Source code for bmorph.core.workflows

import bmorph
import pandas as pd
import numpy as np
import xarray as xr
from functools import partial
from tqdm.autonotebook import tqdm
from bmorph.util import mizuroute_utils as mizutil
import os

[docs]def apply_bmorph(raw_ts, train_ts, ref_ts, apply_window, raw_train_window, ref_train_window, condition_ts=None, raw_y=None, train_y=None, ref_y=None, interval=pd.DateOffset(years=1), overlap=60, n_smooth_long=None, n_smooth_short=5, bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8, method='hist', train_cdf_min=1e-6, **kwargs): """Bias correction is performed by bmorph on user-defined intervals. Parameters ---------- raw_ts : pandas.Series Raw flow timeseries. train_ts : pandas.Series Flow timeseries to train the bias correction model with. ref_ts : pandas.Series Observed/reference flow timeseries. condition_ts: pandas.Series A timeseries with a variable to condition on. This will be used in place of `raw_y`, `train_y`, and `ref_y`. It is mainly added as a convenience over specifying the same timeseries for each of those variables. apply_window : pandas.date_range Date range to apply bmorph onto flow timeseries. raw_train_window : pandas.date_range Date range to train the bias correction model. ref_train_window : pandas.date_range Date range to smooth elements in 'raw_ts' and 'bmorph_ts'. raw_y : pandas.Series, optional Raw time series of the second time series variable for conditioning. train_y : pandas.Series, optional Training second time series. ref_y : pandas.Series, optional Target second time series. interval : pandas.DateOffset Difference between bmorph application intervals. overlap : int Total number of days overlap CDF windows have with each other, distributed evenly before and after the application window. n_smooth_long : int, optional Number of elements that will be smoothed in `raw_ts` and `bmorph_ts`. The nsmooth value in this case is typically much larger than the one used for the bmorph function itself. For example, 365 days. n_smooth_short : int, optional Number of elements that will be smoothed when determining CDFs used for the bmorph function itself. bw : int, optional Bandwidth for KernelDensity. This should only be used if `method='kde'`. xbins : int, optional Bins for the first time series. This should only be used if `method='hist'`. ybins : int, optional Bins for the second time series. This should only be used if `method='hist'`. rtol : float, optional The desired relatie tolerance of the result for KernelDensity. This should only be used if `method='kde'`. atol : float, optional The desired absolute tolerance of the result for KernelDensity. This should only be used if `method='kde'`. method : str Method to use for conditioning. Currently 'hist' using hist2D and 'kde' using kde2D are the only supported methods. **kwargs: Additional keyword arguments. Mainly implemented for cross-compatibility with other methods so that a unified configuration can be used Returns ------- bmorph_corr_ts : pandas.Series Returns a time series of length of an interval in the bmoprh window with bmorphed values. bmorph_multipliers : pandas.Series Returns a time series of equal length to bc_totals used to scale the raw flow values into the bmorphed values returned in bc_totals. """ assert isinstance(interval, pd.DateOffset) if condition_ts is not None: raw_y = condition_ts ref_y = condition_ts train_y = condition_ts if interval == pd.DateOffset(days=1): raise Exception("Please enter an interval greater than 1 day(s)") raw_train_window = slice(*raw_train_window) apply_window = slice(*apply_window) ref_train_window = slice(*ref_train_window) raw_ts_window = slice(pd.to_datetime(raw_ts.index.values[0]), pd.to_datetime(raw_ts.index.values[-1])) overlap_period = int(overlap / 2) bmorph_ts = pd.Series([]) bmorph_multipliers = pd.Series([]) bmorph_range = pd.date_range(apply_window.start, apply_window.stop+interval, freq=interval) for i in range(0,len(bmorph_range)-1): bmorph_start = bmorph_range[i] bmorph_end = bmorph_range[i+1] raw_apply_window = slice(pd.to_datetime(str(bmorph_start)), pd.to_datetime(str(bmorph_end))) raw_cdf_window = slice( pd.to_datetime(str(bmorph_start - pd.DateOffset(days=overlap_period))), pd.to_datetime(str(bmorph_end + pd.DateOffset(days=overlap_period)))) # No overlap for the first/last periods if ((raw_cdf_window.start < raw_ts_window.start) or (raw_cdf_window.stop > raw_ts_window.stop)): raw_cdf_window = slice(raw_ts_window.start, raw_ts_window.stop) # Perform the bias correction bc_total, bc_mult = bmorph.bmorph( raw_ts, train_ts, ref_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_y, ref_y, train_y, n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol, method=method, train_cdf_min=train_cdf_min) bmorph_ts = bmorph_ts.append(bc_total) bmorph_multipliers = bmorph_multipliers.append(bc_mult) # Eliminate duplicate timestamps because of overlapping period bmorph_ts = bmorph_ts.groupby(bmorph_ts.index).mean() bmorph_multipliers = bmorph_multipliers.groupby(bmorph_multipliers.index).mean() # Apply the correction to the change in mean if n_smooth_long: nrni_mean = ref_ts[ref_train_window].mean() train_mean = train_ts[raw_train_window].mean() bmorph_corr_ts, corr_ts = bmorph.bmorph_correct(raw_ts, bmorph_ts, raw_ts_window, nrni_mean, train_mean, n_smooth_long) else: bmorph_corr_ts = bmorph_ts return bmorph_corr_ts[apply_window], bmorph_multipliers[apply_window]
[docs]def apply_blendmorph(raw_upstream_ts, raw_downstream_ts, train_upstream_ts, train_downstream_ts, ref_upstream_ts, ref_downstream_ts, apply_window, raw_train_window, ref_train_window, interval, overlap, blend_factor, raw_upstream_y=None, raw_downstream_y=None, train_upstream_y=None, train_downstream_y=None, ref_upstream_y=None, ref_downstream_y=None, n_smooth_long=None, n_smooth_short=5, bw=3, xbins=200, ybins=10, rtol=1e-6, atol=1e-8, method='hist', train_cdf_min=1e-6, **kwargs): """Bias correction performed by blending bmorphed flows on user defined intervals. Blendmorph is used to perform spatially consistent bias correction, this function does so on a user-defined interval. This is done by performing bmorph bias correction for each site's timeseries according to upstream and downstream gauge sites (or proxies) where true flows are known. The upstream and downstream corrected timeseries are then multiplied by fractional weights, `blend_factor`, that sum to 1 between them so the corrected flows can be combined, or "blended," into one, representative corrected flow series for the site. It is thereby important to specify upstream and downstream values so bias corrections are performed with values that most closely represent each site being corrected. Parameters --------- raw_upstream_ts : pandas.Series Raw flow timeseries corresponding to the upstream flows. raw_downstream_ts : pandas.Series Raw flow timerseries corresponding to the downstream flows. train_upstream_ts : pandas.Series Flow timeseries to train the bias correction model with for the upstream flows. train_downstream_ts : pandas.Series Flow timeseries to train the bias correction model with for the downstream flows. ref_upstream_ts : pandas.Series Observed/reference flow timeseries corresponding to the upstream flows. ref_downstream_ts : pandas.Series Observed/reference flow timeseries corresponding to the downstream flows. raw_train_window : pandas.date_range Date range to train the bias correction model. apply_window : pandas.date_range Date range to apply bmorph onto flow timeseries. ref_train_window : pandas.date_range Date range to smooth elements in 'raw_ts' and 'bmorph_ts'. interval : pandas.DateOffset Difference between bmorph application intervals. overlap: int Total overlap in number of days the CDF windows have with each other, distributed evenly before and after the application window. blend_factor : numpy.array An array determining how upstream and downstream bmorphing is proportioned. This is determined by the fill_method used in mizuroute_utils. The blend_factor entries are the proportion of upstream multiplers and totals added with 1-blend_factor of downstream multipliers and totals. n_smooth_long : int, optional Number of elements that will be smoothed in `raw_ts` and `bmorph_ts`. The nsmooth value in this case is typically much larger than the one used for the bmorph function itself. For example, 365 days. n_smooth_short : int Number of elements that will be smoothed when determining CDFs used for the bmorph function itself. raw_upstream_y : pandas.Series, optional Raw time series of the second time series variable for conditioning corresponding to upstream flows. raw_downstream_y : pandas.Series, optional Raw time series of the second time series variable for conditioning corresponding to downstream flows. train_upstream_y : pandas.Series, optional Training second time series variable for conditioning correpsonding to downstream flows. train_downstream_y : pandas.Series, optional Training second time series variable for conditioning correpsonding to upstream flows. ref_upstream_y : pandas.Series, optional Target second time series variable for conditioning corresponding to upstream flows. ref_downstream_y : pandas.Series, optional Target second time series variable for conditioning corresponding to downtream flows. bw : int, optional Bandwidth for KernelDensity. This should only be used if `method='kde'`. xbins : int, optional Bins for the first time series. This should only be used if `method='hist'`. ybins : int, optional Bins for the second time series. This should only be used if `method='hist'`. rtol : float, optional The desired relatie tolerance of the result for KernelDensity. This should only be used if `method='kde'`. atol : float, optional The desired absolute tolerance of the result for KernelDensity. This should only be used if `method='kde'`. method : str, optional Method to use for conditioning. Currently 'hist' using hist2D and 'kde' using kde2D are the only supported methods. **kwargs: Additional keyword arguments. Mainly implemented for cross-compatibility with other methods so that a unified configuration can be used Returns ------- bc_totals : pandas.Series Returns a time series of length of an interval in the bmoprh window with bmorphed values. bc_multipliers : pandas.Series Returns a time series of equal length to bc_totals used to scale the raw flow values into the bmorphed values returned in bc_totals. """ assert isinstance(interval, pd.DateOffset) if interval == pd.DateOffset(days=1): raise Exception("Please enter a interval greater than 1 day(s)") bc_multipliers = pd.Series([]) bc_totals = pd.Series([]) apply_window = slice(*apply_window) raw_train_window = slice(*raw_train_window) ref_train_window = slice(*ref_train_window) raw_ts_window = slice(pd.to_datetime(raw_upstream_ts.index.values[0]), pd.to_datetime(raw_upstream_ts.index.values[-1])) # Check if there is enough data input to run conditioning for both upstream # and downstream bmorphs. Boolean used here instead of later to make certain # both upstream and downstream use the same method and to minimze checks within # the for-loop run_mdcd = False y_varlist = [raw_upstream_y, train_upstream_y, ref_upstream_y, raw_downstream_y, train_downstream_y, ref_downstream_y] if np.any(list(map(lambda x: x is not None, y_varlist))): run_mdcd = True # bmorph the series overlap_period = int(overlap / 2) bmorph_ts = pd.Series([]) bmorph_multipliers = pd.Series([]) bmorph_range = pd.date_range(apply_window.start, apply_window.stop+interval, freq=interval) for i in range(0,len(bmorph_range)-1): bmorph_start = bmorph_range[i] bmorph_end = bmorph_range[i+1] # we don't need ot adjust for overlap if it is the last entry at i+1 if i < len(bmorph_range)-2: bmorph_end -= pd.DateOffset(days=1) raw_apply_window = slice(pd.to_datetime(str(bmorph_start)), pd.to_datetime(str(bmorph_end))) raw_cdf_window = slice(pd.to_datetime(str(bmorph_start - pd.DateOffset(days=overlap_period))), pd.to_datetime(str(bmorph_end + pd.DateOffset(days=overlap_period)))) if (raw_cdf_window.start < raw_ts_window.start): offset = raw_ts_window.start - raw_cdf_window.start raw_cdf_window = slice(raw_ts_window.start, raw_cdf_window.stop + offset) if(raw_cdf_window.stop > raw_ts_window.stop): offset = raw_ts_window.stop - raw_cdf_window.stop raw_cdf_window = slice(raw_cdf_window.start + offset, raw_ts_window.stop) # Upstream & Downstream bias correction if run_mdcd: bc_up_total, bc_up_mult = bmorph.bmorph( raw_upstream_ts, train_upstream_ts, ref_upstream_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_upstream_y, ref_upstream_y, train_upstream_y, n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol, method=method, train_cdf_min=train_cdf_min) bc_down_total, bc_down_mult = bmorph.bmorph( raw_downstream_ts, train_downstream_ts, ref_downstream_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, raw_downstream_y, ref_downstream_y, train_downstream_y, n_smooth_short, bw=bw, xbins=xbins, ybins=ybins, rtol=rtol, atol=atol, method=method, train_cdf_min=train_cdf_min) else: bc_up_total, bc_up_mult = bmorph.bmorph( raw_upstream_ts, train_upstream_ts, ref_upstream_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, n_smooth_short, train_cdf_min=train_cdf_min) bc_down_total, bc_down_mult = bmorph.bmorph( raw_downstream_ts, train_downstream_ts, ref_downstream_ts, raw_apply_window, raw_train_window, ref_train_window, raw_cdf_window, n_smooth_short, train_cdf_min=train_cdf_min) bc_multiplier = (blend_factor * bc_up_mult) + ((1 - blend_factor) * bc_down_mult) bc_total = (blend_factor * bc_up_total) + ((1 - blend_factor) * bc_down_total) bc_multipliers = bc_multipliers.append(bc_multiplier) bc_totals = bc_totals.append(bc_total) bc_totals = bc_totals.groupby(bc_totals.index).mean() bc_multipliers = bc_multipliers.groupby(bc_multipliers.index).mean() # Apply the correction to preserve the mean change if n_smooth_long: raw_ts = (blend_factor * raw_upstream_ts) + ((1 - blend_factor) * raw_downstream_ts) ref_ts = (blend_factor * ref_upstream_ts) + ((1 - blend_factor) * ref_downstream_ts) train_ts = (blend_factor * train_upstream_ts) + ((1 - blend_factor) * train_downstream_ts) nrni_mean = ref_ts[ref_train_window].mean() train_mean = train_ts[raw_train_window].mean() bc_totals, corr_ts = bmorph.bmorph_correct(raw_ts, bc_totals, raw_ts_window, nrni_mean, train_mean, n_smooth_long) bc_multipliers *= corr_ts return bc_totals[apply_window], bc_multipliers[apply_window]
def _scbc_c_seg(ds, apply_window, raw_train_window, ref_train_window, interval, overlap, condition_var, **kwargs): up_raw_ts = ds['IRFroutedRunoff'].to_series() up_train_ts = ds['up_raw_flow'].to_series() up_ref_ts = ds['up_ref_flow'].to_series() up_seg = int( ds['up_ref_seg'].values[()]) up_cond = ds[f'up_{condition_var}'].to_series() dn_raw_ts = ds['IRFroutedRunoff'].to_series() dn_train_ts = ds['down_raw_flow'].to_series() dn_ref_ts = ds['down_ref_flow'].to_series() dn_seg = int( ds['down_ref_seg'].values[()]) dn_cond = ds[f'down_{condition_var}'].to_series() blend_factor = ds['cdf_blend_factor'].values[()] local_flow = ds['dlayRunoff'] scbc_c_flows, scbc_c_mults = apply_blendmorph( up_raw_ts, dn_raw_ts, up_train_ts, dn_train_ts, up_ref_ts, dn_ref_ts, apply_window, raw_train_window, ref_train_window, interval, overlap, blend_factor, raw_upstream_y=up_cond, raw_downstream_y=dn_cond, train_upstream_y=up_cond, train_downstream_y=dn_cond, ref_upstream_y=up_cond, ref_downstream_y=dn_cond, **kwargs) scbc_c_locals = scbc_c_mults * local_flow.sel(time=scbc_c_mults.index) return scbc_c_flows, scbc_c_mults, scbc_c_locals def _scbc_u_seg(ds, apply_window, raw_train_window, ref_train_window, interval, overlap, condition_var=None, **kwargs): up_raw_ts = ds['IRFroutedRunoff'].to_series() up_train_ts = ds['up_raw_flow'].to_series() up_ref_ts = ds['up_ref_flow'].to_series() up_seg = int( ds['up_ref_seg'].values[()]) dn_raw_ts = ds['IRFroutedRunoff'].to_series() dn_train_ts = ds['down_raw_flow'].to_series() dn_ref_ts = ds['down_ref_flow'].to_series() dn_seg = int( ds['down_ref_seg'].values[()]) blend_factor = ds['cdf_blend_factor'].values[()] local_flow = ds['dlayRunoff'].copy(deep=True) scbc_u_flows, scbc_u_mults = apply_blendmorph( up_raw_ts, dn_raw_ts, up_train_ts, dn_train_ts, up_ref_ts, dn_ref_ts, apply_window, raw_train_window, ref_train_window, interval, overlap, blend_factor, **kwargs) scbc_u_locals = scbc_u_mults * local_flow.sel(time=scbc_u_mults.index) return scbc_u_flows, scbc_u_mults, scbc_u_locals def _scbc_pass(ds, apply_window, **kwargs): """ Formats flows that are not to be bias corrected because they are from segements without hrus into a format compatible with flows that are bias corrected. This is to ensure all the data is still returned to the user, but that the flows that are not bias corrected stay the same. """ raw_ts = ds['IRFroutedRunoff'].to_series() local_flow = ds['dlayRunoff'] # taken from apply_blendmorph apply_window = slice(*apply_window) # multipliers of 1 will represent no change pass_mults = 0*local_flow.sel(time=apply_window).to_pandas() + 1 # properly format the lcoals pass_locals = pass_mults * local_flow.sel(time=pass_mults.index) return raw_ts, pass_mults, pass_locals
[docs]def apply_scbc(ds, mizuroute_exe, bmorph_config, client=None, save_mults=False, **tqdm_kwargs): """ Applies Spatially Consistent Bias Correction (SCBC) by bias correcting local flows and re-routing them through mizuroute. This method can be run in parallel by providing a `dask client`. Parameters ---------- ds: xr.Dataset An xarray dataset containing `time` and `seg` dimensions and variables to be bias corrected. This will mostly likely come from the provided preprocessing utility, `mizuroute_utils.to_bmorph` mizuroute_exe: str The path to the mizuroute executable bmorph_config: dict The configuration for the bias correction. See the documentation on input specifications and selecting bias correction techniques for descriptions of the options and their choices. client: dask.Client (optional) A `client` object to manage parallel computation. save_mults: boolean (optional) Whether to save multipliers from bmorph for diagnosis. If True, multipliers are saved in the same directory as local flows. Defaults as False to not save multipliers. **tqdm_kwargs (optional) Keyword arguments for tqdm loops within apply_scbc. Returns ------- region_totals: xr.Dataset The rerouted, total, bias corrected flows for the region """ def unpack_and_write_netcdf(results, segs, file_path, out_varname='scbc_flow', mult_path=None): flows = [r[0] for r in results] mults = [r[1] for r in results] local = [r[2] for r in results] local_ds = xr.DataArray(np.vstack(local), dims=('seg','time')) local_ds['seg'] = segs local_ds['time'] = flows[0].index local_ds.name = out_varname local_ds = local_ds.where(local_ds >= 1e-4, other=1e-4) try: os.remove(file_path) except OSError: pass local_ds.transpose().to_netcdf(file_path) if mult_path: try: os.remove(mult_path) except OSError: pass mult_ds = xr.DataArray(np.vstack(mults), dims=('seg','time')) mult_ds['seg'] = segs mult_ds['time'] = flows[0].index mult_ds.name = 'mults' mult_ds.transpose().to_netcdf(mult_path) if 'condition_var' in bmorph_config.keys() and bmorph_config['condition_var']: scbc_type = 'conditional' scbc_fun = partial(_scbc_c_seg, **bmorph_config) else: scbc_type = 'univariate' scbc_fun = partial(_scbc_u_seg, **bmorph_config) sel_vars = ['IRFroutedRunoff', 'up_raw_flow', 'up_ref_flow', 'up_ref_seg', 'IRFroutedRunoff', 'down_raw_flow', 'down_ref_flow', 'down_ref_seg', 'cdf_blend_factor', 'dlayRunoff'] if scbc_type == 'conditional': sel_vars.append(f'down_{bmorph_config["condition_var"]}') sel_vars.append(f'up_{bmorph_config["condition_var"]}') # select out segs that have an hru # and prep the pass function if there are # segs without an hru bc_segs_idx = np.where(ds['has_hru'])[0] if len(bc_segs_idx) != len(ds['seg'].values): scbc_pass_fun = partial(_scbc_pass, **bmorph_config) if client: big_futures = [client.scatter(ds[sel_vars].sel(seg=seg)) for seg in tqdm(ds['seg'].values, **tqdm_kwargs)] futures = [] for bf, seg_idx in zip(big_futures, np.arange(len(ds['seg'].values))): if seg_idx in bc_segs_idx: futures.append(client.submit(scbc_fun, bf)) else: futures.append(client.submit(scbc_pass_fun, bf)) results = client.gather(futures) else: results = [] for seg_idx in tqdm(np.arange(len(ds['seg'].values)), **tqdm_kwargs): if seg_idx in bc_segs_idx: results.append(scbc_fun(ds.isel(seg=seg_idx))) else: results.append(scbc_pass_fun(ds.isel(seg=seg_idx))) out_file = (f'{bmorph_config["data_path"]}/input/' f'{bmorph_config["output_prefix"]}_local_{scbc_type}_scbc.nc') if save_mults: mult_file = (f'{bmorph_config["data_path"]}/input/' f'{bmorph_config["output_prefix"]}_local_mult_{scbc_type}_scbc.nc') else: mult_file = None unpack_and_write_netcdf(results, ds['seg'], out_file, mult_path=mult_file) config_path, mizuroute_config = mizutil.write_mizuroute_config( bmorph_config["output_prefix"], scbc_type, bmorph_config['apply_window'], config_dir=bmorph_config.get('mizuroute_configs_path', bmorph_config['data_path'] + '/mizuroute_configs/'), topo_dir=bmorph_config.get('topologies_path', bmorph_config['data_path'] + '/topologies/'), input_dir=bmorph_config.get('input_path', bmorph_config['data_path'] + '/input/'), output_dir=bmorph_config.get('output_path', bmorph_config['data_path'] + '/output/'), out_name=bmorph_config.get('out_name', None) ) mizutil.run_mizuroute(mizuroute_exe, config_path) region_totals = xr.open_mfdataset(f'{mizuroute_config["output_dir"]}{mizuroute_config["out_name"]}*') region_totals = region_totals.sel(time=slice(*bmorph_config['apply_window'])) region_totals['seg'] = region_totals['reachID'].isel(time=0) return region_totals.load()
def bmorph_to_dataarray(dict_flows, name): da = xr.DataArray(np.vstack(dict_flows.values()), dims=('site', 'time')) da['site'] = list(dict_flows.keys()) da['time'] = list(dict_flows.values())[0].index da.name = name return da.transpose()