Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Database & Storage Integration

Relevant source files

Purpose and Scope

This document describes the database and storage infrastructure used by the Python narrative_stack system to persist and retrieve US GAAP financial data. The system employs a dual-storage architecture: MySQL for structured financial records and a WebSocket-based key-value store (simd-r-drive) for high-performance embedding and triplet data. This page covers the DbUsGaap database interface, DataStoreWsClient WebSocket integration, UsGaapStore facade pattern, triplet storage format, and embedding matrix management.

For information about CSV ingestion and preprocessing operations that populate these storage systems, see Data Ingestion & Preprocessing. For information about how the training pipeline consumes this stored data, see Machine Learning Training Pipeline.

Storage Architecture Overview

The narrative_stack system uses two complementary storage backends to optimize for different data access patterns:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-40

graph TB
    subgraph "Python narrative_stack"
        UsGaapStore["UsGaapStore\nUnified Facade"]
Preprocessing["Preprocessing Pipeline\ningest_us_gaap_csvs\ngenerate_pca_embeddings"]
Training["ML Training\nIterableConceptValueDataset"]
end
    
    subgraph "Storage Backends"
        DbUsGaap["DbUsGaap\nMySQL Interface\nus_gaap_test database"]
DataStoreWsClient["DataStoreWsClient\nWebSocket Client\nsimd-r-drive protocol"]
end
    
    subgraph "External Services"
        MySQL[("MySQL Server\nStructured Records\nSymbol/Concept/Value")]
        SimdRDrive["simd-r-drive-ws-server\nKey-Value Store\nEmbeddings/Triplets/Scalers"]
end
    
 
   Preprocessing -->|ingest CSV data| UsGaapStore
 
   UsGaapStore -->|store records| DbUsGaap
 
   UsGaapStore -->|store triplets/embeddings| DataStoreWsClient
 
   DbUsGaap -->|SQL queries| MySQL
 
   DataStoreWsClient -->|WebSocket protocol| SimdRDrive
 
   UsGaapStore -->|retrieve triplets| Training
 
   Training -->|batch lookups| UsGaapStore

The dual-storage design separates concerns:

  • MySQL stores structured financial data with SQL query capabilities for filtering and aggregation
  • simd-r-drive stores high-dimensional embeddings and preprocessed triplets for fast sequential access during training

MySQL Database Interface (DbUsGaap)

The DbUsGaap class provides an abstraction layer over MySQL database operations for US GAAP financial records.

Database Schema

The system expects a MySQL database named us_gaap_test with the following structure:

Table/FieldTypePurpose
Raw records table-Stores original CSV-ingested data
symbolVARCHARCompany ticker symbol
conceptVARCHARUS GAAP concept name
unitVARCHARUnit of measurement (USD, shares, etc.)
valueDECIMALNumeric value for the concept
formVARCHARSEC form type (10-K, 10-Q, etc.)
filing_dateDATEDate of filing

Sources: python/narrative_stack/us_gaap_store_integration_test.sh:30-35

DbUsGaap Connection Configuration

The DbUsGaap class is instantiated with configuration from db_config:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-18

The database connection is established during initialization and used for querying structured financial data during the ingestion phase. The DbUsGaap interface supports filtering by symbol, concept, and date ranges for selective data loading.

WebSocket Data Store (DataStoreWsClient)

The DataStoreWsClient provides a WebSocket-based interface to the simd-r-drive key-value store for high-performance binary data storage and retrieval.

simd-r-drive Integration

The simd-r-drive system is a custom WebSocket server that provides persistent key-value storage optimized for embedding matrices and preprocessed training data:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:33-40

graph TB
    subgraph "Python Client"
        WSClient["DataStoreWsClient\nWebSocket protocol handler"]
BatchWrite["batch_write\n[(key, value)]"]
BatchRead["batch_read\n[keys] → [values]"]
end
    
    subgraph "simd-r-drive Server"
        WSServer["simd-r-drive-ws-server\nWebSocket endpoint"]
DataStore["DataStore\nBinary file backend"]
end
    
 
   BatchWrite -->|WebSocket message| WSClient
 
   BatchRead -->|WebSocket message| WSClient
 
   WSClient <-->|ws://host:port| WSServer
 
   WSServer <-->|read/write| DataStore

