import numpy as np
from ..velocity import VelBinner
import warnings
from ..tools.misc import slice1d_along_axis, _nans_like
from scipy.special import cbrt
import xarray as xr
[docs]
class ADVBinner(VelBinner):
"""
A class that builds upon `VelBinner` for calculating turbulence
statistics and velocity spectra from ADV data
Parameters
----------
n_bin : int
The length of each `bin`, in number of points, for this averaging
operator.
fs : int
Instrument sampling frequency in Hz
n_fft : int
The length of the FFT for computing spectra (must be <= n_bin).
Optional, default `n_fft` = `n_bin`
n_fft_coh : int
Number of data points to use for coherence and cross-spectra fft's.
Optional, default `n_fft_coh` = `n_fft`
noise : float or array-like
Instrument noise level in same units as velocity. Typically
found from `adv.turbulence.doppler_noise_level`.
Default: None.
"""
def __call__(self, ds, freq_units="rad/s", window="hann"):
out = type(ds)()
out = self.bin_average(ds, out)
noise = ds.get("doppler_noise", [0, 0, 0])
out["tke_vec"] = self.turbulent_kinetic_energy(ds["vel"], noise=noise)
out["stress_vec"] = self.reynolds_stress(ds["vel"])
out["psd"] = self.power_spectral_density(
ds["vel"], window=window, freq_units=freq_units, noise=noise
)
for key in list(ds.attrs.keys()):
if "config" in key:
ds.attrs.pop(key)
out.attrs = ds.attrs
out.attrs["n_bin"] = self.n_bin
out.attrs["n_fft"] = self.n_fft
out.attrs["n_fft_coh"] = self.n_fft_coh
return out
[docs]
def reynolds_stress(self, veldat, detrend=True):
"""
Calculate the specific Reynolds stresses
(covariances of u,v,w in m^2/s^2)
Parameters
----------
veldat : xr.DataArray
A velocity data array. The last dimension is assumed
to be time.
detrend : bool
Detrend the velocity data (True), or simply de-mean it
(False), prior to computing stress. Note: the psd routines
use detrend, so if you want to have the same amount of
variance here as there use ``detrend=True``.
Default = True
Returns
-------
out : xarray.DataArray
"""
if not isinstance(veldat, xr.DataArray):
raise TypeError("`veldat` must be an instance of `xarray.DataArray`.")
time = self.mean(veldat["time"].values)
vel = veldat.values
out = np.empty(self._outshape(vel[:3].shape)[:-1], dtype=np.float32)
if detrend:
vel = self.detrend(vel)
else:
vel = self.demean(vel)
for idx, p in enumerate(self._cross_pairs):
out[idx] = np.nanmean(vel[p[0]] * vel[p[1]], -1, dtype=np.float64).astype(
np.float32
)
da = xr.DataArray(
out.astype("float32"),
dims=veldat.dims,
attrs={"units": "m2 s-2", "long_name": "Specific Reynolds Stress Vector"},
)
da = da.rename({"dir": "tau"})
da = da.assign_coords({"tau": self.tau, "time": time})
return da
[docs]
def cross_spectral_density(
self,
veldat,
freq_units="rad/s",
fs=None,
window="hann",
n_bin=None,
n_fft_coh=None,
):
"""
Calculate the cross-spectral density of velocity components.
Parameters
----------
veldat : xarray.DataArray
The raw 3D velocity data.
freq_units : string
Frequency units of the returned spectra in either Hz or rad/s
(`f` or :math:`\\omega`)
fs : float (optional)
The sample rate. Default = `binner.fs`
window : string or array
Specify the window function.
Options: 1, None, 'hann', 'hamm'
n_bin : int (optional)
The bin-size. Default = `binner.n_bin`
n_fft_coh : int (optional)
The fft size. Default = `binner.n_fft_coh`
Returns
-------
csd : xarray.DataArray (3, M, N_FFT)
The first-dimension of the cross-spectrum is the three
different cross-spectra: 'uv', 'uw', 'vw'.
"""
if not isinstance(veldat, xr.DataArray):
raise TypeError("`veldat` must be an instance of `xarray.DataArray`.")
if ("rad" not in freq_units) and ("Hz" not in freq_units):
raise ValueError("`freq_units` should be one of 'Hz' or 'rad/s'")
fs_in = self._parse_fs(fs)
n_fft = self._parse_nfft_coh(n_fft_coh)
time = self.mean(veldat["time"].values)
veldat = veldat.values
if len(np.shape(veldat)) != 2:
raise Exception(
"This function is only valid for calculating TKE using "
"the 3D velocity vector from an ADV."
)
out = np.empty(
self._outshape_fft(veldat[:3].shape, n_fft=n_fft, n_bin=n_bin),
dtype="complex",
)
# Create frequency vector, also checks whether using f or omega
if "rad" in freq_units:
fs = 2 * np.pi * fs_in
freq_units = "rad s-1"
units = "m2 s-1 rad-1"
else:
fs = fs_in
freq_units = "Hz"
units = "m2 s-2 Hz-1"
coh_freq = xr.DataArray(
self._fft_freq(fs=fs_in, units=freq_units, n_fft=n_fft, coh=True),
dims=["coh_freq"],
name="coh_freq",
attrs={
"units": freq_units,
"long_name": "FFT Frequency Vector",
"coverage_content_type": "coordinate",
},
).astype("float32")
for ip, ipair in enumerate(self._cross_pairs):
out[ip] = self._csd_base(
veldat[ipair[0]],
veldat[ipair[1]],
fs=fs,
window=window,
n_bin=n_bin,
n_fft=n_fft,
)
csd = xr.DataArray(
out.astype("complex64"),
coords={"C": self.C, "time": time, "coh_freq": coh_freq},
dims=["C", "time", "coh_freq"],
attrs={
"units": units,
"n_fft_coh": n_fft,
"long_name": "Cross Spectral Density",
},
)
csd["coh_freq"].attrs["units"] = freq_units
return csd
[docs]
def doppler_noise_level(self, psd, pct_fN=0.8):
"""
Calculate bias due to Doppler noise using the noise floor
of the velocity spectra.
Parameters
----------
psd : xarray.DataArray (dir, time, f)
The ADV power spectral density of velocity (auto-spectra)
pct_fN : float
Percent of Nyquist frequency to calculate characeristic frequency
Returns
-------
doppler_noise (xarray.DataArray):
Doppler noise level in units of m/s
Notes
-----
Approximates bias from
.. :math: \\sigma^{2}_{noise} = N x f_{c}
where :math: `\\sigma_{noise}` is the bias due to Doppler noise,
`N` is the constant variance or spectral density, and `f_{c}`
is the characteristic frequency.
The characteristic frequency is then found as
.. :math: f_{c} = pct_fN * (f_{s}/2)
where `f_{s}/2` is the Nyquist frequency.
Richard, Jean-Baptiste, et al. "Method for identification of Doppler noise
levels in turbulent flow measurements dedicated to tidal energy." International
Journal of Marine Energy 3 (2013): 52-64.
ThiƩbaut, Maxime, et al. "Investigating the flow dynamics and turbulence at a
tidal-stream energy site in a highly energetic estuary." Renewable Energy 195
(2022): 252-262.
"""
if not isinstance(psd, xr.DataArray):
raise TypeError("`psd` must be an instance of `xarray.DataArray`.")
if not isinstance(pct_fN, float) or not 0 <= pct_fN <= 1:
raise ValueError("`pct_fN` must be a float within the range [0, 1].")
# Characteristic frequency set to 80% of Nyquist frequency
fN = self.fs / 2
fc = pct_fN * fN
# Get units right
if psd.freq.units == "Hz":
f_range = slice(fc, fN)
else:
f_range = slice(2 * np.pi * fc, 2 * np.pi * fN)
# Noise floor
N2 = psd.sel(freq=f_range) * psd.freq.sel(freq=f_range)
noise_level = np.sqrt(N2.mean(dim="freq"))
return xr.DataArray(
noise_level.values.astype("float32"),
coords={"S": psd["S"], "time": psd["time"]},
attrs={
"units": "m/s",
"long_name": "Doppler Noise Level",
"description": "Doppler noise level calculated " "from PSD white noise",
},
)
[docs]
def check_turbulence_cascade_slope(self, psd, freq_range=[6.28, 12.57]):
"""
This function calculates the slope of the PSD, the power spectra
of velocity, within the given frequency range. The purpose of this
function is to check that the region of the PSD containing the
isotropic turbulence cascade decreases at a rate of :math:`f^{-5/3}`.
Parameters
----------
psd : xarray.DataArray ([time,] freq)
The power spectral density (1D or 2D)
freq_range : iterable(2) (default: [6.28, 12.57])
The range over which the isotropic turbulence cascade occurs, in
units of the psd frequency vector (Hz or rad/s)
Returns
-------
(m, b): tuple (slope, y-intercept)
A tuple containing the coefficients of the log-adjusted linear
regression between PSD and frequency
Notes
-----
Calculates slope based on the `standard` formula for dissipation:
.. math:: S(k) = \\alpha \\epsilon^{2/3} k^{-5/3} + N
The slope of the isotropic turbulence cascade, which should be
equal to :math:`k^{-5/3}` or :math:`f^{-5/3}`, where k and f are
the wavenumber and frequency vectors, is estimated using linear
regression with a log transformation:
.. math:: log10(y) = m*log10(x) + b
Which is equivalent to
.. math:: y = 10^{b} x^{m}
Where :math:`y` is S(k) or S(f), :math:`x` is k or f, :math:`m`
is the slope (ideally -5/3), and :math:`10^{b}` is the intercept of
y at x^m=1.
"""
if not isinstance(psd, xr.DataArray):
raise TypeError("`psd` must be an instance of `xarray.DataArray`.")
if not hasattr(freq_range, "__iter__") or len(freq_range) != 2:
raise ValueError("`freq_range` must be an iterable of length 2.")
idx = np.where((freq_range[0] < psd.freq) & (psd.freq < freq_range[1]))
idx = idx[0]
x = np.log10(psd["freq"].isel(freq=idx))
y = np.log10(psd.isel(freq=idx))
y_bar = y.mean("freq")
x_bar = x.mean("freq")
# using the formula to calculate the slope and intercept
n = np.sum((x - x_bar) * (y - y_bar), axis=0)
d = np.sum((x - x_bar) ** 2, axis=0)
m = n / d
b = y_bar - m * x_bar
return m, b
[docs]
def dissipation_rate_LT83(self, psd, U_mag, freq_range=[6.28, 12.57], noise=None):
"""
Calculate the dissipation rate from the PSD
Parameters
----------
psd : xarray.DataArray (...,time,f)
The power spectral density
U_mag : xarray.DataArray (...,time)
The bin-averaged horizontal velocity [m/s] (from dataset shortcut)
freq_range : iterable(2)
The range over which to integrate/average the spectrum, in units
of the psd frequency vector (Hz or rad/s).
Default = [6.28, 12.57] rad/s
noise : float or array-like
Instrument noise level in same units as velocity. Typically
found from `adv.turbulence.calc_doppler_noise`.
Default: None.
Returns
-------
epsilon : xarray.DataArray (...,n_time)
dataArray of the dissipation rate
Notes
-----
This uses the `standard` formula for dissipation:
.. math:: S(k) = \\alpha \\epsilon^{2/3} k^{-5/3} + N
where :math:`\\alpha = 0.5` (1.5 for all three velocity
components), `k` is wavenumber, `S(k)` is the turbulent
kinetic energy spectrum, and `N' is the doppler noise level
associated with the TKE spectrum.
With :math:`k \\rightarrow \\omega / U`, then -- to preserve variance --
:math:`S(k) = U S(\\omega)`, and so this becomes:
.. math:: S(\\omega) = \\alpha \\epsilon^{2/3} \\omega^{-5/3} U^{2/3} + N
With :math:`k \\rightarrow (2\\pi f) / U`, then
.. math:: S(\\omega) = \\alpha \\epsilon^{2/3} f^{-5/3} (U/(2*\\pi))^{2/3} + N
LT83 : Lumley and Terray, "Kinematics of turbulence convected
by a random wave field". JPO, 1983, vol13, pp2000-2007.
"""
if not isinstance(psd, xr.DataArray):
raise TypeError("`psd` must be an instance of `xarray.DataArray`.")
if len(U_mag.shape) != 1:
raise Exception("U_mag should be 1-dimensional (time)")
if len(psd["time"]) != len(U_mag["time"]):
raise Exception("`U_mag` should be from ensembled-averaged dataset")
if not hasattr(freq_range, "__iter__") or len(freq_range) != 2:
raise ValueError("`freq_range` must be an iterable of length 2.")
if noise is not None:
if np.shape(noise)[0] != 3:
raise Exception("Noise should have same first dimension as velocity")
else:
noise = np.array([0, 0, 0])[:, None, None]
# Noise subtraction from binner.TimeBinner.calc_psd_base
psd = psd.copy()
if noise is not None:
psd -= noise**2 / (self.fs / 2)
psd = psd.where(psd > 0, np.min(np.abs(psd)) / 100)
freq = psd.freq
idx = np.where((freq_range[0] < freq) & (freq < freq_range[1]))
idx = idx[0]
if freq.units == "Hz":
U = U_mag / (2 * np.pi)
else:
U = U_mag
a = 0.5
out = (psd.isel(freq=idx) * freq.isel(freq=idx) ** (5 / 3) / a).mean(
axis=-1
) ** (3 / 2) / U
return xr.DataArray(
out.astype("float32"),
attrs={
"units": "m2 s-3",
"long_name": "TKE Dissipation Rate",
"standard_name": "specific_turbulent_kinetic_energy_dissipation_in_sea_water",
"description": "TKE dissipation rate calculated using "
"the method from Lumley and Terray, 1983",
},
)
[docs]
def dissipation_rate_SF(self, vel_raw, U_mag, fs=None, freq_range=[2.0, 4.0]):
"""
Calculate dissipation rate using the "structure function" (SF) method
Parameters
----------
vel_raw : xarray.DataArray (time)
The raw velocity data upon which to perform the SF technique.
U_mag : xarray.DataArray
The bin-averaged horizontal velocity (from dataset shortcut)
fs : float
The sample rate of `vel_raw` [Hz]
freq_range : iterable(2)
The frequency range over which to compute the SF [Hz]
(i.e. the frequency range within which the isotropic
turbulence cascade falls).
Default = [2., 4.] Hz
Returns
-------
epsilon : xarray.DataArray
dataArray of the dissipation rate
"""
if not isinstance(vel_raw, xr.DataArray):
raise TypeError("`vel_raw` must be an instance of `xarray.DataArray`.")
if len(vel_raw["time"]) == len(U_mag["time"]):
raise Exception("`U_mag` should be from ensembled-averaged dataset")
if not hasattr(freq_range, "__iter__") or len(freq_range) != 2:
raise ValueError("`freq_range` must be an iterable of length 2.")
veldat = vel_raw.values
if len(veldat.shape) > 1:
raise Exception("Function input should be a 1D velocity vector")
fs = self._parse_fs(fs)
if freq_range[1] > fs:
warnings.warn("Max freq_range cannot be greater than fs")
dt = self.reshape(veldat)
out = np.empty(dt.shape[:-1], dtype=dt.dtype)
for slc in slice1d_along_axis(dt.shape, -1):
up = dt[slc]
lag = U_mag.values[slc[:-1]] / fs * np.arange(up.shape[0])
DAA = _nans_like(lag)
for L in range(int(fs / freq_range[1]), int(fs / freq_range[0])):
DAA[L] = np.nanmean((up[L:] - up[:-L]) ** 2, dtype=np.float64)
cv2 = DAA / (lag ** (2 / 3))
cv2m = np.median(cv2[np.logical_not(np.isnan(cv2))])
out[slc[:-1]] = (cv2m / 2.1) ** (3 / 2)
return xr.DataArray(
out.astype("float32"),
coords=U_mag.coords,
dims=U_mag.dims,
attrs={
"units": "m2 s-3",
"long_name": "TKE Dissipation Rate",
"standard_name": "specific_turbulent_kinetic_energy_dissipation_in_sea_water",
"description": "TKE dissipation rate calculated using the "
'"structure function" method',
},
)
def _up_angle(self, U_complex):
"""
Calculate the angle of the turbulence fluctuations.
Parameters
----------
U_complex : numpy.ndarray (..., n_time * n_bin)
The complex, raw horizontal velocity (non-binned)
Returns
-------
theta : numpy.ndarray (..., n_time)
The angle of the turbulence [rad]
"""
dt = self.demean(U_complex)
fx = dt.imag <= 0
dt[fx] = dt[fx] * np.exp(1j * np.pi)
return np.angle(np.mean(dt, -1, dtype=np.complex128))
def _integral_TE01(self, I_tke, theta):
"""
The integral, equation A13, in [TE01].
Parameters
----------
I_tke : numpy.ndarray
(beta in TE01) is the turbulence intensity ratio:
\\sigma_u / V
theta : numpy.ndarray
is the angle between the mean flow and the primary axis of
velocity fluctuations
"""
x = np.arange(-20, 20, 1e-2) # I think this is a long enough range.
out = np.empty_like(I_tke.flatten())
for i, (b, t) in enumerate(zip(I_tke.flatten(), theta.flatten())):
out[i] = np.trapz(
cbrt(x**2 - 2 / b * np.cos(t) * x + b ** (-2)) * np.exp(-0.5 * x**2),
x,
)
return out.reshape(I_tke.shape) * (2 * np.pi) ** (-0.5) * I_tke ** (2 / 3)
[docs]
def dissipation_rate_TE01(self, dat_raw, dat_avg, freq_range=[6.28, 12.57]):
"""
Calculate the dissipation rate according to TE01.
Parameters
----------
dat_raw : xarray.Dataset
The raw (off the instrument) adv dataset
dat_avg : xarray.Dataset
The bin-averaged adv dataset (calc'd from 'calc_turbulence' or
'do_avg'). The spectra (psd) and basic turbulence statistics
('tke_vec' and 'stress_vec') must already be computed.
freq_range : iterable(2)
The range over which to integrate/average the spectrum, in units
of the psd frequency vector (Hz or rad/s).
Default = [6.28, 12.57] rad/s
Notes
-----
TE01 : Trowbridge, J and Elgar, S, "Turbulence measurements in
the Surf Zone". JPO, 2001, vol31, pp2403-2417.
"""
if not isinstance(dat_raw, xr.Dataset):
raise TypeError("`dat_raw` must be an instance of `xarray.Dataset`.")
if not isinstance(dat_avg, xr.Dataset):
raise TypeError("`dat_avg` must be an instance of `xarray.Dataset`.")
if not hasattr(freq_range, "__iter__") or len(freq_range) != 2:
raise ValueError("`freq_range` must be an iterable of length 2.")
# Assign local names
U_mag = dat_avg.velds.U_mag.values
I_tke = dat_avg.velds.I_tke.values
theta = np.angle(dat_avg.velds.U.values) - self._up_angle(
dat_raw.velds.U.values
)
freq = dat_avg["psd"].freq.values
# Calculate constants
alpha = 1.5
intgrl = self._integral_TE01(I_tke, theta)
# Index data to be used
inds = (freq_range[0] < freq) & (freq < freq_range[1])
psd = dat_avg["psd"][..., inds].values
freq = freq[inds].reshape([1] * (dat_avg["psd"].ndim - 2) + [sum(inds)])
# Estimate values
# u & v components (equation 6)
out = (
np.nanmean((psd[0] + psd[1]) * freq ** (5 / 3), -1)
/ (21 / 55 * alpha * intgrl)
) ** (3 / 2) / U_mag
# Add w component
out += (
np.nanmean(psd[2] * freq ** (5 / 3), -1) / (12 / 55 * alpha * intgrl)
) ** (3 / 2) / U_mag
# Average the two estimates
out *= 0.5
return xr.DataArray(
out.astype("float32"),
coords={"time": dat_avg["psd"]["time"]},
dims="time",
attrs={
"units": "m2 s-3",
"long_name": "TKE Dissipation Rate",
"standard_name": "specific_turbulent_kinetic_energy_dissipation_in_sea_water",
"description": "TKE dissipation rate calculated using the "
"method from Trowbridge and Elgar, 2001",
},
)
[docs]
def integral_length_scales(self, a_cov, U_mag, fs=None):
"""
Calculate integral length scales.
Parameters
----------
a_cov : xarray.DataArray
The auto-covariance array (i.e. computed using `autocovariance`).
U_mag : xarray.DataArray
The bin-averaged horizontal velocity (from dataset shortcut)
fs : numeric
The raw sample rate
Returns
-------
L_int : numpy.ndarray (..., n_time)
The integral length scale (T_int*U_mag).
Notes
----
The integral time scale (T_int) is the lag-time at which the
auto-covariance falls to 1/e.
If T_int is not reached, L_int will default to '0'.
"""
if not isinstance(a_cov, xr.DataArray):
raise TypeError("`a_cov` must be an instance of `xarray.DataArray`.")
if len(a_cov["time"]) != len(U_mag["time"]):
raise Exception("`U_mag` should be from ensembled-averaged dataset")
acov = a_cov.values
fs = self._parse_fs(fs)
scale = np.argmin((acov / acov[..., :1]) > (1 / np.e), axis=-1)
L_int = U_mag.values / fs * scale
return xr.DataArray(
L_int.astype("float32"),
coords={"dir": a_cov["dir"], "time": a_cov["time"]},
attrs={
"units": "m",
"long_name": "Integral Length Scale",
"standard_name": "turbulent_mixing_length_of_sea_water",
},
)
[docs]
def turbulence_statistics(
ds_raw, n_bin, fs, n_fft=None, freq_units="rad/s", window="hann"
):
"""
Functional version of `ADVBinner` that computes a suite of turbulence
statistics for the input dataset, and returns a `binned` data object.
Parameters
----------
ds_raw : xarray.Dataset
The raw adv datset to `bin`, average and compute
turbulence statistics of.
freq_units : string
Frequency units of the returned spectra in either Hz or rad/s
(`f` or :math:`\\omega`). Default is 'rad/s'
window : string or array
The window to use for calculating spectra.
Returns
-------
ds : xarray.Dataset
Returns an 'binned' (i.e. 'averaged') data object. All
fields (variables) of the input data object are averaged in n_bin
chunks. This object also computes the following items over
those chunks:
- tke_vec : The energy in each component, each components is
alternatively accessible as:
:attr:`upup_ <dolfyn.velocity.Velocity.upup_>`,
:attr:`vpvp_ <dolfyn.velocity.Velocity.vpvp_>`,
:attr:`wpwp_ <dolfyn.velocity.Velocity.wpwp_>`)
- stress_vec : The Reynolds stresses, each component is
alternatively accessible as:
:attr:`upwp_ <dolfyn.data.velocity.Velocity.upwp_>`,
:attr:`vpwp_ <dolfyn.data.velocity.Velocity.vpwp_>`,
:attr:`upvp_ <dolfyn.data.velocity.Velocity.upvp_>`)
- U_std : The standard deviation of the horizontal
velocity `U_mag`.
- psd : DataArray containing the spectra of the velocity
in radial frequency units. The data-array contains:
- vel : the velocity spectra array (m^2/s/rad))
- omega : the radial frequncy (rad/s)
"""
calculator = ADVBinner(n_bin, fs, n_fft=n_fft)
return calculator(ds_raw, freq_units=freq_units, window=window)