Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Machine Learning Training Pipeline

Relevant source files

Purpose and Scope

This page documents the machine learning training pipeline for the narrative_stack system, specifically the Stage 1 autoencoder that learns latent representations of US GAAP financial concepts. The training pipeline consumes preprocessed concept/unit/value triplets and their semantic embeddings (see Data Ingestion & Preprocessing) to train a neural network that can encode financial data into a compressed latent space.

The pipeline uses PyTorch Lightning for training orchestration, implements custom iterable datasets for efficient data streaming from the simd-r-drive WebSocket server, and provides comprehensive experiment tracking through TensorBoard. For details on the underlying storage mechanisms and data retrieval, see Database & Storage Integration.

Training Pipeline Architecture

The training pipeline operates as a streaming system that continuously fetches preprocessed triplets from the UsGaapStore and feeds them through the autoencoder model. The architecture emphasizes memory efficiency and reproducibility.

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb456-556

graph TB
    subgraph "Data Source Layer"
        DataStoreWsClient["DataStoreWsClient\n(simd_r_drive_ws_client)"]
UsGaapStore["UsGaapStore\nlookup_by_index()"]
end
    
    subgraph "Dataset Layer"
        IterableDataset["IterableConceptValueDataset\ninternal_batch_size=64\nreturn_scaler=True\nshuffle=True/False"]
CollateFunction["collate_with_scaler()\nBatch construction"]
end
    
    subgraph "PyTorch Lightning Training Loop"
        DataLoader["DataLoader\nbatch_size from hparams\nnum_workers=2\npin_memory=True\npersistent_workers=True"]
Model["Stage1Autoencoder\nEncoder → Latent → Decoder"]
Optimizer["Adam Optimizer\n+ CosineAnnealingWarmRestarts\nReduceLROnPlateau"]
Trainer["pl.Trainer\nEarlyStopping\nModelCheckpoint\ngradient_clip_val"]
end
    
    subgraph "Monitoring & Persistence"
        TensorBoard["TensorBoardLogger\ntrain_loss\nval_loss_epoch\nlearning_rate"]
Checkpoints["Model Checkpoints\n.ckpt files\nsave_top_k=1\nmonitor='val_loss_epoch'"]
end
    
 
   DataStoreWsClient --> UsGaapStore
 
   UsGaapStore --> IterableDataset
 
   IterableDataset --> CollateFunction
 
   CollateFunction --> DataLoader
    
 
   DataLoader --> Model
 
   Model --> Optimizer
 
   Optimizer --> Trainer
    
 
   Trainer --> TensorBoard
 
   Trainer --> Checkpoints
    
 
   Checkpoints -.->|Resume training| Model

Stage1Autoencoder Model

Model Architecture

The Stage1Autoencoder is a fully-connected autoencoder that learns to compress financial concept embeddings combined with their scaled values into a lower-dimensional latent space. The model reconstructs its input, forcing the latent representation to capture the most important features.

graph LR
    Input["Input Tensor\n[embedding + scaled_value]\nDimension: embedding_dim + 1"]
Encoder["Encoder Network\nfc1 → dropout → ReLU\nfc2 → dropout → ReLU"]
Latent["Latent Space\nDimension: latent_dim"]
Decoder["Decoder Network\nfc3 → dropout → ReLU\nfc4 → dropout → output"]
Output["Reconstructed Input\nSame dimension as input"]
Loss["MSE Loss\ninput vs output"]
Input --> Encoder
 
   Encoder --> Latent
 
   Latent --> Decoder
 
   Decoder --> Output
 
   Output --> Loss
 
   Input --> Loss

Hyperparameters

The model exposes the following configurable hyperparameters through its hparams attribute:

ParameterDescriptionTypical Value
input_dimDimension of input (embedding + 1 for value)Varies based on embedding size
latent_dimDimension of compressed latent space64-128
dropout_rateDropout probability for regularization0.0-0.2
lrInitial learning rate1e-5 to 5e-5
min_lrMinimum learning rate for scheduler1e-7 to 1e-6
batch_sizeTraining batch size32 (typical)
weight_decayL2 regularization parameter1e-8 to 1e-4
gradient_clipMaximum gradient norm0.0-1.0

The model can be instantiated with default parameters or loaded from a checkpoint with overridden hyperparameters:

