Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Python narrative_stack System

Loading…

Python narrative_stack System

Relevant source files

Purpose and Scope

The narrative_stack system is the Python machine learning component of the dual-language financial data processing architecture. This system is responsible for transforming raw financial data fetched by the Rust sec-fetcher into learned latent representations using deep learning techniques.

For detailed information on specific subsystems:

  • Data Ingestion & Preprocessing: See Data Ingestion & Preprocessing for details on CSV parsing, semantic embedding generation, and PCA dimensionality reduction.
  • Machine Learning Training Pipeline : See Machine Learning Training Pipeline for documentation on the Stage1Autoencoder model, PyTorch Lightning setup, and dataset streaming.
  • Database & Storage Integration: See Database & Storage Integration for details on the DbUsGaap interface and DataStoreWsClient WebSocket integration.
  • US GAAP Distribution Analyzer : See US GAAP Distribution Analyzer for information on the Rust-based clustering tool used to analyze concept distributions.

The Rust data fetching layer is documented in Rust sec-fetcher Application.

System Architecture Overview

The narrative_stack system processes US GAAP financial data through a multi-stage pipeline that transforms raw CSV files into learned latent representations. The architecture consists of three primary layers: storage/ingestion, preprocessing, and training.

graph TB
    subgraph "Storage Layer"
        DbUsGaap["DbUsGaap\nMySQL Database Interface"]
DataStoreWsClient["DataStoreWsClient\nWebSocket Client"]
UsGaapStore["UsGaapStore\nUnified Data Facade"]
end
    
    subgraph "Data Sources"
        RustCSV["CSV Files\nfrom sec-fetcher\ntruncated_csvs/"]
MySQL["MySQL Database\nus_gaap_test"]
SimdRDrive["simd-r-drive\nWebSocket Server"]
end
    
    subgraph "Preprocessing Components"
        IngestCSV["ingest_us_gaap_csvs\nCSV Walker & Parser"]
GenPCA["generate_pca_embeddings\nPCA Compression"]
RobustScaler["RobustScaler\nPer-Pair Normalization"]
ConceptEmbed["Semantic Embeddings\nConcept/Unit Pairs"]
end
    
    subgraph "Training Components"
        IterableDS["IterableConceptValueDataset\nStreaming Data Loader"]
Stage1AE["Stage1Autoencoder\nPyTorch Lightning Module"]
PLTrainer["pl.Trainer\nTraining Orchestration"]
Callbacks["EarlyStopping\nModelCheckpoint\nTensorBoard Logger"]
end
    
 
   RustCSV --> IngestCSV
 
   MySQL --> DbUsGaap
 
   SimdRDrive --> DataStoreWsClient
    
 
   DbUsGaap --> UsGaapStore
 
   DataStoreWsClient --> UsGaapStore
    
 
   IngestCSV --> UsGaapStore
 
   UsGaapStore --> GenPCA
 
   UsGaapStore --> RobustScaler
 
   UsGaapStore --> ConceptEmbed
    
 
   GenPCA --> UsGaapStore
 
   RobustScaler --> UsGaapStore
 
   ConceptEmbed --> UsGaapStore
    
 
   UsGaapStore --> IterableDS
 
   IterableDS --> Stage1AE
 
   Stage1AE --> PLTrainer
 
   PLTrainer --> Callbacks
    
 
   Callbacks -.->|Checkpoints| Stage1AE

Component Architecture

Sources:

Core Component Responsibilities

ComponentTypeResponsibilities
DbUsGaapDatabase InterfaceMySQL connection management, query execution.
DataStoreWsClientWebSocket ClientReal-time communication with simd-r-drive server.
UsGaapStoreData FacadeUnified API for ingestion, lookup, and embedding management.
IterableConceptValueDatasetPyTorch DatasetStreaming data loader with internal batching to handle large datasets.
Stage1AutoencoderLightning ModuleEncoder-decoder architecture for learning financial concept embeddings.
pl.TrainerTraining FrameworkOrchestration of the training loop, logging, and checkpointing.

Sources:

Data Flow Pipeline

The system processes financial data through a sequential pipeline from raw CSV files to trained model checkpoints.

graph LR
    subgraph "Input Stage"
        CSV1["Rust CSV Output\nproject_paths.rust_data/\ntruncated_csvs/"]
end
    
    subgraph "Ingestion Stage"
        Walk["walk_us_gaap_csvs\nDirectory Walker"]
Parse["UsGaapRowRecord\nParser"]
Store1["Store to\nDbUsGaap & DataStore"]
end
    
    subgraph "Preprocessing Stage"
        Extract["Extract\nconcept/unit pairs"]
