This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Database & Storage Integration
Relevant source files
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynbipynb)
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb
- python/narrative_stack/us_gaap_store_integration_test.sh
- src/caches.rs
Purpose and Scope
This document describes the database and storage infrastructure used by the Python narrative_stack system to persist and retrieve US GAAP financial data. The system employs a dual-storage architecture: MySQL for structured financial records and a WebSocket-based key-value store (simd-r-drive) for high-performance embedding and triplet data. This page covers the DbUsGaap database interface, DataStoreWsClient WebSocket integration, UsGaapStore facade pattern, triplet storage format, and embedding matrix management.
For information about CSV ingestion and preprocessing operations that populate these storage systems, see Data Ingestion & Preprocessing. For information about how the training pipeline consumes this stored data, see Machine Learning Training Pipeline.
Storage Architecture Overview
The narrative_stack system uses two complementary storage backends to optimize for different data access patterns:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-40
graph TB
subgraph "Python narrative_stack"
UsGaapStore["UsGaapStore\nUnified Facade"]
Preprocessing["Preprocessing Pipeline\ningest_us_gaap_csvs\ngenerate_pca_embeddings"]
Training["ML Training\nIterableConceptValueDataset"]
end
subgraph "Storage Backends"
DbUsGaap["DbUsGaap\nMySQL Interface\nus_gaap_test database"]
DataStoreWsClient["DataStoreWsClient\nWebSocket Client\nsimd-r-drive protocol"]
end
subgraph "External Services"
MySQL[("MySQL Server\nStructured Records\nSymbol/Concept/Value")]
SimdRDrive["simd-r-drive-ws-server\nKey-Value Store\nEmbeddings/Triplets/Scalers"]
end
Preprocessing -->|ingest CSV data| UsGaapStore
UsGaapStore -->|store records| DbUsGaap
UsGaapStore -->|store triplets/embeddings| DataStoreWsClient
DbUsGaap -->|SQL queries| MySQL
DataStoreWsClient -->|WebSocket protocol| SimdRDrive
UsGaapStore -->|retrieve triplets| Training
Training -->|batch lookups| UsGaapStore
The dual-storage design separates concerns:
- MySQL stores structured financial data with SQL query capabilities for filtering and aggregation
- simd-r-drive stores high-dimensional embeddings and preprocessed triplets for fast sequential access during training
MySQL Database Interface (DbUsGaap)
The DbUsGaap class provides an abstraction layer over MySQL database operations for US GAAP financial records.
Database Schema
The system expects a MySQL database named us_gaap_test with the following structure:
| Table/Field | Type | Purpose |
|---|---|---|
| Raw records table | - | Stores original CSV-ingested data |
symbol | VARCHAR | Company ticker symbol |
concept | VARCHAR | US GAAP concept name |
unit | VARCHAR | Unit of measurement (USD, shares, etc.) |
value | DECIMAL | Numeric value for the concept |
form | VARCHAR | SEC form type (10-K, 10-Q, etc.) |
filing_date | DATE | Date of filing |
Sources: python/narrative_stack/us_gaap_store_integration_test.sh:30-35
DbUsGaap Connection Configuration
The DbUsGaap class is instantiated with configuration from db_config:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:13-18
The database connection is established during initialization and used for querying structured financial data during the ingestion phase. The DbUsGaap interface supports filtering by symbol, concept, and date ranges for selective data loading.
WebSocket Data Store (DataStoreWsClient)
The DataStoreWsClient provides a WebSocket-based interface to the simd-r-drive key-value store for high-performance binary data storage and retrieval.
simd-r-drive Integration
The simd-r-drive system is a custom WebSocket server that provides persistent key-value storage optimized for embedding matrices and preprocessed training data:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:33-40
graph TB
subgraph "Python Client"
WSClient["DataStoreWsClient\nWebSocket protocol handler"]
BatchWrite["batch_write\n[(key, value)]"]
BatchRead["batch_read\n[keys] → [values]"]
end
subgraph "simd-r-drive Server"
WSServer["simd-r-drive-ws-server\nWebSocket endpoint"]
DataStore["DataStore\nBinary file backend"]
end
BatchWrite -->|WebSocket message| WSClient
BatchRead -->|WebSocket message| WSClient
WSClient <-->|ws://host:port| WSServer
WSServer <-->|read/write| DataStore
Connection Initialization
The WebSocket client is instantiated with server configuration:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb38
Batch Operations
The DataStoreWsClient supports efficient batch read and write operations:
| Operation | Input | Output | Purpose |
|---|---|---|---|
batch_write | [(key: bytes, value: bytes)] | None | Store multiple key-value pairs atomically |
batch_read | [key: bytes] | [value: bytes] | Retrieve multiple values by key |
The WebSocket protocol enables low-latency access to stored embeddings and preprocessed triplets during training iterations.
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:50-55
Rust-side Cache Storage
On the Rust side, the simd-r-drive storage system is accessed through the Caches module, which provides two cache stores:
Sources: src/caches.rs:7-65
graph TB
ConfigManager["ConfigManager\ncache_base_dir"]
CachesInit["Caches::init\nInitialize cache stores"]
subgraph "Cache Stores"
HTTPCache["SIMD_R_DRIVE_HTTP_CACHE\nhttp_storage_cache.bin\nOnceLock<Arc<DataStore>>"]
PreprocessorCache["SIMD_R_DRIVE_PREPROCESSOR_CACHE\npreprocessor_cache.bin\nOnceLock<Arc<DataStore>>"]
end
HTTPCacheGet["Caches::get_http_cache_store"]
PreprocessorCacheGet["Caches::get_preprocessor_cache"]
ConfigManager -->|cache_base_dir path| CachesInit
CachesInit -->|open DataStore| HTTPCache
CachesInit -->|open DataStore| PreprocessorCache
HTTPCache -->|Arc clone| HTTPCacheGet
PreprocessorCache -->|Arc clone| PreprocessorCacheGet
The Rust implementation uses OnceLock<Arc<DataStore>> for thread-safe lazy initialization of cache stores. The DataStore::open method creates or opens persistent binary files at paths derived from ConfigManager:
http_storage_cache.bin- HTTP response cachingpreprocessor_cache.bin- Preprocessed data caching
UsGaapStore Facade
The UsGaapStore class provides a unified interface that coordinates operations across both storage backends (MySQL and simd-r-drive):
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb40 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:65-68
graph TB
subgraph "UsGaapStore Facade"
IngestCSV["ingest_us_gaap_csvs\nCSV → MySQL + triplets"]
GeneratePCA["generate_pca_embeddings\nCompress embeddings"]
LookupByIndex["lookup_by_index\nRetrieve triplet"]
BatchLookup["batch_lookup_by_indices\nBulk retrieval"]
GetEmbeddingMatrix["get_embedding_matrix\nReturn all embeddings"]
GetTripletCount["get_triplet_count\nTotal stored triplets"]
GetPairCount["get_pair_count\nUnique concept/unit pairs"]
end
subgraph "Backend Operations"
MySQLWrite["DbUsGaap.insert\nStore raw records"]
SimdWrite["DataStoreWsClient.batch_write\nStore triplets/embeddings"]
SimdRead["DataStoreWsClient.batch_read\nRetrieve data"]
end
IngestCSV -->|raw data| MySQLWrite
IngestCSV -->|triplets| SimdWrite
GeneratePCA -->|compressed embeddings| SimdWrite
LookupByIndex -->|key query| SimdRead
BatchLookup -->|batch keys| SimdRead
GetEmbeddingMatrix -->|matrix key| SimdRead
UsGaapStore Initialization
The facade is initialized with a reference to the DataStoreWsClient:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb40
Key Methods
| Method | Parameters | Returns | Purpose |
|---|---|---|---|
ingest_us_gaap_csvs | csv_data_dir: Path, db_us_gaap: DbUsGaap | None | Walk CSV directory, parse records, store to both backends |
generate_pca_embeddings | None | None | Apply PCA dimensionality reduction to stored embeddings |
lookup_by_index | index: int | Triplet dict | Retrieve single triplet by sequential index |
batch_lookup_by_indices | indices: List[int] | List[Triplet dict] | Retrieve multiple triplets efficiently |
get_embedding_matrix | None | (ndarray, List[Tuple]) | Return full embedding matrix and concept/unit pairs |
get_triplet_count | None | int | Total number of stored triplets |
get_pair_count | None | int | Number of unique (concept, unit) pairs |
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:65-96
The UsGaapStore abstracts storage implementation details, allowing preprocessing and training code to interact with a simple, high-level API.
Triplet Storage Format
The core data structure stored in simd-r-drive is the triplet : a tuple of (concept, unit, scaled_value) along with associated metadata.
Triplet Structure
Each triplet stored in simd-r-drive contains the following fields:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:88-96
Scaler Validation
The storage system maintains consistency between stored values and scalers. During retrieval, the system can verify that the scaler produces the expected scaled value:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:92-96
This validation ensures that the RobustScaler instance stored with each (concept, unit) pair correctly transforms the unscaled value to the stored scaled value, maintaining data integrity across storage and retrieval cycles.
graph LR
Index0["Index 0\nTriplet 0"]
Index1["Index 1\nTriplet 1"]
Index2["Index 2\nTriplet 2"]
IndexN["Index N\nTriplet N"]
BatchLookup["batch_lookup_by_indices\n[0, 1, 2, ..., N]"]
Index0 -->|retrieve| BatchLookup
Index1 -->|retrieve| BatchLookup
Index2 -->|retrieve| BatchLookup
IndexN -->|retrieve| BatchLookup
Triplet Indexing
Triplets are stored with sequential integer indices, enabling efficient batch retrieval during training:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:87-89
The sequential indexing scheme allows the training dataset to iterate efficiently through all stored triplets without loading the entire dataset into memory.
graph TB
subgraph "Embedding Matrix Generation"
ConceptUnitPairs["Unique Pairs\n(concept, unit)\nCollected during ingestion"]
GenerateEmbeddings["generate_concept_unit_embeddings\nSentence transformer model"]
PCACompression["PCA Compression\nVariance threshold: 0.95"]
EmbeddingMatrix["Embedding Matrix\nShape: (n_pairs, embedding_dim)"]
end
subgraph "Storage"
StorePairs["Store pairs list\nKey: 'concept_unit_pairs'"]
StoreMatrix["Store matrix\nKey: 'embedding_matrix'"]
end
ConceptUnitPairs -->|semantic text| GenerateEmbeddings
GenerateEmbeddings -->|full embeddings| PCACompression
PCACompression -->|compressed| EmbeddingMatrix
EmbeddingMatrix --> StoreMatrix
ConceptUnitPairs --> StorePairs
Embedding Matrix Management
The UsGaapStore maintains a pre-computed embedding matrix that maps each unique (concept, unit) pair to its semantic embedding vector.
Embedding Matrix Structure
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb68 python/narrative_stack/notebooks/stage1_preprocessing.ipynb:263-269
Retrieval and Usage
The embedding matrix can be retrieved for analysis or visualization:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:106-108 python/narrative_stack/notebooks/stage1_preprocessing.ipynb263
During training, each triplet's embedding is retrieved from storage rather than recomputed, ensuring consistent representations across training runs.
PCA Dimensionality Reduction
The generate_pca_embeddings method applies PCA to compress the raw semantic embeddings while retaining 95% of the variance:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb68
The PCA compression reduces storage requirements and computational overhead during training while maintaining semantic relationships between concept/unit pairs.
Data Flow Integration
The storage system integrates with both preprocessing and training workflows:
Sources: python/narrative_stack/notebooks/stage1_preprocessing.ipynb:65-68 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522
Training Data Access Pattern
During training, the IterableConceptValueDataset streams triplets from storage:
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-512
The dataset communicates with the simd-r-drive WebSocket server to stream triplets, avoiding memory exhaustion on large datasets. The collate_with_scaler function combines triplets into batches while preserving scaler metadata for potential validation or analysis.
Integration Testing
The storage integration is validated through automated integration tests:
Sources: python/narrative_stack/us_gaap_store_integration_test.sh:1-39
The integration test script:
- Starts isolated Docker containers for MySQL and simd-r-drive using
docker composewith project nameus_gaap_it - Waits for MySQL readiness using
mysqladmin ping - Creates the
us_gaap_testdatabase and loads the schema fromtests/integration/assets/us_gaap_schema_2025.sql - Runs pytest integration tests against the live services
- Tears down containers and volumes in cleanup trap
This ensures that the storage layer functions correctly across both backends before code is merged.
Sources: python/narrative_stack/us_gaap_store_integration_test.sh:8-38