"""
Climate indices
---------------
Functions for various climate index calculations.
Authors: Petter Lind & David Lindstedt
Created: Autumn 2016
Updates:
May 2020
"""
import numpy as np
from .arithmetics import run_mean
[docs]
def hotdays_calc(data, thr_p75):
"""
Calculate number of hotdays.
Return days with mean temperature above the 75th percentile
of climatology.
Parameters
----------
data: array
1D-array of temperature input timeseries
thr_p75: float
75th percentile daily mean value from climatology
"""
hotdays = ((data > thr_p75)).sum()
return hotdays
[docs]
def extr_hotdays_calc(data, thr_p95):
"""
Calculate number of extreme hotdays.
Return days with mean temperature above the 95th percentile
of climatology.
Parameters
----------
data: array
1D-array of temperature input timeseries
thr_p95: float
95th percentile daily mean value from climatology
"""
xtr_hotdays = ((data > thr_p95)).sum()
return xtr_hotdays
[docs]
def tropnights_calc(data):
"""
Calculate number of tropical nights.
Return days with minimum temperature not below 20 degrees C.
Parameters
----------
data: array
1D-array of minimum temperatures timeseries in degrees Kelvin
"""
tropnts = ((data > 293)).sum()
return tropnts
[docs]
def ehi(data, thr_95, axis=0, keepdims=False):
"""
Calculate Excessive Heat Index (EHI).
Parameters
----------
data: list/array
1D/2D array of daily temperature timeseries
thr_95: float
95th percentile daily mean value from climatology
axis: int
The axis along which the calculation is applied (default 0).
keepdims: boolean
If data is 2d (time in third dimesion) and keepdims is set to True,
calculation is applied to the zeroth axis (time) and returns a 2d array
of calculated statistics. If set to False (default) all values are
collectively assembled before calculation.
Returns
-------
EHI: float
Excessive heat index
"""
def ehi_calc(pdata, thr_95):
if all(np.isnan(pdata)):
print("All data missing/masked!")
ehi = np.nan
else:
# run_mean = moving_average(pdata, 3)
rmean = run_mean(pdata, 3)
ehi = ((rmean > thr_95)).sum()
return ehi
if keepdims:
EHI = np.apply_along_axis(ehi_calc, axis, data, thr_95)
else:
EHI = ehi_calc(data, thr_95)
return EHI
[docs]
def cdd(data, thr=1.0, periods=np.arange(1, 61), maxper=False,
axis=0, keepdims=False):
"""
Calculate the Consecutive Dry Days index (CDD).
Parameters
----------
data: array
1D/2D daily precipitation data array in mm.
thr: float
Value of threshold to define dry day. Default 1 mm.
periods: list/array
Array of lenghts of dry periods to consider; e.g.
[1, 3, 10, 14, 21, 30] computes frequency of dry periods with lengths
1-3 days, 3-10 days, etc. Leftmost interval edge is included, not the
right. Default periods is set to 60 days with 1 day increment.
maxper: boolean
If set to True the longest CDD period and positioned at last position
in returned array. Default False.
axis: int
Along which axis to calculate cdd. Defaults to 0
keepdims: boolean
If False (default) calculation is performed on all data collectively,
otherwise for each timeseries on each point in 2d space. 'Axis' then
defines along which axis the timeseries are located.
Returns
-------
dlen: list
list with lengths of each dry day event in timeseries
cdd: list/array
1D/2D array with frequencies of cdd intervals. For intervals where non
exists, positions are set to NaN. Length of returned array (along
computed 'axis') is equal to length of 'periods' list/array minus 1.
"""
import itertools
def cdd_calc(data1d, thr, lbins):
dim_out = len(periods) if maxper else len(periods) - 1
if all(np.isnan(data1d)):
print("All data missing/masked!")
cdd_out = np.repeat(np.nan, dim_out)
else:
cdd = [list(x[1]) for x in itertools.groupby(
data1d, lambda x: x > thr) if not x[0]]
cdd_len = [len(f) for f in cdd]
cdd_max = np.max(cdd_len)
# Sort lengths into bins and return counts per bin
cdd_out = np.histogram(cdd_len, lbins)[0]
# binned = np.histogram(cdd_len, lbins)[0]
# cdd_out = np.array([v if v != 0 else np.nan for v in binned])
if maxper:
cdd_out = np.hstack((cdd_out, cdd_max))
return cdd_out
if keepdims:
CDD = np.apply_along_axis(cdd_calc, axis, data, thr, lbins=periods)
else:
CDD = cdd_calc(data, thr, lbins=periods)
return CDD
[docs]
def Rxx(data, thr=1.0, axis=0, normalize=False, keepdims=False):
"""
Rxx mm, count of any time units (days, hours, etc) when precipitation ≥
xx mm: Let RRij be the precipitation amount on time unit i in period j.
Count the number of days where RRij ≥ xx mm.
Parameters
----------
data: array
1D/2D data array, with time steps on the zeroth axis (axis=0).
thr: float/int
Threshold to be used; eg 10 for R10, 20 R20 etc. Default 1.0.
axis: int
Along which axis to calculate Rxx. Defaults to 0
normalize: boolean
If True (default False) the counts are normalized by total number of
time units in each array/grid point. Returned values will then be
fractions.
keepdims: boolean
If False (default) calculation is performed on all data collectively,
otherwise for each timeseries on each point in 2d space. 'Axis' then
defines along which axis the timeseries are located.
Returns
-------
Rxx: list/array
1D/2D array with calculated Rxx indices.
"""
def rxx_calc(pdata, thr):
# Flatten data to one dimension
if isinstance(pdata, np.ma.MaskedArray):
data1d = pdata.compressed()
elif isinstance(pdata, (list, tuple)):
data1d = np.array(pdata)
else:
data1d = pdata.ravel()
if all(np.isnan(data1d)):
print("All data missing/masked!")
Rxx = np.nan
else:
if any(np.isnan(data1d)):
data1d = data1d[~np.isnan(data1d)]
Rxx = (data1d > thr).sum()
if normalize:
Rxx /= data1d.size
return Rxx
if keepdims:
RXX = np.apply_along_axis(rxx_calc, axis, data, thr=thr)
else:
RXX = rxx_calc(data, thr=thr)
return RXX
[docs]
def RRpX(data, percentile, thr=None, axis=0, keepdims=False):
"""
RRpX mm, total amount of precipitation above the percentile threshold pX;
RR ≥ pX mm: Let RRij be the daily precipitation amount on day i in period
j. Sum the precipitation for all days where RRij ≥ pX mm.
Parameters
----------
data: array
1D/2D data array, with time steps on the zeroth axis (axis=0).
percentile: int
Percentile that defines the threshold.
thr: float/int
Pre-thresholding of data to do calculation for wet days/hours only.
keepdims: boolean
If False (default) calculation is performed on all data collectively,
otherwise for each timeseries on each point in 2d space. 'Axis' then
defines along which axis the timeseries are located.
Returns
-------
RRpx: list/array
1D/2D array with calculated RRpXX indices.
"""
def rpxx_calc(pdata, pctl, thr):
# Flatten data to one dimension
if isinstance(pdata, np.ma.MaskedArray):
data1d = pdata.compressed()
elif isinstance(pdata, (list, tuple)):
data1d = np.array(pdata)
else:
data1d = pdata.ravel()
if all(np.isnan(data1d)):
print("All data missing/masked!")
Rpxx = np.nan
else:
if thr is not None:
data1d[data1d < thr] = np.nan
if all(np.isnan(data1d)):
print("All data missing/masked!")
Rpxx = np.nan
else:
if any(np.isnan(data1d)):
data1d = data1d[~np.isnan(data1d)]
pctl_val = np.percentile(data1d, pctl)
Rpxx = data1d[data1d >= pctl_val].sum()
return Rpxx
if keepdims:
RRpx = np.apply_along_axis(rpxx_calc, axis, data, percentile, thr)
else:
RRpx = rpxx_calc(data, pctl=percentile, thr=thr)
return RRpx
[docs]
def RRtX(data, thr, axis=0, keepdims=False):
"""
RRtX mm, total amount of precipitation above the threshold 'thr'.
Parameters
----------
data: array
1D/2D data array, with time steps on the zeroth axis (axis=0).
thr: int
Threshold that defines the threshold above which data is summed.
keepdims: boolean
If False (default) calculation is performed on all data collectively,
otherwise for each timeseries on each point in 2d space. 'Axis' then
defines along which axis the timeseries are located.
Returns
-------
RRtx: list/array
1D/2D array with calculated RRtXX indices.
"""
def rtxx_calc(pdata, thr):
# Flatten data to one dimension
if isinstance(pdata, np.ma.MaskedArray):
data1d = pdata.compressed()
elif isinstance(pdata, (list, tuple)):
data1d = np.array(pdata)
else:
data1d = pdata.ravel()
if all(np.isnan(data1d)):
print("All data missing/masked!")
Rtxx = np.nan
else:
if any(np.isnan(data1d)):
data1d = data1d[~np.isnan(data1d)]
Rtxx = data1d[data1d >= thr].sum()
return Rtxx
if keepdims:
RRtx = np.apply_along_axis(rtxx_calc, axis, data, thr)
else:
RRtx = rtxx_calc(data, thr=thr)
return RRtx
[docs]
def SDII(data, thr=1.0, axis=0, keepdims=False):
"""
SDII, Simple pricipitation intensity index: Let RRwj be the daily
precipitation amount on wet days, w (RR ≥ 1mm) in period j.
Parameters
----------
data: list/array
2D array.
thr: float/int
threshold for wet events (wet days/hours etc)
axis: int
The axis along which the calculation is applied (default 0).
keepdims: boolean
If data is 2d (time in third dimesion) and keepdims is set to True,
calculation is applied to the zeroth axis (time) and returns a 2d array
of freq-int dists. If set to False (default) all values are
collectively assembled before calculation.
"""
def sdii_calc(pdata, thr):
# Flatten data to one dimension
if isinstance(pdata, np.ma.MaskedArray):
data1d = pdata.compressed()
elif isinstance(pdata, (list, tuple)):
data1d = np.array(pdata)
else:
data1d = pdata.ravel()
if all(np.isnan(data1d)):
print("All data missing/masked!")
sdii = np.nan
else:
if any(np.isnan(data1d)):
data1d = data1d[~np.isnan(data1d)]
sdii = data1d[data1d >= thr].sum()/data1d[data1d >= thr].size
return sdii
if keepdims:
SDII = np.apply_along_axis(sdii_calc, axis, data, thr)
else:
SDII = sdii_calc(data, thr)
return SDII