GenEmbed["generate_concept_unit_embeddings\nSemantic Embeddings"]
Scale["RobustScaler\nPer-Pair Normalization"]
PCA["PCA Compression\nvariance_threshold=0.95"]
Triplet["Triplet Storage\nconcept+unit+scaled_value\n+scaler+embedding"]
end
    
    subgraph "Training Stage"
        Stream["IterableConceptValueDataset\ninternal_batch_size=64"]
DataLoader["DataLoader\nbatch_size from hparams\ncollate_with_scaler"]
Encode["Stage1Autoencoder.encode\nInput → Latent"]
Decode["Stage1Autoencoder.decode\nLatent → Reconstruction"]
Loss["MSE Loss\nReconstruction Error"]
end
    
    subgraph "Output Stage"
        Ckpt["Model Checkpoints\nstage1_resume-vN.ckpt"]
TB["TensorBoard Logs\nval_loss_epoch monitoring"]
end
    
 
   CSV1 --> Walk
 
   Walk --> Parse
 
   Parse --> Store1
 
   Store1 --> Extract
 
   Extract --> GenEmbed
 
   GenEmbed --> Scale
 
   Scale --> PCA
 
   PCA --> Triplet
 
   Triplet --> Stream
 
   Stream --> DataLoader
 
   DataLoader --> Encode
 
   Encode --> Decode
 
   Decode --> Loss
 
   Loss --> Ckpt
 
   Loss --> TB

End-to-End Processing Flow

Sources:

Processing Stages

  1. CSV Ingestion : The system ingests CSV files produced by the Rust sec-fetcher using us_gaap_store.ingest_us_gaap_csvs(). python/narrative_stack/notebooks/stage1_preprocessing.ipynb:21-23
  2. Concept/Unit Pair Extraction : Unique (concept, unit) pairs are extracted to define the semantic space. python/narrative_stack/notebooks/stage1_preprocessing.ipynb:67-68
  3. Semantic Embedding Generation : Embeddings capture relationships between financial concepts, compressed via PCA with a 0.95 variance threshold. python/narrative_stack/notebooks/stage1_preprocessing.ipynb:263-269
  4. Value Normalization : RobustScaler is applied per-pair to handle outliers in financial magnitudes. python/narrative_stack/notebooks/stage1_preprocessing.ipynb:88-96
  5. Model Training : The Stage1Autoencoder learns a bottleneck representation of the concatenated embedding and scaled value. python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-486

Storage Architecture Integration

The Python system integrates with multiple storage backends to support different access patterns and data requirements.

graph TB
    subgraph "Python Application"
        App["narrative_stack\nNotebooks & Scripts"]
end
    
    subgraph "Storage Facade"
        UsGaapStore["UsGaapStore\nUnified Interface"]
end
    
    subgraph "Backend Interfaces"
        DbInterface["DbUsGaap\ndb_config"]
WsInterface["DataStoreWsClient\nsimd_r_drive_server_config"]
end
    
    subgraph "Storage Systems"
        MySQL["MySQL Server\nus_gaap_test database"]
WsServer["simd-r-drive\nWebSocket Server\nKey-Value Store"]
FileCache["File System\nCache Storage"]
end
    
    subgraph "Data Types"
        Raw["Raw Records\nUsGaapRowRecord"]
Triplets["Triplets\nconcept+unit+scaled_value\n+scaler+embedding"]
Matrix["Embedding Matrix\nnumpy arrays"]
PCAModel["PCA Models\nsklearn objects"]
end
    
 
   App --> UsGaapStore
 
   UsGaapStore --> DbInterface
 
   UsGaapStore --> WsInterface
    
 
   DbInterface --> MySQL
 
   WsInterface --> WsServer
 
   WsServer --> FileCache
    
 
   MySQL -.->|stores| Raw
 
   WsServer -.->|stores| Triplets
 
   WsServer -.->|stores| Matrix
 
   WsServer -.->|stores| PCAModel

Storage Backend Architecture

Sources:

Configuration and Initialization

The system uses centralized configuration for database connections and WebSocket server endpoints. The .vscode/settings.json file points to the specific Python environment for the stack.

Training Infrastructure

The training system uses PyTorch Lightning for experiment management.

Training Configuration

Sources:

Key Training Parameters

ParameterValuePurpose
EPOCHS1000Maximum training epochs.
internal_batch_size64Dataset internal batching size.
num_workers2DataLoader worker processes.
gradient_clip_valFrom hparamsGradient clipping threshold.

Sources:

Dismiss

Refresh this wiki

Enter email to refresh