This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Data Ingestion & Preprocessing
Relevant source files
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynbipynb)
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb
- src/caches.rs
Purpose and Scope
This document describes the data ingestion and preprocessing pipeline in the Python narrative_stack system. This pipeline transforms raw US GAAP financial data from CSV files (generated by the Rust sec-fetcher application) into normalized, embedded triplets suitable for machine learning training. The pipeline handles CSV parsing, concept/unit pair extraction, semantic embedding generation, robust statistical normalization, and PCA-based dimensionality reduction.
For information about the machine learning training pipeline that consumes this preprocessed data, see Machine Learning Training Pipeline. For details about the Rust data fetching and CSV generation, see Rust sec-fetcher Application.
Overview
The preprocessing pipeline orchestrates multiple transformation stages to prepare raw financial data for machine learning. The system reads CSV files containing US GAAP fundamental concepts (Assets, Revenues, etc.) with associated values and units of measure, then generates semantic embeddings for each concept/unit pair, normalizes values using robust statistical techniques, and compresses embeddings via PCA.
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:1-383
graph TB
subgraph "Input"
CSVFiles["CSV Files\nrust_data/us-gaap/*.csv\nSymbol-specific filings"]
end
subgraph "Ingestion Layer"
WalkCSV["walk_us_gaap_csvs()\nIterator over CSV files"]
ParseRow["UsGaapRowRecord\nParsed entries per symbol"]
end
subgraph "Extraction & Embedding"
ExtractPairs["Extract Concept/Unit Pairs\n(concept, unit)
tuples"]
GenEmbeddings["generate_concept_unit_embeddings()\nSemantic embeddings per pair"]
end
subgraph "Normalization"
GroupValues["Group by (concept, unit)\nCollect all values per pair"]
FitScaler["RobustScaler.fit()\nPer-pair scaling parameters"]
TransformValues["Transform values\nScaled to robust statistics"]
end
subgraph "Dimensionality Reduction"
PCAFit["PCA.fit()\nvariance_threshold=0.95"]
PCATransform["PCA.transform()\nCompress embeddings"]
end
subgraph "Storage"
BuildTriplets["Build Triplets\n(concept, unit, scaled_value,\nscaler, embedding)"]
StoreDS["DataStoreWsClient\nWrite to simd-r-drive"]
UsGaapStore["UsGaapStore\nUnified access facade"]
end
CSVFiles --> WalkCSV
WalkCSV --> ParseRow
ParseRow --> ExtractPairs
ExtractPairs --> GenEmbeddings
ParseRow --> GroupValues
GroupValues --> FitScaler
FitScaler --> TransformValues
GenEmbeddings --> PCAFit
PCAFit --> PCATransform
TransformValues --> BuildTriplets
PCATransform --> BuildTriplets
BuildTriplets --> StoreDS
StoreDS --> UsGaapStore
Architecture Components
The preprocessing system is built around three primary components that handle data access, transformation, and storage:
graph TB
subgraph "Database Layer"
DbUsGaap["DbUsGaap\nMySQL interface\nus_gaap_test database"]
end
subgraph "Storage Layer"
DataStoreWsClient["DataStoreWsClient\nWebSocket client\nsimd_r_drive_server_config"]
end
subgraph "Facade Layer"
UsGaapStore["UsGaapStore\nUnified data access\nOrchestrates ingestion & retrieval"]
end
subgraph "Core Operations"
Ingest["ingest_us_gaap_csvs()\nCSV → Database → WebSocket"]
GenPCA["generate_pca_embeddings()\nPCA compression pipeline"]
Lookup["lookup_by_index()\nRetrieve triplet + metadata"]
end
DbUsGaap --> UsGaapStore
DataStoreWsClient --> UsGaapStore
UsGaapStore --> Ingest
UsGaapStore --> GenPCA
UsGaapStore --> Lookup
| Component | Type | Purpose |
|---|---|---|
DbUsGaap | Database Interface | Provides async MySQL access to stored US GAAP data |
DataStoreWsClient | WebSocket Client | Connects to simd-r-drive server for key-value storage |
UsGaapStore | Facade | Coordinates ingestion, embedding generation, and data retrieval |
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-40
CSV Data Ingestion
The ingestion process begins by reading CSV files produced by the Rust sec-fetcher application. These files are organized by ticker symbol and contain US GAAP fundamental concepts extracted from SEC filings.
Directory Structure
CSV files are read from a configurable directory path that points to the Rust application's output:
graph LR ProjectPaths["project_paths.rust_data"] --> CSVDir["csv_data_dir\nPath to CSV directory"] CSVDir --> Files["Symbol CSV Files\nAAPL.csv, GOOGL.csv, etc."] Files --> Walker["walk_us_gaap_csvs()\nGenerator function"]
The ingestion system walks through the directory structure, processing one CSV file per symbol. Each file contains multiple filing entries with concept/value/unit triplets.
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:20-23
Ingestion Invocation
The primary ingestion method is us_gaap_store.ingest_us_gaap_csvs(), which accepts a CSV directory path and database connection:
This method orchestrates:
- CSV file walking and parsing
- Concept/unit pair extraction
- Value aggregation per pair
- Scaler fitting and transformation
- Storage in both MySQL and simd-r-drive
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb67
Concept/Unit Pair Extraction
Financial data in CSV files contains concept/value/unit triplets. The preprocessing pipeline extracts unique (concept, unit) pairs and groups all values associated with each pair.
Data Structure
Each row in the CSV contains:
- Concept : A
FundamentalConceptvariant (e.g., "Assets", "Revenues", "NetIncomeLoss") - Unit of Measure (UOM) : The measurement unit (e.g., "USD", "shares", "USD_per_share")
- Value : The numeric value reported in the filing
- Additional Metadata : Period type, balance type, filing date, etc.
Grouping Strategy
Values are grouped by (concept, unit) combinations because:
- Different concepts have different value ranges (e.g., Assets vs. EPS)
- The same concept in different units requires separate scaling (e.g., Revenue in USD vs. Revenue in thousands)
- This grouping enables per-pair statistical normalization
Each pair becomes associated with:
- A semantic embedding vector
- A fitted
RobustScalerinstance - A collection of normalized values
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:190-249
Semantic Embedding Generation
The system generates semantic embeddings for each unique (concept, unit) pair using a pre-trained language model. These embeddings capture the semantic meaning of the financial concept and its measurement unit.
graph LR Pairs["Concept/Unit Pairs\n(Revenues, USD)\n(Assets, USD)\n(NetIncomeLoss, USD)"] --> Model["Embedding Model\nPre-trained transformer"] Model --> Vectors["Embedding Vectors\nFixed dimensionality\nSemantic representation"] Vectors --> EmbedMap["embedding_map\nDict[(concept, unit) → embedding]"]
Embedding Process
The generate_concept_unit_embeddings() function creates fixed-dimensional vector representations:
Embedding Properties
| Property | Description |
|---|---|
| Dimensionality | Fixed size (typically 384 or 768 dimensions) |
| Semantic Preservation | Similar concepts produce similar embeddings |
| Deterministic | Same input always produces same output |
| Device-Agnostic | Can be computed on CPU or GPU |
The embeddings are stored in a mapping structure that associates each (concept, unit) pair with its corresponding vector. This mapping is used throughout the preprocessing and training pipelines.
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:279-292
Value Normalization with RobustScaler
Financial values exhibit extreme variance and outliers. The preprocessing pipeline uses RobustScaler from scikit-learn to normalize values within each (concept, unit) group.
RobustScaler Characteristics
RobustScaler is chosen over standard normalization techniques because:
- Uses median and interquartile range (IQR) instead of mean and standard deviation
- Resistant to outliers that are common in financial data
- Scales each feature independently
- Preserves zero values when appropriate
Scaling Formula
For each value in a (concept, unit) group:
scaled_value = (raw_value - median) / IQR
Where:
medianis the 50th percentile of all values in the groupIQRis the interquartile range (75th percentile - 25th percentile)
Scaler Persistence
Each fitted RobustScaler instance is stored alongside the scaled values. This enables:
- Inverse transformation for interpreting model outputs
- Validation of scaling correctness
- Consistent processing of new data points
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:108-166
PCA Dimensionality Reduction
The preprocessing pipeline applies Principal Component Analysis (PCA) to compress semantic embeddings while preserving most of the variance. This reduces memory footprint and training time.
PCA Configuration
| Parameter | Value | Purpose |
|---|---|---|
variance_threshold | 0.95 | Retain 95% of original variance |
fit_data | All concept/unit embeddings | Learn principal components |
transform_data | Same embeddings | Apply compression |
Variance Explanation Visualization
The system provides visualization tools to analyze PCA effectiveness:
This function generates plots showing:
- Cumulative explained variance vs. number of components
- Individual component variance contribution
- The optimal number of components to retain 95% variance
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:67-68 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:259-270
graph TB
subgraph "Triplet Components"
Concept["concept: str\nFundamentalConcept variant"]
Unit["unit: str\nUnit of measure"]
ScaledVal["scaled_value: float\nRobustScaler output"]
UnscaledVal["unscaled_value: float\nOriginal raw value"]
Scaler["scaler: RobustScaler\nFitted scaler instance"]
Embedding["embedding: ndarray\nPCA-compressed vector"]
end
subgraph "Storage Format"
Index["Index: int\nSequential triplet ID"]
Serialized["Serialized Data\nPickled Python object"]
end
Concept --> Serialized
Unit --> Serialized
ScaledVal --> Serialized
UnscaledVal --> Serialized
Scaler --> Serialized
Embedding --> Serialized
Index --> Serialized
Serialized --> WebSocket["DataStoreWsClient\nWebSocket write"]
Triplet Storage Structure
The final output of preprocessing is a collection of triplets stored in the simd-r-drive key-value store via DataStoreWsClient. Each triplet contains all information needed for training and inference.
Triplet Schema
Storage Interface
The UsGaapStore facade provides methods for retrieving stored triplets:
| Method | Purpose |
|---|---|
lookup_by_index(idx: int) | Retrieve single triplet by sequential index |
batch_lookup_by_indices(indices: List[int]) | Retrieve multiple triplets efficiently |
get_triplet_count() | Get total number of stored triplets |
get_pair_count() | Get number of unique (concept, unit) pairs |
get_embedding_matrix() | Retrieve full embedding matrix and pair list |
Retrieval Example
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:78-97
Data Validation
The preprocessing pipeline includes validation mechanisms to ensure data integrity throughout the transformation process.
Scaler Verification
A validation check confirms that stored scaled values match the transformation applied by the stored scaler:
graph LR Lookup["Lookup triplet"] --> Extract["Extract unscaled_value\nand scaler"] Extract --> Transform["scaler.transform()"] Transform --> Compare["np.isclose()\ncheck vs scaled_value"] Compare --> Pass["Validation Pass"] Compare --> Fail["Validation Fail"]
This test:
- Retrieves a random triplet
- Re-applies the stored scaler to the unscaled value
- Verifies the result matches the stored scaled value
- Uses
np.isclose()for floating-point tolerance
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:91-96
Visualization Tools
The preprocessing module provides visualization utilities for analyzing embedding quality and PCA effectiveness.
PCA Explanation Plot
The plot_pca_explanation() function generates cumulative variance plots:
This visualization shows:
- How many principal components are needed to reach the variance threshold
- The diminishing returns of adding more components
- The optimal dimensionality for the compressed embeddings
Semantic Embedding Scatterplot
The plot_semantic_embeddings() function creates 2D projections of embeddings:
This visualization helps assess:
- Clustering patterns in concept/unit pairs
- Separation between different financial concepts
- Embedding space structure
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:259-270 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:337-343
graph LR
subgraph "Preprocessing Output"
Triplets["Stored Triplets\nsimd-r-drive storage"]
end
subgraph "Training Input"
Dataset["IterableConceptValueDataset\nStreaming data loader"]
Collate["collate_with_scaler()\nBatch construction"]
DataLoader["PyTorch DataLoader\nTraining batches"]
end
Triplets --> Dataset
Dataset --> Collate
Collate --> DataLoader
Integration with Training Pipeline
The preprocessed data serves as input to the machine learning training pipeline. The stored triplets are loaded via streaming datasets during training.
| Component | Purpose |
|---|---|
| Triplets in simd-r-drive | Persistent storage of preprocessed data |
IterableConceptValueDataset | Streams triplets without loading all into memory |
collate_with_scaler | Custom collation function for batch processing |
PyTorch DataLoader | Manages batching, shuffling, and parallel loading |
For detailed information about the training pipeline, see Machine Learning Training Pipeline.
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522
Performance Considerations
The preprocessing pipeline is designed to handle large-scale financial datasets efficiently.
Memory Management
| Strategy | Implementation |
|---|---|
| Streaming CSV reading | Process files one at a time, not all in memory |
| Incremental scaler fitting | Use online algorithms for large groups |
| Compressed storage | PCA reduces embedding footprint by ~50-70% |
| WebSocket batching | DataStoreWsClient batches writes for efficiency |
graph TB
subgraph "Rust Caching Layer"
HTTPCache["SIMD_R_DRIVE_HTTP_CACHE\nhttp_storage_cache.bin\n1 week TTL"]
PreCache["SIMD_R_DRIVE_PREPROCESSOR_CACHE\npreprocessor_cache.bin\nPersistent storage"]
end
subgraph "Python Preprocessing"
CSVRead["Read CSV files"]
Process["Transform & normalize"]
end
HTTPCache -.->|Cached SEC data| CSVRead
PreCache -.->|Cached computations| Process
Caching Strategy
The Rust layer provides HTTP and preprocessor caches that the preprocessing pipeline can leverage:
Sources: src/caches.rs:1-66