Module nowcastlib.pipeline.standardize
Module for standardization functionality
Expand source code
"""
Module for standardization functionality
"""
import sys
from typing import List, Tuple
import logging
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn.preprocessing as sklearn_pproc
from nowcastlib.pipeline import structs
from nowcastlib.pipeline import utils
plt.ion()
logger = logging.getLogger(__name__)
def handle_diag_plots(
input_series: pd.core.series.Series,
configured_method: structs.config.StandardizationMethod,
):
"""
Plots different rescalings of the input series,
asking the user for confirmation
"""
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(7, 7))
pwr_trnsformer = sklearn_pproc.PowerTransformer()
robust_trnsfrmr = sklearn_pproc.RobustScaler()
ax1.hist(input_series, bins=200, color="black")
ax1.set_title("Original Series")
ax2.hist(
pwr_trnsformer.fit_transform(input_series.to_numpy().reshape(-1, 1)),
bins=200,
color="darkblue"
if configured_method == structs.config.StandardizationMethod.POWER
else "darkgrey",
)
ax2.set_title("Power Transform")
ax3.hist(
robust_trnsfrmr.fit_transform(input_series.to_numpy().reshape(-1, 1)),
bins=200,
color="darkblue"
if configured_method == structs.config.StandardizationMethod.ROBUST
else "darkgrey",
)
ax3.set_title("Robust Scaling")
ax4.hist(
np.log(1 + (input_series - input_series.min())),
bins=200,
color="darkblue"
if configured_method == structs.config.StandardizationMethod.LOGNORM
else "darkgrey",
)
ax4.set_title("log(1 + (input_series - input_series.min()))")
fig.suptitle("Configured method shown in Blue")
fig.set_tight_layout(True)
logger.info("Press any button to exit. Use mouse to zoom and resize")
while True:
plt.draw()
if plt.waitforbuttonpress():
break
return utils.yes_or_no(
"Are you satisfied with the selected Standardization Method?"
)
def standardize_dataset(
options: structs.config.DataSet,
outer_split: structs.TrainTestSplit,
inner_split: structs.IteratedSplit,
) -> structs.SplitDataSet:
"""
Standardizes a DataSet, accounting for train/test nuances
"""
# destructure outer split, so we have access to locs
(train_df, train_locs), (test_df, test_locs) = outer_split
# and standardize
[proc_train_data], [proc_test_data] = standardize_splits(
options, [train_df], [test_df]
)
# destructure inner split, so we have access to locs
inner_train_data, inner_val_data = inner_split
train_dfs, inner_train_locs = [[*x] for x in zip(*inner_train_data)]
val_dfs, inner_val_locs = [[*x] for x in zip(*inner_val_data)]
(
proc_val_train_dfs,
proc_val_test_dfs,
) = standardize_splits(options, train_dfs, val_dfs)
# return in correct format
return (
((proc_train_data, train_locs), (proc_test_data, test_locs)),
(
list(zip(proc_val_train_dfs, inner_train_locs)),
list(zip(proc_val_test_dfs, inner_val_locs)),
),
)
def standardize_splits(
options: structs.config.DataSet,
train_dfs: List[pd.core.frame.DataFrame],
test_dfs: List[pd.core.frame.DataFrame],
) -> Tuple[List[pd.core.frame.DataFrame], List[pd.core.frame.DataFrame]]:
"""
Standardizes a set of train-test splits given options outlined
in the input DataSet config instance.
Returns
-------
std_train_dfs : List[pandas.core.frame.DataFrame]
List of the newly standardized train dataframes
std_test_dfs : List[pandas.core.frame.DataFrame]
List of the newly standardized test dataframes
"""
logger.info("Standardizing splits...")
# instantiate standardized dfs
std_train_dfs = train_dfs.copy()
std_test_dfs = test_dfs.copy()
# gather which fields to process into single list
raw_fields: List[structs.config.RawField] = [
field for source in options.data_sources for field in source.fields
]
# rename overwrite-protected fields so to avoid acting on the original field
fields_to_process = [utils.rename_protected_field(field) for field in raw_fields]
# proceed with standardization iteratively
for i, _ in enumerate(zip(train_dfs, test_dfs)):
# standardize new fields if necessary
if options.generated_fields is not None:
for new_field in options.generated_fields:
if new_field.std_options is not None:
logger.debug("Standardizing field %s...", new_field.target_name)
(
std_train_dfs[i][new_field.target_name],
std_test_dfs[i][new_field.target_name],
) = standardize_field(
std_train_dfs[i][new_field.target_name],
std_test_dfs[i][new_field.target_name],
new_field.std_options,
)
# standardize processed raw fields at the end
for field in fields_to_process:
if field.std_options is not None:
logger.debug("Standardizing field %s...", field.field_name)
(
std_train_dfs[i][field.field_name],
std_test_dfs[i][field.field_name],
) = standardize_field(
std_train_dfs[i][field.field_name],
std_test_dfs[i][field.field_name],
field.std_options,
)
logger.info("Standardization complete.")
return std_train_dfs, std_test_dfs
def standardize_field(
train_data: pd.core.series.Series,
test_data: pd.core.series.Series,
options: structs.config.StandardizationOptions,
):
"""
Standardizes a field based on config options,
taking care not to leak information from the training set
to the testing set
"""
if options.diagnostic_plots is True:
continue_processing = handle_diag_plots(train_data, options.method)
if continue_processing is False:
logger.info(
"Closing program prematurely to allow for configuration changes"
)
sys.exit()
if options.method == structs.config.StandardizationMethod.LOGNORM:
return (
np.log(1 + (train_data - train_data.min())),
np.log(1 + (test_data - test_data.min())),
)
# handle transformer based methods
elif options.method == structs.config.StandardizationMethod.POWER:
transformer = sklearn_pproc.PowerTransformer()
elif options.method == structs.config.StandardizationMethod.ROBUST:
transformer = sklearn_pproc.RobustScaler()
# fit only on training data, to avoid information leakage
fitted_trnsfrmr = transformer.fit(train_data.to_numpy().reshape(-1, 1))
# use the fitted transformer for transforming both train and test data
return (
fitted_trnsfrmr.transform(train_data.to_numpy().reshape(-1, 1)),
fitted_trnsfrmr.transform(test_data.to_numpy().reshape(-1, 1)),
)
Functions
def handle_diag_plots(input_series: pandas.core.series.Series, configured_method: StandardizationMethod)
-
Plots different rescalings of the input series, asking the user for confirmation
Expand source code
def handle_diag_plots( input_series: pd.core.series.Series, configured_method: structs.config.StandardizationMethod, ): """ Plots different rescalings of the input series, asking the user for confirmation """ fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(7, 7)) pwr_trnsformer = sklearn_pproc.PowerTransformer() robust_trnsfrmr = sklearn_pproc.RobustScaler() ax1.hist(input_series, bins=200, color="black") ax1.set_title("Original Series") ax2.hist( pwr_trnsformer.fit_transform(input_series.to_numpy().reshape(-1, 1)), bins=200, color="darkblue" if configured_method == structs.config.StandardizationMethod.POWER else "darkgrey", ) ax2.set_title("Power Transform") ax3.hist( robust_trnsfrmr.fit_transform(input_series.to_numpy().reshape(-1, 1)), bins=200, color="darkblue" if configured_method == structs.config.StandardizationMethod.ROBUST else "darkgrey", ) ax3.set_title("Robust Scaling") ax4.hist( np.log(1 + (input_series - input_series.min())), bins=200, color="darkblue" if configured_method == structs.config.StandardizationMethod.LOGNORM else "darkgrey", ) ax4.set_title("log(1 + (input_series - input_series.min()))") fig.suptitle("Configured method shown in Blue") fig.set_tight_layout(True) logger.info("Press any button to exit. Use mouse to zoom and resize") while True: plt.draw() if plt.waitforbuttonpress(): break return utils.yes_or_no( "Are you satisfied with the selected Standardization Method?" )
def standardize_dataset(options: DataSet, outer_split: Tuple[Tuple[pandas.core.frame.DataFrame, numpy.ndarray], Tuple[pandas.core.frame.DataFrame, numpy.ndarray]], inner_split: Tuple[List[Tuple[pandas.core.frame.DataFrame, numpy.ndarray]], List[Tuple[pandas.core.frame.DataFrame, numpy.ndarray]]]) ‑> Tuple[Tuple[Tuple[pandas.core.frame.DataFrame, numpy.ndarray], Tuple[pandas.core.frame.DataFrame, numpy.ndarray]], Tuple[List[Tuple[pandas.core.frame.DataFrame, numpy.ndarray]], List[Tuple[pandas.core.frame.DataFrame, numpy.ndarray]]]]
-
Standardizes a DataSet, accounting for train/test nuances
Expand source code
def standardize_dataset( options: structs.config.DataSet, outer_split: structs.TrainTestSplit, inner_split: structs.IteratedSplit, ) -> structs.SplitDataSet: """ Standardizes a DataSet, accounting for train/test nuances """ # destructure outer split, so we have access to locs (train_df, train_locs), (test_df, test_locs) = outer_split # and standardize [proc_train_data], [proc_test_data] = standardize_splits( options, [train_df], [test_df] ) # destructure inner split, so we have access to locs inner_train_data, inner_val_data = inner_split train_dfs, inner_train_locs = [[*x] for x in zip(*inner_train_data)] val_dfs, inner_val_locs = [[*x] for x in zip(*inner_val_data)] ( proc_val_train_dfs, proc_val_test_dfs, ) = standardize_splits(options, train_dfs, val_dfs) # return in correct format return ( ((proc_train_data, train_locs), (proc_test_data, test_locs)), ( list(zip(proc_val_train_dfs, inner_train_locs)), list(zip(proc_val_test_dfs, inner_val_locs)), ), )
def standardize_splits(options: DataSet, train_dfs: List[pandas.core.frame.DataFrame], test_dfs: List[pandas.core.frame.DataFrame]) ‑> Tuple[List[pandas.core.frame.DataFrame], List[pandas.core.frame.DataFrame]]
-
Standardizes a set of train-test splits given options outlined in the input DataSet config instance.
Returns
std_train_dfs
:List[pandas.core.frame.DataFrame]
- List of the newly standardized train dataframes
std_test_dfs
:List[pandas.core.frame.DataFrame]
- List of the newly standardized test dataframes
Expand source code
def standardize_splits( options: structs.config.DataSet, train_dfs: List[pd.core.frame.DataFrame], test_dfs: List[pd.core.frame.DataFrame], ) -> Tuple[List[pd.core.frame.DataFrame], List[pd.core.frame.DataFrame]]: """ Standardizes a set of train-test splits given options outlined in the input DataSet config instance. Returns ------- std_train_dfs : List[pandas.core.frame.DataFrame] List of the newly standardized train dataframes std_test_dfs : List[pandas.core.frame.DataFrame] List of the newly standardized test dataframes """ logger.info("Standardizing splits...") # instantiate standardized dfs std_train_dfs = train_dfs.copy() std_test_dfs = test_dfs.copy() # gather which fields to process into single list raw_fields: List[structs.config.RawField] = [ field for source in options.data_sources for field in source.fields ] # rename overwrite-protected fields so to avoid acting on the original field fields_to_process = [utils.rename_protected_field(field) for field in raw_fields] # proceed with standardization iteratively for i, _ in enumerate(zip(train_dfs, test_dfs)): # standardize new fields if necessary if options.generated_fields is not None: for new_field in options.generated_fields: if new_field.std_options is not None: logger.debug("Standardizing field %s...", new_field.target_name) ( std_train_dfs[i][new_field.target_name], std_test_dfs[i][new_field.target_name], ) = standardize_field( std_train_dfs[i][new_field.target_name], std_test_dfs[i][new_field.target_name], new_field.std_options, ) # standardize processed raw fields at the end for field in fields_to_process: if field.std_options is not None: logger.debug("Standardizing field %s...", field.field_name) ( std_train_dfs[i][field.field_name], std_test_dfs[i][field.field_name], ) = standardize_field( std_train_dfs[i][field.field_name], std_test_dfs[i][field.field_name], field.std_options, ) logger.info("Standardization complete.") return std_train_dfs, std_test_dfs
def standardize_field(train_data: pandas.core.series.Series, test_data: pandas.core.series.Series, options: StandardizationOptions)
-
Standardizes a field based on config options, taking care not to leak information from the training set to the testing set
Expand source code
def standardize_field( train_data: pd.core.series.Series, test_data: pd.core.series.Series, options: structs.config.StandardizationOptions, ): """ Standardizes a field based on config options, taking care not to leak information from the training set to the testing set """ if options.diagnostic_plots is True: continue_processing = handle_diag_plots(train_data, options.method) if continue_processing is False: logger.info( "Closing program prematurely to allow for configuration changes" ) sys.exit() if options.method == structs.config.StandardizationMethod.LOGNORM: return ( np.log(1 + (train_data - train_data.min())), np.log(1 + (test_data - test_data.min())), ) # handle transformer based methods elif options.method == structs.config.StandardizationMethod.POWER: transformer = sklearn_pproc.PowerTransformer() elif options.method == structs.config.StandardizationMethod.ROBUST: transformer = sklearn_pproc.RobustScaler() # fit only on training data, to avoid information leakage fitted_trnsfrmr = transformer.fit(train_data.to_numpy().reshape(-1, 1)) # use the fitted transformer for transforming both train and test data return ( fitted_trnsfrmr.transform(train_data.to_numpy().reshape(-1, 1)), fitted_trnsfrmr.transform(test_data.to_numpy().reshape(-1, 1)), )