This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Python narrative_stack System
Relevant source files
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynbipynb)
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb
- src/caches.rs
Purpose and Scope
The narrative_stack system is the Python machine learning component of the dual-language financial data processing architecture. This document provides an overview of the Python system's architecture, its integration with the Rust sec-fetcher component, and the high-level data flow from CSV ingestion through model training.
For detailed information on specific subsystems:
- Data ingestion and preprocessing pipeline: See Data Ingestion & Preprocessing
- Machine learning model architecture and training: See Machine Learning Training Pipeline
- Database and storage integration: See Database & Storage Integration
The Rust data fetching layer is documented in Rust sec-fetcher Application.
System Architecture Overview
The narrative_stack system processes US GAAP financial data through a multi-stage pipeline that transforms raw CSV files into learned latent representations. The architecture consists of three primary layers: storage/ingestion, preprocessing, and training.
graph TB
subgraph "Storage Layer"
DbUsGaap["DbUsGaap\nMySQL Database Interface"]
DataStoreWsClient["DataStoreWsClient\nWebSocket Client"]
UsGaapStore["UsGaapStore\nUnified Data Facade"]
end
subgraph "Data Sources"
RustCSV["CSV Files\nfrom sec-fetcher\ntruncated_csvs/"]
MySQL["MySQL Database\nus_gaap_test"]
SimdRDrive["simd-r-drive\nWebSocket Server"]
end
subgraph "Preprocessing Components"
IngestCSV["ingest_us_gaap_csvs\nCSV Walker & Parser"]
GenPCA["generate_pca_embeddings\nPCA Compression"]
RobustScaler["RobustScaler\nPer-Pair Normalization"]
ConceptEmbed["Semantic Embeddings\nConcept/Unit Pairs"]
end
subgraph "Training Components"
IterableDS["IterableConceptValueDataset\nStreaming Data Loader"]
Stage1AE["Stage1Autoencoder\nPyTorch Lightning Module"]
PLTrainer["pl.Trainer\nTraining Orchestration"]
Callbacks["EarlyStopping\nModelCheckpoint\nTensorBoard Logger"]
end
RustCSV --> IngestCSV
MySQL --> DbUsGaap
SimdRDrive --> DataStoreWsClient
DbUsGaap --> UsGaapStore
DataStoreWsClient --> UsGaapStore
IngestCSV --> UsGaapStore
UsGaapStore --> GenPCA
UsGaapStore --> RobustScaler
UsGaapStore --> ConceptEmbed
GenPCA --> UsGaapStore
RobustScaler --> UsGaapStore
ConceptEmbed --> UsGaapStore
UsGaapStore --> IterableDS
IterableDS --> Stage1AE
Stage1AE --> PLTrainer
PLTrainer --> Callbacks
Callbacks -.->|Checkpoints| Stage1AE
Component Architecture
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-40
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176
Core Component Responsibilities
| Component | Type | Responsibilities |
|---|---|---|
DbUsGaap | Database Interface | MySQL connection management, query execution |
DataStoreWsClient | WebSocket Client | Real-time communication with simd-r-drive server |
UsGaapStore | Data Facade | Unified API for ingestion, lookup, embedding management |
IterableConceptValueDataset | PyTorch Dataset | Streaming data loader with internal batching |
Stage1Autoencoder | Lightning Module | Encoder-decoder architecture for concept embeddings |
pl.Trainer | Training Framework | Orchestration, logging, checkpointing |
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-40
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176
Data Flow Pipeline
The system processes financial data through a sequential pipeline from raw CSV files to trained model checkpoints.
graph LR
subgraph "Input Stage"
CSV1["Rust CSV Output\nproject_paths.rust_data/\ntruncated_csvs/"]
end
subgraph "Ingestion Stage"
Walk["walk_us_gaap_csvs\nDirectory Walker"]
Parse["UsGaapRowRecord\nParser"]
Store1["Store to\nDbUsGaap & DataStore"]
end
subgraph "Preprocessing Stage"
Extract["Extract\nconcept/unit pairs"]
GenEmbed["generate_concept_unit_embeddings\nSemantic Embeddings"]
Scale["RobustScaler\nPer-Pair Normalization"]
PCA["PCA Compression\nvariance_threshold=0.95"]
Triplet["Triplet Storage\nconcept+unit+scaled_value\n+scaler+embedding"]
end
subgraph "Training Stage"
Stream["IterableConceptValueDataset\ninternal_batch_size=64"]
DataLoader["DataLoader\nbatch_size from hparams\ncollate_with_scaler"]
Encode["Stage1Autoencoder.encode\nInput → Latent"]
Decode["Stage1Autoencoder.decode\nLatent → Reconstruction"]
Loss["MSE Loss\nReconstruction Error"]
end
subgraph "Output Stage"
Ckpt["Model Checkpoints\nstage1_resume-vN.ckpt"]
TB["TensorBoard Logs\nval_loss_epoch monitoring"]
end
CSV1 --> Walk
Walk --> Parse
Parse --> Store1
Store1 --> Extract
Extract --> GenEmbed
GenEmbed --> Scale
Scale --> PCA
PCA --> Triplet
Triplet --> Stream
Stream --> DataLoader
DataLoader --> Encode
Encode --> Decode
Decode --> Loss
Loss --> Ckpt
Loss --> TB
End-to-End Processing Flow
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:21-68
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-556
Processing Stages
1. CSV Ingestion
The system ingests CSV files produced by the Rust sec-fetcher using us_gaap_store.ingest_us_gaap_csvs(). This function walks the directory structure, parses each CSV file, and stores the data in both MySQL and the WebSocket data store.
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:21-23
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:67-68
2. Concept/Unit Pair Extraction
The preprocessing pipeline extracts unique (concept, unit) pairs from the ingested data. Each pair represents a specific financial metric with its measurement unit (e.g., ("Revenues", "USD")).
Sources:
3. Semantic Embedding Generation
For each concept/unit pair, the system generates semantic embeddings that capture the meaning and relationship of financial concepts. These embeddings are then compressed using PCA with a variance threshold of 0.95.
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:67-68
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:263-269
4. Value Normalization
The RobustScaler normalizes financial values separately for each concept/unit pair, making the data robust to outliers while preserving relative magnitudes within each group.
Sources:
5. Model Training
The Stage1Autoencoder learns to reconstruct the concatenated embedding+value input through a bottleneck architecture, forcing the model to learn compressed representations in the latent space.
Sources:
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-486
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb541-556
graph TB
subgraph "Python Application"
App["narrative_stack\nNotebooks & Scripts"]
end
subgraph "Storage Facade"
UsGaapStore["UsGaapStore\nUnified Interface"]
end
subgraph "Backend Interfaces"
DbInterface["DbUsGaap\ndb_config"]
WsInterface["DataStoreWsClient\nsimd_r_drive_server_config"]
end
subgraph "Storage Systems"
MySQL["MySQL Server\nus_gaap_test database"]
WsServer["simd-r-drive\nWebSocket Server\nKey-Value Store"]
FileCache["File System\nCache Storage"]
end
subgraph "Data Types"
Raw["Raw Records\nUsGaapRowRecord"]
Triplets["Triplets\nconcept+unit+scaled_value\n+scaler+embedding"]
Matrix["Embedding Matrix\nnumpy arrays"]
PCAModel["PCA Models\nsklearn objects"]
end
App --> UsGaapStore
UsGaapStore --> DbInterface
UsGaapStore --> WsInterface
DbInterface --> MySQL
WsInterface --> WsServer
WsServer --> FileCache
MySQL -.->|stores| Raw
WsServer -.->|stores| Triplets
WsServer -.->|stores| Matrix
WsServer -.->|stores| PCAModel
Storage Architecture Integration
The Python system integrates with multiple storage backends to support different access patterns and data requirements.
Storage Backend Architecture
Sources:
Storage System Characteristics
| Storage Backend | Use Case | Data Types | Access Pattern |
|---|---|---|---|
MySQL (DbUsGaap) | Raw record storage | UsGaapRowRecord entries | Batch queries during ingestion |
WebSocket (DataStoreWsClient) | Preprocessed data | Triplets, embeddings, scalers, PCA models | Real-time streaming during training |
| File System | Cache persistence | HTTP cache, preprocessor cache | Transparent via simd-r-drive |
Sources:
sequenceDiagram
participant NB as "Jupyter Notebook"
participant Config as "config module"
participant DB as "DbUsGaap"
participant WS as "DataStoreWsClient"
participant Store as "UsGaapStore"
NB->>Config: Import db_config
NB->>Config: Import simd_r_drive_server_config
NB->>DB: DbUsGaap(db_config)
NB->>WS: DataStoreWsClient(host, port)
NB->>Store: UsGaapStore(data_store)
Store->>WS: Delegate operations
Store->>DB: Query raw records
Configuration and Initialization
The system uses centralized configuration for database connections and WebSocket server endpoints.
Initialization Sequence
Sources:
Configuration Objects
| Configuration | Purpose | Key Parameters |
|---|---|---|
db_config | MySQL connection | Host, port, database name, credentials |
simd_r_drive_server_config | WebSocket server | Host, port |
project_paths | File system paths | rust_data, python_data |
Sources:
graph TB
subgraph "Model Configuration"
Model["Stage1Autoencoder\nhparams"]
LoadCkpt["load_from_checkpoint\nckpt_path"]
LR["Learning Rate\nlr, min_lr"]
end
subgraph "Data Configuration"
DS["IterableConceptValueDataset\ninternal_batch_size=64\nreturn_scaler=True\nshuffle=True/False"]
DL["DataLoader\nbatch_size from hparams\ncollate_fn=collate_with_scaler\nnum_workers=2\nprefetch_factor=4"]
end
subgraph "Callback Configuration"
ES["EarlyStopping\nmonitor=val_loss_epoch\npatience=20\nmode=min"]
MC["ModelCheckpoint\nmonitor=val_loss_epoch\nsave_top_k=1\nfilename=stage1_resume"]
TB["TensorBoardLogger\nOUTPUT_PATH\nname=stage1_autoencoder"]
end
subgraph "Trainer Configuration"
Trainer["pl.Trainer\nmax_epochs=1000\naccelerator=auto\ndevices=1\ngradient_clip_val"]
end
Model --> Trainer
LoadCkpt -.->|optional resume| Model
LR --> Model
DS --> DL
DL --> Trainer
ES --> Trainer
MC --> Trainer
TB --> Trainer
Training Infrastructure
The training system uses PyTorch Lightning for experiment management and reproducible training workflows.
Training Configuration
Sources:
Key Training Parameters
| Parameter | Value | Purpose |
|---|---|---|
EPOCHS | 1000 | Maximum training epochs |
PATIENCE | 20 | Early stopping patience |
internal_batch_size | 64 | Dataset internal batching |
num_workers | 2 | DataLoader worker processes |
prefetch_factor | 4 | Batches to prefetch per worker |
gradient_clip_val | From hparams | Gradient clipping threshold |
Sources:
Data Access Patterns
The UsGaapStore provides multiple access methods for different use cases.
Access Methods
| Method | Purpose | Return Type | Usage Context |
|---|---|---|---|
ingest_us_gaap_csvs(csv_dir, db) | Bulk CSV ingestion | None | Initial data loading |
generate_pca_embeddings() | PCA compression | None | Preprocessing |
lookup_by_index(idx) | Single triplet retrieval | Dict with concept, unit, scaled_value, scaler, embedding | Validation, debugging |
batch_lookup_by_indices(indices) | Multiple triplet retrieval | List of dicts | Batch validation |
get_triplet_count() | Count stored triplets | Integer | Monitoring |
get_pair_count() | Count unique pairs | Integer | Statistics |
get_embedding_matrix() | Full embedding matrix | (numpy array, pairs list) | Visualization, analysis |
Sources:
Validation and Monitoring
The system includes validation mechanisms to ensure data integrity throughout the pipeline.
Scaler Validation Example
The preprocessing notebook includes validation code to verify that stored scalers correctly transform values:
Sources:
Visualization Tools
The system provides visualization utilities for monitoring preprocessing quality:
| Function | Purpose | Output |
|---|---|---|
plot_pca_explanation() | Show PCA variance explained | Matplotlib plot with dimensionality recommendation |
plot_semantic_embeddings() | Visualize embedding space | 2D scatterplot of embeddings |
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:259-269
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:337-342
graph LR
subgraph "Rust sec-fetcher"
Fetch["Network Fetching\nfetch_us_gaap_fundamentals"]
Transform["Concept Transformation\ndistill_us_gaap_fundamental_concepts"]
CSVWrite["CSV Output\nus-gaap/*.csv"]
end
subgraph "File System"
CSVStore["CSV Files\nproject_paths.rust_data/\ntruncated_csvs/"]
end
subgraph "Python narrative_stack"
CSVRead["CSV Ingestion\ningest_us_gaap_csvs"]
Process["Preprocessing Pipeline"]
Train["Model Training"]
end
Fetch --> Transform
Transform --> CSVWrite
CSVWrite --> CSVStore
CSVStore --> CSVRead
CSVRead --> Process
Process --> Train
Integration with Rust sec-fetcher
The Python system consumes data produced by the Rust sec-fetcher application through a file-based interface.
Rust-Python Data Bridge
Sources:
Data Format Expectations
The Python system expects CSV files produced by the Rust sec-fetcher to contain US GAAP fundamental data with the following structure:
- Each CSV file represents data for a single ticker symbol
- Files are organized in subdirectories (e.g.,
truncated_csvs/) - Each row contains concept, unit, value, and metadata fields
- Concepts are standardized through the Rust
distill_us_gaap_fundamental_conceptstransformer
The preprocessing pipeline parses these CSVs into UsGaapRowRecord objects for further processing.
Sources:
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:21-23
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb:67-68
Checkpoint Management
The training system supports checkpoint-based workflows for resuming training and fine-tuning.
Checkpoint Loading Patterns
The notebook demonstrates multiple checkpoint loading strategies:
Sources:
Checkpoint Configuration
| Parameter | Purpose | Typical Value |
|---|---|---|
dirpath | Checkpoint save directory | OUTPUT_PATH |
filename | Checkpoint filename pattern | "stage1_resume" |
monitor | Metric to monitor | "val_loss_epoch" |
mode | Optimization direction | "min" |
save_top_k | Number of checkpoints to keep | 1 |
Sources: