Source code for mhkit.dolfyn.adv.turbulence

import numpy as np
from ..velocity import VelBinner
import warnings
from ..tools.misc import slice1d_along_axis, _nans_like
from scipy.special import cbrt
import xarray as xr


[docs] class ADVBinner(VelBinner): """ A class that builds upon `VelBinner` for calculating turbulence statistics and velocity spectra from ADV data. Parameters ---------- n_bin : int The length of each bin, in number of points, for this averaging operator. fs : int Instrument sampling frequency in Hz n_fft : int The length of the FFT for computing spectra (must be <= `n_bin`). Optional, default `n_fft` = `n_bin` n_fft_coh : int Number of data points to use for coherence and cross-spectra FFT's. Optional, default `n_fft_coh` = `n_fft` noise : float or array-like Instrument noise level in same units as velocity. Typically found from :func:`doppler_noise_level <mhkit.dolfyn.adv.turbulence.ADVBinner.doppler_noise_level>`. Default: None. """ def __call__(self, ds, freq_units="rad/s", window="hann"): out = type(ds)() out = self.bin_average(ds, out) out["tke_vec"] = self.turbulent_kinetic_energy(ds["vel"]) out["stress_vec"] = self.reynolds_stress(ds["vel"]) out["psd"] = self.power_spectral_density( ds["vel"], window=window, freq_units=freq_units ) for key in list(ds.attrs.keys()): if "config" in key: ds.attrs.pop(key) out.attrs = ds.attrs out.attrs["n_bin"] = self.n_bin out.attrs["n_fft"] = self.n_fft out.attrs["n_fft_coh"] = self.n_fft_coh return out
[docs] def reynolds_stress(self, veldat, detrend=True): """ Calculate the specific Reynolds shear stresses (:math:`\\overline{u'v'}`, :math:`\\overline{u'w'}`, :math:`\\overline{v'w'}`). Parameters ---------- veldat : xr.DataArray A velocity data array. The last dimension is assumed to be time. detrend : bool Detrend the velocity data (True), or simply de-mean it (False), prior to computing stress. Note: the psd routines use detrend, so if you want to have the same amount of variance here as there use `detrend=True`. Default = True Returns ------- out : xarray.DataArray """ if not isinstance(veldat, xr.DataArray): raise TypeError("`veldat` must be an instance of `xarray.DataArray`.") time = self.mean(veldat["time"].values) vel = veldat.values out = np.empty(self._outshape(vel[:3].shape)[:-1], dtype=np.float32) if detrend: vel = self.detrend(vel) else: vel = self.demean(vel) for idx, p in enumerate(self._cross_pairs): out[idx] = np.nanmean(vel[p[0]] * vel[p[1]], -1, dtype=np.float64).astype( np.float32 ) da = xr.DataArray( out.astype("float32"), dims=veldat.dims, attrs={"units": "m2 s-2", "long_name": "Specific Reynolds Stress Vector"}, ) da = da.rename({"dir": "tau"}) da = da.assign_coords({"tau": self.tau, "time": time}) return da
[docs] def cross_spectral_density( self, veldat, freq_units="rad/s", fs=None, window="hann", n_bin=None, n_fft_coh=None, ): """ Calculate the cross-spectral density of velocity components. Parameters ---------- veldat : xarray.DataArray The raw 3D velocity data. freq_units : string Frequency units of the returned spectra in either Hz or rad/s (`f` or :math:`\\omega`) fs : float (optional) The sample rate. Default = `self.fs` window : string or array Specify the window function. Options: 1, None, 'hann', 'hamm' n_bin : int (optional) The bin-size. Default = `self.n_bin` n_fft_coh : int (optional) The fft size. Default = `self.n_fft_coh` Returns ------- csd : xarray.DataArray (3, M, N_FFT) The first-dimension of the cross-spectrum is the three different cross-spectra: :math:`uv`, :math:`uw`, :math:`vw`. """ if not isinstance(veldat, xr.DataArray): raise TypeError("`veldat` must be an instance of `xarray.DataArray`.") if ("rad" not in freq_units) and ("Hz" not in freq_units): raise ValueError("`freq_units` should be one of 'Hz' or 'rad/s'") fs_in = self._parse_fs(fs) n_fft = self._parse_nfft_coh(n_fft_coh) time = self.mean(veldat["time"].values) veldat = veldat.values if len(np.shape(veldat)) != 2: raise Exception( "This function is only valid for calculating TKE using " "the 3D velocity vector from an ADV." ) out = np.empty( self._outshape_fft(veldat[:3].shape, n_fft=n_fft, n_bin=n_bin), dtype="complex", ) # Create frequency vector, also checks whether using f or omega if "rad" in freq_units: fs = 2 * np.pi * fs_in freq_units = "rad s-1" units = "m2 s-1 rad-1" else: fs = fs_in freq_units = "Hz" units = "m2 s-2 Hz-1" coh_freq = xr.DataArray( self._fft_freq(fs=fs_in, units=freq_units, n_fft=n_fft, coh=True), dims=["coh_freq"], name="coh_freq", attrs={ "units": freq_units, "long_name": "FFT Frequency Vector", "coverage_content_type": "coordinate", }, ).astype("float32") for ip, ipair in enumerate(self._cross_pairs): out[ip] = self._csd_base( veldat[ipair[0]], veldat[ipair[1]], fs=fs, window=window, n_bin=n_bin, n_fft=n_fft, ) csd = xr.DataArray( out.astype("complex64"), coords={"C": self.C, "time": time, "coh_freq": coh_freq}, dims=["C", "time", "coh_freq"], attrs={ "units": units, "n_fft_coh": n_fft, "long_name": "Cross Spectral Density", }, ) csd["coh_freq"].attrs["units"] = freq_units return csd
[docs] def doppler_noise_level(self, psd, pct_fN=0.8): """ Calculate bias due to Doppler noise using the noise floor of the velocity spectra. Parameters ---------- psd : xarray.DataArray (dir, time, freq) The ADV power spectral density of velocity (auto-spectra) pct_fN : float Percent of Nyquist frequency to calculate characeristic frequency. Default = 0.8 (80%) Returns ------- doppler_noise (xarray.DataArray): Doppler noise level in units of m/s Notes ----- Approximates bias from .. math:: \\sigma^{2}_{noise} = N * f_{c} where :math:`\\sigma_{noise}` is the bias due to Doppler noise, :math:`N` is the constant variance or spectral density, and :math:`f_{c}` is the characteristic frequency. The characteristic frequency is then found as .. math:: f_{c} = pct_fN * (f_{s}/2) where :math:`f_{s}/2` is the Nyquist frequency. Richard, Jean-Baptiste, et al. "Method for identification of Doppler noise levels in turbulent flow measurements dedicated to tidal energy." International Journal of Marine Energy 3 (2013): 52-64. ThiƩbaut, Maxime, et al. "Investigating the flow dynamics and turbulence at a tidal-stream energy site in a highly energetic estuary." Renewable Energy 195 (2022): 252-262. """ if not isinstance(psd, xr.DataArray): raise TypeError("`psd` must be an instance of `xarray.DataArray`.") if not isinstance(pct_fN, float) or not 0 <= pct_fN <= 1: raise ValueError("`pct_fN` must be a float within the range [0, 1].") # Characteristic frequency set to 80% of Nyquist frequency fN = self.fs / 2 fc = pct_fN * fN # Get units right if psd.freq.units == "Hz": f_range = slice(fc, fN) else: f_range = slice(2 * np.pi * fc, 2 * np.pi * fN) # Noise floor N2 = psd.sel(freq=f_range) * psd.freq.sel(freq=f_range) noise_level = np.sqrt(N2.mean(dim="freq")) return xr.DataArray( noise_level.values.astype("float32"), coords={"S": psd["S"], "time": psd["time"]}, attrs={ "units": "m/s", "long_name": "Doppler Noise Level", "description": "Doppler noise level calculated " "from PSD white noise", }, )
[docs] def check_turbulence_cascade_slope(self, psd, freq_range=[6.28, 12.57]): """ This function calculates the slope of the PSD, the power spectra of velocity, within the given frequency range. The purpose of this function is to check that the region of the PSD containing the isotropic turbulence cascade decreases at a rate of :math:`f^{-5/3}`. Parameters ---------- psd : xarray.DataArray ([time,] freq) The power spectral density (1D or 2D) freq_range : iterable(2) The range over which the isotropic turbulence cascade occurs, in units of the psd frequency vector (Hz or rad/s). Default = [6.28, 12.57] rad/s Returns ------- (m, b): tuple (slope, y-intercept) A tuple containing the coefficients of the log-adjusted linear regression between PSD and frequency Notes ----- Calculates slope based on the `standard` formula for dissipation: .. math:: S(k) = \\alpha \\epsilon^{2/3} k^{-5/3} + N The slope of the isotropic turbulence cascade, which should be equal to :math:`k^{-5/3}` or :math:`f^{-5/3}`, where k and f are the wavenumber and frequency vectors, is estimated using linear regression with a log transformation: .. math:: log10(y) = m*log10(x) + b Which is equivalent to .. math:: y = 10^{b} x^{m} Where :math:`y` is S(k) or S(f), :math:`x` is k or f, :math:`m` is the slope (ideally -5/3), and :math:`10^{b}` is the intercept of :math:`y` at :math:`x^{m}=1'. """ if not isinstance(psd, xr.DataArray): raise TypeError("`psd` must be an instance of `xarray.DataArray`.") if not hasattr(freq_range, "__iter__") or len(freq_range) != 2: raise ValueError("`freq_range` must be an iterable of length 2.") idx = np.where((freq_range[0] < psd.freq) & (psd.freq < freq_range[1])) idx = idx[0] x = np.log10(psd["freq"].isel(freq=idx)) y = np.log10(psd.isel(freq=idx)) y_bar = y.mean("freq") x_bar = x.mean("freq") # using the formula to calculate the slope and intercept n = np.sum((x - x_bar) * (y - y_bar), axis=0) d = np.sum((x - x_bar) ** 2, axis=0) m = n / d b = y_bar - m * x_bar return m, b
[docs] def dissipation_rate_LT83( self, psd, U_mag, freq_range=[6.28, 12.57], k_constant=[0.5, 0.67, 0.67], noise=None, ): """ Calculate the dissipation rate from the power spectral density of velocity. Parameters ---------- psd : xarray.DataArray ([dir,] time, freq) The power spectral density U_mag : xarray.DataArray (time) The bin-averaged horizontal velocity [m/s] (i.e., computed using :func:`U_mag <mhkit.dolfyn.velocity.Velocity.U_mag>`) freq_range : iterable(2) The range over which to integrate/average the spectrum, in units of the psd frequency vector (Hz or rad/s). Default = [6.28, 12.57] rad/s k_constant : float or iterable(3) Kolmogorov Constant (\\alpha in Notes section below) to use. If a three dimensional PSD is provided, \\alpha defaults to [0.5, 0.67, 0.67]; i.e. 0.5 for the streamwise PSD and 0.67 for the transverse and vertical PSDs. If the PSD is provided for a single velocity direction, \\alpha is taken to be 0.5 unless otherwise specified. noise : float or array-like Instrument noise level in same units as velocity. Typically found from `adv.turbulence.calc_doppler_noise`. Default: None. Returns ------- epsilon : xarray.DataArray ([dir,] time) dataArray of the dissipation rate Notes ----- This uses the `standard` formula for dissipation: .. math:: S(k) = \\alpha \\epsilon^{2/3} k^{-5/3} + N where :math:`\\alpha is the Kolmogorov constant, `k` is wavenumber, `S(k)` is the turbulent kinetic energy spectrum, and `N' is the doppler noise level associated with the TKE spectrum. With :math:`k \\rightarrow \\omega / U`, then -- to preserve variance -- :math:`S(k) = U S(\\omega)`, and so this becomes: .. math:: S(\\omega) = \\alpha \\epsilon^{2/3} \\omega^{-5/3} U^{2/3} + N With :math:`k \\rightarrow (2\\pi f) / U`, then .. math:: S(\\omega) = \\alpha \\epsilon^{2/3} f^{-5/3} (U/(2*\\pi))^{2/3} + N LT83 : Lumley and Terray, "Kinematics of turbulence convected by a random wave field". JPO, 1983, vol13, pp2000-2007. """ if not isinstance(psd, xr.DataArray): raise TypeError("`psd` must be an instance of `xarray.DataArray`.") if len(U_mag.shape) != 1: raise Exception("U_mag should be 1-dimensional (time).") if len(psd["time"]) != len(U_mag["time"]): raise Exception("`U_mag` should be from ensembled-averaged dataset.") if not hasattr(freq_range, "__iter__") or len(freq_range) != 2: raise ValueError("`freq_range` must be an iterable of length 2.") # if the spectra are 1D, then the first dimension should be time (any length) if (psd.shape[0] != 3) and (np.size(k_constant) != 1): raise ValueError("`k_constant` should be a single value.") elif (psd.shape[0] == 3) and (np.size(k_constant) != 3): raise ValueError("`k_constant` should be an iterable of length 3.") if noise is not None: if np.shape(noise)[0] != np.shape(psd)[0]: raise Exception("Noise should have same first dimension as `psd`.") else: noise = np.array([0, 0, 0])[:, None, None] # Noise subtraction from binner.TimeBinner.calc_psd_base psd = psd.copy() if noise is not None: psd -= noise**2 / (self.fs / 2) psd = psd.where(psd > 0, np.min(np.abs(psd)) / 100) freq = psd.freq idx = np.where((freq_range[0] < freq) & (freq < freq_range[1])) idx = idx[0] # Set the correct magnitude whether the frequency is in Hz or rad/s if freq.units == "Hz": U = U_mag / (2 * np.pi) else: U = U_mag # Set Kolmogorov constant a = np.array(k_constant) if psd.shape[0] == 3: a = a[:, None, None] # stack properly else: a = np.squeeze(k_constant) # Calculate dissipation out = (psd.isel(freq=idx) * freq.isel(freq=idx) ** (5 / 3) / a).mean( axis=-1 ) ** (3 / 2) / U return xr.DataArray( out.astype("float32"), attrs={ "units": "m2 s-3", "long_name": "TKE Dissipation Rate", "standard_name": "specific_turbulent_kinetic_energy_dissipation_in_sea_water", "description": "TKE dissipation rate calculated using " "the method from Lumley and Terray, 1983", }, )
[docs] def dissipation_rate_SF(self, vel_raw, U_mag, fs=None, freq_range=[2.0, 4.0]): """ Calculate dissipation rate using the "structure function" (SF) method Parameters ---------- vel_raw : xarray.DataArray (time) The raw velocity data upon which to perform the SF technique. U_mag : xarray.DataArray The bin-averaged horizontal velocity (i.e., computed using :func:`U_mag <mhkit.dolfyn.velocity.Velocity.U_mag>`) fs : float The sample rate of `vel_raw` in Hz freq_range : iterable(2) The frequency range over which to compute the SF in Hz (i.e. the frequency range within which the isotropic turbulence cascade falls). Default = [2., 4.] Hz Returns ------- epsilon : xarray.DataArray dataArray of the dissipation rate """ if not isinstance(vel_raw, xr.DataArray): raise TypeError("`vel_raw` must be an instance of `xarray.DataArray`.") if len(vel_raw["time"]) == len(U_mag["time"]): raise Exception("`U_mag` should be from ensembled-averaged dataset") if not hasattr(freq_range, "__iter__") or len(freq_range) != 2: raise ValueError("`freq_range` must be an iterable of length 2.") veldat = vel_raw.values if len(veldat.shape) > 1: raise Exception("Function input should be a 1D velocity vector") fs = self._parse_fs(fs) if freq_range[1] > fs: warnings.warn("Max freq_range cannot be greater than fs") dt = self.reshape(veldat) out = np.empty(dt.shape[:-1], dtype=dt.dtype) for slc in slice1d_along_axis(dt.shape, -1): up = dt[slc] lag = U_mag.values[slc[:-1]] / fs * np.arange(up.shape[0]) DAA = _nans_like(lag) for L in range(int(fs / freq_range[1]), int(fs / freq_range[0])): DAA[L] = np.nanmean((up[L:] - up[:-L]) ** 2, dtype=np.float64) cv2 = DAA / (lag ** (2 / 3)) cv2m = np.median(cv2[np.logical_not(np.isnan(cv2))]) out[slc[:-1]] = (cv2m / 2.1) ** (3 / 2) return xr.DataArray( out.astype("float32"), coords=U_mag.coords, dims=U_mag.dims, attrs={ "units": "m2 s-3", "long_name": "TKE Dissipation Rate", "standard_name": "specific_turbulent_kinetic_energy_dissipation_in_sea_water", "description": "TKE dissipation rate calculated using the " '"structure function" method', }, )
def _up_angle(self, U_complex): """ Calculate the angle of the turbulence fluctuations. Parameters ---------- U_complex : numpy.ndarray (..., n_time * n_bin) The complex, raw horizontal velocity (non-binned) Returns ------- theta : numpy.ndarray (..., n_time) The angle of the turbulence [rad] """ dt = self.demean(U_complex) fx = dt.imag <= 0 dt[fx] = dt[fx] * np.exp(1j * np.pi) return np.angle(np.mean(dt, -1, dtype=np.complex128)) def _integral_TE01(self, I_tke, theta): """ The integral, equation A13, in [TE01]. Parameters ---------- I_tke : numpy.ndarray (beta in TE01) is the turbulence intensity ratio: \\sigma_u / V theta : numpy.ndarray is the angle between the mean flow and the primary axis of velocity fluctuations """ x = np.arange(-20, 20, 1e-2) # I think this is a long enough range. out = np.empty_like(I_tke.flatten()) for i, (b, t) in enumerate(zip(I_tke.flatten(), theta.flatten())): out[i] = np.trapezoid( cbrt(x**2 - 2 / b * np.cos(t) * x + b ** (-2)) * np.exp(-0.5 * x**2), x, ) return out.reshape(I_tke.shape) * (2 * np.pi) ** (-0.5) * I_tke ** (2 / 3)
[docs] def dissipation_rate_TE01(self, dat_raw, dat_avg, freq_range=[6.28, 12.57]): """ Calculate the dissipation rate according to TE01. Parameters ---------- dat_raw : xarray.Dataset The raw (off the instrument) adv dataset dat_avg : xarray.Dataset The bin-averaged adv dataset (calculated from `ADVBinner.calc_turbulence` or `VelBinner.bin_average`). The spectra (PSD) and Reynolds stresses (`tke_vec` and `stress_vec`) must already be computed. freq_range : iterable(2) The range over which to integrate/average the spectrum, in units of the psd frequency vector (Hz or rad/s). Default = [6.28, 12.57] rad/s Notes ----- TE01 : Trowbridge, J and Elgar, S, "Turbulence measurements in the Surf Zone". JPO, 2001, vol31, pp2403-2417. """ if not isinstance(dat_raw, xr.Dataset): raise TypeError("`dat_raw` must be an instance of `xarray.Dataset`.") if not isinstance(dat_avg, xr.Dataset): raise TypeError("`dat_avg` must be an instance of `xarray.Dataset`.") if not hasattr(freq_range, "__iter__") or len(freq_range) != 2: raise ValueError("`freq_range` must be an iterable of length 2.") # Assign local names U_mag = dat_avg.velds.U_mag.values I_tke = dat_avg.velds.I_tke.values theta = np.angle(dat_avg.velds.U.values) - self._up_angle( dat_raw.velds.U.values ) freq = dat_avg["freq"].values # Calculate constants alpha = 1.5 intgrl = self._integral_TE01(I_tke, theta) # Index data to be used inds = (freq_range[0] < freq) & (freq < freq_range[1]) psd = dat_avg["psd"][..., inds].values freq = freq[inds].reshape([1] * (dat_avg["psd"].ndim - 2) + [sum(inds)]) # Estimate values # u & v components (equation 6) out = ( np.nanmean((psd[0] + psd[1]) * freq ** (5 / 3), -1) / (21 / 55 * alpha * intgrl) ) ** (3 / 2) / U_mag # Add w component out += ( np.nanmean(psd[2] * freq ** (5 / 3), -1) / (12 / 55 * alpha * intgrl) ) ** (3 / 2) / U_mag # Average the two estimates out *= 0.5 return xr.DataArray( out.astype("float32"), coords={"time": dat_avg["time"]}, dims="time", attrs={ "units": "m2 s-3", "long_name": "TKE Dissipation Rate", "standard_name": "specific_turbulent_kinetic_energy_dissipation_in_sea_water", "description": "TKE dissipation rate calculated using the " "method from Trowbridge and Elgar, 2001", }, )
[docs] def integral_length_scales(self, a_cov, U_mag, fs=None): """ Calculate integral length scales from the autocovariance (or autocorrelation). Parameters ---------- a_cov : xarray.DataArray ([dir,] time, lag) The autocovariance or autocorrelation array (i.e., computed using :func:`autocovariance <mhkit.dolfyn.velocity.VelBinner.autocovariance>`) U_mag : xarray.DataArray (time) The bin-averaged horizontal velocity (i.e., computed using :func:`U_mag <mhkit.dolfyn.velocity.Velocity.U_mag>`) fs : numeric The raw sample rate Returns ------- L_int : numpy.ndarray ([dir,] time) The integral length scale. Notes ---- The integral time scale (:math:`T_{int}`) is integral of the normalized autocovariance (autocorrelation) function, which theoretically decays to zero over time. Practically, :math:`T_{int}` is the integral from zero to the first zero-crossing lag-time of the autocorrelation function. The integral length scale (:math:`L_{int}`) then is the integral time scale multiplied by the bin speed. """ if not isinstance(a_cov, xr.DataArray): raise TypeError("`a_cov` must be an instance of `xarray.DataArray`.") if len(a_cov["time"]) != len(U_mag["time"]): raise Exception("`U_mag` should be from ensembled-averaged dataset") fs = self._parse_fs(fs) # Normalize autocovariance/autocorrelation acov = a_cov / a_cov[..., 0] # Calculate first zero crossing in auto-correlation zero_crossing = np.nanargmin(~(acov < 0), axis=-1) # Calculate integral time scale T_int = np.zeros(acov.shape[:2]) for i in range(3): for t in range(a_cov["time"].size): T_int[i, t] = np.trapezoid(acov[i, t][: zero_crossing[i, t]], dx=1 / fs) L_int = U_mag.values * T_int return xr.DataArray( L_int.astype("float32"), coords={"dir": a_cov["dir"], "time": a_cov["time"]}, attrs={ "units": "m", "long_name": "Integral Length Scale", "standard_name": "turbulent_mixing_length_of_sea_water", }, )
[docs] def turbulence_statistics( ds_raw, n_bin, fs, n_fft=None, freq_units="rad/s", window="hann" ): """ Functional version of `ADVBinner` that computes a suite of turbulence statistics for the input dataset, and returns a `binned` data object. Parameters ---------- ds_raw : xarray.Dataset The raw adv datset to bin, average, and compute turbulence statistics from. freq_units : string Frequency units of the returned spectra in either Hz or rad/s (`f` or :math:`\\omega`). Default = 'rad/s' window : string or array The window to use for calculating spectra. Returns ------- ds : xarray.Dataset Returns an 'binned' (i.e. 'averaged') data object. All fields (variables) of the input data object are averaged in `n_bin` chunks. This object also computes the following items over those chunks: - tke_vec : The energy in each component, each components is alternatively accessible as: :attr:`upup_ <dolfyn.velocity.Velocity.upup_>`, :attr:`vpvp_ <dolfyn.velocity.Velocity.vpvp_>`, :attr:`wpwp_ <dolfyn.velocity.Velocity.wpwp_>`) - stress_vec : The Reynolds stresses, each component is alternatively accessible as: :attr:`upwp_ <dolfyn.data.velocity.Velocity.upwp_>`, :attr:`vpwp_ <dolfyn.data.velocity.Velocity.vpwp_>`, :attr:`upvp_ <dolfyn.data.velocity.Velocity.upvp_>`) - U_std : The standard deviation of the horizontal velocity `U_mag`. - psd : DataArray containing the spectra of the velocity in radial frequency units. The data-array contains: - vel : the velocity spectra array (m^2/s/rad)) - omega : the radial frequncy (rad/s) """ calculator = ADVBinner(n_bin, fs, n_fft=n_fft) return calculator(ds_raw, freq_units=freq_units, window=window)