Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Python narrative_stack System

Relevant source files

Purpose and Scope

The narrative_stack system is the Python machine learning component of the dual-language financial data processing architecture. This document provides an overview of the Python system's architecture, its integration with the Rust sec-fetcher component, and the high-level data flow from CSV ingestion through model training.

For detailed information on specific subsystems:

The Rust data fetching layer is documented in Rust sec-fetcher Application.

System Architecture Overview

The narrative_stack system processes US GAAP financial data through a multi-stage pipeline that transforms raw CSV files into learned latent representations. The architecture consists of three primary layers: storage/ingestion, preprocessing, and training.

graph TB
    subgraph "Storage Layer"
        DbUsGaap["DbUsGaap\nMySQL Database Interface"]
DataStoreWsClient["DataStoreWsClient\nWebSocket Client"]
UsGaapStore["UsGaapStore\nUnified Data Facade"]
end
    
    subgraph "Data Sources"
        RustCSV["CSV Files\nfrom sec-fetcher\ntruncated_csvs/"]
MySQL["MySQL Database\nus_gaap_test"]
SimdRDrive["simd-r-drive\nWebSocket Server"]
end
    
    subgraph "Preprocessing Components"
        IngestCSV["ingest_us_gaap_csvs\nCSV Walker & Parser"]
GenPCA["generate_pca_embeddings\nPCA Compression"]
RobustScaler["RobustScaler\nPer-Pair Normalization"]
ConceptEmbed["Semantic Embeddings\nConcept/Unit Pairs"]
end
    
    subgraph "Training Components"
        IterableDS["IterableConceptValueDataset\nStreaming Data Loader"]
Stage1AE["Stage1Autoencoder\nPyTorch Lightning Module"]
PLTrainer["pl.Trainer\nTraining Orchestration"]
Callbacks["EarlyStopping\nModelCheckpoint\nTensorBoard Logger"]
end
    
 
   RustCSV --> IngestCSV
 
   MySQL --> DbUsGaap
 
   SimdRDrive --> DataStoreWsClient
    
 
   DbUsGaap --> UsGaapStore
 
   DataStoreWsClient --> UsGaapStore
    
 
   IngestCSV --> UsGaapStore
 
   UsGaapStore --> GenPCA
 
   UsGaapStore --> RobustScaler
 
   UsGaapStore --> ConceptEmbed
    
 
   GenPCA --> UsGaapStore
 
   RobustScaler --> UsGaapStore
 
   ConceptEmbed --> UsGaapStore
    
 
   UsGaapStore --> IterableDS
 
   IterableDS --> Stage1AE
 
   Stage1AE --> PLTrainer
 
   PLTrainer --> Callbacks
    
 
   Callbacks -.->|Checkpoints| Stage1AE

Component Architecture

Sources:

Core Component Responsibilities

ComponentTypeResponsibilities
DbUsGaapDatabase InterfaceMySQL connection management, query execution
DataStoreWsClientWebSocket ClientReal-time communication with simd-r-drive server
UsGaapStoreData FacadeUnified API for ingestion, lookup, embedding management
IterableConceptValueDatasetPyTorch DatasetStreaming data loader with internal batching
Stage1AutoencoderLightning ModuleEncoder-decoder architecture for concept embeddings
pl.TrainerTraining FrameworkOrchestration, logging, checkpointing

Sources:

Data Flow Pipeline

The system processes financial data through a sequential pipeline from raw CSV files to trained model checkpoints.

graph LR
    subgraph "Input Stage"
        CSV1["Rust CSV Output\nproject_paths.rust_data/\ntruncated_csvs/"]
end
    
    subgraph "Ingestion Stage"
        Walk["walk_us_gaap_csvs\nDirectory Walker"]
Parse["UsGaapRowRecord\nParser"]
Store1["Store to\nDbUsGaap & DataStore"]
end
    
    subgraph "Preprocessing Stage"
        Extract["Extract\nconcept/unit pairs"]
GenEmbed["generate_concept_unit_embeddings\nSemantic Embeddings"]
Scale["RobustScaler\nPer-Pair Normalization"]
PCA["PCA Compression\nvariance_threshold=0.95"]
Triplet["Triplet Storage\nconcept+unit+scaled_value\n+scaler+embedding"]
end
    
    subgraph "Training Stage"
        Stream["IterableConceptValueDataset\ninternal_batch_size=64"]
DataLoader["DataLoader\nbatch_size from hparams\ncollate_with_scaler"]
Encode["Stage1Autoencoder.encode\nInput → Latent"]
Decode["Stage1Autoencoder.decode\nLatent → Reconstruction"]
Loss["MSE Loss\nReconstruction Error"]
end
    
    subgraph "Output Stage"
        Ckpt["Model Checkpoints\nstage1_resume-vN.ckpt"]
