import warnings
from scipy.optimize import fsolve as _fsolve
from scipy import signal as _signal
import pandas as pd
import xarray as xr
import numpy as np
from mhkit.utils import to_numeric_array, convert_to_dataarray, convert_to_dataset
### Spectrum
[docs]
def elevation_spectrum(
    eta,
    sample_rate,
    nnft,
    window="hann",
    detrend=True,
    noverlap=None,
    time_dimension="",
    to_pandas=True,
):
    """
    Calculates the wave energy spectrum from wave elevation time-series
    Parameters
    ------------
    eta: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Wave surface elevation [m] indexed by time [datetime or s]
    sample_rate: float
        Data frequency [Hz]
    nnft: integer
        Number of bins in the Fast Fourier Transform
    window: string (optional)
        Signal window type. 'hann' is used by default given the broadband
        nature of waves. See scipy.signal.get_window for more options.
    detrend: bool (optional)
        Specifies if a linear trend is removed from the data before
        calculating the wave energy spectrum.  Data is detrended by default.
    noverlap: int, optional
        Number of points to overlap between segments. If None,
        ``noverlap = nperseg / 2``.  Defaults to None.
    time_dimension: string (optional)
        Name of the xarray dimension corresponding to time. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz].
    """
    # TODO: Add confidence intervals, equal energy frequency spacing, and NDBC
    #       frequency spacing
    # TODO: may need to raise an error for the length of nnft- signal.welch breaks when nfft is too short
    eta = convert_to_dataset(eta, "eta")
    if not isinstance(sample_rate, (float, int)):
        raise TypeError(
            f"sample_rate must be of type int or float. Got: {type(sample_rate)}"
        )
    if not isinstance(nnft, int):
        raise TypeError(f"nnft must be of type int. Got: {type(nnft)}")
    if not isinstance(window, str):
        raise TypeError(f"window must be of type str. Got: {type(window)}")
    if not isinstance(detrend, bool):
        raise TypeError(f"detrend must be of type bool. Got: {type(detrend)}")
    if not nnft > 0:
        raise ValueError(f"nnft must be > 0. Got: {nnft}")
    if not sample_rate > 0:
        raise ValueError(f"sample_rate must be > 0. Got: {sample_rate}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if time_dimension == "":
        time_dimension = list(eta.dims)[0]
    else:
        if time_dimension not in list(eta.dims):
            raise ValueError(
                f"time_dimension is not a dimension of eta ({list(eta.dims)}). Got: {time_dimension}."
            )
    time = eta[time_dimension]
    delta_t = time.values[1] - time.values[0]
    if not np.allclose(time.diff(dim=time_dimension)[1:], delta_t):
        raise ValueError(
            "Time bins are not evenly spaced. Create a constant "
            + f"temporal spacing for eta."
        )
    S = xr.Dataset()
    for var in eta.data_vars:
        eta_subset = eta[var]
        if detrend:
            eta_subset = _signal.detrend(
                eta_subset.dropna(dim=time_dimension), axis=-1, type="linear", bp=0
            )
        [f, wave_spec_measured] = _signal.welch(
            eta_subset,
            fs=sample_rate,
            window=window,
            nperseg=nnft,
            nfft=nnft,
            noverlap=noverlap,
        )
        S[var] = (["Frequency"], wave_spec_measured)
    S = S.assign_coords({"Frequency": f})
    if to_pandas:
        S = S.to_pandas()
    return S 
[docs]
def pierson_moskowitz_spectrum(f, Tp, Hs, to_pandas=True):
    """
    Calculates Pierson-Moskowitz Spectrum from IEC TS 62600-2 ED2 Annex C.2 (2019)
    Parameters
    ------------
    f: list, np.ndarray, pd.Series, xr.DataArray
        Frequency [Hz]
    Tp: float/int
        Peak period [s]
    Hs: float/int
        Significant wave height [m]
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    S: xarray Dataset
        Spectral density [m^2/Hz] indexed frequency [Hz]
    """
    f = to_numeric_array(f, "f")
    if not isinstance(Tp, (int, float)):
        raise TypeError(f"Tp must be of type int or float. Got: {type(Tp)}")
    if not isinstance(Hs, (int, float)):
        raise TypeError(f"Hs must be of type int or float. Got: {type(Hs)}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    f.sort()
    B_PM = (5 / 4) * (1 / Tp) ** 4
    A_PM = B_PM * (Hs / 2) ** 2
    # Avoid a divide by zero if the 0 frequency is provided
    # The zero frequency should always have 0 amplitude, otherwise
    # we end up with a mean offset when computing the surface elevation.
    Sf = np.zeros(f.size)
    if f[0] == 0.0:
        inds = range(1, f.size)
    else:
        inds = range(0, f.size)
    Sf[inds] = A_PM * f[inds] ** (-5) * np.exp(-B_PM * f[inds] ** (-4))
    name = "Pierson-Moskowitz (" + str(Tp) + "s)"
    S = xr.Dataset(data_vars={name: (["Frequency"], Sf)}, coords={"Frequency": f})
    if to_pandas:
        S = S.to_pandas()
    return S 
[docs]
def jonswap_spectrum(f, Tp, Hs, gamma=None, to_pandas=True):
    """
    Calculates JONSWAP Spectrum from IEC TS 62600-2 ED2 Annex C.2 (2019)
    Parameters
    ------------
    f: list, np.ndarray, pd.Series, xr.DataArray
        Frequency [Hz]
    Tp: float/int
        Peak period [s]
    Hs: float/int
        Significant wave height [m]
    gamma: float (optional)
        Gamma
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    S: pandas Series or xarray DataArray
        Spectral density [m^2/Hz] indexed frequency [Hz]
    """
    f = to_numeric_array(f, "f")
    if not isinstance(Tp, (int, float)):
        raise TypeError(f"Tp must be of type int or float. Got: {type(Tp)}")
    if not isinstance(Hs, (int, float)):
        raise TypeError(f"Hs must be of type int or float. Got: {type(Hs)}")
    if not isinstance(gamma, (int, float, type(None))):
        raise TypeError(
            f"If specified, gamma must be of type int or float. Got: {type(gamma)}"
        )
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    f.sort()
    B_PM = (5 / 4) * (1 / Tp) ** 4
    A_PM = B_PM * (Hs / 2) ** 2
    # Avoid a divide by zero if the 0 frequency is provided
    # The zero frequency should always have 0 amplitude, otherwise
    # we end up with a mean offset when computing the surface elevation.
    S_f = np.zeros(f.size)
    if f[0] == 0.0:
        inds = range(1, f.size)
    else:
        inds = range(0, f.size)
    S_f[inds] = A_PM * f[inds] ** (-5) * np.exp(-B_PM * f[inds] ** (-4))
    if not gamma:
        TpsqrtHs = Tp / np.sqrt(Hs)
        if TpsqrtHs <= 3.6:
            gamma = 5
        elif TpsqrtHs > 5:
            gamma = 1
        else:
            gamma = np.exp(5.75 - 1.15 * TpsqrtHs)
    # Cutoff frequencies for gamma function
    siga = 0.07
    sigb = 0.09
    fp = 1 / Tp  # peak frequency
    lind = np.where(f <= fp)
    hind = np.where(f > fp)
    Gf = np.zeros(f.shape)
    Gf[lind] = gamma ** np.exp(-((f[lind] - fp) ** 2) / (2 * siga**2 * fp**2))
    Gf[hind] = gamma ** np.exp(-((f[hind] - fp) ** 2) / (2 * sigb**2 * fp**2))
    C = 1 - 0.287 * np.log(gamma)
    Sf = C * S_f * Gf
    name = "JONSWAP (" + str(Hs) + "m," + str(Tp) + "s)"
    S = xr.Dataset(data_vars={name: (["Frequency"], Sf)}, coords={"Frequency": f})
    if to_pandas:
        S = S.to_pandas()
    return S 
### Metrics
[docs]
def surface_elevation(
    S,
    time_index,
    seed=None,
    frequency_bins=None,
    phases=None,
    method="ifft",
    frequency_dimension="",
    to_pandas=True,
):
    """
    Calculates wave elevation time-series from spectrum
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    time_index: numpy array
        Time used to create the wave elevation time-series [s],
        for example, time = np.arange(0,100,0.01)
    seed: int (optional)
        Random seed
    frequency_bins: numpy array, pandas Series, or xarray DataArray (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    phases: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Explicit phases for frequency components (overrides seed)
        for example, phases = np.random.rand(len(S)) * 2 * np.pi
    method: str (optional)
        Method used to calculate the surface elevation. 'ifft'
        (Inverse Fast Fourier Transform) used by default if the
        given frequency_bins==None or is evenly spaced.
        'sum_of_sines' explicitly sums each frequency component
        and used by default if uneven frequency_bins are provided.
        The 'ifft' method is significantly faster.
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension (the index for pandas input).
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    eta: pandas DataFrame or xarray Dataset
        Wave surface elevation [m] indexed by time [s].
    """
    S = convert_to_dataset(S, "S")
    time_index = to_numeric_array(time_index, "time_index")
    if not isinstance(seed, (type(None), int)):
        raise TypeError(f"If specified, seed must be of type int. Got: {type(seed)}")
    if not isinstance(phases, type(None)):
        phases = convert_to_dataset(phases)
    if not isinstance(method, str):
        raise TypeError(f"method must be of type str. Got: {type(method)}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if frequency_dimension == "":
        frequency_dimension = list(S.coords)[0]
    elif frequency_dimension not in list(S.dims):
        # frequency_dimension given, but not in list of possible dimensions
        raise ValueError(
            f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}."
        )
    frequency_axis = list(S.dims).index(frequency_dimension)
    # Create dimensions and coordinates for the new dataset (frequency becomes time)
    new_dims = list(S.dims)
    new_dims[frequency_axis] = "Time"
    new_coords = S.sum(dim=frequency_dimension).coords
    new_coords = new_coords.assign({"Time": time_index})
    f = S[frequency_dimension]
    if not isinstance(frequency_bins, (type(None), np.ndarray)):
        frequency_bins = convert_to_dataarray(frequency_bins)
    elif isinstance(frequency_bins, np.ndarray):
        frequency_bins = xr.DataArray(
            data=frequency_bins,
            dims=frequency_dimension,
            coords={frequency_dimension: f},
        )
    if frequency_bins is not None:
        if not frequency_bins.squeeze().shape == f.shape:
            raise ValueError(
                "shape of frequency_bins must only contain 1 column and match the shape of the frequency dimension of S"
            )
        delta_f = frequency_bins
        delta_f_even = np.allclose(frequency_bins, frequency_bins[0])
        if delta_f_even:
            # reduce delta_f to a scalar if it is uniform
            delta_f = delta_f[0].item()
    else:
        delta_f = f.values[1] - f.values[0]
        delta_f_even = np.allclose(f.diff(dim=frequency_dimension)[1:], delta_f)
    if phases is not None:
        for var in phases.data_vars:
            if not phases[var].shape == S[var].shape:
                raise ValueError(
                    "shape of variables in phases must match shape of variables in S"
                )
    if method == "ifft":
        # ifft method must have a zero frequency and evenly spaced frequency bins
        if not f[0] == 0:
            warnings.warn(
                f"ifft method must have zero frequency defined. Lowest frequency is: {f[0].values}. Setting method to less efficient `sum_of_sines` method."
            )
            method = "sum_of_sines"
        if not delta_f_even:
            warnings.warn(
                f"ifft method must have evenly spaced frequency bins. Setting method to less efficient `sum_of_sines` method."
            )
            method = "sum_of_sines"
    elif method == "sum_of_sines":
        # For sum of sines, does not matter if there is a zero frequency or if frequency bins are evenly spaced
        pass
    else:
        raise ValueError(f"Method must be 'ifft' or 'sum_of_sines'. Got: {method}")
    omega = xr.DataArray(
        data=2 * np.pi * f, dims=frequency_dimension, coords={frequency_dimension: f}
    )
    eta = xr.Dataset()
    for var in S.data_vars:
        S_subset = S[var]
        if phases is None:
            np.random.seed(seed)
            phase = xr.DataArray(
                data=2
                * np.pi
                * np.random.random_sample(S_subset[frequency_dimension].shape),
                dims=frequency_dimension,
                coords={frequency_dimension: f},
            )
        else:
            phase = phases[var]
        # Wave amplitude times delta f
        A = np.sqrt(2 * S_subset * delta_f)
        if method == "ifft":
            A_cmplx = A * (np.cos(phase) + 1j * np.sin(phase))
            eta_tmp = np.fft.irfftn(
                0.5 * A_cmplx * time_index.size,
                list(time_index.shape),
                axes=[frequency_axis],
            )
            eta[var] = xr.DataArray(data=eta_tmp, dims=new_dims, coords=new_coords)
        elif method == "sum_of_sines":
            # Product of omega and time
            B = np.outer(time_index, omega)
            B = B.reshape((len(time_index), len(omega)))
            B = xr.DataArray(
                data=B,
                dims=["Time", frequency_dimension],
                coords={"Time": time_index, frequency_dimension: f},
            )
            # wave elevation
            C = np.cos(B + phase)
            eta[var] = (C * A).sum(dim=frequency_dimension)
    if to_pandas:
        eta = eta.to_pandas()
    if isinstance(eta, (pd.Series, pd.DataFrame, xr.DataArray)):
        eta.name = "eta"
    return eta 
[docs]
def frequency_moment(S, N, frequency_bins=None, frequency_dimension="", to_pandas=True):
    """
    Calculates the Nth frequency moment of the spectrum
    Parameters
    -----------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    N: int
        Moment (0 for 0th, 1 for 1st ....)
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    -------
    m: pandas DataFrame or xarray Dataset
        Nth Frequency Moment indexed by S.columns
    """
    S = convert_to_dataarray(S)
    if not isinstance(N, int):
        raise TypeError(f"N must be of type int. Got: {type(N)}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if frequency_dimension == "":
        frequency_dimension = list(S.coords)[0]
    elif frequency_dimension not in list(S.dims):
        raise ValueError(
            f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}."
        )
    f = S[frequency_dimension]
    # Eq 8 in IEC 62600-101
    S = S.sel({frequency_dimension: slice(1e-12, f.max())})  # omit frequency of 0
    f = S[frequency_dimension]  # reset frequency_dimension without the 0 frequency
    fn = np.power(f, N)
    if frequency_bins is None:
        delta_f = f.diff(dim=frequency_dimension)
        delta_f0 = f[1] - f[0]
        delta_f0 = delta_f0.assign_coords({frequency_dimension: f[0]})
        delta_f = xr.concat([delta_f0, delta_f], dim=frequency_dimension)
    else:
        delta_f = xr.DataArray(
            data=convert_to_dataarray(frequency_bins),
            dims=frequency_dimension,
            coords={frequency_dimension: f},
        )
    m = S * fn * delta_f
    m = m.sum(dim=frequency_dimension)
    if to_pandas:
        m = m.to_pandas()
    if isinstance(m, (pd.Series, pd.DataFrame, xr.DataArray)):
        m.name = "m" + str(N)
    return m 
[docs]
def significant_wave_height(
    S, frequency_dimension="", frequency_bins=None, to_pandas=True
):
    """
    Calculates wave height from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    Hm0: pandas DataFrame or xarray Dataset
        Significant wave height [m] index by S.columns
    """
    S = convert_to_dataarray(S)
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    # Eq 12 in IEC 62600-101
    m0 = frequency_moment(
        S,
        0,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    Hm0 = 4 * np.sqrt(m0)
    if to_pandas:
        Hm0 = Hm0.to_pandas()
    if isinstance(Hm0, (pd.Series, pd.DataFrame, xr.DataArray)):
        Hm0.name = "Hm0"
    return Hm0 
[docs]
def average_zero_crossing_period(
    S, frequency_dimension="", frequency_bins=None, to_pandas=True
):
    """
    Calculates wave average zero crossing period from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    Tz: pandas DataFrame or xarray Dataset
        Average zero crossing period [s] indexed by S.columns
    """
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    # Eq 15 in IEC 62600-101
    m0 = frequency_moment(
        S,
        0,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m2 = frequency_moment(
        S,
        2,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    Tz = np.sqrt(m0 / m2)
    if to_pandas:
        Tz = Tz.to_pandas()
    if isinstance(Tz, (pd.Series, pd.DataFrame, xr.DataArray)):
        Tz.name = "Tz"
    return Tz 
[docs]
def average_crest_period(
    S, frequency_dimension="", frequency_bins=None, to_pandas=True
):
    """
    Calculates wave average crest period from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    Tavg: pandas DataFrame or xarray Dataset
        Average wave period [s] indexed by S.columns
    """
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    m2 = frequency_moment(
        S,
        2,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m4 = frequency_moment(
        S,
        4,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    Tavg = np.sqrt(m2 / m4)
    if to_pandas:
        Tavg = Tavg.to_pandas()
    if isinstance(Tavg, (pd.Series, pd.DataFrame, xr.DataArray)):
        Tavg.name = "Tavg"
    return Tavg 
[docs]
def average_wave_period(S, frequency_dimension="", frequency_bins=None, to_pandas=True):
    """
    Calculates mean wave period from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    Tm: pandas DataFrame or xarray Dataset
        Mean wave period [s] indexed by S.columns
    """
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    m0 = frequency_moment(
        S,
        0,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m1 = frequency_moment(
        S,
        1,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    Tm = np.sqrt(m0 / m1)
    if to_pandas:
        Tm = Tm.to_pandas()
    if isinstance(Tm, (pd.Series, pd.DataFrame, xr.DataArray)):
        Tm.name = "Tm"
    return Tm 
[docs]
def peak_period(S, frequency_dimension="", to_pandas=True):
    """
    Calculates wave peak period from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    Tp: pandas DataFrame or xarray Dataset
        Wave peak period [s] indexed by S.columns
    """
    S = convert_to_dataarray(S)
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if frequency_dimension == "":
        frequency_dimension = list(S.coords)[0]
    elif frequency_dimension not in list(S.dims):
        raise ValueError(
            f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}."
        )
    # Eq 14 in IEC 62600-101
    fp = S.idxmax(dim=frequency_dimension)  # Hz
    Tp = 1 / fp
    if to_pandas:
        Tp = Tp.to_pandas()
    if isinstance(Tp, (pd.Series, pd.DataFrame, xr.DataArray)):
        Tp.name = "Tp"
    return Tp 
[docs]
def energy_period(S, frequency_dimension="", frequency_bins=None, to_pandas=True):
    """
    Calculates wave energy period from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    Te: pandas DataFrame or xarray Dataset
        Wave energy period [s] indexed by S.columns
    """
    S = convert_to_dataarray(S)
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    mn1 = frequency_moment(
        S,
        -1,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m0 = frequency_moment(
        S,
        0,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    # Eq 13 in IEC 62600-101
    Te = mn1 / m0
    if to_pandas:
        Te = Te.to_pandas()
    if isinstance(Te, (pd.Series, pd.DataFrame, xr.DataArray)):
        Te.name = "Te"
    return Te 
[docs]
def spectral_bandwidth(S, frequency_dimension="", frequency_bins=None, to_pandas=True):
    """
    Calculates bandwidth from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    e: pandas DataFrame or xarray Dataset
        Spectral bandwidth [s] indexed by S.columns
    """
    S = convert_to_dataarray(S)
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    m2 = frequency_moment(
        S,
        2,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m0 = frequency_moment(
        S,
        0,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m4 = frequency_moment(
        S,
        4,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    e = np.sqrt(1 - (m2**2) / (m0 / m4))
    if to_pandas:
        e = e.to_pandas()
    if isinstance(e, (pd.Series, pd.DataFrame, xr.DataArray)):
        e.name = "e"
    return e 
[docs]
def spectral_width(S, frequency_dimension="", frequency_bins=None, to_pandas=True):
    """
    Calculates wave spectral width from spectra
    Parameters
    ------------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    frequency_bins: numpy array or pandas Series (optional)
        Bin widths for frequency of S. Required for unevenly sized bins
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    ---------
    v: pandas DataFrame or xarray Dataset
        Spectral width [m] indexed by S.columns
    """
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    mn2 = frequency_moment(
        S,
        -2,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    m0 = frequency_moment(
        S,
        0,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    mn1 = frequency_moment(
        S,
        -1,
        frequency_bins=frequency_bins,
        frequency_dimension=frequency_dimension,
        to_pandas=False,
    )
    # Eq 16 in IEC 62600-101
    v = np.sqrt((m0 * mn2 / np.power(mn1, 2)) - 1)
    if to_pandas:
        v = v.to_pandas()
    if isinstance(v, (pd.Series, pd.DataFrame, xr.DataArray)):
        v.name = "v"
    return v 
[docs]
def energy_flux(
    S,
    h,
    deep=False,
    rho=1025,
    g=9.80665,
    ratio=2,
    frequency_dimension="",
    to_pandas=True,
):
    """
    Calculates the omnidirectional wave energy flux of the spectra
    Parameters
    -----------
    S: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Spectral density [m^2/Hz] indexed by frequency [Hz]
    h: float
        Water depth [m]
    deep: bool (optional)
        If True use the deep water approximation. Default False. When
        False a depth check is run to check for shallow water. The ratio
        of the shallow water regime can be changed using the ratio
        keyword.
    rho: float (optional)
        Water Density [kg/m^3]. Default = 1025 kg/m^3
    g : float (optional)
        Gravitational acceleration [m/s^2]. Default = 9.80665 m/s^2
    ratio: float or int (optional)
        Only applied if depth=False. If h/l > ratio,
        water depth will be set to deep. Default ratio = 2.
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    -------
    J: pandas DataFrame or xarray Dataset
        Omni-directional wave energy flux [W/m] indexed by S.columns
    """
    S = convert_to_dataarray(S)
    if not isinstance(h, (int, float)):
        raise TypeError(f"h must be of type int or float. Got: {type(h)}")
    if not isinstance(deep, bool):
        raise TypeError(f"deep must be of type bool. Got: {type(deep)}")
    if not isinstance(rho, (int, float)):
        raise TypeError(f"rho must be of type int or float. Got: {type(rho)}")
    if not isinstance(g, (int, float)):
        raise TypeError(f"g must be of type int or float. Got: {type(g)}")
    if not isinstance(ratio, (int, float)):
        raise TypeError(f"ratio must be of type int or float. Got: {type(ratio)}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if frequency_dimension == "":
        frequency_dimension = list(S.coords)[0]
    elif frequency_dimension not in list(S.dims):
        raise ValueError(
            f"frequency_dimension is not a dimension of S ({list(S.dims)}). Got: {frequency_dimension}."
        )
    f = S[frequency_dimension]
    if deep:
        # Eq 8 in IEC 62600-100, deep water simplification
        Te = energy_period(S, to_pandas=False)
        Hm0 = significant_wave_height(S, to_pandas=False)
        coeff = rho * (g**2) / (64 * np.pi)
        J = coeff * (Hm0**2) * Te
    else:
        # deep water flag is false
        k = wave_number(f, h, rho, g, to_pandas=False)
        # wave celerity (group velocity)
        Cg = wave_celerity(k, h, g, depth_check=True, ratio=ratio, to_pandas=False)
        # Calculating the wave energy flux, Eq 9 in IEC 62600-101
        delta_f = f.diff(dim=frequency_dimension)
        delta_f0 = f[1] - f[0]
        delta_f0 = delta_f0.assign_coords({frequency_dimension: f[0]})
        delta_f = xr.concat([delta_f0, delta_f], dim=frequency_dimension)
        CgSdelF = S * delta_f * Cg
        J = rho * g * CgSdelF.sum(dim=frequency_dimension)
    if to_pandas:
        J = J.to_pandas()
    if isinstance(J, (pd.Series, pd.DataFrame, xr.DataArray)):
        J.name = "J"
    return J 
[docs]
def energy_period_to_peak_period(Te, gamma):
    """
    Convert from spectral energy period (Te) to peak period (Tp) using ITTC approximation for JONSWAP Spectrum.
    Approximation is given in "The Specialist Committee on Waves, Final Report
    and Recommendations to the 23rd ITTC", Proceedings of the 23rd ITTC - Volume
    2, Table A4.
    Parameters
    ----------
    Te: int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset
    gamma: float or int
        Peak enhancement factor for JONSWAP spectrum
    Returns
    -------
    Tp: float or array
        Spectral peak period [s]
    """
    if not isinstance(
        Te, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
    ):
        raise TypeError(
            f"Te must be an int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray or xr.Dataset. Got: {type(Te)}"
        )
    if not isinstance(gamma, (float, int)):
        raise TypeError(f"gamma must be of type float or int. Got: {type(gamma)}")
    factor = 0.8255 + 0.03852 * gamma - 0.005537 * gamma**2 + 0.0003154 * gamma**3
    Tp = Te / factor
    if isinstance(Tp, (pd.Series, pd.DataFrame, xr.DataArray)):
        Tp.name = "Tp"
    return Tp 
[docs]
def wave_celerity(
    k, h, g=9.80665, depth_check=False, ratio=2, frequency_dimension="", to_pandas=True
):
    """
    Calculates wave celerity (group velocity)
    Parameters
    ----------
    k: pandas DataFrame, pandas Series, xarray DataArray, or xarray Dataset
        Wave number [1/m] indexed by frequency [Hz]
    h: float
        Water depth [m]
    g : float (optional)
        Gravitational acceleration [m/s^2]. Default 9.80665 m/s.
    depth_check: bool (optional)
        If True check depth regime. Default False.
    ratio: float or int (optional)
        Only applied if depth_check=True. If h/l > ratio,
        water depth will be set to deep. Default ratio = 2
    frequency_dimension: string (optional)
        Name of the xarray dimension corresponding to frequency. If not supplied,
        defaults to the first dimension. Does not affect pandas input.
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    -------
    Cg: pandas DataFrame or xarray Dataset
        Water celerity [m/s] indexed by frequency [Hz]
    """
    k = convert_to_dataarray(k)
    if not isinstance(h, (int, float)):
        raise TypeError(f"h must be of type int or float. Got: {type(h)}")
    if not isinstance(g, (int, float)):
        raise TypeError(f"g must be of type int or float. Got: {type(g)}")
    if not isinstance(depth_check, bool):
        raise TypeError(f"depth_check must be of type bool. Got: {type(depth_check)}")
    if not isinstance(ratio, (int, float)):
        raise TypeError(f"ratio must be of type int or float. Got: {type(ratio)}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    if frequency_dimension == "":
        frequency_dimension = list(k.coords)[0]
    elif frequency_dimension not in list(k.dims):
        raise ValueError(
            f"frequency_dimension is not a dimension of k ({list(k.dims)}). Got: {frequency_dimension}."
        )
    f = k[frequency_dimension]
    k = k.values
    if depth_check:
        l = wave_length(k)
        # get depth regime
        dr = depth_regime(l, h, ratio=ratio)
        # deep frequencies
        df = f[dr]
        dk = k[dr]
        # deep water approximation
        dCg = np.pi * df / dk
        dCg = xr.DataArray(
            data=dCg, dims=frequency_dimension, coords={frequency_dimension: df}
        )
        dCg.name = "Cg"
        # shallow frequencies
        sf = f[~dr]
        sk = k[~dr]
        sCg = (np.pi * sf / sk) * (1 + (2 * h * sk) / np.sinh(2 * h * sk))
        sCg = xr.DataArray(
            data=sCg, dims=frequency_dimension, coords={frequency_dimension: sf}
        )
        sCg.name = "Cg"
        Cg = xr.concat([dCg, sCg], dim=frequency_dimension).sortby(frequency_dimension)
        Cg.name = "Cg"
    else:
        # Eq 10 in IEC 62600-101
        Cg = (np.pi * f / k) * (1 + (2 * h * k) / np.sinh(2 * h * k))
        Cg = xr.DataArray(
            data=Cg, dims=frequency_dimension, coords={frequency_dimension: f}
        )
        Cg.name = "Cg"
    if to_pandas:
        Cg = Cg.to_pandas()
    if isinstance(Cg, (pd.Series, pd.DataFrame, xr.DataArray)):
        Cg.name = "Cg"
    return Cg 
[docs]
def wave_length(k):
    """
    Calculates wave length from wave number
    To compute: 2*pi/wavenumber
    Parameters
    -------------
    k: int, float, numpy ndarray, pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
        Wave number [1/m] indexed by frequency
    Returns
    ---------
    l: int, float, numpy ndarray, pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
        Wave length [m] indexed by frequency. Output type is identical to the type of k.
    """
    if not isinstance(
        k, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
    ):
        raise TypeError(
            f"k must be an int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray or xr.Dataset. Got: {type(k)}"
        )
    l = 2 * np.pi / k
    if isinstance(l, (pd.Series, pd.DataFrame, xr.DataArray)):
        l.name = "l"
    return l 
[docs]
def wave_number(f, h, rho=1025, g=9.80665, to_pandas=True):
    """
    Calculates wave number
    To compute wave number from angular frequency (w), convert w to f before
    using this function (f = w/2*pi)
    Parameters
    -----------
    f: int, float, numpy ndarray, pandas DataFrame, pandas Series, xarray DataArray
        Frequency [Hz]
    h: float
        Water depth [m]
    rho: float (optional)
        Water density [kg/m^3]
    g: float (optional)
        Gravitational acceleration [m/s^2]
    to_pandas: bool (optional)
        Flag to output pandas instead of xarray. Default = True.
    Returns
    -------
    k: pandas DataFrame or xarray Dataset
        Wave number [1/m] indexed by frequency [Hz]
    """
    if isinstance(f, (int, float)):
        f = np.asarray([f])
    if not isinstance(h, (int, float)):
        raise TypeError(f"h must be of type int or float. Got: {type(h)}")
    if not isinstance(rho, (int, float)):
        raise TypeError(f"rho must be of type int or float. Got: {type(rho)}")
    if not isinstance(g, (int, float)):
        raise TypeError(f"g must be of type int or float. Got: {type(g)}")
    if not isinstance(to_pandas, bool):
        raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
    w = 2 * np.pi * f  # angular frequency
    xi = w / np.sqrt(g / h)  # note: =h*wa/sqrt(h*g/h)
    yi = xi * xi / np.power(1.0 - np.exp(-np.power(xi, 2.4908)), 0.4015)
    k0 = yi / h  # Initial guess without current-wave interaction
    # Eq 11 in IEC 62600-101 using initial guess from Guo (2002)
    def func(kk):
        val = np.power(w, 2) - g * kk * np.tanh(kk * h)
        return val
    mask = np.abs(func(k0)) > 1e-9
    if mask.sum() > 0:
        k0_mask = k0[mask]
        w = w[mask]
        k, info, ier, mesg = _fsolve(func, k0_mask, full_output=True)
        if not ier == 1:
            raise ValueError("Wave number not found. " + mesg)
        k0[mask] = k
    k = k0
    if isinstance(k, (pd.Series, pd.DataFrame, xr.DataArray)):
        k.name = "k"
    return k 
[docs]
def depth_regime(l, h, ratio=2):
    """
    Calculates the depth regime based on wavelength and height
    Deep water: h/l > ratio
    This function exists so sinh in wave celerity doesn't blow
    up to infinity.
    P.K. Kundu, I.M. Cohen (2000) suggest h/l >> 1 for deep water (pg 209)
    Same citation as above, they also suggest for 3% accuracy, h/l > 0.28 (pg 210)
    However, since this function allows multiple wavelengths, higher ratio
    numbers are more accurate across varying wavelengths.
    Parameters
    ----------
    l: int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset
        wavelength [m]
    h: float or int
        water column depth [m]
    ratio: float or int (optional)
        if h/l > ratio, water depth will be set to deep. Default ratio = 2
    Returns
    -------
    depth_reg: boolean or boolean array-like
        Boolean True if deep water, False otherwise
    """
    if not isinstance(
        l, (int, float, np.ndarray, pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
    ):
        raise TypeError(
            f"l must be of type int, float, np.ndarray, pd.DataFrame, pd.Series, xr.DataArray, or xr.Dataset. Got: {type(l)}"
        )
    if not isinstance(h, (int, float)):
        raise TypeError(f"h must be of type int or float. Got: {type(h)}")
    depth_reg = h / l > ratio
    if isinstance(depth_reg, (pd.Series, pd.DataFrame, xr.DataArray)):
        depth_reg.name = "depth_reg"
    return depth_reg