Module nowcastlib.pipeline.process.postprocess
Functions for post-processing of data
Expand source code
"""
Functions for post-processing of data
"""
import logging
from typing import Optional, List
import numpy as np
import pandas as pd
from nowcastlib.pipeline.structs import config
from nowcastlib.pipeline import utils
from nowcastlib.pipeline import sync
from nowcastlib.pipeline.process import utils as process_utils
logger = logging.getLogger(__name__)
# disable SettingWithCopy warning since it was catching False Positives
pd.set_option("chained_assignment", None)
def postprocess_dataset(
options: config.DataSet, data_df: Optional[pd.core.frame.DataFrame] = None
) -> pd.core.frame.DataFrame:
"""
Postprocesses a dataset given options outlined
in the input DataSet config instance.
"""
# need to get data_df from syncing process if not provided
if data_df is None:
chunked_df, _ = sync.synchronize_dataset(options)
else:
chunked_df = data_df.copy()
logger.info("Postprocessing dataset...")
# instantiate our processed result
proc_df = chunked_df.copy()
# gather which fields to process into single list
raw_fields: List[config.RawField] = [
field for source in options.data_sources for field in source.fields
]
# rename overwrite-protected fields so to avoid acting on the original field
fields_to_process = [utils.rename_protected_field(field) for field in raw_fields]
# finally we may perform postprocessing
for field in fields_to_process:
logger.debug("Processing field %s...", field.field_name)
if field.postprocessing_options is not None:
proc_df[field.field_name] = process_utils.process_field(
chunked_df[field.field_name],
field.postprocessing_options,
False,
)
logger.info("Dataset postprocessing complete.")
return proc_df
Sub-modules
nowcastlib.pipeline.process.postprocess.cli
-
Command-Line interface functionality for preprocessing
Functions
def postprocess_dataset(options: DataSet, data_df: Union[pandas.core.frame.DataFrame, NoneType] = None) ‑> pandas.core.frame.DataFrame
-
Postprocesses a dataset given options outlined in the input DataSet config instance.
Expand source code
def postprocess_dataset( options: config.DataSet, data_df: Optional[pd.core.frame.DataFrame] = None ) -> pd.core.frame.DataFrame: """ Postprocesses a dataset given options outlined in the input DataSet config instance. """ # need to get data_df from syncing process if not provided if data_df is None: chunked_df, _ = sync.synchronize_dataset(options) else: chunked_df = data_df.copy() logger.info("Postprocessing dataset...") # instantiate our processed result proc_df = chunked_df.copy() # gather which fields to process into single list raw_fields: List[config.RawField] = [ field for source in options.data_sources for field in source.fields ] # rename overwrite-protected fields so to avoid acting on the original field fields_to_process = [utils.rename_protected_field(field) for field in raw_fields] # finally we may perform postprocessing for field in fields_to_process: logger.debug("Processing field %s...", field.field_name) if field.postprocessing_options is not None: proc_df[field.field_name] = process_utils.process_field( chunked_df[field.field_name], field.postprocessing_options, False, ) logger.info("Dataset postprocessing complete.") return proc_df