Module nowcastlib.pipeline.sync
Functions for synchronizing data
Expand source code
"""
Functions for synchronizing data
"""
import sys
import logging
from typing import Optional, List
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from nowcastlib.pipeline.structs import config
from nowcastlib.pipeline.process import preprocess
from nowcastlib.pipeline import utils
from nowcastlib import datasets
plt.ion()
logger = logging.getLogger(__name__)
def handle_diag_plots(
options: config.SyncOptions, dataframes: List[pd.core.frame.DataFrame]
):
"""
Produces plots that can aid the user in noticing mistakes
in their configuration before proceeding with the
synchronization process
Parameters
----------
options : nowcastlib.pipeline.structs.config.SyncOptions
dataframes : list[pandas.core.frame.DataFrame]
The set of dataframes one wishes to synchronize.
Returns
-------
bool
whether the user wishes to continue with
running the pipeline or not
"""
n_samples = 10000
fig, ax1 = plt.subplots(1, 1, figsize=(6, 4))
for i, data_df in enumerate(dataframes):
sample_spacing = data_df.index.to_series().diff()
data = sample_spacing.sample(n_samples).astype("timedelta64[s]")
weights = np.ones_like(data) / n_samples
ax1.hist(
data,
bins=np.arange(150, step=1),
weights=weights,
label="DataSource {} sample spacing".format(i),
histtype="step",
linewidth=1.5,
)
ax1.axvline(options.sample_spacing, color="black", label="Selected Sample Spacing")
ax1.set_xlabel("Sample Spacing [s]")
ax1.set_ylabel("Prevalence")
ax1.set_title(
"Sample spacing of {} random samples across the input Data Sources".format(
n_samples
)
)
ax1.legend()
fig.set_tight_layout(True)
logger.info("Press any button to exit. Use mouse to zoom and resize")
while True:
plt.draw()
if plt.waitforbuttonpress():
break
return utils.yes_or_no("Are you satisfied with the target sample rate?")
def handle_chunking(
data_df: pd.core.frame.DataFrame,
options: config.ChunkOptions,
column_names: Optional[List[str]] = None,
):
"""
Finds overlapping chunks of data across dataframe columns,
taking into account gap size preferences.
Parameters
----------
data_df: pandas.core.frame.DataFrame
the sparse dataframe to perform chunking on
options : nowcastlib.pipeline.structs.config.ChunkOptions
chunking configuration options
column_names : list[str], default None
the names of the columns to check for overlaps.
If `None`, all columns will be used.
Returns
-------
pandas.core.frame.DataFrame
The resulting dataframe, rows where not
all columns contained data now contain NaN
across all columns, depending on gap preferences.
numpy.ndarray
2D numpy array containing the start and end
integer indices of the contiguous chunks of data
in the input dataframe. Shape is (-1, 2).
"""
# find overlapping data, ignoring small gaps
sample_spacing_secs = data_df.index.freq.delta.seconds
max_spacing_steps = np.floor((options.max_gap_size / sample_spacing_secs)).astype(
int
)
min_chunk_length = int(options.min_chunk_size / sample_spacing_secs)
final_mask, chunk_locs = datasets.compute_dataframe_mask(
data_df, max_spacing_steps, min_chunk_length, 0, column_names
)
# imputing gaps, restoring large gaps
interpolated_df = data_df.interpolate("linear", limit_direction="both")
chunked_df = interpolated_df.where(final_mask) # type: ignore
return chunked_df, chunk_locs
def synchronize_dataset(
options: config.DataSet, dataset: Optional[List[pd.core.frame.DataFrame]] = None
):
"""
Synchronizes a set of data sources given options outlined
in the input DataSet config instance. Optionally writes the
results to disk.
Parameters
----------
options : nowcastlib.pipeline.structs.config.DataSet
dataset : list[pandas.core.frame.DataFrame], default None
The set of dataframes one wishes to synchronize.
If `None`, the preprocessing output produced by the
config options will be synchronized.
Returns
-------
pandas.core.frame.DataFrame
A single dataframe containing the
synchronized data. The dataframe is sparse,
with individual rows comprising solely of NaNs.
numpy.ndarray
2D numpy array containing the start and end
integer indices of the contiguous chunks of data
in the input dataframe. Shape is (-1, 2).
"""
sync_config = options.sync_options
assert (
sync_config is not None
), "`options.sync_options` must be defined to perform synchronization"
# avoid preprocessing if datasets are passed directly
if dataset is None:
data_dfs = preprocess.preprocess_dataset(options)
else:
data_dfs = dataset
logger.info("Synchronizing dataset...")
if sync_config.diagnostic_plots is not False:
continue_processing = handle_diag_plots(sync_config, data_dfs)
if continue_processing is False:
logger.info(
"Closing program prematurely to allow for configuration changes"
)
sys.exit()
total_dfs = len(data_dfs)
resampled_dfs = []
for i, data_df in enumerate(data_dfs):
logger.debug("Resampling DataSource %d of %d...", i + 1, total_dfs)
data_df.index.name = None
offset_str = "{}S".format(sync_config.sample_spacing)
resampled_dfs.append(
data_df.resample(
offset_str,
origin=data_df.index[0].floor(offset_str),
).mean()
)
logger.debug("Finding overlapping range and joining into single dataframe...")
synced_df = pd.concat(resampled_dfs, axis=1, join="inner")
logger.debug("Splitting data into contiguous chunks...")
chunked_df, chunk_locs = handle_chunking(
synced_df, sync_config.chunk_options, [df.columns[0] for df in data_dfs]
)
if sync_config.data_output is not None:
logger.debug("Serializing chunked dataframe...")
utils.handle_serialization(chunked_df, sync_config.data_output)
if sync_config.chunks_output is not None:
logger.debug("Serializing chunk locations...")
utils.handle_serialization(chunk_locs, sync_config.chunks_output)
logger.info("Dataset synchronization complete.")
return chunked_df, chunk_locs
Sub-modules
nowcastlib.pipeline.sync.cli
-
Command-Line interface functionality for synchronization
Functions
def handle_diag_plots(options: SyncOptions, dataframes: List[pandas.core.frame.DataFrame])
-
Produces plots that can aid the user in noticing mistakes in their configuration before proceeding with the synchronization process
Parameters
options
:SyncOptions
dataframes
:list[pandas.core.frame.DataFrame]
- The set of dataframes one wishes to synchronize.
Returns
bool
- whether the user wishes to continue with running the pipeline or not
Expand source code
def handle_diag_plots( options: config.SyncOptions, dataframes: List[pd.core.frame.DataFrame] ): """ Produces plots that can aid the user in noticing mistakes in their configuration before proceeding with the synchronization process Parameters ---------- options : nowcastlib.pipeline.structs.config.SyncOptions dataframes : list[pandas.core.frame.DataFrame] The set of dataframes one wishes to synchronize. Returns ------- bool whether the user wishes to continue with running the pipeline or not """ n_samples = 10000 fig, ax1 = plt.subplots(1, 1, figsize=(6, 4)) for i, data_df in enumerate(dataframes): sample_spacing = data_df.index.to_series().diff() data = sample_spacing.sample(n_samples).astype("timedelta64[s]") weights = np.ones_like(data) / n_samples ax1.hist( data, bins=np.arange(150, step=1), weights=weights, label="DataSource {} sample spacing".format(i), histtype="step", linewidth=1.5, ) ax1.axvline(options.sample_spacing, color="black", label="Selected Sample Spacing") ax1.set_xlabel("Sample Spacing [s]") ax1.set_ylabel("Prevalence") ax1.set_title( "Sample spacing of {} random samples across the input Data Sources".format( n_samples ) ) ax1.legend() fig.set_tight_layout(True) logger.info("Press any button to exit. Use mouse to zoom and resize") while True: plt.draw() if plt.waitforbuttonpress(): break return utils.yes_or_no("Are you satisfied with the target sample rate?")
def handle_chunking(data_df: pandas.core.frame.DataFrame, options: ChunkOptions, column_names: Union[List[str], NoneType] = None)
-
Finds overlapping chunks of data across dataframe columns, taking into account gap size preferences.
Parameters
data_df
:pandas.core.frame.DataFrame
- the sparse dataframe to perform chunking on
options
:ChunkOptions
- chunking configuration options
column_names
:list[str]
, defaultNone
- the names of the columns to check for overlaps.
If
None
, all columns will be used.
Returns
pandas.core.frame.DataFrame
- The resulting dataframe, rows where not all columns contained data now contain NaN across all columns, depending on gap preferences.
numpy.ndarray
- 2D numpy array containing the start and end integer indices of the contiguous chunks of data in the input dataframe. Shape is (-1, 2).
Expand source code
def handle_chunking( data_df: pd.core.frame.DataFrame, options: config.ChunkOptions, column_names: Optional[List[str]] = None, ): """ Finds overlapping chunks of data across dataframe columns, taking into account gap size preferences. Parameters ---------- data_df: pandas.core.frame.DataFrame the sparse dataframe to perform chunking on options : nowcastlib.pipeline.structs.config.ChunkOptions chunking configuration options column_names : list[str], default None the names of the columns to check for overlaps. If `None`, all columns will be used. Returns ------- pandas.core.frame.DataFrame The resulting dataframe, rows where not all columns contained data now contain NaN across all columns, depending on gap preferences. numpy.ndarray 2D numpy array containing the start and end integer indices of the contiguous chunks of data in the input dataframe. Shape is (-1, 2). """ # find overlapping data, ignoring small gaps sample_spacing_secs = data_df.index.freq.delta.seconds max_spacing_steps = np.floor((options.max_gap_size / sample_spacing_secs)).astype( int ) min_chunk_length = int(options.min_chunk_size / sample_spacing_secs) final_mask, chunk_locs = datasets.compute_dataframe_mask( data_df, max_spacing_steps, min_chunk_length, 0, column_names ) # imputing gaps, restoring large gaps interpolated_df = data_df.interpolate("linear", limit_direction="both") chunked_df = interpolated_df.where(final_mask) # type: ignore return chunked_df, chunk_locs
def synchronize_dataset(options: DataSet, dataset: Union[List[pandas.core.frame.DataFrame], NoneType] = None)
-
Synchronizes a set of data sources given options outlined in the input DataSet config instance. Optionally writes the results to disk.
Parameters
options
:DataSet
dataset
:list[pandas.core.frame.DataFrame]
, defaultNone
- The set of dataframes one wishes to synchronize.
If
None
, the preprocessing output produced by the config options will be synchronized.
Returns
pandas.core.frame.DataFrame
- A single dataframe containing the synchronized data. The dataframe is sparse, with individual rows comprising solely of NaNs.
numpy.ndarray
- 2D numpy array containing the start and end integer indices of the contiguous chunks of data in the input dataframe. Shape is (-1, 2).
Expand source code
def synchronize_dataset( options: config.DataSet, dataset: Optional[List[pd.core.frame.DataFrame]] = None ): """ Synchronizes a set of data sources given options outlined in the input DataSet config instance. Optionally writes the results to disk. Parameters ---------- options : nowcastlib.pipeline.structs.config.DataSet dataset : list[pandas.core.frame.DataFrame], default None The set of dataframes one wishes to synchronize. If `None`, the preprocessing output produced by the config options will be synchronized. Returns ------- pandas.core.frame.DataFrame A single dataframe containing the synchronized data. The dataframe is sparse, with individual rows comprising solely of NaNs. numpy.ndarray 2D numpy array containing the start and end integer indices of the contiguous chunks of data in the input dataframe. Shape is (-1, 2). """ sync_config = options.sync_options assert ( sync_config is not None ), "`options.sync_options` must be defined to perform synchronization" # avoid preprocessing if datasets are passed directly if dataset is None: data_dfs = preprocess.preprocess_dataset(options) else: data_dfs = dataset logger.info("Synchronizing dataset...") if sync_config.diagnostic_plots is not False: continue_processing = handle_diag_plots(sync_config, data_dfs) if continue_processing is False: logger.info( "Closing program prematurely to allow for configuration changes" ) sys.exit() total_dfs = len(data_dfs) resampled_dfs = [] for i, data_df in enumerate(data_dfs): logger.debug("Resampling DataSource %d of %d...", i + 1, total_dfs) data_df.index.name = None offset_str = "{}S".format(sync_config.sample_spacing) resampled_dfs.append( data_df.resample( offset_str, origin=data_df.index[0].floor(offset_str), ).mean() ) logger.debug("Finding overlapping range and joining into single dataframe...") synced_df = pd.concat(resampled_dfs, axis=1, join="inner") logger.debug("Splitting data into contiguous chunks...") chunked_df, chunk_locs = handle_chunking( synced_df, sync_config.chunk_options, [df.columns[0] for df in data_dfs] ) if sync_config.data_output is not None: logger.debug("Serializing chunked dataframe...") utils.handle_serialization(chunked_df, sync_config.data_output) if sync_config.chunks_output is not None: logger.debug("Serializing chunk locations...") utils.handle_serialization(chunk_locs, sync_config.chunks_output) logger.info("Dataset synchronization complete.") return chunked_df, chunk_locs