Module nowcastlib.pipeline.process.preprocess

Functions for pre-processing of data.

Expand source code
"""
Functions for pre-processing of data.
"""
import logging
from typing import Union
import pandas as pd
from nowcastlib.pipeline.structs import config
from nowcastlib.pipeline import utils as pipeline_utils
from nowcastlib.pipeline.process import utils as process_utils

logger = logging.getLogger(__name__)


def preprocess_datasource(options: config.DataSource):
    """
    Runs preprocessing on a given data source given options outlined
    in the input DataSource instance.

    Parameters
    ----------
    options : nowcastlib.pipeline.structs.config.DataSource

    Returns
    -------
    pandas.core.frame.DataFrame
        the resulting processed dataframe
    """
    logger.debug("Preprocessing %s...", options.name)
    index_field = next(field for field in options.fields if field.is_date)
    logger.debug("Reading file...")
    data_df = pd.read_csv(
        options.path,
        usecols=[field.field_name for field in options.fields],
        index_col=index_field.field_name,
        parse_dates=False,
        comment=options.comment_format,
    )
    data_df.index = pd.to_datetime(data_df.index, format=index_field.date_format)
    data_df = data_df[  # pylint: disable=unsubscriptable-object
        ~data_df.index.duplicated(keep="last")
    ]
    data_df.sort_index(inplace=True)
    for field in options.fields:
        logger.debug("Processing field %s of %s...", field.field_name, options.name)
        proc_options = field.preprocessing_options
        if proc_options is not None:
            # next two lines handle whether user wishes to overwrite field or not
            computed_field_name = pipeline_utils.build_field_name(
                proc_options, field.field_name
            )
            data_df[computed_field_name] = data_df[field.field_name].copy()
            data_df[computed_field_name] = process_utils.process_field(
                data_df[computed_field_name], proc_options, True
            )
    logger.debug("Dropping NaNs...")
    data_df = data_df.dropna()
    if options.preprocessing_output is not None:
        logger.debug("Serializing preprocessing output...")
        pipeline_utils.handle_serialization(data_df, options.preprocessing_output)
    return data_df


def preprocess_dataset(options: config.DataSet):
    """
    Runs preprocessing on a given set of data sources given options outlined
    in the input DataSet instance.

    Parameters
    ----------
    options : nowcastlib.pipeline.structs.config.DataSet

    Returns
    -------
    list[pandas.core.frame.DataFrame]
        list containing each of the resulting processed dataframes
    """
    logger.info("Preprocessing dataset...")
    processed_dfs = []
    for ds_config in options.data_sources:
        processed_dfs.append(preprocess_datasource(ds_config))
    logger.info("Dataset preprocessing complete.")
    return processed_dfs

Sub-modules

nowcastlib.pipeline.process.preprocess.cli

Command-Line interface functionality for preprocessing

Functions

def preprocess_datasource(options: DataSource)

Runs preprocessing on a given data source given options outlined in the input DataSource instance.

Parameters

options : DataSource
 

Returns

pandas.core.frame.DataFrame
the resulting processed dataframe
Expand source code
def preprocess_datasource(options: config.DataSource):
    """
    Runs preprocessing on a given data source given options outlined
    in the input DataSource instance.

    Parameters
    ----------
    options : nowcastlib.pipeline.structs.config.DataSource

    Returns
    -------
    pandas.core.frame.DataFrame
        the resulting processed dataframe
    """
    logger.debug("Preprocessing %s...", options.name)
    index_field = next(field for field in options.fields if field.is_date)
    logger.debug("Reading file...")
    data_df = pd.read_csv(
        options.path,
        usecols=[field.field_name for field in options.fields],
        index_col=index_field.field_name,
        parse_dates=False,
        comment=options.comment_format,
    )
    data_df.index = pd.to_datetime(data_df.index, format=index_field.date_format)
    data_df = data_df[  # pylint: disable=unsubscriptable-object
        ~data_df.index.duplicated(keep="last")
    ]
    data_df.sort_index(inplace=True)
    for field in options.fields:
        logger.debug("Processing field %s of %s...", field.field_name, options.name)
        proc_options = field.preprocessing_options
        if proc_options is not None:
            # next two lines handle whether user wishes to overwrite field or not
            computed_field_name = pipeline_utils.build_field_name(
                proc_options, field.field_name
            )
            data_df[computed_field_name] = data_df[field.field_name].copy()
            data_df[computed_field_name] = process_utils.process_field(
                data_df[computed_field_name], proc_options, True
            )
    logger.debug("Dropping NaNs...")
    data_df = data_df.dropna()
    if options.preprocessing_output is not None:
        logger.debug("Serializing preprocessing output...")
        pipeline_utils.handle_serialization(data_df, options.preprocessing_output)
    return data_df
def preprocess_dataset(options: DataSet)

Runs preprocessing on a given set of data sources given options outlined in the input DataSet instance.

Parameters

options : DataSet
 

Returns

list[pandas.core.frame.DataFrame]
list containing each of the resulting processed dataframes
Expand source code
def preprocess_dataset(options: config.DataSet):
    """
    Runs preprocessing on a given set of data sources given options outlined
    in the input DataSet instance.

    Parameters
    ----------
    options : nowcastlib.pipeline.structs.config.DataSet

    Returns
    -------
    list[pandas.core.frame.DataFrame]
        list containing each of the resulting processed dataframes
    """
    logger.info("Preprocessing dataset...")
    processed_dfs = []
    for ds_config in options.data_sources:
        processed_dfs.append(preprocess_datasource(ds_config))
    logger.info("Dataset preprocessing complete.")
    return processed_dfs