TB["TensorBoard Logs\nval_loss_epoch monitoring"]
end
    
 
   CSV1 --> Walk
 
   Walk --> Parse
 
   Parse --> Store1
 
   Store1 --> Extract
 
   Extract --> GenEmbed
 
   GenEmbed --> Scale
 
   Scale --> PCA
 
   PCA --> Triplet
 
   Triplet --> Stream
 
   Stream --> DataLoader
 
   DataLoader --> Encode
 
   Encode --> Decode
 
   Decode --> Loss
 
   Loss --> Ckpt
 
   Loss --> TB

End-to-End Processing Flow

Sources:

Processing Stages

1. CSV Ingestion

The system ingests CSV files produced by the Rust sec-fetcher using us_gaap_store.ingest_us_gaap_csvs(). This function walks the directory structure, parses each CSV file, and stores the data in both MySQL and the WebSocket data store.

Sources:

2. Concept/Unit Pair Extraction

The preprocessing pipeline extracts unique (concept, unit) pairs from the ingested data. Each pair represents a specific financial metric with its measurement unit (e.g., ("Revenues", "USD")).

Sources:

3. Semantic Embedding Generation

For each concept/unit pair, the system generates semantic embeddings that capture the meaning and relationship of financial concepts. These embeddings are then compressed using PCA with a variance threshold of 0.95.

Sources:

4. Value Normalization

The RobustScaler normalizes financial values separately for each concept/unit pair, making the data robust to outliers while preserving relative magnitudes within each group.

Sources:

5. Model Training

The Stage1Autoencoder learns to reconstruct the concatenated embedding+value input through a bottleneck architecture, forcing the model to learn compressed representations in the latent space.

Sources:

graph TB
    subgraph "Python Application"
        App["narrative_stack\nNotebooks & Scripts"]
end
    
    subgraph "Storage Facade"
        UsGaapStore["UsGaapStore\nUnified Interface"]
end
    
    subgraph "Backend Interfaces"
        DbInterface["DbUsGaap\ndb_config"]
WsInterface["DataStoreWsClient\nsimd_r_drive_server_config"]
end
    
    subgraph "Storage Systems"
        MySQL["MySQL Server\nus_gaap_test database"]
WsServer["simd-r-drive\nWebSocket Server\nKey-Value Store"]
FileCache["File System\nCache Storage"]
end
    
    subgraph "Data Types"
        Raw["Raw Records\nUsGaapRowRecord"]
Triplets["Triplets\nconcept+unit+scaled_value\n+scaler+embedding"]
Matrix["Embedding Matrix\nnumpy arrays"]
PCAModel["PCA Models\nsklearn objects"]
end
    
 
   App --> UsGaapStore
 
   UsGaapStore --> DbInterface
 
   UsGaapStore --> WsInterface
    
 
   DbInterface --> MySQL
 
   WsInterface --> WsServer
 
   WsServer --> FileCache
    
 
   MySQL -.->|stores| Raw
 
   WsServer -.->|stores| Triplets
 
   WsServer -.->|stores| Matrix
 
   WsServer -.->|stores| PCAModel

Storage Architecture Integration

The Python system integrates with multiple storage backends to support different access patterns and data requirements.

Storage Backend Architecture

Sources:

Storage System Characteristics

Storage BackendUse CaseData TypesAccess Pattern
MySQL (DbUsGaap)Raw record storageUsGaapRowRecord entriesBatch queries during ingestion
WebSocket (DataStoreWsClient)Preprocessed dataTriplets, embeddings, scalers, PCA modelsReal-time streaming during training
File SystemCache persistenceHTTP cache, preprocessor cacheTransparent via simd-r-drive

Sources:

sequenceDiagram
    participant NB as "Jupyter Notebook"
    participant Config as "config module"
    participant DB as "DbUsGaap"
    participant WS as "DataStoreWsClient"
    participant Store as "UsGaapStore"
    
    NB->>Config: Import db_config
    NB->>Config: Import simd_r_drive_server_config
    NB->>DB: DbUsGaap(db_config)
    NB->>WS: DataStoreWsClient(host, port)
    NB->>Store: UsGaapStore(data_store)
    Store->>WS: Delegate operations
    Store->>DB: Query raw records

Configuration and Initialization

The system uses centralized configuration for database connections and WebSocket server endpoints.

Initialization Sequence

Sources:

Configuration Objects

ConfigurationPurposeKey Parameters
db_configMySQL connectionHost, port, database name, credentials
simd_r_drive_server_configWebSocket serverHost, port
project_pathsFile system pathsrust_data, python_data

Sources:

graph TB
    subgraph "Model Configuration"
        Model["Stage1Autoencoder\nhparams"]
LoadCkpt["load_from_checkpoint\nckpt_path"]
LR["Learning Rate\nlr, min_lr"]
end
    
    subgraph "Data Configuration"
        DS["IterableConceptValueDataset\ninternal_batch_size=64\nreturn_scaler=True\nshuffle=True/False"]
