Source code for rcatool.stats.convolve

#!/usr/bin/env python
#
#
# Functions for convolution of arrays ##
#
# By: Petter Lind
#     2014-04-11
#
#
#

# Import necessary modules
import numpy as np
from scipy import signal

"""
This module includes functions to perform convolution, for example
image smoothing, partly using scipy's convolution routines.
"""


[docs] def kernel_gen(n, ktype='square', kfun='mean'): """ Function to create a kernel, i.e. a moving window (box or disk) with side/radius equal to 'n'. Parameters ---------- n: int Side/radius of square/disk of smoothening window. ktype: string The type of box; 'square' (default) or 'disk'. kfun: string The function 'kfun' applied to each sub-sample within the moving window. Either 'mean' (default) or 'sum'. """ if (ktype == 'square'): if (kfun == 'mean'): kernel = np.ones(shape=(n, n)) * (1./n**2) if (kfun == 'sum'): kernel = np.ones(shape=(n, n)) elif (ktype == 'disk'): if (kfun == 'mean'): y, x = np.ogrid[-n: n+1, -n: n+1] disk = x**2+y**2 <= n**2 disk = np.where(disk, 1.0, 0.0) kernel = disk/n**2 return kernel
[docs] def filtering(data, wgts, mode='valid', dim=1, axis=None, fft=False, fftn=np.fft.fftn, ifftn=np.fft.ifftn): """ 1D and 2D filtering procedures. Filters input data, both 1D and 2D, with user defined weights. Set fft to True for fast fourier transform to speed things up when data is large. Parameters ---------- data: array Data to be filtered. wgts: array/list The weights (kernel) to be used in the filtering. mode: str String indicating the size of output (see https://docs.scipy.org/doc/scipy/reference/signal.html) dim: int If 1 one-dimensional filtering is performed and if 'axis' is also set, 1D-filtering is applied along this axis. If dim=2 two-dimensional filtering is applied. fft: boolean Set True to use fast fourier transform in the 2D filtering. Returns ------- data_conv: array Convoluted data """ if isinstance(data, np.ma.MaskedArray): mask = data.mask # Set masked values to zero indata = np.ma.filled(data, 0) else: indata = np.copy(data) if dim == 1: # One-dimensional filtering if axis is None: data_conv = signal.convolve(indata, wgts, mode=mode) else: data_conv = np.apply_along_axis(signal.convolve, axis, indata, wgts, mode=mode) elif dim == 2: # Two-dimensional filtering if fft: data_conv = convolve_fft(indata, wgts, fftn=fftn, ifftn=ifftn) else: data_conv = signal.convolve2d(indata, wgts, mode=mode, boundary='symm') # Set values in mask to NaN (if masked array) if isinstance(data, np.ma.MaskedArray): data_conv[mask] = np.nan return data_conv
[docs] def lanczos_filter(window, cutoff, cutoff_2=None, ftype='lowpass'): """ Calculate weights for a low pass Lanczos filter. Parameters ---------- window: int The length of the filter window. cutoff: float The cutoff frequency in inverse time steps. cutoff_2: float The second cutoff frequency in inverse time steps. Only used if ftype is 'bandpass' ftype: str The type of cutoff filtering: 'lowpass', 'highpass' or 'bandpass'. Returns ------- wgts: vector Array with calculated weights. """ def _low_pass_filter(win, cut): order = ((win - 1) // 2) + 1 nwts = 2 * order + 1 w = np.zeros([nwts]) n = nwts // 2 w[n] = 2 * cut k = np.arange(1., n) sigma = np.sin(np.pi * k / n) * n / (np.pi * k) firstfactor = np.sin(2. * np.pi * cut * k) / (np.pi * k) w[n-1:0:-1] = firstfactor * sigma w[n+1:-1] = firstfactor * sigma return w[1:-1] if ftype == 'lowpass': wgts_out = _low_pass_filter(window, cutoff) elif ftype == 'highpass': wgts_out = (-1) * _low_pass_filter(window, cutoff) elif ftype == 'bandpass': assert cutoff_2 is not None, "cutoff_2 must be set for ftype bandpass" wgts1 = _low_pass_filter(window, cutoff) wgts2 = _low_pass_filter(window, cutoff_2) wgts_out = wgts2 - wgts1 return wgts_out
[docs] def fft_prep(array, kernel, fill_value, boundary='fill', psf_pad=False, fft_pad=True): """ Prepare data array and kernel for fft computation. Parameters ---------- boundary : {'fill', 'wrap'}, optional A flag indicating how to handle boundaries: * 'fill': set values outside the array boundary to fill_value (default) * 'wrap': periodic boundary fft_pad : bool, optional Default on. Zero-pad image to the nearest 2^n psf_pad : bool, optional Default off. Zero-pad image to be at least the sum of the image sizes (in order to avoid edge-wrapping when smoothing) """ arrayshape = array.shape kernshape = kernel.shape # mask catching - masks must be turned into NaNs for use later if np.ma.is_masked(array): mask = array.mask array = np.array(array) array[mask] = np.nan if np.ma.is_masked(kernel): mask = kernel.mask kernel = np.array(kernel) kernel[mask] = np.nan # NaN and inf catching nanmaskarray = np.isnan(array) + np.isinf(array) array[nanmaskarray] = 0 nanmaskkernel = np.isnan(kernel) + np.isinf(kernel) kernel[nanmaskkernel] = 0 if boundary is None: psf_pad = True elif boundary == 'fill': # create a boundary region at least as large as the kernel psf_pad = True elif boundary == 'wrap': psf_pad = False fft_pad = False fill_value = 0 # force zero; it should not be used elif boundary == 'extend': raise NotImplementedError("The 'extend' option is not implemented " "for fft-based convolution") # find ideal size (power of 2) for fft. # Can add shapes because they are tuples if fft_pad: # default=True if psf_pad: # default=False # add the dimensions and then take the max (bigger) fsize = 2 ** np.ceil(np.log2( np.max(np.array(arrayshape) + np.array(kernshape)))) else: # add the shape lists (max of a list of length 4) (smaller) # also makes the shapes square fsize = 2 ** np.ceil(np.log2(np.max(arrayshape + kernshape))) newshape = np.array([fsize for ii in range(array.ndim)], dtype=int) else: if psf_pad: # just add the biggest dimensions newshape = np.array(arrayshape) + np.array(kernshape) else: newshape = np.array([np.max([imsh, kernsh]) for imsh, kernsh in zip(arrayshape, kernshape)]) # separate each dimension by the padding size... this is to determine the # appropriate slice size to get back to the input dimensions arrayslices = [] kernslices = [] for ii, (newdimsize, arraydimsize, kerndimsize)\ in enumerate(zip(newshape, arrayshape, kernshape)): center = newdimsize - (newdimsize + 1) // 2 arrayslices += [slice(center - arraydimsize // 2, center + (arraydimsize + 1) // 2)] kernslices += [slice(center - kerndimsize // 2, center + (kerndimsize + 1) // 2)] if not np.all(newshape == arrayshape): bigarray = np.ones(newshape, dtype=np.complex) * fill_value bigarray[arrayslices] = array else: bigarray = array if not np.all(newshape == kernshape): bigkernel = np.zeros(newshape, dtype=np.complex) bigkernel[kernslices] = kernel else: bigkernel = kernel return bigarray, bigkernel, arrayslices,\ kernslices, newshape, nanmaskarray, nanmaskkernel
[docs] def convolve_fft(array, kernel, boundary='fill', fill_value=0, crop=True, return_fft=False, fft_pad=True, psf_pad=False, interpolate_nan=False, quiet=False, ignore_edge_zeros=False, min_wt=0.0, normalize_kernel=False, allow_huge=True, fftn=np.fft.fftn, ifftn=np.fft.ifftn): """ Convolve an ndarray with an nd-kernel. Returns a convolved image with shape = array.shape. Assumes kernel is centered. `convolve_fft` differs from `scipy.signal.fftconvolve` in a few ways: * It can treat ``NaN`` values as zeros or interpolate over them. * ``inf`` values are treated as ``NaN`` * (optionally) It pads to the nearest 2^n size to improve FFT speed. * Its only valid ``mode`` is 'same' (i.e. the same shape array is returned) * It lets you use your own fft, e.g., `pyFFTW <http://pypi.python.org/pypi/pyFFTW>` or `pyFFTW3 <http://pypi.python.org/pypi/PyFFTW3/0.2.1>`, which can lead to performance improvements, depending on your system configuration. pyFFTW3 is threaded, and therefore may yield significant performance benefits on multi-core machines at the cost of greater memory requirements. Specify the ``fftn`` and ``ifftn`` keywords to override the default, which is `numpy.fft.fft` and `numpy.fft.ifft`. Parameters ---------- array : `numpy.ndarray` Array to be convolved with ``kernel`` kernel : `numpy.ndarray` Will be normalized if ``normalize_kernel`` is set. Assumed to be centered (i.e., shifts may result if your kernel is asymmetric) boundary : {'fill', 'wrap'}, optional A flag indicating how to handle boundaries: * 'fill': set values outside the array boundary to fill_value (default) * 'wrap': periodic boundary interpolate_nan : bool, optional The convolution will be re-weighted assuming ``NaN`` values are meant to be ignored, not treated as zero. If this is off, all ``NaN`` values will be treated as zero. ignore_edge_zeros : bool, optional Ignore the zero-pad-created zeros. This will effectively decrease the kernel area on the edges but will not re-normalize the kernel. This parameter may result in 'edge-brightening' effects if you're using a normalized kernel min_wt : float, optional If ignoring ``NaN`` / zeros, force all grid points with a weight less than this value to ``NaN`` (the weight of a grid point with *no* ignored neighbors is 1.0). If ``min_wt`` is zero, then all zero-weight points will be set to zero instead of ``NaN`` (which they would be otherwise, because 1/0 = nan). See the examples below normalize_kernel : function or boolean, optional If specified, this is the function to divide kernel by to normalize it. e.g., ``normalize_kernel=np.sum`` means that kernel will be modified to be: ``kernel = kernel / np.sum(kernel)``. If True, defaults to ``normalize_kernel = np.sum``. Other Parameters ---------------- fft_pad : bool, optional Default on. Zero-pad image to the nearest 2^n psf_pad : bool, optional Default off. Zero-pad image to be at least the sum of the image sizes (in order to avoid edge-wrapping when smoothing) crop : bool, optional Default on. Return an image of the size of the largest input image. If the images are asymmetric in opposite directions, will return the largest image in both directions. For example, if an input image has shape [100,3] but a kernel with shape [6,6] is used, the output will be [100,6]. return_fft : bool, optional Return the fft(image)*fft(kernel) instead of the convolution (which is ifft(fft(image)*fft(kernel))). Useful for making PSDs. fftn, ifftn : functions, optional The fft and inverse fft functions. Can be overridden to use your own ffts, e.g. an fftw3 wrapper or scipy's fftn, e.g. ``fftn=scipy.fftpack.fftn`` complex_dtype : np.complex, optional Which complex dtype to use. `numpy` has a range of options, from 64 to 256. quiet : bool, optional Silence warning message about NaN interpolation allow_huge : bool, optional Allow huge arrays in the FFT? If False, will raise an exception if the array or kernel size is >1 GB Raises ------ ValueError: If the array is bigger than 1 GB after padding, will raise this exception unless allow_huge is True See Also -------- convolve : Convolve is a non-fft version of this code. It is more memory efficient and for small kernels can be faster. Returns ------- default : ndarray **array** convolved with ``kernel``. If ``return_fft`` is set, returns fft(**array**) * fft(``kernel``). If crop is not set, returns the image, but with the fft-padded size instead of the input size """ array = np.asarray(array, dtype=np.complex) kernel = np.asarray(kernel, dtype=np.complex) # Check that the number of dimensions is compatible if array.ndim != kernel.ndim: raise ValueError("Image and kernel must have same number of " "dimensions") arrayshape = array.shape if not allow_huge: array_size_B = (np.product(arrayshape, dtype=np.int64) * np.dtype(np.complex).itemsize) if array_size_B > 1024**3: raise ValueError("Size Error: Use allow_huge=True to override\ this exception.") bigarray, bigkernel, arrayslices, kernslices,\ newshape, nanmaskarray, nanmaskkernel =\ fft_prep(array, kernel, fill_value, boundary='fill', psf_pad=psf_pad, fft_pad=fft_pad) arrayfft = fftn(bigarray) # need to shift the kernel so that, e.g., [0,0,1,0] -> [1,0,0,0] = unity kernfft = fftn(np.fft.ifftshift(bigkernel)) fftmult = arrayfft * kernfft if (interpolate_nan or ignore_edge_zeros): if ignore_edge_zeros: bigimwt = np.zeros(newshape, dtype=np.complex) else: bigimwt = np.ones(newshape, dtype=np.complex) bigimwt[arrayslices] = 1.0 - nanmaskarray * interpolate_nan wtfft = fftn(bigimwt) # I think this one HAS to be normalized (i.e., the weights can't be # computed with a non-normalized kernel) wtfftmult = wtfft * kernfft / kernel.sum() wtsm = ifftn(wtfftmult) # need to re-zero weights outside of the image (if it is padded, we # still don't weight those regions) bigimwt[arrayslices] = wtsm.real[arrayslices] # curiously, at the floating-point limit, can get slightly negative # numbers they break the min_wt=0 "flag" and must therefore be removed bigimwt[bigimwt < 0] = 0 else: bigimwt = 1 if np.isnan(fftmult).any(): # this check should be unnecessary; call it an insanity check raise ValueError("Encountered NaNs in convolve. This is disallowed.") # restore NaNs in original image (they were modified inplace earlier) # We don't have to worry about masked arrays - if input was masked, it was # copied array[nanmaskarray] = np.nan kernel[nanmaskkernel] = np.nan if return_fft: return fftmult if interpolate_nan or ignore_edge_zeros: rifft = (ifftn(fftmult)) / bigimwt if not np.isscalar(bigimwt): rifft[bigimwt < min_wt] = np.nan if min_wt == 0.0: rifft[bigimwt == 0.0] = 0.0 else: rifft = (ifftn(fftmult)) if crop: result = rifft[arrayslices].real return result else: return rifft.real