Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Data Ingestion & Preprocessing

Relevant source files

Purpose and Scope

This document describes the data ingestion and preprocessing pipeline in the Python narrative_stack system. This pipeline transforms raw US GAAP financial data from CSV files (generated by the Rust sec-fetcher application) into normalized, embedded triplets suitable for machine learning training. The pipeline handles CSV parsing, concept/unit pair extraction, semantic embedding generation, robust statistical normalization, and PCA-based dimensionality reduction.

For information about the machine learning training pipeline that consumes this preprocessed data, see Machine Learning Training Pipeline. For details about the Rust data fetching and CSV generation, see Rust sec-fetcher Application.

Overview

The preprocessing pipeline orchestrates multiple transformation stages to prepare raw financial data for machine learning. The system reads CSV files containing US GAAP fundamental concepts (Assets, Revenues, etc.) with associated values and units of measure, then generates semantic embeddings for each concept/unit pair, normalizes values using robust statistical techniques, and compresses embeddings via PCA.

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:1-383

graph TB
    subgraph "Input"
        CSVFiles["CSV Files\nrust_data/us-gaap/*.csv\nSymbol-specific filings"]
end
    
    subgraph "Ingestion Layer"
        WalkCSV["walk_us_gaap_csvs()\nIterator over CSV files"]
