Source code for rcatool.runtime.RCAT_stats

import sys
import numpy as np
import xarray as xa
from rcatool.stats import ASoP
from rcatool.stats import convolve
from rcatool.stats import event_duration as eda
from rcatool.stats import climateindex as ci
from rcatool.stats import precipitation_index as prix
from pandas import to_timedelta
from copy import deepcopy


############################################################
#                                                          #
#     FUNCTIONS CONTROLLING STATISTICAL CALCULATIONS       #
#                                                          #
############################################################

[docs] def default_stats_config(stats): """ The function returns a dictionary with default statistics configurations for a selection of statistics given by input stats. """ stats_dict = { 'generic': { 'vars': [], 'statistic': {'description': 'Average', 'algorithm': 'mean'}, 'function args': None, 'apply dim': 'time', 'new dims': None, 'group data': None, 'resample resolution': None, 'pool data': False, 'thr': None, 'cond analysis': None, 'chunk dimension': 'time'}, 'moments': { 'vars': [], 'moment stat': ['D', 'mean'], 'moment resample kwargs': None, 'resample resolution': None, 'pool data': False, 'thr': None, 'cond analysis': None, 'chunk dimension': 'time'}, 'seasonal cycle': { 'vars': [], 'resample resolution': None, 'pool data': False, 'stat method': 'mean', 'thr': None, 'cond analysis': None, 'chunk dimension': 'time'}, 'annual cycle': { 'vars': [], 'resample resolution': None, 'pool data': False, 'stat method': 'mean', 'thr': None, 'cond analysis': None, 'chunk dimension': 'time'}, 'diurnal cycle': { 'vars': [], 'resample resolution': None, 'hours': None, 'dcycle stat': 'amount', 'stat method': 'mean', 'method kwargs': None, 'thr': None, 'cond analysis': None, 'pool data': False, 'chunk dimension': 'space'}, 'dcycle harmonic': { 'vars': [], 'resample resolution': None, 'pool data': False, 'dcycle stat': 'amount', 'thr': None, 'cond analysis': None, 'chunk dimension': 'space'}, 'asop': { 'vars': ['pr'], 'resample resolution': None, 'pool data': False, 'nr_bins': 80, 'bin_type': 'Klingaman', 'thr': None, 'cond analysis': None, 'chunk dimension': 'space'}, 'eda': { 'vars': ['pr'], 'resample resolution': None, 'pool data': False, 'duration bins': np.arange(1, 51), 'event statistic': 'amount', 'statistic bins': [.1, .2, .5, 1, 2, 5, 10, 20, 50, 100, 150, 200], 'dry events': False, 'dry bins': None, 'event thr': 0.1, 'cond analysis': None, 'chunk dimension': 'space'}, 'pdf': { 'vars': [], 'resample resolution': None, 'pool data': False, 'bins': None, 'normalized': False, 'thr': None, 'cond analysis': None, 'dry event thr': None, 'chunk dimension': 'space'}, 'percentile': { 'vars': [], 'resample resolution': None, 'pool data': False, 'pctls': [95, 99], 'thr': None, 'cond analysis': None, 'chunk dimension': 'space'}, 'Rxx': { 'vars': ['pr'], 'resample resolution': None, 'pool data': False, 'normalize': False, 'thr': 1.0, 'cond analysis': None, 'chunk dimension': 'space'}, 'cdd': { 'vars': ['pr'], 'resample resolution': None, 'pool data': False, 'thr': 1.0, 'periods': np.arange(1, 61), 'maxper': False, 'cond analysis': None, 'chunk dimension': 'space'}, 'pr survival fraction': { 'vars': ['pr'], 'resample resolution': None, 'pool data': False, 'percentiles': np.arange(1, 100), 'thr': None, 'cond analysis': None, 'vectorize': True, 'chunk dimension': 'space'}, 'signal filtering': { 'vars': [], 'resample resolution': None, 'pool data': False, 'filter': 'lanczos', 'cutoff type': 'lowpass', 'window': 61, 'mode': 'same', '1st cutoff': None, '2nd cutoff': None, 'filter dim': 1, 'thr': None, 'cond analysis': None, 'chunk dimension': 'space'}, } return {k: stats_dict[k] for k in stats}
[docs] def mod_stats_config(requested_stats): """ Get the configuration for the input statistics 'requested_stats'. The returned configuration is a dictionary. """ stats_dd = default_stats_config(list(requested_stats.keys())) # Update dictionary based on input for k in requested_stats: if requested_stats[k] == 'default': pass else: for m in requested_stats[k]: msg = "For statistic {}, the configuration key {} is not "\ "available. Check possible configurations in "\ "default_stats_config in stats_template "\ "module.".format(k, m) try: stats_dd[k][m] = requested_stats[k][m] except KeyError: print(msg) return stats_dd
def _stats(stat): """ Dictionary that relates a statistical measure to a specific function that do the calculation. """ p = { 'generic': generic_function, 'moments': moments, 'seasonal cycle': seasonal_cycle, 'annual cycle': annual_cycle, 'percentile': percentile, 'diurnal cycle': diurnal_cycle, 'dcycle harmonic': dcycle_harmonic_fit, 'pdf': freq_int_dist, 'pr survival fraction': pr_amount_survival_fraction, 'asop': asop, 'eda': eda_calc, 'Rxx': Rxx, 'cdd': cdd, 'signal filtering': filtering, } return p[stat]
[docs] def calc_statistics(data, var, stat, stat_config): """ Calculate statistics 'stat' according to configuration in 'stat_config'. This function calls the respective stat function (defined in _stats). """ stat_data = _stats(stat)(data, var, stat, stat_config) return stat_data
def _check_hours(ds): """ Checking time stamps for subdaily data Hours in time stamps should be the right edge of interval if accumulated or resampled data. """ # Round to nearest minute ds['time'] = ds.indexes['time'].round('min') # Hour offsets offset_dict = {12: 6, 6: 3, 3: 1, 1: 0} _delta = ds['time.hour'].diff('time') delta = _delta[_delta > 0].values[0] hr_check = np.all(np.isin(ds['time.hour'], np.arange(0, 24, delta))) if hr_check and not np.any(ds.time.dt.minute > 0): pass else: offset = offset_dict[delta] print("\t\t\tShifting time stamps to whole hours!\n") hbool = np.all(ds.indexes['time'].minute >= 30) ds['time'] = ds.indexes['time'].ceil('H').shift(offset, 'H') if hbool\ else ds.indexes['time'].floor('H').shift(offset, 'H') return ds def _get_freq(tf): from functools import reduce d = [j.isdigit() for j in tf] if np.any(d): freq = int(reduce((lambda x, y: x+y), [x for x, y in zip(tf, d) if y])) else: freq = 1 unit = reduce((lambda x, y: x+y), [x for x, y in zip(tf, d) if not y]) if unit in ('M', 'Y'): freq = freq*30 if unit == 'M' else freq*365 unit = 'D' elif unit[0] == 'Q': freq = 90 unit = 'D' return freq, unit ############################################################ # # # STATISTICS FUNCTIONS # # # ############################################################
[docs] def generic_function(data, var, stat, stat_config): """ Calculate statistics applying user defined function. Arguments and returned data need to be specified in main RCAT configuration file. Statistical calculations may be done on sub-grouped data (using xarray resample), as defined by user in configuration file. """ def _calc(da, modpath, module, funcnm, **kwargs): sys.path.append(modpath) import importlib fmod = importlib.import_module(module) func = getattr(fmod, funcnm) out = func(da.values, **kwargs) da_out = xa.DataArray(out, dims=da.dims, coords=da.coords) return da_out def _dd_grp(g): if g in ('Y', 'y', 'year', 'ann', 'annual'): lbl = 'Y' elif g in ('D', 'd', 'day'): lbl = 'D' elif g in ('season', 'QS-DEC'): lbl = 'QS-DEC' elif g in ('hour', 'h', 'H'): lbl = 'H' else: lbl = g return lbl # Data thresholding in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr dim = stat_config[stat]['apply dim'] outdim = stat_config[stat]['new dims'] # Group/sub-select data? grp = stat_config[stat]['group data'] if grp is not None: data = data.chunk({'time': -1}) # NOTE: Use groupby or resample function? # indata = data[var].groupby(f'time.{grp}').apply(lambda x: x) gr = _dd_grp(grp) indata = data[var].resample(time=f'{gr}').apply(lambda x: x) else: gr = None indata = data[var] # Stat function to apply fstat = stat_config[stat]['statistic'] kwargs = stat_config[stat]['function args'] assert isinstance(fstat, dict), \ print("stat function in configuration file must be a dictionary" " with keys 'description' and 'algorithm'") if fstat['algorithm'] in ('sum', 'count', 'mean', 'max', 'min', 'std'): st_data = eval(f"indata.{fstat['algorithm']}('time')") else: if 'lambda' in fstat['algorithm']: func = fstat['algorithm'] elif isinstance(fstat['algorithm'], dict): import pathlib fpth = fstat['algorithm']['module file'].rsplit('/', 1) f = pathlib.Path(fpth[1]) fmodule = f.with_suffix('').name funcname = fstat['algorithm']['function name'] newdim = [[outdim['dim name']]] if outdim is not None else outdim dimsize = outdim['dim size'] if outdim is not None else outdim if gr is None: if newdim is not None: out = xa.apply_ufunc( func, indata, input_core_dims=[[dim]], output_core_dims=newdim, dask='parallelized', output_sizes={newdim: dimsize}, output_dtypes=[float], kwargs=kwargs) else: out = xa.apply_ufunc(func, indata, input_core_dims=[[dim]], dask='parallelized', output_dtypes=[float], kwargs=kwargs) dims = list(out.dims) st_data = out.to_dataset().transpose(dims[-1], dims[0], dims[1]) else: out = xa.map_blocks(_calc, indata, (fpth[0], fmodule, funcname), kwargs, template=indata) st_data = out.to_dataset(name=var) # Add back time stamps st_data = st_data.assign( {'time': data['time'].resample(time=f'{gr}').apply( lambda x: x).values}) st_data.attrs['Description'] =\ (f"Statistic: {fstat['description']} | Threshold: {thr} | " f"Grouped data: {gr}") return st_data
[docs] def moments(data, var, stat, stat_config): """ Calculate standard moment statistics: avg, median, std, max/min """ # Data thresholding in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr # Moment stats configuration _mstat = deepcopy(stat_config[stat]['moment stat']) mstat = _mstat[var] if isinstance(_mstat, dict) else _mstat if mstat is None: st_data = data.copy() st_data.attrs['Description'] =\ f"Moment statistic: No statistics applied | Threshold: {thr}" else: if mstat[0] == 'all': st_data = eval(f"data.{mstat[1]}(dim='time', skipna=True)") else: # Resample expression res_kw = stat_config[stat]['moment resample kwargs'] if res_kw is None: if mstat[1] == 'apply function': expr = (f"data[var].resample(time='{mstat[0]}')" f".apply({mstat[2]}).dropna('time', 'all')") elif mstat[1] == 'interpolate': expr = (f"data[var].resample(time='{mstat[0]}')" f".interpolate({mstat[2]}).dropna('time', 'all')") else: expr = (f"data[var].resample(time='{mstat[0]}')" f".{mstat[1]}('time').dropna('time', 'all')") else: if mstat[1] == 'apply function': expr = (f"data[var].resample(time='{mstat[0]}', **res_kw)" f".apply({mstat[2]}).dropna('time', 'all')") elif mstat[1] == 'interpolate': expr = (f"data[var].resample(time='{mstat[0]}', **res_kw)" f".interpolate({mstat[2]}).dropna('time', 'all')") else: expr = (f"data[var].resample(time='{mstat[0]}', **res_kw)" f".{mstat[1]}('time').dropna('time', 'all')") diff = data.time.values[1] - data.time.values[0] nsec = to_timedelta(diff).total_seconds() tr, fr = _get_freq(mstat[0]) sec_resample = to_timedelta(tr, fr).total_seconds() if nsec >= sec_resample or mstat is None: print("\t\t* Moment statistics:\n\t\tData already at the same " "or coarser time resolution as selected resample " "frequency!\n\t\tKeeping data as is ...\n") st_data = data.copy() else: _st_data = eval(expr) st_data = _st_data.to_dataset() st_data.attrs['Description'] =\ "Moment statistic: {} | Threshold: {}".format( ' '.join(s.upper() for s in mstat), thr) return st_data
[docs] def seasonal_cycle(data, var, stat, stat_config): """ Calculate seasonal cycle """ tstat = stat_config[stat]['stat method'] in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr if 'percentile' in tstat: q = tstat.partition(' ')[2] errmsg = ("Make sure percentile(s) in stat method is given correctly; " "i.e. with a white space e.g. 'percentile 95'") if not q: raise ValueError(errmsg) else: q = float(q) sc_pctls = xa.apply_ufunc( _percentile_func, data[var].groupby('time.season'), input_core_dims=[['time']], dask='parallelized', output_dtypes=[float], kwargs={'q': q, 'axis': -1, 'thr': thr}) st_data = sc_pctls.to_dataset() else: st_data = eval("data.groupby('time.season').{}('time')".format( tstat)) st_data = st_data.reindex(season=['DJF', 'MAM', 'JJA', 'SON']) st_data.attrs['Description'] =\ "Seasonal cycle | Season stat: {} | Threshold: {}".format( tstat, thr) return st_data
[docs] def annual_cycle(data, var, stat, stat_config): """ Calculate annual cycle """ tstat = stat_config[stat]['stat method'] in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr if 'percentile' in tstat: q = tstat.partition(' ')[2] errmsg = ("Make sure percentile(s) in stat method is given correctly; " "i.e. with a white space e.g. 'percentile 95'") if not q: raise ValueError(errmsg) else: q = float(q) ac_pctls = xa.apply_ufunc( _percentile_func, data[var].groupby('time.month'), input_core_dims=[['time']], dask='parallelized', output_dtypes=[float], kwargs={'q': q, 'axis': -1, 'thr': thr}) st_data = ac_pctls.to_dataset() elif isinstance(tstat, dict): func = tstat['function'] st_data = eval("data.groupby('time.month').reduce(" f"{func}, dim='time')") else: st_data = eval("data.groupby('time.month').{}('time')".format( tstat)) st_data.attrs['Description'] =\ "Annual cycle | Month stat: {} | Threshold: {}".format( tstat, thr) st_data = st_data.chunk({'month': -1}) return st_data
[docs] def diurnal_cycle(data, var, stat, stat_config): """ Calculate diurnal cycle """ # Type of diurnal cycle; amount or frequency dcycle_stat = stat_config[stat]['dcycle stat'] # Threshold; must be defined for frequency in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr # Check time stamps in data data = _check_hours(data) if dcycle_stat == 'amount': tstat = stat_config[stat]['stat method'] if 'percentile' in tstat: q = tstat.partition(' ')[2] errmsg = ("Make sure percentile(s) in stat method is given " "correctly; i.e. with a white space e.g. " "'percentile 95'") if not q: raise ValueError(errmsg) else: q = float(q) dc_pctls = xa.apply_ufunc( _percentile_func, data[var].groupby('time.hour'), input_core_dims=[['time']], dask='parallelized', output_dtypes=[float], kwargs={'q': q, 'axis': -1, 'thr': thr}) dcycle = dc_pctls.to_dataset() elif 'pdf' in tstat: # Bins assert 'bins' in stat_config[stat]['method kwargs'], \ "\n\tBins are missing in 'method kwargs'!\n" bin_r = stat_config[stat]['method kwargs']['bins'] bins = np.arange(bin_r[0], bin_r[1], bin_r[2]) lbins = bins.size - 1 dc_pdf = xa.apply_ufunc( _pdf_calc, data[var].groupby('time.hour'), input_core_dims=[['time']], output_core_dims=[['bins']], dask='parallelized', output_dtypes=[float], dask_gufunc_kwargs={'output_sizes': {'bins': lbins+1}}, kwargs={ 'keepdims': True, 'bins': bins, 'axis': -1, 'thr': thr}) dims = list(dc_pdf.dims) dcycle = dc_pdf.to_dataset().assign_coords( {'bins': bins}).transpose('bins', 'hour', dims[0], dims[1]) else: dcycle = eval("data.groupby('time.hour').{}('time')".format(tstat)) statnm = "Amount | stat: {} | thr: {}".format(tstat, thr) elif dcycle_stat == 'frequency': errmsg = "For frequency analysis, a threshold ('thr') must be set!" assert thr is not None, errmsg dcycle = data.groupby('time.hour').count('time') totdays = np.array([(data['time.hour'].values == h).sum() for h in np.arange(24)]) statnm = "Frequency | stat: counts | thr: {}".format(thr) else: print("Unknown configured diurnal cycle stat: {}".format(dcycle_stat)) sys.exit() dcycle = dcycle.chunk({'hour': -1}) _hrs = stat_config[stat]['hours'] hrs = _hrs if _hrs is not None else dcycle.hour st_data = dcycle.sel(hour=hrs) if dcycle_stat == 'frequency': st_data = st_data.assign({'ndays_per_hour': ('nday', totdays)}) st_data.attrs['Description'] =\ "Diurnal cycle | {}".format(statnm) return st_data
[docs] def dcycle_harmonic_fit(data, var, stat, stat_config): """ Calculate diurnal cycle with Harmonic oscillation fit """ # Type of diurnal cycle; amount or frequency dcycle_stat = stat_config[stat]['dcycle stat'] # Threshold; must be defined for frequency in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr if dcycle_stat == 'amount': data = _check_hours(data) dcycle = data.groupby('time.hour').mean('time') statnm = "Amount | thr: {}".format(thr) elif dcycle_stat == 'frequency': ermsg = "For frequency analysis, a threshold must be set" assert thr is not None, ermsg data_sub = data.where(data[var] >= thr) data_sub = _check_hours(data_sub) dcycle = data_sub.groupby('time.hour').count('time') totdays = np.array([(data_sub['time.hour'].values == h).sum() for h in np.arange(24)]) statnm = "Frequency | thr: {}".format(thr) else: print("Unknown configured diurnal cycle stat: {}".format(dcycle_stat)) sys.exit() dcycle = dcycle.chunk({'hour': -1}) dc_fit = xa.apply_ufunc( _harmonic_linefit, dcycle[var], input_core_dims=[['hour']], output_core_dims=[['fit']], dask='parallelized', output_dtypes=[float], output_sizes={'fit': 204}, kwargs={'keepdims': True, 'axis': -1, 'var': var}) dims = list(dc_fit.dims) st_data = dc_fit.to_dataset().transpose(dims[-1], dims[0], dims[1]) if dcycle_stat == 'frequency': st_data = st_data.assign({'ndays_per_hour': ('nday', totdays)}) st_data.attrs['Description'] =\ "Harmonic fit of diurnal cycle | Statistic: {}".format(statnm) st_data.attrs['Data info'] = ( """First four values in each array with fitted data """ """are fit parameters; (c1, p1, c2, p2), where 1/c2 """ """and p1/p2 represents amplitude and phase of 1st/2nd """ """harmonic of the fit.""") return st_data
[docs] def percentile(data, var, stat, stat_config): """ Calculate percentiles """ in_thr = stat_config[stat]['thr'] if in_thr is not None: thr = None if var not in in_thr else in_thr[var] else: thr = in_thr pctls = stat_config[stat]['pctls'] lpctls = [pctls] if not isinstance(pctls, (list, tuple)) else pctls pctl_c = xa.apply_ufunc( _percentile_func, data[var], input_core_dims=[['time']], output_core_dims=[['pctls']], dask='parallelized', output_sizes={'pctls': len(lpctls)}, output_dtypes=[float], kwargs={'q': lpctls, 'axis': -1, 'thr': thr}) dims = list(pctl_c.dims) pctl_ds = pctl_c.to_dataset().transpose(dims[-1], dims[0], dims[1]) st_data = pctl_ds.assign({'percentiles': ('pctls', lpctls)}) st_data.attrs['Description'] =\ "Percentile | q: {} | threshold: {}".format(lpctls, thr) return st_data
[docs] def freq_int_dist(data, var, stat, stat_config): """ Calculate frequency intensity distributions """ # Bins if var not in stat_config[stat]['bins']: dmn = data[var].min(skipna=True) dmx = data[var].max(skipna=True) bins = np.linspace(dmn, dmx, 20) else: bin_r = stat_config[stat]['bins'][var] bins = np.arange(bin_r[0], bin_r[1], bin_r[2]) # Data threshold in_thr = stat_config[stat]['thr'] if in_thr is not None: thr = None if var not in in_thr else in_thr[var] else: thr = in_thr # Dry event threshold in_dry_thr = stat_config[stat]['dry event thr'] if in_dry_thr is not None: dry_thr = None if var not in in_dry_thr else in_dry_thr[var] else: dry_thr = in_dry_thr # Output size lbins_out = bins.size - 1 if dry_thr is None else bins.size # Normalization normalized = stat_config[stat]['normalized'] if isinstance(normalized, bool): norm = normalized else: norm = False if var not in normalized else normalized[var] pdf = xa.apply_ufunc( _pdf_calc, data[var], input_core_dims=[['time']], output_core_dims=[['bins']], dask='parallelized', output_dtypes=[float], output_sizes={'bins': lbins_out}, kwargs={'keepdims': True, 'bins': bins, 'axis': -1, 'norm': norm, 'thr': thr, 'dry_event_thr': dry_thr}) dims = list(pdf.dims) pdf_ds = pdf.to_dataset().transpose(dims[-1], dims[0], dims[1]) st_data = pdf_ds.assign(bin_edges=['dry_events']+list(bins)) st_data.attrs['Description'] =\ "PDF | threshold: {} | Normalized bin data: {}".format(thr, norm) return st_data
[docs] def asop(data, var, stat, stat_config): """ Calculate ASoP components for precipitation """ if stat_config[stat]['nr_bins'] is None: nbins = np.arange(50) else: nbins = np.arange(stat_config[stat]['nr_bins']) bintype = stat_config[stat]['bin_type'] # Define bins bins = ASoP.bins_calc(nbins, bintype) bins = np.insert(bins, 0, 0.0) lbins = bins.size - 1 in_thr = stat_config[stat]['thr'] if in_thr is not None: thr = None if var not in in_thr else in_thr[var] else: thr = in_thr asop_out = xa.apply_ufunc( ASoP.asop, data[var], input_core_dims=[['time']], output_core_dims=[['factors', 'bins']], dask='parallelized', output_dtypes=[float], output_sizes={'factors': 2, 'bins': lbins}, kwargs={'keepdims': True, 'axis': -1, 'bins': bins}) dims = list(asop_out.dims) # N.B. This does not work in rcat yet! Variable name need still to be 'pr' # C = asop.isel(factors=0) # FC = asop.isel(factors=1) # dims = list(C.dims) # C_ds = C.to_dataset().transpose(dims[-1], dims[0], dims[1]) # FC_ds = FC.to_dataset().transpose(dims[-1], dims[0], dims[1]) # asop_ds = = xa.Dataset.merge(C_ds, FC_ds) asop_ds = asop_out.to_dataset().transpose(dims[-2], dims[-1], dims[0], dims[1]) st_data = asop_ds.assign(bin_edges=bins, factors=['C', 'FC']) st_data.attrs['Description'] =\ f"ASoP analysis | bin type: {bintype} | threshold: {thr}" return st_data
[docs] def eda_calc(data, var, stat, stat_config): """ Event duration analysis for precipitation """ # Statistic used for events event_stat = stat_config[stat]['event statistic'] # Bins dur_bins = stat_config[stat]['duration bins'] dur_bins = np.array(dur_bins) if dur_bins is not None else dur_bins st_bins = stat_config[stat]['statistic bins'] st_bins = np.array(st_bins) if st_bins is not None else st_bins # Dry intervals dry = stat_config[stat]['dry events'] dry_bins = stat_config[stat]['dry bins'] dry_bins = np.array(dry_bins) if dry_bins is not None else dry_bins dur_dim = dur_bins.size+1 if dry else dur_bins.size frq_dim = st_bins.size-1 # Event threshold thr = stat_config[stat]['event thr'] eda_out = xa.apply_ufunc( eda.eda, data[var], input_core_dims=[['time']], output_core_dims=[['frequency', 'duration']], dask='parallelized', output_dtypes=[float], dask_gufunc_kwargs={'output_sizes': { 'frequency': frq_dim, 'duration': dur_dim}}, exclude_dims={'time'}, kwargs={ 'thr': thr, 'axis': -1, 'duration_bins': dur_bins, 'event_statistic': event_stat, 'statistic_bins': st_bins, 'dry_events': dry, 'dry_bins': dry_bins, 'keepdims': True}) dims = list(eda_out.dims) eda_ds = eda_out.to_dataset().transpose(dims[-2], dims[-1], dims[0], dims[1]) st_data = eda_ds.assign(duration_bins=dur_bins, statistic_bins=st_bins, dry_bins=dry_bins) st_data.attrs['Description'] =\ "EDA analysis | event statistic: {} | threshold: {}".format( event_stat, thr) return st_data
[docs] def Rxx(data, var, stat, stat_config): """ Count of any time units (days, hours, etc) when precipitation ≥ xx mm. """ in_thr = stat_config[stat]['thr'] if in_thr is not None: thr = None if var not in in_thr else in_thr[var] else: thr = in_thr errmsg = "\nA threshold must be set for Rxx calculation" assert thr is not None, errmsg # Normalized values or not norm = stat_config[stat]['normalize'] frq = xa.apply_ufunc( ci.Rxx, data[var], input_core_dims=[['time']], dask='parallelized', output_dtypes=[float], kwargs={'keepdims': True, 'axis': -1, 'thr': thr, 'normalize': norm}) st_data = frq.to_dataset() st_data.attrs['Description'] =\ "Rxx; frequency above threshold | threshold: {} | normalized: {}".\ format(thr, norm) return st_data
[docs] def cdd(data, var, stat, stat_config): """ Calculate frequencies of consecutive dry days (CDD) periods. See cdd function in rcatool/stats/climateindex.py for more details and options. """ in_thr = stat_config[stat]['thr'] if in_thr is not None: thr = None if var not in in_thr else in_thr[var] else: thr = in_thr errmsg = "\nA threshold must be set for CDD calculation" assert thr is not None, errmsg # Array of CDD interval lengths to consider periods = stat_config[stat]['periods'] # Calculate longest dry period? # N.B. If so, value will be positioned last in returned array. maxper = stat_config[stat]['maxper'] # Output size dim_out = len(periods) if maxper else len(periods) - 1 frq_cdd = xa.apply_ufunc( ci.cdd, data[var], input_core_dims=[['time']], output_core_dims=[['frequencies']], dask='parallelized', output_dtypes=[float], output_sizes={'frequencies': dim_out}, kwargs={'keepdims': True, 'axis': -1, 'thr': thr, 'periods': periods, 'maxper': maxper}) dims = list(frq_cdd.dims) frq_cdd_ds = frq_cdd.to_dataset().transpose(dims[-1], dims[0], dims[1]) st_data = frq_cdd_ds.assign(cdd_intervals=periods) st_data.attrs['Description'] =\ f"CDD frequencies | threshold: {thr} | Longest CDD period: {maxper}" return st_data
[docs] def pr_amount_survival_fraction(data, var, stat, stat_config): """ Calculate the frequency distribution, normalize by total precipitation, and sum the fractions. It answers the question 'what fraction of total precipitation occurs beyond the top p percentile of days in a period?', where p is any percentile of interest. """ in_thr = stat_config[stat]['thr'] if in_thr is not None: thr = None if var not in in_thr else in_thr[var] else: thr = in_thr # Quantiles pctls = stat_config[stat]['percentiles'] da_pctls = xa.DataArray(pctls, dims={'perc': len(pctls)}) # Vectorize the unvectorized function? vectorize = stat_config[stat]['vectorize'] fracs = xa.apply_ufunc( prix.precip_amount_survival_fraction, data[var], da_pctls, input_core_dims=[['time'], ['perc']], output_core_dims=[['pctl']], dask_gufunc_kwargs={'output_sizes': {'pctl': len(pctls)}}, dask='parallelized', output_dtypes=[data[var].dtype], vectorize=vectorize, kwargs={'keepdims': True}) dims = list(fracs.dims) st_data = fracs.to_dataset(name=var).transpose( dims[-1], dims[0], dims[1]).assign(percentiles=pctls) st_data.attrs['Description'] =\ f"Pr survival fractions | threshold: {thr}" return st_data
[docs] def filtering(data, var, stat, stat_config): """ Filter the input data """ # The type of frequency cutoff ftype = stat_config[stat]['cutoff type'] # The type of filter filt = stat_config[stat]['filter'] # The length of filter window window = stat_config[stat]['window'] assert window % 2 == 1, "Filter window must be odd" # The filter mode mode = stat_config[stat]['mode'] # First cutoff frequency cutoff = stat_config[stat]['1st cutoff'] cutoff2 = stat_config[stat]['2nd cutoff'] if ftype == 'bandpass': errmsg = "'2nd cutoff' must be set for bandpass filtering" assert cutoff2 is not None, errmsg # The filtering dimensions (1D or 2D filtering) filt_dim = stat_config[stat]['filter dim'] # Thresholding data in_thr = stat_config[stat]['thr'] if in_thr is not None: if var in in_thr: thr = in_thr[var] data = data.where(data[var] >= thr) else: thr = None else: thr = in_thr if filt == 'lanczos': wgts = convolve.lanczos_filter(window, 1/cutoff, 1/cutoff2, ftype) else: # TODO print("No other filter implemented yet. To be done.") sys.exit() if mode == 'valid' or mode is None: out_dim = data.time.size - window + 1 else: out_dim = data.time.size if filt_dim == 1: filtered = xa.apply_ufunc( convolve.filtering, data[var], input_core_dims=[['time']], output_core_dims=[['filtered']], dask='parallelized', dask_gufunc_kwargs={'output_sizes': {'filtered': out_dim}}, output_dtypes=[float], kwargs={'wgts': wgts, 'dim': filt_dim, 'axis': -1, 'mode': mode}) dims = list(filtered.dims) st_data = filtered.to_dataset().transpose('filtered', dims[0], dims[1]) elif filt_dim == 2: print("\nSorry, 2D filtering not available yet.") sys.exit() else: print("\nOnly 1D and 2D filtering (dim = 1 or 2) is possible ...") sys.exit() statnm = ( f"Filter Dimension: {filt_dim} | Filter: {filt} | Cutoff Type: {ftype}" f" | 1st Cutoff (time steps): {cutoff} | 2nd Cutoff (time steps): " f"{cutoff2} | Filter Window Size: {window}" ) st_data.attrs['Description'] = "Convolved data | {}".format(statnm) return st_data
def _percentile_func(arr, axis=0, q=95, thr=None): if thr is not None: arr[arr < thr] = np.nan pctl = np.nanpercentile(arr, axis=axis, q=q) if axis == -1 and pctl.ndim > 2: pctl = np.moveaxis(pctl, 0, -1) return pctl def _harmonic_linefit(data, keepdims=False, axis=0, var=None): """ Non-linear regression line fit using first two harmonics (diurnal cycle) """ from scipy import optimize def _f1(t, m, c1, p1): return m + c1*np.cos(2*np.pi*t/24 - p1) def _f2(t, m, c2, p2): return m + c2*np.cos(4*np.pi*t/24 - p2) def _compute(data1d, v): if any(np.isnan(data1d)): print("Data missing/masked!") dcycle = np.repeat(np.nan, 204) else: m, c1, p1 = optimize.curve_fit(_f1, np.arange(data1d.size), data1d)[0] m, c2, p2 = optimize.curve_fit(_f2, np.arange(data1d.size), data1d)[0] t = np.linspace(0, 23, 200) r = m + c1*np.cos(2*np.pi*t/24 - p1) +\ c2*np.cos(4*np.pi*t/24 - p2) dcycle = np.hstack(((c1, p1, c2, p2), r)) return dcycle if keepdims: dcycle_fit = np.apply_along_axis(_compute, axis, data, var) else: if isinstance(data, np.ma.MaskedArray): data1d = data.copy() else: data1d = np.array(data) msg = "If keepdims is False, data must be one dimensional" assert data1d.ndim == 1, msg dcycle_fit = _compute(data1d, var) return dcycle_fit def _pdf_calc(data, bins=None, norm=False, keepdims=False, axis=0, thr=None, dry_event_thr=None): """ Calculate pdf """ def _compute(data1d, bins, lbins, norm, thr, dry_thr): if all(np.isnan(data1d)): print("All data missing/masked!") hdata = np.repeat(np.nan, lbins+1) if dry_thr is not None\ else np.repeat(np.nan, lbins) else: if any(np.isnan(data1d)): data1d = data1d[~np.isnan(data1d)] if thr is not None: indata = data1d[data1d >= thr] else: indata = data1d.copy() if norm: binned = np.digitize(indata, bins) binned_dict = {bint: indata[np.where(binned == bint)] if bint in binned else np.nan for bint in range(1, len(bins))} # Mean value for each bin means = np.array([np.mean(arr) if not np.all(np.isnan(arr)) else 0.0 for k, arr in binned_dict.items()]) # Occurrences and frequencies ocrns = np.array([arr.size if not np.all(np.isnan(arr)) else 0 for k, arr in binned_dict.items()]) frequency = ocrns/np.nansum(ocrns) C = frequency*means # Relative contribution per bin hdata = C/np.nansum(C) # Normalized contribution per bin else: hdata = np.histogram(indata, bins=bins, density=True)[0] # Add dry events if dry_thr is not None: dry_events = np.sum(data1d < dry_thr) hdata = np.hstack((dry_events, hdata)) return hdata # Set number of bins to 10 (np.histogram default) if bins not provided. inbins = 10 if bins is None else bins lbins = inbins if isinstance(inbins, int) else len(inbins) - 1 if keepdims: hist = np.apply_along_axis(_compute, axis, data, bins=inbins, lbins=lbins, norm=norm, thr=thr, dry_thr=dry_event_thr) else: if isinstance(data, np.ma.MaskedArray): data1d = data.compressed() else: data1d = np.array(data).ravel() hist = _compute(data1d, bins=inbins, lbins=lbins, norm=norm, thr=thr, dry_thr=dry_event_thr) return hist