Connection Initialization

The WebSocket client is instantiated with server configuration:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb38

Batch Operations

The DataStoreWsClient supports efficient batch read and write operations:

OperationInputOutputPurpose
batch_write[(key: bytes, value: bytes)]NoneStore multiple key-value pairs atomically
batch_read[key: bytes][value: bytes]Retrieve multiple values by key

The WebSocket protocol enables low-latency access to stored embeddings and preprocessed triplets during training iterations.

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:50-55

Rust-side Cache Storage

On the Rust side, the simd-r-drive storage system is accessed through the Caches module, which provides two cache stores:

Sources: src/caches.rs:7-65

graph TB
    ConfigManager["ConfigManager\ncache_base_dir"]
CachesInit["Caches::init\nInitialize cache stores"]
subgraph "Cache Stores"
        HTTPCache["SIMD_R_DRIVE_HTTP_CACHE\nhttp_storage_cache.bin\nOnceLock<Arc<DataStore>>"]
PreprocessorCache["SIMD_R_DRIVE_PREPROCESSOR_CACHE\npreprocessor_cache.bin\nOnceLock<Arc<DataStore>>"]
end
    
    HTTPCacheGet["Caches::get_http_cache_store"]
PreprocessorCacheGet["Caches::get_preprocessor_cache"]
ConfigManager -->|cache_base_dir path| CachesInit
 
   CachesInit -->|open DataStore| HTTPCache
 
   CachesInit -->|open DataStore| PreprocessorCache
 
   HTTPCache -->|Arc clone| HTTPCacheGet
 
   PreprocessorCache -->|Arc clone| PreprocessorCacheGet

The Rust implementation uses OnceLock<Arc<DataStore>> for thread-safe lazy initialization of cache stores. The DataStore::open method creates or opens persistent binary files at paths derived from ConfigManager:

  • http_storage_cache.bin - HTTP response caching
  • preprocessor_cache.bin - Preprocessed data caching

UsGaapStore Facade

The UsGaapStore class provides a unified interface that coordinates operations across both storage backends (MySQL and simd-r-drive):

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb40 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:65-68

graph TB
    subgraph "UsGaapStore Facade"
        IngestCSV["ingest_us_gaap_csvs\nCSV → MySQL + triplets"]
GeneratePCA["generate_pca_embeddings\nCompress embeddings"]
LookupByIndex["lookup_by_index\nRetrieve triplet"]
BatchLookup["batch_lookup_by_indices\nBulk retrieval"]
GetEmbeddingMatrix["get_embedding_matrix\nReturn all embeddings"]
GetTripletCount["get_triplet_count\nTotal stored triplets"]
GetPairCount["get_pair_count\nUnique concept/unit pairs"]
end
    
    subgraph "Backend Operations"
        MySQLWrite["DbUsGaap.insert\nStore raw records"]
SimdWrite["DataStoreWsClient.batch_write\nStore triplets/embeddings"]
SimdRead["DataStoreWsClient.batch_read\nRetrieve data"]
end
    
 
   IngestCSV -->|raw data| MySQLWrite
 
   IngestCSV -->|triplets| SimdWrite
 
   GeneratePCA -->|compressed embeddings| SimdWrite
 
   LookupByIndex -->|key query| SimdRead
 
   BatchLookup -->|batch keys| SimdRead
 
   GetEmbeddingMatrix -->|matrix key| SimdRead

UsGaapStore Initialization

The facade is initialized with a reference to the DataStoreWsClient:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb40

Key Methods

MethodParametersReturnsPurpose
ingest_us_gaap_csvscsv_data_dir: Path, db_us_gaap: DbUsGaapNoneWalk CSV directory, parse records, store to both backends
generate_pca_embeddingsNoneNoneApply PCA dimensionality reduction to stored embeddings
lookup_by_indexindex: intTriplet dictRetrieve single triplet by sequential index
batch_lookup_by_indicesindices: List[int]List[Triplet dict]Retrieve multiple triplets efficiently
get_embedding_matrixNone(ndarray, List[Tuple])Return full embedding matrix and concept/unit pairs
get_triplet_countNoneintTotal number of stored triplets
get_pair_countNoneintNumber of unique (concept, unit) pairs

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:65-96

