Module nowcastlib.pipeline

Data processing and Model evaluation pipeline for the Nowcast Library

Pipeline

The Nowcast Library Pipeline is intended as a holistic yet modular system for processing and synchronizing data, fitting models to it, and evaluating these models.

While these steps are modular and hence able to be run independently, in practice the pipeline is intended to be run into 2 main steps:

  1. data wrangling
  2. model training and evaluation

pipeline flow

The dashed red line indicates the boundary between the two steps, with the output of the data wrangling step being fed into the model training and evaluation step.

Structs

The nowcastlib.pipeline submodule includes the nowcastlib.pipeline.structs module. These are custom classes that serve as skeletons for return types and more importantly as configuration options (from the config submodule) for setting up the various pipeline steps. To use the pipeline and any sub-pipeline within, users will use the top level DataSet class to specify their configuration, with the other structs being specified within. Configuration is intended to be provided in JSON format via a .json file. Example configuration files can be found in the examples folder on github under the naming convention pipeline_{cli subcommand}.json

Data Wrangling

The data wrangling "step" takes a set of data sources and outputs the resulting synchronized and processed data, organized in train, test and train-validation splits to feed into the model training and evaluation step. This step includes the following sub-pipelines, shown here in their intended order of execution.

  1. preprocessing
  2. synchronization
  3. postprocessing
  4. splitprocessing

While each of these steps can be run independently of the others, each step will implicitly run the processes of the previous steps leading up to it. For example, if the user were to run a postprocessing subroutine, the pipeline will also run the preprocessing and synchronization steps leading up to it. Of course, if the configuration is minimal for these implicit previous steps, then the actual processing itself will also be kept to a minimum.

Future iterations of the pipeline will aim to implement caching to avoid repeating previously computed processes.

Future iterations of the pipeline will also aim to be more modular, avoiding the cascading nature of processing steps by allowing users to provide input files at later stages.

Processing

The Nowcast Library Pipeline categorizes data processing in two types:

  • preprocessing: processing applied that is corrective and/or filter-like in nature
  • postprocessing: processing applied that is additive or trasformative in nature

Synchronization

Synchronization refers to the process of temporally aligning data originating from multiple sources with different sample rates and operational time ranges.

The simplest edge case requiring non-trivial synchronization is when the sources operate at roughly the same sample rate and at the same times almost continuously, but for longer and shorter periods. In this case we simply have to resample to a single sample rate and find the latest start period and earliest end period across the data sources.

On the opposite end we have the edge case where each data source has its own sample rate and presents large gaps of different sizes at different times. In this case a synchronization process would want to keep the data only when all data sources overlap. This leaves large chunks between overlaps, as well as smaller ones due to differences in sample rates.

Handling the latter of these two edge case allow us to handle all other cases in between. As such, besides the target sample rate, the user needs to specify additional configuration options under the chunk_config of the SyncOptions struct.

Model Training and Evaluation (Not yet available)

The model training and evaluation "step" is concerned with 3 general processes, reliant on the

  • Model fitting
    • including a preliminary and optional feature selection stage
  • Model performance validation:
    • to select hyperparameters
    • to compare and select multiple models
  • Model performance evaluation
    • to estimate how the selected models will perform in production
Expand source code
"""
Data processing and Model evaluation pipeline for the Nowcast Library
.. include:: README.md
"""
import logging
import pathlib
import pandas as pd
from .structs import config
from . import process
from . import sync
from . import features
from . import split
from . import standardize
from . import utils

logger = logging.getLogger(__name__)


def pipe_dataset(options: config.DataSet):
    """
    Runs all configured data-wrangling steps of the
    Nowcast Library Pipeline on a set of data described
    by the input DataSet instance
    """
    # preprocessing
    preprocessed_dfs = process.preprocess.preprocess_dataset(options)
    # syncing
    chunked_df, chunk_locs = sync.synchronize_dataset(options, preprocessed_dfs)
    # postprocessing
    postprocessed_df = process.postprocess.postprocess_dataset(options, chunked_df)
    # add generated fields if necessary
    if options.generated_fields is not None:
        postprocessed_df = features.generate_fields(options, postprocessed_df)
    # serializing postprocessing if necessary
    if options.postprocessing_output is not None:
        logger.info("Serializing postprocessing results...")
        utils.handle_serialization(postprocessed_df, options.postprocessing_output)
        logger.info("Serialization complete.")
    # splitting
    if options.split_options is not None:
        outer_split, inner_split = split.split_dataset(
            options, (postprocessed_df, chunk_locs)
        )
        # standardization (requires splits)
        outer_split, inner_split = standardize.standardize_dataset(
            options, outer_split, inner_split
        )
        # serialization
        split.serialize_splits(
            options.split_options.output_options, outer_split, inner_split
        )

Sub-modules

nowcastlib.pipeline.cli

Command-Line interface functionality for preprocessing

nowcastlib.pipeline.features

module containing functionality related to feature engineering and selection

nowcastlib.pipeline.process

Functions for processing data

nowcastlib.pipeline.split

Functions for splitting data

nowcastlib.pipeline.standardize

Module for standardization functionality

nowcastlib.pipeline.structs

Module containing custom structures used throughout the pipeline submodule.

nowcastlib.pipeline.sync

Functions for synchronizing data

nowcastlib.pipeline.utils

Shared functionality across the Nowcast Library Pipeline submodule

Functions

def pipe_dataset(options: DataSet)

Runs all configured data-wrangling steps of the Nowcast Library Pipeline on a set of data described by the input DataSet instance

Expand source code
def pipe_dataset(options: config.DataSet):
    """
    Runs all configured data-wrangling steps of the
    Nowcast Library Pipeline on a set of data described
    by the input DataSet instance
    """
    # preprocessing
    preprocessed_dfs = process.preprocess.preprocess_dataset(options)
    # syncing
    chunked_df, chunk_locs = sync.synchronize_dataset(options, preprocessed_dfs)
    # postprocessing
    postprocessed_df = process.postprocess.postprocess_dataset(options, chunked_df)
    # add generated fields if necessary
    if options.generated_fields is not None:
        postprocessed_df = features.generate_fields(options, postprocessed_df)
    # serializing postprocessing if necessary
    if options.postprocessing_output is not None:
        logger.info("Serializing postprocessing results...")
        utils.handle_serialization(postprocessed_df, options.postprocessing_output)
        logger.info("Serialization complete.")
    # splitting
    if options.split_options is not None:
        outer_split, inner_split = split.split_dataset(
            options, (postprocessed_df, chunk_locs)
        )
        # standardization (requires splits)
        outer_split, inner_split = standardize.standardize_dataset(
            options, outer_split, inner_split
        )
        # serialization
        split.serialize_splits(
            options.split_options.output_options, outer_split, inner_split
        )