Source code for mhkit.wave.resource

import warnings
from scipy.optimize import fsolve as _fsolve
from scipy import signal as _signal
import pandas as pd
import xarray as xr
import numpy as np
from mhkit.utils import to_numeric_array, convert_to_dataarray, convert_to_dataset


### Spectrum
[docs] def elevation_spectrum( eta, sample_rate, nnft, window="hann", detrend=True, noverlap=None, time_dimension="", to_pandas=True, ): """ Calculates the wave energy spectrum from wave elevation time-series Parameters ------------ eta: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Wave surface elevation [m] indexed by time [datetime or s] sample_rate: float Data frequency [Hz] nnft: integer Number of bins in the Fast Fourier Transform window: string (optional) Signal window type. 'hann' is used by default given the broadband nature of waves. See scipy.signal.get_window for more options. detrend: bool (optional) Specifies if a linear trend is removed from the data before calculating the wave energy spectrum. Data is detrended by default. noverlap: int, optional Number of points to overlap between segments. If None, ``noverlap = nperseg / 2``. Defaults to None. time_dimension: string (optional) Name of the xarray dimension corresponding to time. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz]. """ # TODO: Add confidence intervals, equal energy frequency spacing, and NDBC # frequency spacing # TODO: may need to raise an error for the length of nnft- signal.welch breaks when nfft is too short eta = convert_to_dataset(eta, "eta") if not isinstance(sample_rate, (float, int)): raise TypeError( f"sample_rate must be of type int or float. Got: {type(sample_rate)}" ) if not isinstance(nnft, int): raise TypeError(f"nnft must be of type int. Got: {type(nnft)}") if not isinstance(window, str): raise TypeError(f"window must be of type str. Got: {type(window)}") if not isinstance(detrend, bool): raise TypeError(f"detrend must be of type bool. Got: {type(detrend)}") if not nnft > 0: raise ValueError(f"nnft must be > 0. Got: {nnft}") if not sample_rate > 0: raise ValueError(f"sample_rate must be > 0. Got: {sample_rate}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if time_dimension == "": time_dimension = list(eta.dims)[0] else: if time_dimension not in list(eta.dims): raise ValueError( f"time_dimension is not a dimension of eta ({list(eta.dims)}). Got: {time_dimension}." ) time = eta[time_dimension] delta_t = time.values[1] - time.values[0] if not np.allclose(time.diff(dim=time_dimension)[1:], delta_t): raise ValueError( "Time bins are not evenly spaced. Create a constant " + f"temporal spacing for eta." ) S = xr.Dataset() for var in eta.data_vars: eta_subset = eta[var] if detrend: eta_subset = _signal.detrend( eta_subset.dropna(dim=time_dimension), axis=-1, type="linear", bp=0 ) [f, wave_spec_measured] = _signal.welch( eta_subset, fs=sample_rate, window=window, nperseg=nnft, nfft=nnft, noverlap=noverlap, ) S[var] = (["Frequency"], wave_spec_measured) S = S.assign_coords({"Frequency": f}) if to_pandas: S = S.to_pandas() return S
[docs] def pierson_moskowitz_spectrum(f, Tp, Hs, to_pandas=True): """ Calculates Pierson-Moskowitz Spectrum from IEC TS 62600-2 ED2 Annex C.2 (2019) Parameters ------------ f: list, np.ndarray, pd.Series, xr.DataArray Frequency [Hz] Tp: float/int Peak period [s] Hs: float/int Significant wave height [m] to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- S: xarray Dataset Spectral density [m^2/Hz] indexed frequency [Hz] """ f = to_numeric_array(f, "f") if not isinstance(Tp, (int, float)): raise TypeError(f"Tp must be of type int or float. Got: {type(Tp)}") if not isinstance(Hs, (int, float)): raise TypeError(f"Hs must be of type int or float. Got: {type(Hs)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") f.sort() B_PM = (5 / 4) * (1 / Tp) ** 4 A_PM = B_PM * (Hs / 2) ** 2 # Avoid a divide by zero if the 0 frequency is provided # The zero frequency should always have 0 amplitude, otherwise # we end up with a mean offset when computing the surface elevation. Sf = np.zeros(f.size) if f[0] == 0.0: inds = range(1, f.size) else: inds = range(0, f.size) Sf[inds] = A_PM * f[inds] ** (-5) * np.exp(-B_PM * f[inds] ** (-4)) name = "Pierson-Moskowitz (" + str(Tp) + "s)" S = xr.Dataset(data_vars={name: (["Frequency"], Sf)}, coords={"Frequency": f}) if to_pandas: S = S.to_pandas() return S
[docs] def jonswap_spectrum(f, Tp, Hs, gamma=None, to_pandas=True): """ Calculates JONSWAP Spectrum from IEC TS 62600-2 ED2 Annex C.2 (2019) Parameters ------------ f: list, np.ndarray, pd.Series, xr.DataArray Frequency [Hz] Tp: float/int Peak period [s] Hs: float/int Significant wave height [m] gamma: float (optional) Gamma to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- S: pandas Series or xarray DataArray Spectral density [m^2/Hz] indexed frequency [Hz] """ f = to_numeric_array(f, "f") if not isinstance(Tp, (int, float)): raise TypeError(f"Tp must be of type int or float. Got: {type(Tp)}") if not isinstance(Hs, (int, float)): raise TypeError(f"Hs must be of type int or float. Got: {type(Hs)}") if not isinstance(gamma, (int, float, type(None))): raise TypeError( f"If specified, gamma must be of type int or float. Got: {type(gamma)}" ) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") f.sort() B_PM = (5 / 4) * (1 / Tp) ** 4 A_PM = B_PM * (Hs / 2) ** 2 # Avoid a divide by zero if the 0 frequency is provided # The zero frequency should always have 0 amplitude, otherwise # we end up with a mean offset when computing the surface elevation. S_f = np.zeros(f.size) if f[0] == 0.0: inds = range(1, f.size) else: inds = range(0, f.size) S_f[inds] = A_PM * f[inds] ** (-5) * np.exp(-B_PM * f[inds] ** (-4)) if not gamma: TpsqrtHs = Tp / np.sqrt(Hs) if TpsqrtHs <= 3.6: gamma = 5 elif TpsqrtHs > 5: gamma = 1 else: gamma = np.exp(5.75 - 1.15 * TpsqrtHs) # Cutoff frequencies for gamma function siga = 0.07 sigb = 0.09 fp = 1 / Tp # peak frequency lind = np.where(f <= fp) hind = np.where(f > fp) Gf = np.zeros(f.shape) Gf[lind] = gamma ** np.exp(-((f[lind] - fp) ** 2) / (2 * siga**2 * fp**2)) Gf[hind] = gamma ** np.exp(-((f[hind] - fp) ** 2) / (2 * sigb**2 * fp**2)) C = 1 - 0.287 * np.log(gamma) Sf = C * S_f * Gf name = "JONSWAP (" + str(Hs) + "m," + str(Tp) + "s)" S = xr.Dataset(data_vars={name: (["Frequency"], Sf)}, coords={"Frequency": f}) if to_pandas: S = S.to_pandas() return S
### Metrics
[docs] def surface_elevation( S, time_index, seed=None, frequency_bins=None, phases=None, method="ifft", frequency_dimension="", to_pandas=True, ): """ Calculates wave elevation time-series from spectrum Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] time_index: numpy array Time used to create the wave elevation time-series [s], for example, time = np.arange(0,100,0.01) seed: int (optional) Random seed frequency_bins: numpy array, pandas Series, or xarray DataArray (optional) Bin widths for frequency of S. Required for unevenly sized bins phases: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Explicit phases for frequency components (overrides seed) for example, phases = np.random.rand(len(S)) * 2 * np.pi method: str (optional) Method used to calculate the surface elevation. 'ifft' (Inverse Fast Fourier Transform) used by default if the given frequency_bins==None or is evenly spaced. 'sum_of_sines' explicitly sums each frequency component and used by default if uneven frequency_bins are provided. The 'ifft' method is significantly faster. frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension (the index for pandas input). to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- eta: pandas DataFrame or xarray Dataset Wave surface elevation [m] indexed by time [s]. """ S = convert_to_dataset(S, "S") time_index = to_numeric_array(time_index, "time_index") if not isinstance(seed, (type(None), int)): raise TypeError(f"If specified, seed must be of type int. Got: {type(seed)}") if not isinstance(phases, type(None)): phases = convert_to_dataset(phases) if not isinstance(method, str): raise TypeError(f"method must be of type str. Got: {type(method)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): # frequency_dimension given, but not in list of possible dimensions raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) frequency_axis = list(S.dims).index(frequency_dimension) # Create dimensions and coordinates for the new dataset (frequency becomes time) new_dims = list(S.dims) new_dims[frequency_axis] = "Time" new_coords = S.sum(dim=frequency_dimension).coords new_coords = new_coords.assign({"Time": time_index}) f = S[frequency_dimension] if not isinstance(frequency_bins, (type(None), np.ndarray)): frequency_bins = convert_to_dataarray(frequency_bins) elif isinstance(frequency_bins, np.ndarray): frequency_bins = xr.DataArray( data=frequency_bins, dims=frequency_dimension, coords={frequency_dimension: f}, ) if frequency_bins is not None: if not frequency_bins.squeeze().shape == f.shape: raise ValueError( "shape of frequency_bins must only contain 1 column and match the shape of the frequency dimension of S" ) delta_f = frequency_bins delta_f_even = np.allclose(frequency_bins, frequency_bins[0]) if delta_f_even: # reduce delta_f to a scalar if it is uniform delta_f = delta_f[0].item() else: delta_f = f.values[1] - f.values[0] delta_f_even = np.allclose(f.diff(dim=frequency_dimension)[1:], delta_f) if phases is not None: for var in phases.data_vars: if not phases[var].shape == S[var].shape: raise ValueError( "shape of variables in phases must match shape of variables in S" ) if method == "ifft": # ifft method must have a zero frequency and evenly spaced frequency bins if not f[0] == 0: warnings.warn( f"ifft method must have zero frequency defined. Lowest frequency is: {f[0].values}. Setting method to less efficient `sum_of_sines` method." ) method = "sum_of_sines" if not delta_f_even: warnings.warn( f"ifft method must have evenly spaced frequency bins. Setting method to less efficient `sum_of_sines` method." ) method = "sum_of_sines" elif method == "sum_of_sines": # For sum of sines, does not matter if there is a zero frequency or if frequency bins are evenly spaced pass else: raise ValueError(f"Method must be 'ifft' or 'sum_of_sines'. Got: {method}") omega = xr.DataArray( data=2 * np.pi * f, dims=frequency_dimension, coords={frequency_dimension: f} ) eta = xr.Dataset() for var in S.data_vars: S_subset = S[var] if phases is None: np.random.seed(seed) phase = xr.DataArray( data=2 * np.pi * np.random.random_sample(S_subset[frequency_dimension].shape), dims=frequency_dimension, coords={frequency_dimension: f}, ) else: phase = phases[var] # Wave amplitude times delta f A = np.sqrt(2 * S_subset * delta_f) if method == "ifft": A_cmplx = A * (np.cos(phase) + 1j * np.sin(phase)) eta_tmp = np.fft.irfftn( 0.5 * A_cmplx * time_index.size, list(time_index.shape), axes=[frequency_axis], ) eta[var] = xr.DataArray(data=eta_tmp, dims=new_dims, coords=new_coords) elif method == "sum_of_sines": # Product of omega and time B = np.outer(time_index, omega) B = B.reshape((len(time_index), len(omega))) B = xr.DataArray( data=B, dims=["Time", frequency_dimension], coords={"Time": time_index, frequency_dimension: f}, ) # wave elevation C = np.cos(B + phase) eta[var] = (C * A).sum(dim=frequency_dimension) if to_pandas: eta = eta.to_pandas() if isinstance(eta, (pd.Series, pd.DataFrame, xr.DataArray)): eta.name = "eta" return eta
[docs] def frequency_moment(S, N, frequency_bins=None, frequency_dimension="", to_pandas=True): """ Calculates the Nth frequency moment of the spectrum Parameters ----------- S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] N: int Moment (0 for 0th, 1 for 1st ....) frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- m: pandas DataFrame or xarray Dataset Nth Frequency Moment indexed by S.columns """ S = convert_to_dataarray(S) if not isinstance(N, int): raise TypeError(f"N must be of type int. Got: {type(N)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) f = S[frequency_dimension] # Eq 8 in IEC 62600-101 S = S.sel({frequency_dimension: slice(1e-12, f.max())}) # omit frequency of 0 f = S[frequency_dimension] # reset frequency_dimension without the 0 frequency fn = np.power(f, N) if frequency_bins is None: delta_f = f.diff(dim=frequency_dimension) delta_f0 = f[1] - f[0] delta_f0 = delta_f0.assign_coords({frequency_dimension: f[0]}) delta_f = xr.concat([delta_f0, delta_f], dim=frequency_dimension) else: delta_f = xr.DataArray( data=convert_to_dataarray(frequency_bins), dims=frequency_dimension, coords={frequency_dimension: f}, ) m = S * fn * delta_f m = m.sum(dim=frequency_dimension) if to_pandas: m = m.to_pandas() if isinstance(m, (pd.Series, pd.DataFrame, xr.DataArray)): m.name = "m" + str(N) return m
[docs] def significant_wave_height( S, frequency_dimension="", frequency_bins=None, to_pandas=True ): """ Calculates wave height from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Hm0: pandas DataFrame or xarray Dataset Significant wave height [m] index by S.columns """ S = convert_to_dataarray(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") # Eq 12 in IEC 62600-101 m0 = frequency_moment( S, 0, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) Hm0 = 4 * np.sqrt(m0) if to_pandas: Hm0 = Hm0.to_pandas() if isinstance(Hm0, (pd.Series, pd.DataFrame, xr.DataArray)): Hm0.name = "Hm0" return Hm0
[docs] def average_zero_crossing_period( S, frequency_dimension="", frequency_bins=None, to_pandas=True ): """ Calculates wave average zero crossing period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tz: pandas DataFrame or xarray Dataset Average zero crossing period [s] indexed by S.columns """ if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") # Eq 15 in IEC 62600-101 m0 = frequency_moment( S, 0, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m2 = frequency_moment( S, 2, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) Tz = np.sqrt(m0 / m2) if to_pandas: Tz = Tz.to_pandas() if isinstance(Tz, (pd.Series, pd.DataFrame, xr.DataArray)): Tz.name = "Tz" return Tz
[docs] def average_crest_period( S, frequency_dimension="", frequency_bins=None, to_pandas=True ): """ Calculates wave average crest period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tavg: pandas DataFrame or xarray Dataset Average wave period [s] indexed by S.columns """ if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") m2 = frequency_moment( S, 2, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m4 = frequency_moment( S, 4, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) Tavg = np.sqrt(m2 / m4) if to_pandas: Tavg = Tavg.to_pandas() if isinstance(Tavg, (pd.Series, pd.DataFrame, xr.DataArray)): Tavg.name = "Tavg" return Tavg
[docs] def average_wave_period(S, frequency_dimension="", frequency_bins=None, to_pandas=True): """ Calculates mean wave period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tm: pandas DataFrame or xarray Dataset Mean wave period [s] indexed by S.columns """ if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") m0 = frequency_moment( S, 0, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m1 = frequency_moment( S, 1, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) Tm = np.sqrt(m0 / m1) if to_pandas: Tm = Tm.to_pandas() if isinstance(Tm, (pd.Series, pd.DataFrame, xr.DataArray)): Tm.name = "Tm" return Tm
[docs] def peak_period(S, frequency_dimension="", to_pandas=True): """ Calculates wave peak period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tp: pandas DataFrame or xarray Dataset Wave peak period [s] indexed by S.columns """ S = convert_to_dataarray(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) # Eq 14 in IEC 62600-101 fp = S.idxmax(dim=frequency_dimension) # Hz Tp = 1 / fp if to_pandas: Tp = Tp.to_pandas() if isinstance(Tp, (pd.Series, pd.DataFrame, xr.DataArray)): Tp.name = "Tp" return Tp
[docs] def energy_period(S, frequency_dimension="", frequency_bins=None, to_pandas=True): """ Calculates wave energy period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Te: pandas DataFrame or xarray Dataset Wave energy period [s] indexed by S.columns """ S = convert_to_dataarray(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") mn1 = frequency_moment( S, -1, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m0 = frequency_moment( S, 0, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) # Eq 13 in IEC 62600-101 Te = mn1 / m0 if to_pandas: Te = Te.to_pandas() if isinstance(Te, (pd.Series, pd.DataFrame, xr.DataArray)): Te.name = "Te" return Te
[docs] def spectral_bandwidth(S, frequency_dimension="", frequency_bins=None, to_pandas=True): """ Calculates bandwidth from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- e: pandas DataFrame or xarray Dataset Spectral bandwidth [s] indexed by S.columns """ S = convert_to_dataarray(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") m2 = frequency_moment( S, 2, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m0 = frequency_moment( S, 0, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m4 = frequency_moment( S, 4, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) e = np.sqrt(1 - (m2**2) / (m0 / m4)) if to_pandas: e = e.to_pandas() if isinstance(e, (pd.Series, pd.DataFrame, xr.DataArray)): e.name = "e" return e
[docs] def spectral_width(S, frequency_dimension="", frequency_bins=None, to_pandas=True): """ Calculates wave spectral width from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- v: pandas DataFrame or xarray Dataset Spectral width [m] indexed by S.columns """ if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") mn2 = frequency_moment( S, -2, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) m0 = frequency_moment( S, 0, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) mn1 = frequency_moment( S, -1, frequency_bins=frequency_bins, frequency_dimension=frequency_dimension, to_pandas=False, ) # Eq 16 in IEC 62600-101 v = np.sqrt((m0 * mn2 / np.power(mn1, 2)) - 1) if to_pandas: v = v.to_pandas() if isinstance(v, (pd.Series, pd.DataFrame, xr.DataArray)): v.name = "v" return v
[docs] def energy_flux( S, h, deep=False, rho=1025, g=9.80665, ratio=2, frequency_dimension="", to_pandas=True, ): """ Calculates the omnidirectional wave energy flux of the spectra Parameters ----------- S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] h: float Water depth [m] deep: bool (optional) If True use the deep water approximation. Default False. When False a depth check is run to check for shallow water. The ratio of the shallow water regime can be changed using the ratio keyword. rho: float (optional) Water Density [kg/m^3]. Default = 1025 kg/m^3 g : float (optional) Gravitational acceleration [m/s^2]. Default = 9.80665 m/s^2 ratio: float or int (optional) Only applied if depth=False. If h/l > ratio, water depth will be set to deep. Default ratio = 2. frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- J: pandas DataFrame or xarray Dataset Omni-directional wave energy flux [W/m] indexed by S.columns """ S = convert_to_dataarray(S) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") if not isinstance(deep, bool): raise TypeError(f"deep must be of type bool. Got: {type(deep)}") if not isinstance(rho, (int, float)): raise TypeError(f"rho must be of type int or float. Got: {type(rho)}") if not isinstance(g, (int, float)): raise TypeError(f"g must be of type int or float. Got: {type(g)}") if not isinstance(ratio, (int, float)): raise TypeError(f"ratio must be of type int or float. Got: {type(ratio)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) f = S[frequency_dimension] if deep: # Eq 8 in IEC 62600-100, deep water simplification Te = energy_period(S, to_pandas=False) Hm0 = significant_wave_height(S, to_pandas=False) coeff = rho * (g**2) / (64 * np.pi) J = coeff * (Hm0**2) * Te else: # deep water flag is false k = wave_number(f, h, rho, g, to_pandas=False) # wave celerity (group velocity) Cg = wave_celerity(k, h, g, depth_check=True, ratio=ratio, to_pandas=False) # Calculating the wave energy flux, Eq 9 in IEC 62600-101 delta_f = f.diff(dim=frequency_dimension) delta_f0 = f[1] - f[0] delta_f0 = delta_f0.assign_coords({frequency_dimension: f[0]}) delta_f = xr.concat([delta_f0, delta_f], dim=frequency_dimension) CgSdelF = S * delta_f * Cg J = rho * g * CgSdelF.sum(dim=frequency_dimension) if to_pandas: J = J.to_pandas() if isinstance(J, (pd.Series, pd.DataFrame, xr.DataArray)): J.name = "J" return J
[docs] def energy_period_to_peak_period(Te, gamma): """ Convert from spectral energy period (Te) to peak period (Tp) using ITTC approximation for JONSWAP Spectrum. Approximation is given in "The Specialist Committee on Waves, Final Report and Recommendations to the 23rd ITTC", Proceedings of the 23rd ITTC - Volume 2, Table A4. Parameters ---------- Te: int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset gamma: float or int Peak enhancement factor for JONSWAP spectrum Returns ------- Tp: float or array Spectral peak period [s] """ if not isinstance( Te, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) ): raise TypeError( f"Te must be an int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray or xr.Dataset. Got: {type(Te)}" ) if not isinstance(gamma, (float, int)): raise TypeError(f"gamma must be of type float or int. Got: {type(gamma)}") factor = 0.8255 + 0.03852 * gamma - 0.005537 * gamma**2 + 0.0003154 * gamma**3 Tp = Te / factor if isinstance(Tp, (pd.Series, pd.DataFrame, xr.DataArray)): Tp.name = "Tp" return Tp
[docs] def wave_celerity( k, h, g=9.80665, depth_check=False, ratio=2, frequency_dimension="", to_pandas=True ): """ Calculates wave celerity (group velocity) Parameters ---------- k: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Wave number [1/m] indexed by frequency [Hz] h: float Water depth [m] g : float (optional) Gravitational acceleration [m/s^2]. Default 9.80665 m/s. depth_check: bool (optional) If True check depth regime. Default False. ratio: float or int (optional) Only applied if depth_check=True. If h/l > ratio, water depth will be set to deep. Default ratio = 2 frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- Cg: pandas DataFrame or xarray Dataset Water celerity [m/s] indexed by frequency [Hz] """ k = convert_to_dataarray(k) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") if not isinstance(g, (int, float)): raise TypeError(f"g must be of type int or float. Got: {type(g)}") if not isinstance(depth_check, bool): raise TypeError(f"depth_check must be of type bool. Got: {type(depth_check)}") if not isinstance(ratio, (int, float)): raise TypeError(f"ratio must be of type int or float. Got: {type(ratio)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(k.coords)[0] elif frequency_dimension not in list(k.dims): raise ValueError( f"frequency_dimension is not a dimension of k ({list(k.dims)}). Got: {frequency_dimension}." ) f = k[frequency_dimension] k = k.values if depth_check: l = wave_length(k) # get depth regime dr = depth_regime(l, h, ratio=ratio) # deep frequencies df = f[dr] dk = k[dr] # deep water approximation dCg = np.pi * df / dk dCg = xr.DataArray( data=dCg, dims=frequency_dimension, coords={frequency_dimension: df} ) dCg.name = "Cg" # shallow frequencies sf = f[~dr] sk = k[~dr] sCg = (np.pi * sf / sk) * (1 + (2 * h * sk) / np.sinh(2 * h * sk)) sCg = xr.DataArray( data=sCg, dims=frequency_dimension, coords={frequency_dimension: sf} ) sCg.name = "Cg" Cg = xr.concat([dCg, sCg], dim=frequency_dimension).sortby(frequency_dimension) Cg.name = "Cg" else: # Eq 10 in IEC 62600-101 Cg = (np.pi * f / k) * (1 + (2 * h * k) / np.sinh(2 * h * k)) Cg = xr.DataArray( data=Cg, dims=frequency_dimension, coords={frequency_dimension: f} ) Cg.name = "Cg" if to_pandas: Cg = Cg.to_pandas() if isinstance(Cg, (pd.Series, pd.DataFrame, xr.DataArray)): Cg.name = "Cg" return Cg
[docs] def wave_length(k): """ Calculates wave length from wave number To compute: 2*pi/wavenumber Parameters ------------- k: int, float, numpy ndarray, pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset Wave number [1/m] indexed by frequency Returns --------- l: int, float, numpy ndarray, pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset Wave length [m] indexed by frequency. Output type is identical to the type of k. """ if not isinstance( k, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) ): raise TypeError( f"k must be an int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray or xr.Dataset. Got: {type(k)}" ) l = 2 * np.pi / k if isinstance(l, (pd.Series, pd.DataFrame, xr.DataArray)): l.name = "l" return l
[docs] def wave_number(f, h, rho=1025, g=9.80665, to_pandas=True): """ Calculates wave number To compute wave number from angular frequency (w), convert w to f before using this function (f = w/2*pi) Parameters ----------- f: int, float, numpy ndarray, pandas DataFrame, pandas Series, xarray DataArray Frequency [Hz] h: float Water depth [m] rho: float (optional) Water density [kg/m^3] g: float (optional) Gravitational acceleration [m/s^2] to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- k: pandas DataFrame or xarray Dataset Wave number [1/m] indexed by frequency [Hz] """ if isinstance(f, (int, float)): f = np.asarray([f]) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") if not isinstance(rho, (int, float)): raise TypeError(f"rho must be of type int or float. Got: {type(rho)}") if not isinstance(g, (int, float)): raise TypeError(f"g must be of type int or float. Got: {type(g)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") w = 2 * np.pi * f # angular frequency xi = w / np.sqrt(g / h) # note: =h*wa/sqrt(h*g/h) yi = xi * xi / np.power(1.0 - np.exp(-np.power(xi, 2.4908)), 0.4015) k0 = yi / h # Initial guess without current-wave interaction # Eq 11 in IEC 62600-101 using initial guess from Guo (2002) def func(kk): val = np.power(w, 2) - g * kk * np.tanh(kk * h) return val mask = np.abs(func(k0)) > 1e-9 if mask.sum() > 0: k0_mask = k0[mask] w = w[mask] k, info, ier, mesg = _fsolve(func, k0_mask, full_output=True) if not ier == 1: raise ValueError("Wave number not found. " + mesg) k0[mask] = k k = k0 if isinstance(k, (pd.Series, pd.DataFrame, xr.DataArray)): k.name = "k" return k
[docs] def depth_regime(l, h, ratio=2): """ Calculates the depth regime based on wavelength and height Deep water: h/l > ratio This function exists so sinh in wave celerity doesn't blow up to infinity. P.K. Kundu, I.M. Cohen (2000) suggest h/l >> 1 for deep water (pg 209) Same citation as above, they also suggest for 3% accuracy, h/l > 0.28 (pg 210) However, since this function allows multiple wavelengths, higher ratio numbers are more accurate across varying wavelengths. Parameters ---------- l: int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset wavelength [m] h: float or int water column depth [m] ratio: float or int (optional) if h/l > ratio, water depth will be set to deep. Default ratio = 2 Returns ------- depth_reg: boolean or boolean array-like Boolean True if deep water, False otherwise """ if not isinstance( l, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) ): raise TypeError( f"l must be of type int, float, np.ndarray, pd.DataFrame, pd.Series, xr.DataArray, or xr.Dataset. Got: {type(l)}" ) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") depth_reg = h / l > ratio if isinstance(depth_reg, (pd.Series, pd.DataFrame, xr.DataArray)): depth_reg.name = "depth_reg" return depth_reg