Loading from checkpoint with modified learning rate: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-486

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-490

Loss Function and Optimization

The model uses Mean Squared Error (MSE) loss between the input and reconstructed output. The optimization strategy combines:

  • Adam optimizer with configurable learning rate and weight decay
  • CosineAnnealingWarmRestarts scheduler for cyclical learning rate annealing
  • ReduceLROnPlateau for adaptive learning rate reduction when validation loss plateaus

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-490

Dataset and Data Loading

IterableConceptValueDataset

The IterableConceptValueDataset is a custom PyTorch IterableDataset that streams training data from the UsGaapStore without loading the entire dataset into memory. This design enables training on datasets larger than available RAM.

Key characteristics:

graph TB
    subgraph "Dataset Initialization"
        Config["simd_r_drive_server_config\nhost + port"]
Params["Dataset Parameters\ninternal_batch_size\nreturn_scaler\nshuffle"]
end
    
    subgraph "Data Streaming Process"
        Store["UsGaapStore instance\nget_triplet_count()"]
IndexGen["Index Generator\nSequential or shuffled\nbased on shuffle param"]
BatchFetch["Internal Batching\nFetch internal_batch_size items\nvia batch_lookup_by_indices()"]
Unpack["Unpack Triplet Data\nembedding\nscaled_value\nscaler (optional)"]
end
    
    subgraph "Output"
        Tensor["PyTorch Tensors\nx: [embedding + scaled_value]\ny: [embedding + scaled_value]\nscaler: RobustScaler object"]
end
    
 
   Config --> Store
 
   Params --> IndexGen
 
   Store --> IndexGen
 
   IndexGen --> BatchFetch
 
   BatchFetch --> Unpack
 
   Unpack --> Tensor
  • Iterable streaming : Data is fetched on-demand during iteration, not preloaded
  • Internal batching : Fetches internal_batch_size items at once from the WebSocket server to reduce network overhead (typically 64)
  • Optional shuffling : Can randomize index order for training or maintain sequential order for validation
  • Scaler inclusion : Optionally returns the RobustScaler object with each sample for validation purposes

Dataset instantiation: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522

DataLoader Configuration

PyTorch DataLoader instances wrap the iterable dataset with additional configuration for efficient batch loading:

ParameterValuePurpose
batch_sizeFrom model.hparams.batch_sizeOuter batch size for model training
collate_fncollate_with_scalerCustom collation function to handle scaler objects
num_workers2Number of parallel data loading processes
pin_memoryTrueEnables faster host-to-GPU transfers
persistent_workersTrueKeeps worker processes alive between epochs
prefetch_factor4Number of batches each worker pre-fetches

Training loader uses shuffle=True in the dataset while validation loader uses shuffle=False to ensure reproducible validation metrics.

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb502-522

collate_with_scaler Function

The collate_with_scaler function handles batch construction when the dataset returns triplets (x, y, scaler). It stacks the tensors into batches while preserving the scaler objects in a list:

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb175-176 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb506-507

graph LR
    Input["Batch Items\n[(x1, y1, scaler1),\n(x2, y2, scaler2),\n...]"]
Stack["Stack Tensors\ntorch.stack(x_list)\ntorch.stack(y_list)"]
Output["Batched Output\n(x_batch, y_batch, scaler_list)"]
Input --> Stack
 
   Stack --> Output

Training Configuration

PyTorch Lightning Trainer Setup

The training pipeline uses PyTorch Lightning's Trainer class to orchestrate the training loop, validation, and callbacks:

Configuration values:

graph TB
    subgraph "Training Configuration"
        MaxEpochs["max_epochs\nTypically 1000"]
Accelerator["accelerator='auto'\ndevices=1"]
GradientClip["gradient_clip_val\nFrom model.hparams"]
end
    
    subgraph "Callbacks"
        EarlyStopping["EarlyStopping\nmonitor='val_loss_epoch'\npatience=20\nmode='min'"]
ModelCheckpoint["ModelCheckpoint\nmonitor='val_loss_epoch'\nsave_top_k=1\nfilename='stage1_resume'"]
end
    
    subgraph "Logging"
        TensorBoardLogger["TensorBoardLogger\nlog_dir=OUTPUT_PATH\nname='stage1_autoencoder'"]
end
    
    Trainer["pl.Trainer"]
