This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Machine Learning Training Pipeline
Loading…
Machine Learning Training Pipeline
Relevant source files
- python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynbipynb)
- python/narrative_stack/notebooks/stage1_preprocessing.ipynb
Purpose and Scope
This page documents the machine learning training pipeline for the narrative_stack system, specifically the Stage 1 autoencoder that learns latent representations of US GAAP financial concepts. The training pipeline consumes preprocessed concept/unit/value triplets and their semantic embeddings (see Data Ingestion & Preprocessing) to train a neural network that can encode financial data into a compressed latent space.
The pipeline uses PyTorch Lightning for training orchestration, implements custom iterable datasets for efficient data streaming from the simd-r-drive WebSocket server, and provides comprehensive experiment tracking through TensorBoard. For details on the underlying storage mechanisms and data retrieval, see Database & Storage Integration.
Training Pipeline Architecture
The training pipeline operates as a streaming system that continuously fetches preprocessed triplets from the UsGaapStore and feeds them through the autoencoder model. The architecture emphasizes memory efficiency and reproducibility.
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb456-556
graph TB
subgraph "Data Source Layer"
DataStoreWsClient["DataStoreWsClient\n(simd_r_drive_ws_client)"]
UsGaapStore["UsGaapStore\nlookup_by_index()"]
end
subgraph "Dataset Layer"
IterableDataset["IterableConceptValueDataset\ninternal_batch_size=64\nreturn_scaler=True\nshuffle=True/False"]
CollateFunction["collate_with_scaler()\nBatch construction"]
end
subgraph "PyTorch Lightning Training Loop"
DataLoader["DataLoader\nbatch_size from hparams\nnum_workers=2\npin_memory=True\npersistent_workers=True"]
Model["Stage1Autoencoder\nEncoder → Latent → Decoder"]
Optimizer["Adam Optimizer\n+ CosineAnnealingWarmRestarts\nReduceLROnPlateau"]
Trainer["pl.Trainer\nEarlyStopping\nModelCheckpoint\ngradient_clip_val"]
end
subgraph "Monitoring & Persistence"
TensorBoard["TensorBoardLogger\ntrain_loss\nval_loss_epoch\nlearning_rate"]
Checkpoints["Model Checkpoints\n.ckpt files\nsave_top_k=1\nmonitor='val_loss_epoch'"]
end
DataStoreWsClient --> UsGaapStore
UsGaapStore --> IterableDataset
IterableDataset --> CollateFunction
CollateFunction --> DataLoader
DataLoader --> Model
Model --> Optimizer
Optimizer --> Trainer
Trainer --> TensorBoard
Trainer --> Checkpoints
Checkpoints -.->|Resume training| Model
Stage1Autoencoder Model
Model Architecture
The Stage1Autoencoder is a fully-connected autoencoder that learns to compress financial concept embeddings combined with their scaled values into a lower-dimensional latent space. The model reconstructs its input, forcing the latent representation to capture the most important features.
graph LR
Input["Input Tensor\n[embedding + scaled_value]\nDimension: embedding_dim + 1"]
Encoder["Encoder Network\nfc1 → dropout → ReLU\nfc2 → dropout → ReLU"]
Latent["Latent Space\nDimension: latent_dim"]
Decoder["Decoder Network\nfc3 → dropout → ReLU\nfc4 → dropout → output"]
Output["Reconstructed Input\nSame dimension as input"]
Loss["MSE Loss\ninput vs output"]
Input --> Encoder
Encoder --> Latent
Latent --> Decoder
Decoder --> Output
Output --> Loss
Input --> Loss
Hyperparameters
The model exposes the following configurable hyperparameters through its hparams attribute:
| Parameter | Description | Typical Value |
|---|---|---|
input_dim | Dimension of input (embedding + 1 for value) | Varies based on embedding size |
latent_dim | Dimension of compressed latent space | 64-128 |
dropout_rate | Dropout probability for regularization | 0.0-0.2 |
lr | Initial learning rate | 1e-5 to 5e-5 |
min_lr | Minimum learning rate for scheduler | 1e-7 to 1e-6 |
batch_size | Training batch size | 32 (typical) |
weight_decay | L2 regularization parameter | 1e-8 to 1e-4 |
gradient_clip | Maximum gradient norm | 0.0-1.0 |
The model can be instantiated with default parameters or loaded from a checkpoint with overridden hyperparameters:
Loading from checkpoint with modified learning rate: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-486
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-490
Loss Function and Optimization
The model uses Mean Squared Error (MSE) loss between the input and reconstructed output. The optimization strategy combines:
- Adam optimizer with configurable learning rate and weight decay
- CosineAnnealingWarmRestarts scheduler for cyclical learning rate annealing
- ReduceLROnPlateau for adaptive learning rate reduction when validation loss plateaus
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-490
Dataset and Data Loading
IterableConceptValueDataset
The IterableConceptValueDataset is a custom PyTorch IterableDataset that streams training data from the UsGaapStore without loading the entire dataset into memory. This design enables training on datasets larger than available RAM.
Key characteristics:
graph TB
subgraph "Dataset Initialization"
Config["simd_r_drive_server_config\nhost + port"]
Params["Dataset Parameters\ninternal_batch_size\nreturn_scaler\nshuffle"]
end
subgraph "Data Streaming Process"
Store["UsGaapStore instance\nget_triplet_count()"]
IndexGen["Index Generator\nSequential or shuffled\nbased on shuffle param"]
BatchFetch["Internal Batching\nFetch internal_batch_size items\nvia batch_lookup_by_indices()"]
Unpack["Unpack Triplet Data\nembedding\nscaled_value\nscaler (optional)"]
end
subgraph "Output"
Tensor["PyTorch Tensors\nx: [embedding + scaled_value]\ny: [embedding + scaled_value]\nscaler: RobustScaler object"]
end
Config --> Store
Params --> IndexGen
Store --> IndexGen
IndexGen --> BatchFetch
BatchFetch --> Unpack
Unpack --> Tensor
- Iterable streaming : Data is fetched on-demand during iteration, not preloaded
- Internal batching : Fetches
internal_batch_sizeitems at once from the WebSocket server to reduce network overhead (typically 64) - Optional shuffling : Can randomize index order for training or maintain sequential order for validation
- Scaler inclusion : Optionally returns the
RobustScalerobject with each sample for validation purposes
Dataset instantiation: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522
DataLoader Configuration
PyTorch DataLoader instances wrap the iterable dataset with additional configuration for efficient batch loading:
| Parameter | Value | Purpose |
|---|---|---|
batch_size | From model.hparams.batch_size | Outer batch size for model training |
collate_fn | collate_with_scaler | Custom collation function to handle scaler objects |
num_workers | 2 | Number of parallel data loading processes |
pin_memory | True | Enables faster host-to-GPU transfers |
persistent_workers | True | Keeps worker processes alive between epochs |
prefetch_factor | 4 | Number of batches each worker pre-fetches |
Training loader uses shuffle=True in the dataset while validation loader uses shuffle=False to ensure reproducible validation metrics.
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522
collate_with_scaler Function
The collate_with_scaler function handles batch construction when the dataset returns triplets (x, y, scaler). It stacks the tensors into batches while preserving the scaler objects in a list:
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb506-507
graph LR
Input["Batch Items\n[(x1, y1, scaler1),\n(x2, y2, scaler2),\n...]"]
Stack["Stack Tensors\ntorch.stack(x_list)\ntorch.stack(y_list)"]
Output["Batched Output\n(x_batch, y_batch, scaler_list)"]
Input --> Stack
Stack --> Output
Training Configuration
PyTorch Lightning Trainer Setup
The training pipeline uses PyTorch Lightning’s Trainer class to orchestrate the training loop, validation, and callbacks:
Configuration values:
graph TB
subgraph "Training Configuration"
MaxEpochs["max_epochs\nTypically 1000"]
Accelerator["accelerator='auto'\ndevices=1"]
GradientClip["gradient_clip_val\nFrom model.hparams"]
end
subgraph "Callbacks"
EarlyStopping["EarlyStopping\nmonitor='val_loss_epoch'\npatience=20\nmode='min'"]
ModelCheckpoint["ModelCheckpoint\nmonitor='val_loss_epoch'\nsave_top_k=1\nfilename='stage1_resume'"]
end
subgraph "Logging"
TensorBoardLogger["TensorBoardLogger\nlog_dir=OUTPUT_PATH\nname='stage1_autoencoder'"]
end
Trainer["pl.Trainer"]
MaxEpochs --> Trainer
Accelerator --> Trainer
GradientClip --> Trainer
EarlyStopping --> Trainer
ModelCheckpoint --> Trainer
TensorBoardLogger --> Trainer
- EPOCHS : 1000 (allows for long training with early stopping)
- PATIENCE : 20 (compensates for learning rate annealing periods)
- OUTPUT_PATH :
project_paths.python_data / "stage1_23_(no_pre_dedupe)"or similar versioned directory
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb468-548
Callbacks
EarlyStopping
The EarlyStopping callback monitors val_loss_epoch and stops training if no improvement occurs for 20 consecutive epochs. This prevents overfitting and saves computational resources.
python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb528-530
ModelCheckpoint
The ModelCheckpoint callback saves the best model based on validation loss:
python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb532-539
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb528-539
Training Execution
The training process is initiated with the Trainer.fit() method:
python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb550-556
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb550-556
Checkpointing and Resuming Training
Saving Checkpoints
Checkpoints are automatically saved by the ModelCheckpoint callback when validation loss improves. The checkpoint file contains:
- Model weights (encoder and decoder parameters)
- Optimizer state
- Learning rate scheduler state
- All hyperparameters (
hparams) - Current epoch number
- Best validation loss
Checkpoint files are saved with the .ckpt extension in the OUTPUT_PATH directory.
Loading and Resuming
The training pipeline supports two modes of checkpoint loading:
1. Resume Training with Existing Configuration
Pass ckpt_path to trainer.fit() to resume training with the exact state from the checkpoint:
python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb555
2. Fine-tune with Modified Hyperparameters
Load the checkpoint explicitly and override specific hyperparameters before training:
This approach is useful for fine-tuning with a lower learning rate after initial training convergence.
python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-486
Checkpoint File Naming
The checkpoint filename is determined by the ModelCheckpoint configuration:
- filename :
"stage1_resume"produces files likestage1_resume.ckptorstage1_resume-v1.ckpt,stage1_resume-v2.ckpt, etc. - Versioning : PyTorch Lightning automatically increments version numbers when the same filename exists
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb475-486 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb532-539 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb550-556
graph TB
subgraph "Logged Metrics"
TrainLoss["train_loss\nPer-batch training loss"]
ValLoss["val_loss_epoch\nEpoch-averaged validation loss"]
LearningRate["learning_rate\nCurrent optimizer LR"]
end
subgraph "TensorBoard Logger"
Logger["TensorBoardLogger\nsave_dir=OUTPUT_PATH\nname='stage1_autoencoder'"]
end
subgraph "Log Directory Structure"
OutputPath["OUTPUT_PATH/"]
VersionDir["stage1_autoencoder/"]
Events["events.out.tfevents.*\nhparams.yaml"]
Checkpoints["*.ckpt checkpoint files"]
end
TrainLoss --> Logger
ValLoss --> Logger
LearningRate --> Logger
Logger --> OutputPath
OutputPath --> VersionDir
VersionDir --> Events
OutputPath --> Checkpoints
Monitoring and Logging
TensorBoard Integration
The TensorBoardLogger automatically logs training metrics for visualization in TensorBoard:
Log directory structure:
OUTPUT_PATH/
├── stage1_autoencoder/
│ └── version_0/
│ ├── events.out.tfevents.*
│ ├── hparams.yaml
│ └── checkpoints/
├── stage1_resume.ckpt
└── optuna_study.db (if using Optuna)
python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb543
Visualizing Training Progress
Launch TensorBoard to monitor training:
TensorBoard displays:
- Loss curves : Training and validation loss over epochs
- Learning rate schedule : Visualization of LR changes during training
- Hyperparameters : Table of all model hyperparameters
- Gradient histograms : Distribution of gradients (if enabled)
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb541-548
Training Workflow Summary
The complete training workflow follows this sequence:
This pipeline design enables:
- Memory-efficient training through streaming data loading
- Reproducible experiments with comprehensive logging
- Flexible resumption with hyperparameter overrides
- Robust training with gradient clipping and early stopping
Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb456-556
Dismiss
Refresh this wiki
Enter email to refresh