ParseRow["UsGaapRowRecord\nParsed entries per symbol"]
end
    
    subgraph "Extraction & Embedding"
        ExtractPairs["Extract Concept/Unit Pairs\n(concept, unit)
tuples"]
GenEmbeddings["generate_concept_unit_embeddings()\nSemantic embeddings per pair"]
end
    
    subgraph "Normalization"
        GroupValues["Group by (concept, unit)\nCollect all values per pair"]
FitScaler["RobustScaler.fit()\nPer-pair scaling parameters"]
TransformValues["Transform values\nScaled to robust statistics"]
end
    
    subgraph "Dimensionality Reduction"
        PCAFit["PCA.fit()\nvariance_threshold=0.95"]
PCATransform["PCA.transform()\nCompress embeddings"]
end
    
    subgraph "Storage"
        BuildTriplets["Build Triplets\n(concept, unit, scaled_value,\nscaler, embedding)"]
StoreDS["DataStoreWsClient\nWrite to simd-r-drive"]
UsGaapStore["UsGaapStore\nUnified access facade"]
end
    
 
   CSVFiles --> WalkCSV
 
   WalkCSV --> ParseRow
 
   ParseRow --> ExtractPairs
 
   ExtractPairs --> GenEmbeddings
 
   ParseRow --> GroupValues
 
   GroupValues --> FitScaler
 
   FitScaler --> TransformValues
 
   GenEmbeddings --> PCAFit
 
   PCAFit --> PCATransform
 
   TransformValues --> BuildTriplets
 
   PCATransform --> BuildTriplets
 
   BuildTriplets --> StoreDS
 
   StoreDS --> UsGaapStore

Architecture Components

The preprocessing system is built around three primary components that handle data access, transformation, and storage:

graph TB
    subgraph "Database Layer"
        DbUsGaap["DbUsGaap\nMySQL interface\nus_gaap_test database"]
end
    
    subgraph "Storage Layer"
        DataStoreWsClient["DataStoreWsClient\nWebSocket client\nsimd_r_drive_server_config"]
end
    
    subgraph "Facade Layer"
        UsGaapStore["UsGaapStore\nUnified data access\nOrchestrates ingestion & retrieval"]
end
    
    subgraph "Core Operations"
        Ingest["ingest_us_gaap_csvs()\nCSV → Database → WebSocket"]
GenPCA["generate_pca_embeddings()\nPCA compression pipeline"]
Lookup["lookup_by_index()\nRetrieve triplet + metadata"]
end
    
 
   DbUsGaap --> UsGaapStore
 
   DataStoreWsClient --> UsGaapStore
 
   UsGaapStore --> Ingest
 
   UsGaapStore --> GenPCA
 
   UsGaapStore --> Lookup
ComponentTypePurpose
DbUsGaapDatabase InterfaceProvides async MySQL access to stored US GAAP data
DataStoreWsClientWebSocket ClientConnects to simd-r-drive server for key-value storage
UsGaapStoreFacadeCoordinates ingestion, embedding generation, and data retrieval

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-40

CSV Data Ingestion

The ingestion process begins by reading CSV files produced by the Rust sec-fetcher application. These files are organized by ticker symbol and contain US GAAP fundamental concepts extracted from SEC filings.

Directory Structure

CSV files are read from a configurable directory path that points to the Rust application's output:

graph LR
 
   ProjectPaths["project_paths.rust_data"] --> CSVDir["csv_data_dir\nPath to CSV directory"]
CSVDir --> Files["Symbol CSV Files\nAAPL.csv, GOOGL.csv, etc."]
Files --> Walker["walk_us_gaap_csvs()\nGenerator function"]

The ingestion system walks through the directory structure, processing one CSV file per symbol. Each file contains multiple filing entries with concept/value/unit triplets.

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:20-23

Ingestion Invocation

The primary ingestion method is us_gaap_store.ingest_us_gaap_csvs(), which accepts a CSV directory path and database connection:

This method orchestrates:

  1. CSV file walking and parsing
  2. Concept/unit pair extraction
  3. Value aggregation per pair
  4. Scaler fitting and transformation
  5. Storage in both MySQL and simd-r-drive

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb67

Concept/Unit Pair Extraction

Financial data in CSV files contains concept/value/unit triplets. The preprocessing pipeline extracts unique (concept, unit) pairs and groups all values associated with each pair.

Data Structure

Each row in the CSV contains:

  • Concept : A FundamentalConcept variant (e.g., "Assets", "Revenues", "NetIncomeLoss")
  • Unit of Measure (UOM) : The measurement unit (e.g., "USD", "shares", "USD_per_share")
  • Value : The numeric value reported in the filing
  • Additional Metadata : Period type, balance type, filing date, etc.

Grouping Strategy

Values are grouped by (concept, unit) combinations because:

  • Different concepts have different value ranges (e.g., Assets vs. EPS)
  • The same concept in different units requires separate scaling (e.g., Revenue in USD vs. Revenue in thousands)
  • This grouping enables per-pair statistical normalization

Each pair becomes associated with:

  • A semantic embedding vector
  • A fitted RobustScaler instance
  • A collection of normalized values

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:190-249

Semantic Embedding Generation

The system generates semantic embeddings for each unique (concept, unit) pair using a pre-trained language model. These embeddings capture the semantic meaning of the financial concept and its measurement unit.

graph LR
 
   Pairs["Concept/Unit Pairs\n(Revenues, USD)\n(Assets, USD)\n(NetIncomeLoss, USD)"] --> Model["Embedding Model\nPre-trained transformer"]
Model --> Vectors["Embedding Vectors\nFixed dimensionality\nSemantic representation"]
Vectors --> EmbedMap["embedding_map\nDict[(concept, unit) → embedding]"]

Embedding Process

The generate_concept_unit_embeddings() function creates fixed-dimensional vector representations:

Embedding Properties

PropertyDescription
DimensionalityFixed size (typically 384 or 768 dimensions)
Semantic PreservationSimilar concepts produce similar embeddings
DeterministicSame input always produces same output
Device-AgnosticCan be computed on CPU or GPU

The embeddings are stored in a mapping structure that associates each (concept, unit) pair with its corresponding vector. This mapping is used throughout the preprocessing and training pipelines.

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:279-292

Value Normalization with RobustScaler

Financial values exhibit extreme variance and outliers. The preprocessing pipeline uses RobustScaler from scikit-learn to normalize values within each (concept, unit) group.

RobustScaler Characteristics

RobustScaler is chosen over standard normalization techniques because:

  • Uses median and interquartile range (IQR) instead of mean and standard deviation
  • Resistant to outliers that are common in financial data
  • Scales each feature independently
  • Preserves zero values when appropriate

Scaling Formula

For each value in a (concept, unit) group:

scaled_value = (raw_value - median) / IQR

Where:

  • median is the 50th percentile of all values in the group
  • IQR is the interquartile range (75th percentile - 25th percentile)

Scaler Persistence

Each fitted RobustScaler instance is stored alongside the scaled values. This enables:

  • Inverse transformation for interpreting model outputs
  • Validation of scaling correctness
  • Consistent processing of new data points

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:108-166

PCA Dimensionality Reduction

The preprocessing pipeline applies Principal Component Analysis (PCA) to compress semantic embeddings while preserving most of the variance. This reduces memory footprint and training time.

PCA Configuration

ParameterValuePurpose
variance_threshold0.95Retain 95% of original variance
fit_dataAll concept/unit embeddingsLearn principal components
transform_dataSame embeddingsApply compression

Variance Explanation Visualization

The system provides visualization tools to analyze PCA effectiveness:

This function generates plots showing:

  • Cumulative explained variance vs. number of components
  • Individual component variance contribution
  • The optimal number of components to retain 95% variance

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:67-68 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:259-270

graph TB
    subgraph "Triplet Components"
        Concept["concept: str\nFundamentalConcept variant"]
Unit["unit: str\nUnit of measure"]
ScaledVal["scaled_value: float\nRobustScaler output"]
UnscaledVal["unscaled_value: float\nOriginal raw value"]
Scaler["scaler: RobustScaler\nFitted scaler instance"]
Embedding["embedding: ndarray\nPCA-compressed vector"]
end
    
    subgraph "Storage Format"
        Index["Index: int\nSequential triplet ID"]
Serialized["Serialized Data\nPickled Python object"]
end
    
 
   Concept --> Serialized
 
   Unit --> Serialized
 
   ScaledVal --> Serialized
 
   UnscaledVal --> Serialized
 
   Scaler --> Serialized
 
   Embedding --> Serialized
 
   Index --> Serialized
    
 
   Serialized --> WebSocket["DataStoreWsClient\nWebSocket write"]

Triplet Storage Structure

The final output of preprocessing is a collection of triplets stored in the simd-r-drive key-value store via DataStoreWsClient. Each triplet contains all information needed for training and inference.

Triplet Schema

Storage Interface

The UsGaapStore facade provides methods for retrieving stored triplets:

MethodPurpose
lookup_by_index(idx: int)Retrieve single triplet by sequential index
batch_lookup_by_indices(indices: List[int])Retrieve multiple triplets efficiently
get_triplet_count()Get total number of stored triplets
get_pair_count()Get number of unique (concept, unit) pairs
get_embedding_matrix()Retrieve full embedding matrix and pair list

Retrieval Example

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:78-97

Data Validation

The preprocessing pipeline includes validation mechanisms to ensure data integrity throughout the transformation process.

Scaler Verification

A validation check confirms that stored scaled values match the transformation applied by the stored scaler:

graph LR
 
   Lookup["Lookup triplet"] --> Extract["Extract unscaled_value\nand scaler"]
Extract --> Transform["scaler.transform()"]
Transform --> Compare["np.isclose()\ncheck vs scaled_value"]
Compare --> Pass["Validation Pass"]
Compare --> Fail["Validation Fail"]

This test:

  1. Retrieves a random triplet
  2. Re-applies the stored scaler to the unscaled value
  3. Verifies the result matches the stored scaled value
  4. Uses np.isclose() for floating-point tolerance

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:91-96

Visualization Tools

The preprocessing module provides visualization utilities for analyzing embedding quality and PCA effectiveness.

PCA Explanation Plot

The plot_pca_explanation() function generates cumulative variance plots:

This visualization shows:

  • How many principal components are needed to reach the variance threshold
  • The diminishing returns of adding more components
  • The optimal dimensionality for the compressed embeddings

Semantic Embedding Scatterplot

The plot_semantic_embeddings() function creates 2D projections of embeddings:

This visualization helps assess:

  • Clustering patterns in concept/unit pairs
  • Separation between different financial concepts
  • Embedding space structure

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:259-270 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:337-343

graph LR
    subgraph "Preprocessing Output"
        Triplets["Stored Triplets\nsimd-r-drive storage"]
end
    
    subgraph "Training Input"
        Dataset["IterableConceptValueDataset\nStreaming data loader"]
Collate["collate_with_scaler()\nBatch construction"]
DataLoader["PyTorch DataLoader\nTraining batches"]
end
    
 
   Triplets --> Dataset
 
   Dataset --> Collate
 
   Collate --> DataLoader

Integration with Training Pipeline

The preprocessed data serves as input to the machine learning training pipeline. The stored triplets are loaded via streaming datasets during training.

ComponentPurpose
Triplets in simd-r-drivePersistent storage of preprocessed data
IterableConceptValueDatasetStreams triplets without loading all into memory
collate_with_scalerCustom collation function for batch processing
PyTorch DataLoaderManages batching, shuffling, and parallel loading

For detailed information about the training pipeline, see Machine Learning Training Pipeline.

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522

Performance Considerations

The preprocessing pipeline is designed to handle large-scale financial datasets efficiently.

Memory Management

StrategyImplementation
Streaming CSV readingProcess files one at a time, not all in memory
Incremental scaler fittingUse online algorithms for large groups
Compressed storagePCA reduces embedding footprint by ~50-70%
WebSocket batchingDataStoreWsClient batches writes for efficiency
graph TB
    subgraph "Rust Caching Layer"
        HTTPCache["SIMD_R_DRIVE_HTTP_CACHE\nhttp_storage_cache.bin\n1 week TTL"]
PreCache["SIMD_R_DRIVE_PREPROCESSOR_CACHE\npreprocessor_cache.bin\nPersistent storage"]
end
    
    subgraph "Python Preprocessing"
        CSVRead["Read CSV files"]
Process["Transform & normalize"]
end
    
 
   HTTPCache -.->|Cached SEC data| CSVRead
 
   PreCache -.->|Cached computations| Process

Caching Strategy

The Rust layer provides HTTP and preprocessor caches that the preprocessing pipeline can leverage:

Sources: src/caches.rs:1-66