DL["DataLoader\nbatch_size from hparams\ncollate_fn=collate_with_scaler\nnum_workers=2\nprefetch_factor=4"]
end
    
    subgraph "Callback Configuration"
        ES["EarlyStopping\nmonitor=val_loss_epoch\npatience=20\nmode=min"]
MC["ModelCheckpoint\nmonitor=val_loss_epoch\nsave_top_k=1\nfilename=stage1_resume"]
TB["TensorBoardLogger\nOUTPUT_PATH\nname=stage1_autoencoder"]
end
    
    subgraph "Trainer Configuration"
        Trainer["pl.Trainer\nmax_epochs=1000\naccelerator=auto\ndevices=1\ngradient_clip_val"]
end
    
 
   Model --> Trainer
 
   LoadCkpt -.->|optional resume| Model
 
   LR --> Model
    
 
   DS --> DL
 
   DL --> Trainer
    
 
   ES --> Trainer
 
   MC --> Trainer
 
   TB --> Trainer

Training Infrastructure

The training system uses PyTorch Lightning for experiment management and reproducible training workflows.

Training Configuration

Sources:

Key Training Parameters

ParameterValuePurpose
EPOCHS1000Maximum training epochs
PATIENCE20Early stopping patience
internal_batch_size64Dataset internal batching
num_workers2DataLoader worker processes
prefetch_factor4Batches to prefetch per worker
gradient_clip_valFrom hparamsGradient clipping threshold

Sources:

Data Access Patterns

The UsGaapStore provides multiple access methods for different use cases.

Access Methods

MethodPurposeReturn TypeUsage Context
ingest_us_gaap_csvs(csv_dir, db)Bulk CSV ingestionNoneInitial data loading
generate_pca_embeddings()PCA compressionNonePreprocessing
lookup_by_index(idx)Single triplet retrievalDict with concept, unit, scaled_value, scaler, embeddingValidation, debugging
batch_lookup_by_indices(indices)Multiple triplet retrievalList of dictsBatch validation
get_triplet_count()Count stored tripletsIntegerMonitoring
get_pair_count()Count unique pairsIntegerStatistics
get_embedding_matrix()Full embedding matrix(numpy array, pairs list)Visualization, analysis

Sources:

Validation and Monitoring

The system includes validation mechanisms to ensure data integrity throughout the pipeline.

Scaler Validation Example

The preprocessing notebook includes validation code to verify that stored scalers correctly transform values:

Sources:

Visualization Tools

The system provides visualization utilities for monitoring preprocessing quality:

FunctionPurposeOutput
plot_pca_explanation()Show PCA variance explainedMatplotlib plot with dimensionality recommendation
plot_semantic_embeddings()Visualize embedding space2D scatterplot of embeddings

Sources:

graph LR
    subgraph "Rust sec-fetcher"
        Fetch["Network Fetching\nfetch_us_gaap_fundamentals"]
Transform["Concept Transformation\ndistill_us_gaap_fundamental_concepts"]
CSVWrite["CSV Output\nus-gaap/*.csv"]
end
    
    subgraph "File System"
        CSVStore["CSV Files\nproject_paths.rust_data/\ntruncated_csvs/"]
end
    
    subgraph "Python narrative_stack"
        CSVRead["CSV Ingestion\ningest_us_gaap_csvs"]
Process["Preprocessing Pipeline"]
Train["Model Training"]
end
    
 
   Fetch --> Transform
 
   Transform --> CSVWrite
 
   CSVWrite --> CSVStore
 
   CSVStore --> CSVRead
 
   CSVRead --> Process
 
   Process --> Train

Integration with Rust sec-fetcher

The Python system consumes data produced by the Rust sec-fetcher application through a file-based interface.

Rust-Python Data Bridge

Sources:

Data Format Expectations

The Python system expects CSV files produced by the Rust sec-fetcher to contain US GAAP fundamental data with the following structure:

  • Each CSV file represents data for a single ticker symbol
  • Files are organized in subdirectories (e.g., truncated_csvs/)
  • Each row contains concept, unit, value, and metadata fields
  • Concepts are standardized through the Rust distill_us_gaap_fundamental_concepts transformer

The preprocessing pipeline parses these CSVs into UsGaapRowRecord objects for further processing.

Sources:

Checkpoint Management

The training system supports checkpoint-based workflows for resuming training and fine-tuning.

Checkpoint Loading Patterns

The notebook demonstrates multiple checkpoint loading strategies:

Sources:

Checkpoint Configuration

ParameterPurposeTypical Value
dirpathCheckpoint save directoryOUTPUT_PATH
filenameCheckpoint filename pattern"stage1_resume"
monitorMetric to monitor"val_loss_epoch"
modeOptimization direction"min"
save_top_kNumber of checkpoints to keep1

Sources: