Source code for rcatool.stats.sal

"""
SAL module
---------------
Functions for calculation of SAL statistics.

Based on Wernli et al 2008
http://journals.ametsoc.org/doi/abs/10.1175/2008MWR2415.1

Created: Spring 2018
Authors: Petter Lind & David Lindstedt
"""

# Global modules
import sys
import xarray as xa
import numpy as np
from skimage.measure import label, regionprops, find_contours
from skimage.morphology import remove_small_objects
import multiprocessing as mp


[docs] def A_stat(data, refdata): """ Calculate the amplitude component (A). Parameters ---------- data, refdata: arrays 2D data arrays to be compared, where refdata is the reference data e.g. observations. Returns ------- A: float The calculated amplitude component """ if isinstance(data, np.ma.MaskedArray): data_mean = np.ma.mean(data) else: data_mean = np.nanmean(data) if isinstance(refdata, np.ma.MaskedArray): refdata_mean = np.ma.mean(refdata) else: refdata_mean = np.nanmean(refdata) A = (data_mean - refdata_mean) / (0.5 * (data_mean + refdata_mean)) return A
[docs] def S_stat(data, data_label, refdata, refdata_label, obj_prop=True, lsmask=None): """ Function to calculate the structure component (S). The basic idea is to compare the volume of the normalized precipitation objects. This property captures information of the geometrical characteristics (size and shape) and how they differ between model (M) and reference (O). Parameters ---------- data, refdata: arrays 2D data arrays to be compared, where refdata is the reference data e.g. observations. data_label, refdata_label: arrays Arrays with labeled objects. Returned from the label() function. obj_prop: boolean If True, individual object (rain fall area) properties are calculated and returned. lsmask: array Land/sea mask (2d boolean array) to characterize identified objects as land, ocean or coastal objects. Returns ------- S: float The calculated structure component. area_props/ref_area_props: dictionary Dictionary containing properties of identified objects in data and refdata respectively. """ def get_object_properties(darray, lbl, area_prop): # Properties of identified rain areas props = regionprops(lbl, darray) # Number of rain areas nobj = len(props) if area_prop: obj_cm_x = np.zeros(nobj) obj_cm_y = np.zeros(nobj) obj_areas = np.zeros(nobj) obj_mxint = np.zeros(nobj) obj_int_ratio = np.zeros(nobj) obj_major_ax = np.zeros(nobj) obj_minor_ax = np.zeros(nobj) obj_extent = np.zeros(nobj) if lsmask is not None: obj_ident = np.zeros(nobj) else: obj_dict = None data_Rn = np.zeros(nobj) data_RnVn = np.zeros(nobj) # Loop over objects for i, obj in enumerate(props): obj_data = np.array([darray[tuple(coord)] for coord in obj.coords]) Rn = obj_data.sum() # Object mean value Rmax = obj.max_intensity # Object max value data_Rn[i] = Rn data_RnVn[i] = (Rn * Rn) / Rmax if area_prop: obj_cm_y[i] = obj.centroid[0] obj_cm_x[i] = obj.centroid[1] obj_areas[i] = obj.area obj_mxint[i] = obj.max_intensity obj_int_ratio[i] = obj.max_intensity/obj.mean_intensity obj_major_ax[i] = obj.major_axis_length obj_minor_ax[i] = obj.minor_axis_length obj_extent[i] = obj.extent if lsmask is not None: assert lsmask.shape == darray.shape,\ """--- ERROR ---\n Land sea mask must have same """ """dimension as input data! Now {} vs {}""".format( lsmask.shape, darray.shape) objmask = lbl == obj.label ocean_pts = lsmask[objmask].sum() ratio = ocean_pts/obj.area if ratio >= 0.90: objid = 0 elif ratio <= 0.10: objid = 1 else: objid = 2 obj_ident[i] = objid if area_prop: obj_dict = {'cm_x': obj_cm_x, 'cm_y': obj_cm_y, 'size': obj_areas, 'extent': obj_extent, 'major axis': obj_major_ax, 'minor axis': obj_minor_ax, 'max intensity': obj_mxint, 'intensity ratio': obj_int_ratio} if lsmask is not None: obj_dict.update({'obj_id': obj_ident}) # Calculate the weighted mean of all objects' scaled volume data_V = np.nansum(data_RnVn) / np.nansum(data_Rn) return data_V, obj_dict # Get object properties data_vol, area_props = get_object_properties(data, data_label, obj_prop) refdata_vol, ref_area_props = get_object_properties(refdata, refdata_label, obj_prop) # S component S = (data_vol - refdata_vol) / (0.5 * (data_vol + refdata_vol)) return S, {'data': area_props, 'refdata': ref_area_props}
[docs] def L_stat(data, data_label, refdata, refdata_label): """ Function to determine the location component (L). It consists of two components, L1 and L2. L1: measures the normalized distance between the centers of mass of the modelled and observed fields. L2: The second considers the averaged distance between the center of mass of the total field and individual field objects. Parameters ---------- data, refdata: arrays 2D data arrays to be compared, where refdata is the reference data e.g. observations. data_label, refdata_label: arrays Arrays with labeled objects. Returned from the label() function. Returns ------- L1, L2, L: dictionary Dictionary with the calculated location components L1 and L2 as well as its composite L (L1 + L2). """ from scipy.ndimage.measurements import center_of_mass from scipy.spatial.distance import pdist def calc_L2(darray, lbl, data_CM): # Properties of identified rain areas props = regionprops(lbl, darray) # Number of rain areas nobj = len(props) obj_Rn = np.zeros(nobj) obj_CM = np.zeros(nobj) for i, obj in enumerate(props): obj_data = np.array([darray[tuple(coord)] for coord in obj.coords]) Rn = obj_data.sum() # Object mean value R_CM = distfunc(center_of_mass(darray, lbl, obj.label)) obj_Rn[i] = Rn obj_CM[i] = R_CM R = np.nansum((obj_Rn * np.abs(obj_CM - data_CM))) / np.nansum(obj_Rn) return R # Center of mass of modelled total field: X=1/R * sum( Rn * dn ), # where R is domain total field, Rn object total field and # dn object distance to reference point. # Center of mass for total field data_CM = distfunc(center_of_mass(data, data_label)) refdata_CM = distfunc(center_of_mass(refdata, refdata_label)) # Find maximum distance in domain fld_mask = data.mask if isinstance(data, np.ma.MaskedArray) \ else ~np.isnan(data) poly = find_contours(fld_mask, 0.9) if poly: bounds = poly[0].astype(int) distances = pdist(bounds) dmax = distances.max() else: wrn_msg = """WARNING -- No domain data mask found, using the whole input array to define maximum distance 'dmax'.""" print("\n{}\n".format(wrn_msg)) dmax = distfunc(fld_mask.shape) L1 = np.abs(data_CM - refdata_CM) / dmax # Calculate L2 component R_data = calc_L2(data, data_label, data_CM) R_refdata = calc_L2(refdata, refdata_label, refdata_CM) L2 = 2 * (np.abs(R_data - R_refdata) / dmax) # Add L1 and L2 L = L1 + L2 return {'L': L, 'L1': L1, 'L2': L2}
[docs] def threshold(data, thr_type, value): """ Function to calculate the threshold to be used to identify objects. Parameters ---------- data: array 2D data array. thr_type: string Type of threshold. Can be either "S" for specified (any absolute value), "F" for a fraction (between 0 and 1) of the maximum value, and "P" for a percentile (95 for the 95th percentile etc). value: int/float The corresponding value based on the chosen threhold type. """ if (thr_type == "S"): T = value elif (thr_type == "F"): if isinstance(data, np.ma.MaskedArray): T = value * np.ma.max(data) else: T = value * np.nanmax(data) elif (thr_type == "P"): if isinstance(data, np.ma.MaskedArray): T = np.percentile(data.compressed(), value) else: T = np.percentile(data.ravel(), value) # print("Threshold: {}{} --> value: {}".format(thr_type, value, T)) return T
[docs] def distfunc(x): """ Calculate distances """ return np.sqrt(x[0]**2 + x[1]**2)
[docs] def remove_large_objects(segments, max_size): """ Remove large objects based on the maximum size limit defined by 'max_size'. Parameters ---------- segments: array Array with labeled objects. Returned from the label() function. max_size: int Maximum size (number of grid points) Returns ------- out: array The segments array with too large objects removed. """ out = np.copy(segments) component_sizes = np.bincount(segments.ravel()) too_large = component_sizes > max_size too_large_mask = too_large[segments] out[too_large_mask] = 0 return out
[docs] def sal_calc(tstep, data, refdata, thr_t, thr_v, obj_prop=True, olsl=None, ousl=None, smlvl=None, land_sea_mask=None): """ Perform the SAL calculation using the S, A, L functions. """ import convolve import warnings print("\nCalculating SAL stats for time step: {}\n".format(tstep+1)) # Smoothening of data if smlvl is not None: kernel = convolve.kernel_gen(smlvl) indata = convolve.convolve2Dfunc(data, kernel, fft=False) inrefdata = convolve.convolve2Dfunc(refdata, kernel, fft=False) else: indata = data inrefdata = refdata # Calculate threshold thr_data = threshold(indata, thr_t, thr_v) thr_rdata = threshold(inrefdata, thr_t, thr_v) # ---- Identify rain areas ---- # mask = np.ones(data.shape) with warnings.catch_warnings(): warnings.simplefilter("ignore") data_mask = np.logical_and(mask, indata > thr_data) refdata_mask = np.logical_and(mask, inrefdata > thr_rdata) # Identify separate rain areas data_lbl = label(data_mask) refdata_lbl = label(refdata_mask) if olsl is not None: data_lbl = remove_small_objects(data_lbl, olsl) refdata_lbl = remove_small_objects(refdata_lbl, olsl) if ousl is not None: data_lbl = remove_large_objects(data_lbl, ousl) refdata_lbl = remove_large_objects(refdata_lbl, ousl) if np.all((np.any(data_lbl.astype(bool)), np.any(refdata_lbl.astype(bool)))): A = A_stat(indata, inrefdata) S, obj_prop_dict = S_stat(indata, data_lbl, inrefdata, refdata_lbl, obj_prop=obj_prop, lsmask=land_sea_mask) L = L_stat(indata, data_lbl, inrefdata, refdata_lbl) if obj_prop: sal_dict = {'S': S, 'A': A, 'L': L, 'obj props': obj_prop_dict} else: sal_dict = {'S': S, 'A': A, 'L': L} else: print('Time step has no objects in either or both data sets ...') print('Moving on ...') sal_dict = None return tstep, sal_dict
[docs] def write_to_disk(ddict, nt, fname, attrs): if fname is None: from datetime import datetime dtime = datetime.now().strftime('%Y-%m-%dT%H%M%S') fn = './sal_analysis_' + dtime + '.nc' else: fn = fname # L component L, L1, L2 = zip(*[tuple(ld.values()) for ld in ddict['L']]) # Object properties prop_dict = {} for dname in ('data', 'refdata'): prop_data = zip(*[tuple(ld[dname].values()) for ld in ddict['obj props']]) prop_dict[dname] =\ np.array([[item for sublist in ll for item in sublist] for ll in prop_data]) # Coordinates prop_names = list(ddict['obj props'][0]['data'].keys()) data_nobj = np.arange(prop_dict['data'].shape[1]) refdata_nobj = np.arange(prop_dict['refdata'].shape[1]) # Extract object identity ds = xa.Dataset({ "Structure_component": (['t'], ddict['S']), "Amplitude_component": (['t'], ddict['A']), "Location_total_component": (['t'], list(L)), "Location_L1_component": (['t'], list(L1)), "Location_L2_component": (['t'], list(L2)), "data_area_props": (['p', 'dnobj'], prop_dict['data']), "refdata_area_props": (['p', 'rdnobj'], prop_dict['refdata'])}, coords={"tsteps": (['t'], np.arange(1, nt+1)), "object_properties": (['p'], prop_names), "N_objects_data": (['dnobj'], data_nobj), "N_objects_refdata": (['rdnobj'], refdata_nobj)}) ds.attrs = attrs obj_id_comment = """Identified objects may be interpreted as land, """ """ocean or coastal objects, if a land-sea mask has been provided """ """to the analysis. These identifications are represented by an """ """integer: ocean (0), land (1), coastal (2).""" ds.attrs['Comment'] = obj_id_comment # Write to disk ds.to_netcdf(fn)
[docs] def run_sal_analysis(data, refdata, thr_type, thr_val, obj_prop=True, obj_lower_size_limit=None, obj_upper_size_limit=None, smoothening_data_level=None, land_sea_mask=None, write_to_file=False, filename=None, nproc=1): """ Run the SAL analysis on the two data sets 'data' and 'refdata', where the latter is supposed to represent the 'truth'. Parameters ---------- data/refdata: arrays 2D data arrays with zeroth dimension representing time steps. Both data sets must have the same dimension sizes, i.e. both in time and space. thr_type: string Type of threshold to use. See 'threshold' function for more information. thr_val: float/int Value of threshold. obj_prop: boolean If True (default), a number of object area properties are returned for each of the identified objects. See 'S_stat' function for more information. obj_lower_size_limit: int If set, all objects with an area (number of connected grid points) lower than the value set is removed from analysis. obj_upper_size_limit: int If set, all objects with an area (number of connected grid points) greater than the value set is removed from analysis. smoothening_data_level: int If set, the number represents the # of grid points of the side of a moving window used to smooth the data arrays. Mean value within window is calculated. land_sea_mask: array/None If set, land_sea_mask must be a 2d boolean array with same dimension as input data. The land/sea-mask is then used to identify objects as either land (1), ocean (0) or coastal (2) in the object properties dictionary. Thus, mask only used if obj_prop=True. N.B. Mask must have True for ocean points and False for land points. write_to_file: boolean Whether to write results to disk. filename: str Name of file for writing to disk. nproc: int Number of processors to use in calculation. If larger than 1 (default), the multiprocessing module is used to distribute the calculation in the time dimension. Returns ------- out_dict: dictionary Dictionary with calculated SAL statistics and area properties (if obj_prop is set to True). nc: file If 'write_to_file' is True, results are written to disk in a netcdf file. """ from collections import defaultdict err_msg = """\n#--- ERROR ! ---#\nData must be numpy array or numpy masked array.""" assert isinstance(data, (np.ndarray, np.ma.MaskedArray)), err_msg err_msg = """\n#--- ERROR ! ---#\nReference data must be numpy array or numpy masked array.""" assert isinstance(refdata, (np.ndarray, np.ma.MaskedArray)), err_msg if np.squeeze(data).ndim > 2: nt = data.shape[0] else: nt = 1 if data.ndim == 2: data = np.expand_dims(data, 0) if refdata.ndim == 2: refdata = np.expand_dims(refdata, 0) if nproc > 1: nr_procs_set = np.minimum(nproc, mp.cpu_count()) pool = mp.Pool(processes=nr_procs_set) computations = [pool.apply_async(sal_calc, args=(t, data[t, :], refdata[t, :], thr_type, thr_val, obj_prop, obj_lower_size_limit, obj_upper_size_limit, smoothening_data_level, land_sea_mask)) for t in range(nt)] pool.close() pool.join() outp = [k.get() for k in computations] outp.sort() else: outp = [sal_calc(t, data[t, :], refdata[t, :], thr_type, thr_val, obj_prop, obj_lower_size_limit, obj_upper_size_limit, smoothening_data_level, land_sea_mask) for t in range(nt)] if nt > 1: out_dict = defaultdict(list) no_data = 0 for i, d in outp: if d is not None: for key, value in d.items(): out_dict[key].append(value) else: no_data += 1 nt = nt - no_data else: out_dict = outp[0][1] if out_dict is None: print("No precip data detected for analysis") sys.exit() if write_to_file: attrs = { "threshold type": thr_type, "threshold value": thr_val, "object size lower limit": obj_lower_size_limit if obj_lower_size_limit is not None else "-", "object size upper limit": obj_upper_size_limit if obj_upper_size_limit is not None else "-", "smoothening level (# grid points)": smoothening_data_level**2 if smoothening_data_level is not None else "-", } write_to_disk(out_dict, nt, filename, attrs) return out_dict