Source code for mhkit.dolfyn.binned

import numpy as np
import warnings
from .tools.fft import fft_frequency, psd_1D, cpsd_1D, cpsd_quasisync_1D
from .tools.misc import slice1d_along_axis, detrend_array
from .time import epoch2dt64, dt642epoch

warnings.simplefilter("ignore", RuntimeWarning)


[docs] class TimeBinner: def __init__(self, n_bin, fs, n_fft=None, n_fft_coh=None, noise=[0, 0, 0]): """ Initialize an averaging object Parameters ---------- n_bin : int Number of data points to include in a 'bin' (ensemble), not the number of bins fs : int Instrument sampling frequency in Hz n_fft : int Number of data points to use for fft (`n_fft`<=`n_bin`). Default: `n_fft`=`n_bin` n_fft_coh : int Number of data points to use for coherence and cross-spectra ffts Default: `n_fft_coh`=`n_fft` noise : list or ndarray Instrument's doppler noise in same units as velocity """ self.n_bin = n_bin self.fs = fs self.n_fft = n_fft self.n_fft_coh = n_fft_coh self.noise = noise if n_fft is None: self.n_fft = n_bin elif n_fft > n_bin: self.n_fft = n_bin warnings.warn("n_fft must be smaller than n_bin, setting n_fft = n_bin") if n_fft_coh is None: self.n_fft_coh = int(self.n_fft) elif n_fft_coh > n_bin: self.n_fft_coh = int(n_bin) warnings.warn( "n_fft_coh must be smaller than or equal to n_bin, " "setting n_fft_coh = n_bin" ) def _outshape(self, inshape, n_pad=0, n_bin=None): """ Returns `outshape` (the 'reshape'd shape) for an `inshape` array. """ n_bin = int(self._parse_nbin(n_bin)) return list(inshape[:-1]) + [int(inshape[-1] // n_bin), int(n_bin + n_pad)] def _outshape_fft(self, inshape, n_fft=None, n_bin=None): """ Returns `outshape` (the fft 'reshape'd shape) for an `inshape` array. """ n_fft = self._parse_nfft(n_fft) n_bin = self._parse_nbin(n_bin) return list(inshape[:-1]) + [int(inshape[-1] // n_bin), int(n_fft // 2)] def _parse_fs(self, fs=None): if fs is None: return self.fs return fs def _parse_nbin(self, n_bin=None): if n_bin is None: return self.n_bin return n_bin def _parse_nfft(self, n_fft=None): if n_fft is None: return self.n_fft if n_fft > self.n_bin: n_fft = self.n_bin warnings.warn("n_fft must be smaller than n_bin, setting n_fft = n_bin") return n_fft def _parse_nfft_coh(self, n_fft_coh=None): if n_fft_coh is None: return self.n_fft_coh if n_fft_coh > self.n_bin: n_fft_coh = int(self.n_bin) warnings.warn( "n_fft_coh must be smaller than or equal to n_bin, " "setting n_fft_coh = n_bin" ) return n_fft_coh def _check_ds(self, raw_ds, out_ds): """ Check that the attributes between two datasets match up. Parameters ---------- raw_ds : xarray.Dataset Input dataset out_ds : xarray.Dataset Dataset to append `raw_ds` to. If None is supplied, this dataset is created from `raw_ds`. Returns ------- out_ds : xarray.Dataset """ for v in raw_ds.data_vars: if np.any(np.array(raw_ds[v].shape) == 0): raise RuntimeError(f"{v} cannot be averaged " "because it is empty.") if ( "DutyCycle_NBurst" in raw_ds.attrs and raw_ds.attrs["DutyCycle_NBurst"] < self.n_bin ): warnings.warn( f"The averaging interval (n_bin = {self.n_bin})" "is larger than the burst interval " "(NBurst = {dat.attrs['DutyCycle_NBurst']})" ) if raw_ds.fs != self.fs: raise Exception( f"The input data sample rate ({raw_ds.fs}) does not " "match the sample rate of this binning-object " "({self.fs})" ) if out_ds is None: out_ds = type(raw_ds)() o_attrs = out_ds.attrs props = {} props["fs"] = self.fs props["n_bin"] = self.n_bin props["n_fft"] = self.n_fft props["description"] = ( "Binned averages calculated from " 'ensembles of size "n_bin"' ) props.update(raw_ds.attrs) for ky in props: if ky in o_attrs and o_attrs[ky] != props[ky]: # The values in out_ds must match `props` (raw_ds.attrs, # plus those defined above) raise AttributeError( "The attribute '{}' of `out_ds` is inconsistent " "with this `VelBinner` or the input data (`raw_ds`)".format(ky) ) else: o_attrs[ky] = props[ky] return out_ds def _new_coords(self, array): """ Function for setting up a new xarray.DataArray regardless of how many dimensions the input data-array has """ dims = array.dims dims_list = [] coords_dict = {} if len(array.shape) == 1 & ("dir" in array.coords): array = array.drop_vars("dir") for ky in dims: dims_list.append(ky) if "time" in ky: coords_dict[ky] = self.mean(array.time.values) else: coords_dict[ky] = array.coords[ky].values return dims_list, coords_dict
[docs] def reshape(self, arr, n_pad=0, n_bin=None): """ Reshape the array `arr` to shape (...,n,n_bin+n_pad). Parameters ---------- arr : numpy.ndarray n_pad : int Is used to add `n_pad`/2 points from the end of the previous ensemble to the top of the current, and `n_pad`/2 points from the top of the next ensemble to the bottom of the current. Zeros are padded in the upper-left and lower-right corners of the matrix (beginning/end of timeseries). In this case, the array shape will be (...,`n`,`n_pad`+`n_bin`) n_bin : int Override this binner's n_bin. Default is `binner.n_bin` Returns ------- out : numpy.ndarray Notes ----- `n_bin` can be non-integer, in which case the output array size will be `n_pad`+`n_bin`, and the decimal will cause skipping of some data points in `arr`. In particular, every mod(`n_bin`,1) bins will have a skipped point. For example: - for n_bin=2048.2 every 1/5 bins will have a skipped point. - for n_bin=4096.9 every 9/10 bins will have a skipped point. """ n_bin = self._parse_nbin(n_bin) if arr.shape[-1] < n_bin: raise Exception("n_bin is larger than length of input array") npd0 = int(n_pad // 2) npd1 = int((n_pad + 1) // 2) shp = self._outshape(arr.shape, n_pad=0, n_bin=n_bin) out = np.zeros( self._outshape(arr.shape, n_pad=n_pad, n_bin=n_bin), dtype=arr.dtype ) if np.mod(n_bin, 1) == 0: # n_bin needs to be int n_bin = int(n_bin) # If n_bin is an integer, we can do this simply. out[..., npd0 : n_bin + npd0] = (arr[..., : (shp[-2] * shp[-1])]).reshape( shp, order="C" ) else: inds = (np.arange(np.prod(shp[-2:])) * n_bin // int(n_bin)).astype(int) # If there are too many indices, drop one bin if inds[-1] >= arr.shape[-1]: inds = inds[: -int(n_bin)] shp[-2] -= 1 out = out[..., 1:, :] n_bin = int(n_bin) out[..., npd0 : n_bin + npd0] = (arr[..., inds]).reshape(shp, order="C") n_bin = int(n_bin) if n_pad != 0: out[..., 1:, :npd0] = out[..., :-1, n_bin : n_bin + npd0] out[..., :-1, -npd1:] = out[..., 1:, npd0 : npd0 + npd1] return out
[docs] def detrend(self, arr, axis=-1, n_pad=0, n_bin=None): """ Reshape the array `arr` to shape (...,n,n_bin+n_pad) and remove the best-fit trend line from each bin. Parameters ---------- arr : numpy.ndarray axis : int Axis along which to take mean. Default = -1 n_pad : int Is used to add `n_pad`/2 points from the end of the previous ensemble to the top of the current, and `n_pad`/2 points from the top of the next ensemble to the bottom of the current. Zeros are padded in the upper-left and lower-right corners of the matrix (beginning/end of timeseries). In this case, the array shape will be (...,`n`,`n_pad`+`n_bin`). Default = 0 n_bin : int Override this binner's n_bin. Default is `binner.n_bin` Returns ------- out : numpy.ndarray """ return detrend_array(self.reshape(arr, n_pad=n_pad, n_bin=n_bin), axis=axis)
[docs] def demean(self, arr, axis=-1, n_pad=0, n_bin=None): """ Reshape the array `arr` to shape (...,n,n_bin+n_pad) and remove the mean from each bin. Parameters ---------- arr : numpy.ndarray axis : int Axis along which to take mean. Default = -1 n_pad : int Is used to add `n_pad`/2 points from the end of the previous ensemble to the top of the current, and `n_pad`/2 points from the top of the next ensemble to the bottom of the current. Zeros are padded in the upper-left and lower-right corners of the matrix (beginning/end of timeseries). In this case, the array shape will be (...,`n`,`n_pad`+`n_bin`). Default = 0 n_bin : int Override this binner's n_bin. Default is `binner.n_bin` Returns ------- out : numpy.ndarray """ dt = self.reshape(arr, n_pad=n_pad, n_bin=n_bin) return dt - np.nanmean(dt, axis)[..., None]
[docs] def mean(self, arr, axis=-1, n_bin=None): """ Reshape the array `arr` to shape (...,n,n_bin+n_pad) and take the mean of each bin along the specified `axis`. Parameters ---------- arr : numpy.ndarray axis : int Axis along which to take mean. Default = -1 n_bin : int Override this binner's n_bin. Default is `binner.n_bin` Returns ------- out : numpy.ndarray """ if np.issubdtype(arr.dtype, np.datetime64): return epoch2dt64(self.mean(dt642epoch(arr), axis=axis, n_bin=n_bin)) if axis != -1: arr = np.swapaxes(arr, axis, -1) n_bin = self._parse_nbin(n_bin) tmp = self.reshape(arr, n_bin=n_bin) return np.nanmean(tmp, -1)
[docs] def variance(self, arr, axis=-1, n_bin=None): """ Reshape the array `arr` to shape (...,n,n_bin+n_pad) and take the variance of each bin along the specified `axis`. Parameters ---------- arr : numpy.ndarray axis : int Axis along which to take variance. Default = -1 n_bin : int Override this binner's n_bin. Default is `binner.n_bin` Returns ------- out : numpy.ndarray """ return np.nanvar(self.reshape(arr, n_bin=n_bin), axis=axis, dtype=np.float32)
[docs] def standard_deviation(self, arr, axis=-1, n_bin=None): """ Reshape the array `arr` to shape (...,n,n_bin+n_pad) and take the standard deviation of each bin along the specified `axis`. Parameters ---------- arr : numpy.ndarray axis : int Axis along which to take std dev. Default = -1 n_bin : int Override this binner's n_bin. Default is `binner.n_bin` Returns ------- out : numpy.ndarray """ return np.nanstd(self.reshape(arr, n_bin=n_bin), axis=axis, dtype=np.float32)
def _psd_base( self, dat, fs=None, window="hann", noise=0, n_bin=None, n_fft=None, n_pad=None, step=None, ): """ Calculate power spectral density of `dat` Parameters ---------- dat : xarray.DataArray The raw dataArray of which to calculate the psd. fs : float (optional) The sample rate (Hz). window : str String indicating the window function to use. Default is 'hanning' noise : float The white-noise level of the measurement (in the same units as `dat`). n_bin : int n_bin of veldat2, number of elements per bin if 'None' is taken from VelBinner n_fft : int n_fft of veldat2, number of elements per bin if 'None' is taken from VelBinner n_pad : int (optional) The number of values to pad with zero. Default = 0 step : int (optional) Controls amount of overlap in fft. Default: the step size is chosen to maximize data use, minimize nens, and have a minimum of 50% overlap. Returns ------- out : numpy.ndarray The power spectral density of `dat` Notes ----- PSD's are calculated based on sample rate units """ fs = self._parse_fs(fs) n_bin = self._parse_nbin(n_bin) n_fft = self._parse_nfft(n_fft) if n_pad is None: n_pad = min(n_bin - n_fft, n_fft) out = np.empty(self._outshape_fft(dat.shape, n_fft=n_fft, n_bin=n_bin)) # The data is detrended in psd, so we don't need to do it here. dat = self.reshape(dat, n_pad=n_pad) for slc in slice1d_along_axis(dat.shape, -1): out[slc] = psd_1D(dat[slc], n_fft, fs, window=window, step=step) if np.any(noise): out -= noise**2 / (fs / 2) # Make sure all values of the PSD are >0 (but still small): out[out < 0] = np.min(np.abs(out)) / 100 return out def _csd_base(self, dat1, dat2, fs=None, window="hann", n_fft=None, n_bin=None): """ Calculate the cross power spectral density of `dat`. Parameters ---------- dat1 : numpy.ndarray The first (shorter, if applicable) raw dataArray of which to calculate the cpsd. dat2 : numpy.ndarray The second (the shorter, if applicable) raw dataArray of which to calculate the cpsd. fs : float (optional) The sample rate (Hz). window : str String indicating the window function to use. Default is 'hanning' n_fft : int n_fft of veldat2, number of elements per bin if 'None' is taken from VelBinner n_bin : int n_bin of veldat2, number of elements per bin if 'None' is taken from VelBinner Returns ------- out : numpy.ndarray The cross power spectral density of `dat1` and `dat2` Notes ----- PSD's are calculated based on sample rate units The two velocity inputs do not have to be perfectly synchronized, but they should have the same start and end timestamps """ fs = self._parse_fs(fs) if n_fft is None: n_fft = self.n_fft_coh # want each slice to carry the same timespan n_bin2 = self._parse_nbin(n_bin) # bins for shorter array n_bin1 = int(dat1.shape[-1] / (dat2.shape[-1] / n_bin2)) oshp = self._outshape_fft(dat1.shape, n_fft=n_fft, n_bin=n_bin1) oshp[-2] = np.min([oshp[-2], int(dat2.shape[-1] // n_bin2)]) # The data is detrended in psd, so we don't need to do it here: dat1 = self.reshape(dat1, n_pad=n_fft) dat2 = self.reshape(dat2, n_pad=n_fft) out = np.empty(oshp, dtype="c{}".format(dat1.dtype.itemsize * 2)) if dat1.shape == dat2.shape: cross = cpsd_1D else: cross = cpsd_quasisync_1D for slc in slice1d_along_axis(out.shape, -1): out[slc] = cross(dat1[slc], dat2[slc], n_fft, fs, window=window) return out def _fft_freq(self, fs=None, units="Hz", n_fft=None, coh=False): """ Wrapper to calculate the ordinary or radial frequency vector Parameters ---------- fs : float (optional) The sample rate (Hz). units : string Frequency units in either Hz or rad/s (f or omega) coh : bool Calculate the frequency vector for coherence/cross-spectra (default: False) i.e. use self.n_fft_coh instead of self.n_fft. n_fft : int n_fft of veldat2, number of elements per bin if 'None' is taken from VelBinner Returns ------- out: numpy.ndarray Spectrum frequency array in units of 'Hz' or 'rad/s' """ if n_fft is None: n_fft = self.n_fft if coh: n_fft = self.n_fft_coh fs = self._parse_fs(fs) if ("Hz" not in units) and ("rad" not in units): raise Exception( "Valid fft frequency vector units are Hz \ or rad/s" ) if "rad" in units: return fft_frequency(n_fft, 2 * np.pi * fs) else: return fft_frequency(n_fft, fs)