Source code for mhkit.wave.resource

from scipy.optimize import fsolve as _fsolve
from scipy import signal as _signal
import pandas as pd
import xarray as xr
import numpy as np
from mhkit.utils import to_numeric_array, convert_to_dataarray, convert_to_dataset


### Spectrum
[docs] def elevation_spectrum( eta, sample_rate, nnft, window="hann", detrend=True, noverlap=None, time_dimension="", to_pandas=True, ): """ Calculates the wave energy spectrum from wave elevation time-series Parameters ------------ eta: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Wave surface elevation [m] indexed by time [datetime or s] sample_rate: float Data frequency [Hz] nnft: integer Number of bins in the Fast Fourier Transform window: string (optional) Signal window type. 'hann' is used by default given the broadband nature of waves. See scipy.signal.get_window for more options. detrend: bool (optional) Specifies if a linear trend is removed from the data before calculating the wave energy spectrum. Data is detrended by default. noverlap: int, optional Number of points to overlap between segments. If None, ``noverlap = nperseg / 2``. Defaults to None. time_dimension: string (optional) Name of the xarray dimension corresponding to time. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- S: pandas DataFrame or xr.Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] """ # TODO: Add confidence intervals, equal energy frequency spacing, and NDBC # frequency spacing # TODO: may need to raise an error for the length of nnft- signal.welch breaks when nfft is too short eta = convert_to_dataset(eta) if not isinstance(sample_rate, (float, int)): raise TypeError( f"sample_rate must be of type int or float. Got: {type(sample_rate)}" ) if not isinstance(nnft, int): raise TypeError(f"nnft must be of type int. Got: {type(nnft)}") if not isinstance(window, str): raise TypeError(f"window must be of type str. Got: {type(window)}") if not isinstance(detrend, bool): raise TypeError(f"detrend must be of type bool. Got: {type(detrend)}") if not nnft > 0: raise ValueError(f"nnft must be > 0. Got: {nnft}") if not sample_rate > 0: raise ValueError(f"sample_rate must be > 0. Got: {sample_rate}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if time_dimension == "": time_dimension = list(eta.dims)[0] else: if time_dimension not in list(eta.dims): raise ValueError( f"time_dimension is not a dimension of eta ({list(eta.dims)}). Got: {time_dimension}." ) time = eta[time_dimension] delta_t = time.values[1] - time.values[0] if not np.allclose(time.diff(dim=time_dimension)[1:], delta_t): raise ValueError( "Time bins are not evenly spaced. Create a constant " + "temporal spacing for eta." ) S = xr.Dataset() for var in eta.data_vars: data = eta[var] if detrend: data = _signal.detrend( data.dropna(dim=time_dimension), axis=-1, type="linear", bp=0 ) [f, wave_spec_measured] = _signal.welch( data, fs=sample_rate, window=window, nperseg=nnft, nfft=nnft, noverlap=noverlap, ) S[var] = (["Frequency"], wave_spec_measured) S = S.assign_coords({"Frequency": f}) if to_pandas: S = S.to_dataframe() return S
[docs] def pierson_moskowitz_spectrum(f, Tp, Hs, to_pandas=True): """ Calculates Pierson-Moskowitz Spectrum from IEC TS 62600-2 ED2 Annex C.2 (2019) Parameters ------------ f: list, np.ndarray, pd.Series, xr.DataArray Frequency [Hz] Tp: float/int Peak period [s] Hs: float/int Significant wave height [m] to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- S: xarray Dataset Spectral density [m^2/Hz] indexed frequency [Hz] """ f = to_numeric_array(f, "f") if not isinstance(Tp, (int, float)): raise TypeError(f"Tp must be of type int or float. Got: {type(Tp)}") if not isinstance(Hs, (int, float)): raise TypeError(f"Hs must be of type int or float. Got: {type(Hs)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") f.sort() B_PM = (5 / 4) * (1 / Tp) ** 4 A_PM = B_PM * (Hs / 2) ** 2 # Avoid a divide by zero if the 0 frequency is provided # The zero frequency should always have 0 amplitude, otherwise # we end up with a mean offset when computing the surface elevation. Sf = np.zeros(f.size) if f[0] == 0.0: inds = range(1, f.size) else: inds = range(0, f.size) Sf[inds] = A_PM * f[inds] ** (-5) * np.exp(-B_PM * f[inds] ** (-4)) name = "Pierson-Moskowitz (" + str(Tp) + "s)" S = xr.Dataset(data_vars={name: (["Frequency"], Sf)}, coords={"Frequency": f}) if to_pandas: S = S.to_pandas() return S
[docs] def jonswap_spectrum(f, Tp, Hs, gamma=None, to_pandas=True): """ Calculates JONSWAP Spectrum from IEC TS 62600-2 ED2 Annex C.2 (2019) Parameters ------------ f: list, np.ndarray, pd.Series, xr.DataArray Frequency [Hz] Tp: float/int Peak period [s] Hs: float/int Significant wave height [m] gamma: float (optional) Gamma to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- S: pandas Series or xarray DataArray Spectral density [m^2/Hz] indexed frequency [Hz] """ f = to_numeric_array(f, "f") if not isinstance(Tp, (int, float)): raise TypeError(f"Tp must be of type int or float. Got: {type(Tp)}") if not isinstance(Hs, (int, float)): raise TypeError(f"Hs must be of type int or float. Got: {type(Hs)}") if not isinstance(gamma, (int, float, type(None))): raise TypeError( f"If specified, gamma must be of type int or float. Got: {type(gamma)}" ) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") f.sort() B_PM = (5 / 4) * (1 / Tp) ** 4 A_PM = B_PM * (Hs / 2) ** 2 # Avoid a divide by zero if the 0 frequency is provided # The zero frequency should always have 0 amplitude, otherwise # we end up with a mean offset when computing the surface elevation. S_f = np.zeros(f.size) if f[0] == 0.0: inds = range(1, f.size) else: inds = range(0, f.size) S_f[inds] = A_PM * f[inds] ** (-5) * np.exp(-B_PM * f[inds] ** (-4)) if not gamma: TpsqrtHs = Tp / np.sqrt(Hs) if TpsqrtHs <= 3.6: gamma = 5 elif TpsqrtHs > 5: gamma = 1 else: gamma = np.exp(5.75 - 1.15 * TpsqrtHs) # Cutoff frequencies for gamma function siga = 0.07 sigb = 0.09 fp = 1 / Tp # peak frequency lind = np.where(f <= fp) hind = np.where(f > fp) Gf = np.zeros(f.shape) Gf[lind] = gamma ** np.exp(-((f[lind] - fp) ** 2) / (2 * siga**2 * fp**2)) Gf[hind] = gamma ** np.exp(-((f[hind] - fp) ** 2) / (2 * sigb**2 * fp**2)) C = 1 - 0.287 * np.log(gamma) Sf = C * S_f * Gf name = "JONSWAP (" + str(Hs) + "m," + str(Tp) + "s)" S = xr.Dataset(data_vars={name: (["Frequency"], Sf)}, coords={"Frequency": f}) if to_pandas: S = S.to_pandas() return S
### Metrics
[docs] def surface_elevation( S, time_index, seed=None, frequency_bins=None, phases=None, method="ifft", frequency_dimension="", to_pandas=True, ): """ Calculates wave elevation time-series from spectrum Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] time_index: numpy array Time used to create the wave elevation time-series [s], for example, time = np.arange(0,100,0.01) seed: int (optional) Random seed frequency_bins: numpy array, pandas Series, or xarray DataArray (optional) Bin widths for frequency of S. Required for unevenly sized bins phases: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Explicit phases for frequency components (overrides seed) for example, phases = np.random.rand(len(S)) * 2 * np.pi method: str (optional) Method used to calculate the surface elevation. 'ifft' (Inverse Fast Fourier Transform) used by default if the given frequency_bins==None. 'sum_of_sines' explicitly sums each frequency component and used by default if frequency_bins are provided. The 'ifft' method is significantly faster. frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- eta: pandas DataFrame or xarray Dataset Wave surface elevation [m] indexed by time [s] """ time_index = to_numeric_array(time_index, "time_index") S = convert_to_dataset(S) if not isinstance(seed, (type(None), int)): raise TypeError(f"If specified, seed must be of type int. Got: {type(seed)}") if not isinstance(phases, type(None)): phases = convert_to_dataset(phases) if not isinstance(method, str): raise TypeError(f"method must be of type str. Got: {type(method)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) f = S[frequency_dimension] if not isinstance(frequency_bins, (type(None), np.ndarray)): frequency_bins = convert_to_dataarray(frequency_bins) elif isinstance(frequency_bins, np.ndarray): frequency_bins = xr.DataArray( data=frequency_bins, dims=frequency_dimension, coords={frequency_dimension: f}, ) if frequency_bins is not None: if not frequency_bins.squeeze().shape == f.shape: raise ValueError( "shape of frequency_bins must match shape of the frequency dimension of S" ) if phases is not None: if not list(phases.data_vars) == list(S.data_vars): raise ValueError("phases must have the same variable names as S") for var in phases.data_vars: if not phases[var].shape == S[var].shape: raise ValueError( "shape of variables in phases must match shape of variables in S" ) if method is not None: if not (method == "ifft" or method == "sum_of_sines"): raise ValueError(f"Method must be 'ifft' or 'sum_of_sines'. Got: {method}") if method == "ifft": if not f[0] == 0: raise ValueError( f"ifft method must have zero frequency defined. Lowest frequency is: {S.index.values[0]}" ) if frequency_bins is None: delta_f = f.values[1] - f.values[0] if not np.allclose(f.diff(dim=frequency_dimension)[1:], delta_f): raise ValueError( "Frequency bins are not evenly spaced. " + "Define 'frequency_bins' or create a constant " + "frequency spacing for S." ) else: if not len(frequency_bins.squeeze().shape) == 1: raise ValueError("frequency_bins must only contain 1 column") delta_f = frequency_bins method = "sum_of_sines" omega = xr.DataArray( data=2 * np.pi * f, dims=frequency_dimension, coords={frequency_dimension: f} ) eta = xr.Dataset() for var in S.data_vars: if phases is None: np.random.seed(seed) phase = xr.DataArray( data=2 * np.pi * np.random.rand(S[var].size), dims="Frequency", coords={"Frequency": f}, ) else: phase = phases[var] # Wave amplitude times delta f A = 2 * S[var] A = A * delta_f A = np.sqrt(A) if method == "ifft": A_cmplx = A * (np.cos(phase) + 1j * np.sin(phase)) eta_tmp = np.fft.irfft( 0.5 * A_cmplx.values * time_index.size, time_index.size ) eta[var] = xr.DataArray( data=eta_tmp, dims="Time", coords={"Time": time_index} ) elif method == "sum_of_sines": # Product of omega and time B = np.outer(time_index, omega) B = B.reshape((len(time_index), len(omega))) B = xr.DataArray( data=B, dims=["Time", "Frequency"], coords={"Time": time_index, "Frequency": f}, ) # wave elevation # eta = xr.DataArray(columns=S.columns, index=time_index) # for mcol in eta.columns: C = np.cos(B + phase) # C = xr.DataArray(data=C, index=time_index, columns=omega.index) eta[var] = (C * A).sum(axis=1) if to_pandas: eta = eta.to_dataframe() return eta
[docs] def frequency_moment(S, N, frequency_bins=None, frequency_dimension="", to_pandas=True): """ Calculates the Nth frequency moment of the spectrum Parameters ----------- S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] N: int Moment (0 for 0th, 1 for 1st ....) frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- m: pandas DataFrame or xarray Dataset Nth Frequency Moment indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(N, int): raise TypeError(f"N must be of type int. Got: {type(N)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) f = S[frequency_dimension] # Eq 8 in IEC 62600-101 S = S.sel({frequency_dimension: slice(1e-12, f.max())}) # omit frequency of 0 f = S[frequency_dimension] # reset frequency_dimension without the 0 frequency fn = np.power(f, N) if frequency_bins is None: delta_f = f.diff(dim=frequency_dimension) delta_f0 = f[1] - f[0] delta_f0 = delta_f0.assign_coords({frequency_dimension: f[0]}) delta_f = xr.concat([delta_f0, delta_f], dim=frequency_dimension) else: delta_f = xr.DataArray( data=convert_to_dataarray(frequency_bins), dims=frequency_dimension, coords={frequency_dimension: f}, ) m = S * fn * delta_f m = m.sum(dim=frequency_dimension) m = _transform_dataset(m, "m" + str(N)) if to_pandas: m = m.to_dataframe() return m
[docs] def significant_wave_height(S, frequency_bins=None, to_pandas=True): """ Calculates wave height from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Hm0: pandas DataFrame or xarray Dataset Significant wave height [m] index by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") # Eq 12 in IEC 62600-101 m0 = frequency_moment(S, 0, frequency_bins=frequency_bins, to_pandas=False).rename( {"m0": "Hm0"} ) Hm0 = 4 * np.sqrt(m0) if to_pandas: Hm0 = Hm0.to_dataframe() return Hm0
[docs] def average_zero_crossing_period(S, frequency_bins=None, to_pandas=True): """ Calculates wave average zero crossing period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tz: pandas DataFrame or xarray Dataset Average zero crossing period [s] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") # Eq 15 in IEC 62600-101 m0 = frequency_moment(S, 0, frequency_bins=frequency_bins, to_pandas=False).rename( {"m0": "Tz"} ) m2 = frequency_moment(S, 2, frequency_bins=frequency_bins, to_pandas=False).rename( {"m2": "Tz"} ) Tz = np.sqrt(m0 / m2) if to_pandas: Tz = Tz.to_dataframe() return Tz
[docs] def average_crest_period(S, frequency_bins=None, to_pandas=True): """ Calculates wave average crest period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tavg: pandas DataFrame or xarray Dataset Average wave period [s] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") m2 = frequency_moment(S, 2, frequency_bins=frequency_bins, to_pandas=False).rename( {"m2": "Tavg"} ) m4 = frequency_moment(S, 4, frequency_bins=frequency_bins, to_pandas=False).rename( {"m4": "Tavg"} ) Tavg = np.sqrt(m2 / m4) if to_pandas: Tavg = Tavg.to_dataframe() return Tavg
[docs] def average_wave_period(S, frequency_bins=None, to_pandas=True): """ Calculates mean wave period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tm: pandas DataFrame or xarray Dataset Mean wave period [s] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") m0 = frequency_moment(S, 0, frequency_bins=frequency_bins, to_pandas=False).rename( {"m0": "Tm"} ) m1 = frequency_moment(S, 1, frequency_bins=frequency_bins, to_pandas=False).rename( {"m1": "Tm"} ) Tm = np.sqrt(m0 / m1) if to_pandas: Tm = Tm.to_dataframe() return Tm
[docs] def peak_period(S, frequency_dimension="", to_pandas=True): """ Calculates wave peak period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Tp: pandas DataFrame or xarray Dataset Wave peak period [s] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) # Eq 14 in IEC 62600-101 fp = S.idxmax(dim=frequency_dimension) # Hz Tp = 1 / fp Tp = _transform_dataset(Tp, "Tp") if to_pandas: Tp = Tp.to_dataframe() return Tp
[docs] def energy_period(S, frequency_bins=None, to_pandas=True): """ Calculates wave energy period from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- Te: pandas DataFrame or xarray Dataset Wave energy period [s] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") mn1 = frequency_moment( S, -1, frequency_bins=frequency_bins, to_pandas=False ).rename({"m-1": "Te"}) m0 = frequency_moment(S, 0, frequency_bins=frequency_bins, to_pandas=False).rename( {"m0": "Te"} ) # Eq 13 in IEC 62600-101 Te = mn1 / m0 if to_pandas: Te = Te.to_dataframe() return Te
[docs] def spectral_bandwidth(S, frequency_bins=None, to_pandas=True): """ Calculates bandwidth from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- e: pandas DataFrame or xarray Dataset Spectral bandwidth [s] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") m2 = frequency_moment(S, 2, frequency_bins=frequency_bins, to_pandas=False).rename( {"m2": "e"} ) m0 = frequency_moment(S, 0, frequency_bins=frequency_bins, to_pandas=False).rename( {"m0": "e"} ) m4 = frequency_moment(S, 4, frequency_bins=frequency_bins, to_pandas=False).rename( {"m4": "e"} ) e = np.sqrt(1 - (m2**2) / (m0 / m4)) if to_pandas: e = e.to_dataframe() return e
[docs] def spectral_width(S, frequency_bins=None, to_pandas=True): """ Calculates wave spectral width from spectra Parameters ------------ S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] frequency_bins: numpy array or pandas Series (optional) Bin widths for frequency of S. Required for unevenly sized bins to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns --------- v: pandas DataFrame or xarray Dataset Spectral width [m] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") mn2 = frequency_moment( S, -2, frequency_bins=frequency_bins, to_pandas=False ).rename({"m-2": "v"}) m0 = frequency_moment(S, 0, frequency_bins=frequency_bins, to_pandas=False).rename( {"m0": "v"} ) mn1 = frequency_moment( S, -1, frequency_bins=frequency_bins, to_pandas=False ).rename({"m-1": "v"}) # Eq 16 in IEC 62600-101 v = np.sqrt((m0 * mn2 / np.power(mn1, 2)) - 1) if to_pandas: v = v.to_dataframe() return v
[docs] def energy_flux( S, h, deep=False, rho=1025, g=9.80665, ratio=2, frequency_dimension="", to_pandas=True, ): """ Calculates the omnidirectional wave energy flux of the spectra Parameters ----------- S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Spectral density [m^2/Hz] indexed by frequency [Hz] h: float Water depth [m] deep: bool (optional) If True use the deep water approximation. Default False. When False a depth check is run to check for shallow water. The ratio of the shallow water regime can be changed using the ratio keyword. rho: float (optional) Water Density [kg/m^3]. Default = 1025 kg/m^3 g : float (optional) Gravitational acceleration [m/s^2]. Default = 9.80665 m/s^2 ratio: float or int (optional) Only applied if depth=False. If h/l > ratio, water depth will be set to deep. Default ratio = 2. frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- J: pandas DataFrame or xarray Dataset Omni-directional wave energy flux [W/m] indexed by S.columns """ S = convert_to_dataset(S) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") if not isinstance(deep, bool): raise TypeError(f"deep must be of type bool. Got: {type(deep)}") if not isinstance(rho, (int, float)): raise TypeError(f"rho must be of type int or float. Got: {type(rho)}") if not isinstance(g, (int, float)): raise TypeError(f"g must be of type int or float. Got: {type(g)}") if not isinstance(ratio, (int, float)): raise TypeError(f"ratio must be of type int or float. Got: {type(ratio)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(S.coords)[0] elif frequency_dimension not in list(S.dims): raise ValueError( f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}." ) f = S[frequency_dimension] if deep: # Eq 8 in IEC 62600-100, deep water simplification Te = energy_period(S, to_pandas=False).rename({"Te": "J"}) Hm0 = significant_wave_height(S, to_pandas=False).rename({"Hm0": "J"}) coeff = rho * (g**2) / (64 * np.pi) J = coeff * (Hm0**2) * Te else: # deep water flag is false k = wave_number(f, h, rho, g, to_pandas=False) # wave celerity (group velocity) Cg = wave_celerity(k, h, g, depth_check=True, ratio=ratio, to_pandas=False)[ "Cg" ] # Calculating the wave energy flux, Eq 9 in IEC 62600-101 delta_f = f.diff(dim=frequency_dimension) delta_f0 = f[1] - f[0] delta_f0 = delta_f0.assign_coords({frequency_dimension: f[0]}) delta_f = xr.concat([delta_f0, delta_f], dim=frequency_dimension) CgSdelF = S * delta_f * Cg J = rho * g * CgSdelF.sum(dim=frequency_dimension) J = _transform_dataset(J, "J") if to_pandas: J = J.to_dataframe() return J
[docs] def energy_period_to_peak_period(Te, gamma): """ Convert from spectral energy period (Te) to peak period (Tp) using ITTC approximation for JONSWAP Spectrum. Approximation is given in "The Specialist Committee on Waves, Final Report and Recommendations to the 23rd ITTC", Proceedings of the 23rd ITTC - Volume 2, Table A4. Parameters ---------- Te: int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset gamma: float or int Peak enhancement factor for JONSWAP spectrum Returns ------- Tp: float or array Spectral peak period [s] """ if not isinstance( Te, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) ): raise TypeError( f"Te must be an int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray or xr.Dataset. Got: {type(Te)}" ) if not isinstance(gamma, (float, int)): raise TypeError(f"gamma must be of type float or int. Got: {type(gamma)}") factor = 0.8255 + 0.03852 * gamma - 0.005537 * gamma**2 + 0.0003154 * gamma**3 Tp = Te / factor if isinstance(Tp, xr.Dataset): Tp.rename({"Te": "Tp"}) return Tp
[docs] def wave_celerity( k, h, g=9.80665, depth_check=False, ratio=2, frequency_dimension="", to_pandas=True ): """ Calculates wave celerity (group velocity) Parameters ---------- k: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Wave number [1/m] indexed by frequency [Hz] h: float Water depth [m] g : float (optional) Gravitational acceleration [m/s^2]. Default 9.80665 m/s. depth_check: bool (optional) If True check depth regime. Default False. ratio: float or int (optional) Only applied if depth_check=True. If h/l > ratio, water depth will be set to deep. Default ratio = 2 frequency_dimension: string (optional) Name of the xarray dimension corresponding to frequency. If not supplied, defaults to the first dimension. Does not affect pandas input. to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- Cg: pandas DataFrame or xarray Dataset Water celerity [m/s] indexed by frequency [Hz] """ k = convert_to_dataarray(k) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") if not isinstance(g, (int, float)): raise TypeError(f"g must be of type int or float. Got: {type(g)}") if not isinstance(depth_check, bool): raise TypeError(f"depth_check must be of type bool. Got: {type(depth_check)}") if not isinstance(ratio, (int, float)): raise TypeError(f"ratio must be of type int or float. Got: {type(ratio)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if frequency_dimension == "": frequency_dimension = list(k.coords)[0] elif frequency_dimension not in list(k.dims): raise ValueError( f"frequency_dimension is not a dimension of k ({list(k.dims)}). Got: {frequency_dimension}." ) f = k[frequency_dimension] k = k.values if depth_check: l = wave_length(k) # get depth regime dr = depth_regime(l, h, ratio=ratio) # deep frequencies df = f[dr] dk = k[dr] # deep water approximation dCg = np.pi * df / dk dCg = xr.DataArray( data=dCg, dims=frequency_dimension, coords={frequency_dimension: df} ) dCg.name = "Cg" # shallow frequencies sf = f[~dr] sk = k[~dr] sCg = (np.pi * sf / sk) * (1 + (2 * h * sk) / np.sinh(2 * h * sk)) sCg = xr.DataArray( data=sCg, dims=frequency_dimension, coords={frequency_dimension: sf} ) sCg.name = "Cg" Cg = xr.concat([dCg, sCg], dim=frequency_dimension).sortby(frequency_dimension) Cg.name = "Cg" else: # Eq 10 in IEC 62600-101 Cg = (np.pi * f / k) * (1 + (2 * h * k) / np.sinh(2 * h * k)) Cg = xr.DataArray( data=Cg, dims=frequency_dimension, coords={frequency_dimension: f} ) Cg.name = "Cg" Cg = Cg.to_dataset() if to_pandas: Cg = Cg.to_dataframe() return Cg
[docs] def wave_length(k): """ Calculates wave length from wave number To compute: 2*pi/wavenumber Parameters ------------- k: int, float, numpy ndarray, pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset Wave number [1/m] indexed by frequency Returns --------- l: int, float, numpy ndarray, pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset Wave length [m] indexed by frequency. Output type is identical to the type of k. """ if not isinstance( k, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) ): raise TypeError( f"k must be an int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray or xr.Dataset. Got: {type(k)}" ) l = 2 * np.pi / k return l
[docs] def wave_number(f, h, rho=1025, g=9.80665, to_pandas=True): """ Calculates wave number To compute wave number from angular frequency (w), convert w to f before using this function (f = w/2*pi) Parameters ----------- f: int, float, numpy ndarray, pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset Frequency [Hz] h: float Water depth [m] rho: float (optional) Water density [kg/m^3] g: float (optional) Gravitational acceleration [m/s^2] to_pandas: bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- k: pandas DataFrame or xarray Dataset Wave number [1/m] indexed by frequency [Hz] """ if isinstance(f, (int, float)): f = np.asarray([f]) f = convert_to_dataarray(f) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") if not isinstance(rho, (int, float)): raise TypeError(f"rho must be of type int or float. Got: {type(rho)}") if not isinstance(g, (int, float)): raise TypeError(f"g must be of type int or float. Got: {type(g)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") w = 2 * np.pi * f # angular frequency xi = w / np.sqrt(g / h) # note: =h*wa/sqrt(h*g/h) yi = xi * xi / np.power(1.0 - np.exp(-np.power(xi, 2.4908)), 0.4015) k0 = yi / h # Initial guess without current-wave interaction # Eq 11 in IEC 62600-101 using initial guess from Guo (2002) def func(kk): val = np.power(w, 2) - g * kk * np.tanh(kk * h) return val mask = np.abs(func(k0)) > 1e-9 if mask.sum() > 0: k0_mask = k0[mask] w = w[mask] k, info, ier, mesg = _fsolve(func, k0_mask, full_output=True) if not ier == 1: raise ValueError("Wave number not found. " + mesg) k0[mask] = k k0.name = "k" k = k0.to_dataset() if to_pandas: k = k.to_dataframe() return k
[docs] def depth_regime(l, h, ratio=2): """ Calculates the depth regime based on wavelength and height Deep water: h/l > ratio This function exists so sinh in wave celerity doesn't blow up to infinity. P.K. Kundu, I.M. Cohen (2000) suggest h/l >> 1 for deep water (pg 209) Same citation as above, they also suggest for 3% accuracy, h/l > 0.28 (pg 210) However, since this function allows multiple wavelengths, higher ratio numbers are more accurate across varying wavelengths. Parameters ---------- l: int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset wavelength [m] h: float or int water column depth [m] ratio: float or int (optional) if h/l > ratio, water depth will be set to deep. Default ratio = 2 Returns ------- depth_reg: boolean or boolean array-like Boolean True if deep water, False otherwise """ if not isinstance( l, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset) ): raise TypeError( f"l must be of type int, float, np.ndarray, pd.DataFrame, pd.Series, xr.DataArray, or xr.Dataset. Got: {type(l)}" ) if not isinstance(h, (int, float)): raise TypeError(f"h must be of type int or float. Got: {type(h)}") depth_reg = h / l > ratio return depth_reg
def _transform_dataset(data, name): # Converting data from a Dataset into a DataArray will turn the variables # columns into a 'variable' dimension. # Converting it back to a dataset will keep this concise variable dimension # but in the expected xr.Dataset/pd.DataFrame format data = data.to_array() data = convert_to_dataset(data, name=name) data = data.rename({"variable": "index"}) return data