The UsGaapStore abstracts storage implementation details, allowing preprocessing and training code to interact with a simple, high-level API.

Triplet Storage Format

The core data structure stored in simd-r-drive is the triplet : a tuple of (concept, unit, scaled_value) along with associated metadata.

Triplet Structure

Each triplet stored in simd-r-drive contains the following fields:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:88-96

Scaler Validation

The storage system maintains consistency between stored values and scalers. During retrieval, the system can verify that the scaler produces the expected scaled value:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:92-96

This validation ensures that the RobustScaler instance stored with each (concept, unit) pair correctly transforms the unscaled value to the stored scaled value, maintaining data integrity across storage and retrieval cycles.

graph LR
    Index0["Index 0\nTriplet 0"]
Index1["Index 1\nTriplet 1"]
Index2["Index 2\nTriplet 2"]
IndexN["Index N\nTriplet N"]
BatchLookup["batch_lookup_by_indices\n[0, 1, 2, ..., N]"]
Index0 -->|retrieve| BatchLookup
 
   Index1 -->|retrieve| BatchLookup
 
   Index2 -->|retrieve| BatchLookup
 
   IndexN -->|retrieve| BatchLookup

Triplet Indexing

Triplets are stored with sequential integer indices, enabling efficient batch retrieval during training:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:87-89

The sequential indexing scheme allows the training dataset to iterate efficiently through all stored triplets without loading the entire dataset into memory.

graph TB
    subgraph "Embedding Matrix Generation"
        ConceptUnitPairs["Unique Pairs\n(concept, unit)\nCollected during ingestion"]
GenerateEmbeddings["generate_concept_unit_embeddings\nSentence transformer model"]
PCACompression["PCA Compression\nVariance threshold: 0.95"]
EmbeddingMatrix["Embedding Matrix\nShape: (n_pairs, embedding_dim)"]
end
    
    subgraph "Storage"
        StorePairs["Store pairs list\nKey: 'concept_unit_pairs'"]
StoreMatrix["Store matrix\nKey: 'embedding_matrix'"]
end
    
 
   ConceptUnitPairs -->|semantic text| GenerateEmbeddings
 
   GenerateEmbeddings -->|full embeddings| PCACompression
 
   PCACompression -->|compressed| EmbeddingMatrix
 
   EmbeddingMatrix --> StoreMatrix
 
   ConceptUnitPairs --> StorePairs

Embedding Matrix Management

The UsGaapStore maintains a pre-computed embedding matrix that maps each unique (concept, unit) pair to its semantic embedding vector.

Embedding Matrix Structure

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb68 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:263-269

Retrieval and Usage

The embedding matrix can be retrieved for analysis or visualization:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:106-108 python/narrative_stack/notebooks/stage1_preprocessing.ipynb263

During training, each triplet's embedding is retrieved from storage rather than recomputed, ensuring consistent representations across training runs.

PCA Dimensionality Reduction

The generate_pca_embeddings method applies PCA to compress the raw semantic embeddings while retaining 95% of the variance:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb68

The PCA compression reduces storage requirements and computational overhead during training while maintaining semantic relationships between concept/unit pairs.

Data Flow Integration

The storage system integrates with both preprocessing and training workflows:

Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:65-68 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522

Training Data Access Pattern

During training, the IterableConceptValueDataset streams triplets from storage:

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-512

The dataset communicates with the simd-r-drive WebSocket server to stream triplets, avoiding memory exhaustion on large datasets. The collate_with_scaler function combines triplets into batches while preserving scaler metadata for potential validation or analysis.

Integration Testing

The storage integration is validated through automated integration tests:

Sources: python/narrative_stack/us_gaap_store_integration_test.sh:1-39

The integration test script:

  1. Starts isolated Docker containers for MySQL and simd-r-drive using docker compose with project name us_gaap_it
  2. Waits for MySQL readiness using mysqladmin ping
  3. Creates the us_gaap_test database and loads the schema from tests/integration/assets/us_gaap_schema_2025.sql
  4. Runs pytest integration tests against the live services
  5. Tears down containers and volumes in cleanup trap

This ensures that the storage layer functions correctly across both backends before code is merged.

Sources: python/narrative_stack/us_gaap_store_integration_test.sh:8-38