MaxEpochs --> Trainer
 
   Accelerator --> Trainer
 
   GradientClip --> Trainer
 
   EarlyStopping --> Trainer
 
   ModelCheckpoint --> Trainer
 
   TensorBoardLogger --> Trainer
  • EPOCHS : 1000 (allows for long training with early stopping)
  • PATIENCE : 20 (compensates for learning rate annealing periods)
  • OUTPUT_PATH : project_paths.python_data / "stage1_23_(no_pre_dedupe)" or similar versioned directory

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb468-548

Callbacks

EarlyStopping

The EarlyStopping callback monitors val_loss_epoch and stops training if no improvement occurs for 20 consecutive epochs. This prevents overfitting and saves computational resources.

python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb528-530

ModelCheckpoint

The ModelCheckpoint callback saves the best model based on validation loss:

python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb532-539

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb528-539

Training Execution

The training process is initiated with the Trainer.fit() method:

python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb550-556

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb550-556

Checkpointing and Resuming Training

Saving Checkpoints

Checkpoints are automatically saved by the ModelCheckpoint callback when validation loss improves. The checkpoint file contains:

  • Model weights (encoder and decoder parameters)
  • Optimizer state
  • Learning rate scheduler state
  • All hyperparameters (hparams)
  • Current epoch number
  • Best validation loss

Checkpoint files are saved with the .ckpt extension in the OUTPUT_PATH directory.

Loading and Resuming

The training pipeline supports two modes of checkpoint loading:

1. Resume Training with Existing Configuration

Pass ckpt_path to trainer.fit() to resume training with the exact state from the checkpoint:

python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb555

2. Fine-tune with Modified Hyperparameters

Load the checkpoint explicitly and override specific hyperparameters before training:

This approach is useful for fine-tuning with a lower learning rate after initial training convergence.

python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb479-486

Checkpoint File Naming

The checkpoint filename is determined by the ModelCheckpoint configuration:

  • filename : "stage1_resume" produces files like stage1_resume.ckpt or stage1_resume-v1.ckpt, stage1_resume-v2.ckpt, etc.
  • Versioning : PyTorch Lightning automatically increments version numbers when the same filename exists

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb475-486 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb532-539 python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb550-556

graph TB
    subgraph "Logged Metrics"
        TrainLoss["train_loss\nPer-batch training loss"]
ValLoss["val_loss_epoch\nEpoch-averaged validation loss"]
LearningRate["learning_rate\nCurrent optimizer LR"]
end
    
    subgraph "TensorBoard Logger"
        Logger["TensorBoardLogger\nsave_dir=OUTPUT_PATH\nname='stage1_autoencoder'"]
end
    
    subgraph "Log Directory Structure"
        OutputPath["OUTPUT_PATH/"]
VersionDir["stage1_autoencoder/"]
Events["events.out.tfevents.*\nhparams.yaml"]
Checkpoints["*.ckpt checkpoint files"]
end
    
 
   TrainLoss --> Logger
 
   ValLoss --> Logger
 
   LearningRate --> Logger
    
 
   Logger --> OutputPath
 
   OutputPath --> VersionDir
 
   VersionDir --> Events
 
   OutputPath --> Checkpoints

Monitoring and Logging

TensorBoard Integration

The TensorBoardLogger automatically logs training metrics for visualization in TensorBoard:

Log directory structure:

OUTPUT_PATH/
├── stage1_autoencoder/
│   └── version_0/
│       ├── events.out.tfevents.*
│       ├── hparams.yaml
│       └── checkpoints/
├── stage1_resume.ckpt
└── optuna_study.db (if using Optuna)

python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb543

Visualizing Training Progress

Launch TensorBoard to monitor training:

TensorBoard displays:

  • Loss curves : Training and validation loss over epochs
  • Learning rate schedule : Visualization of LR changes during training
  • Hyperparameters : Table of all model hyperparameters
  • Gradient histograms : Distribution of gradients (if enabled)

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb541-548

Training Workflow Summary

The complete training workflow follows this sequence:

This pipeline design enables:

  • Memory-efficient training through streaming data loading
  • Reproducible experiments with comprehensive logging
  • Flexible resumption with hyperparameter overrides
  • Robust training with gradient clipping and early stopping

Sources: python/narrative_stack/notebooks/old.stage1_training_(no_pre_dedupe).ipynb456-556