"""
This module provides functionalities to calculate and analyze Most
Likely Extreme Response (MLER) coefficients for wave energy converter
design and risk assessment. It includes functions to:
- Calculate MLER coefficients (`mler_coefficients`) from a sea state
spectrum and a response Amplitude Response Operator (ARO).
- Define and manipulate simulation parameters (`mler_simulation`) used
across various MLER analyses.
- Renormalize the incoming amplitude of the MLER wave
(`mler_wave_amp_normalize`) to match the desired peak height for more
accurate modeling and analysis.
- Export the wave amplitude time series (`mler_export_time_series`)
based on the calculated MLER coefficients for further analysis or
visualization.
"""
from typing import Union, List, Optional, Dict, Any
import pandas as pd
import xarray as xr
import numpy as np
from numpy.typing import NDArray
from mhkit.wave.resource import frequency_moment
SimulationParameters = Dict[str, Union[float, int, np.ndarray]]
def _calculate_spectral_values(
freq_hz: Union[np.ndarray, pd.Series],
rao_array: np.ndarray,
wave_spectrum: Union[pd.Series, pd.DataFrame, np.ndarray],
d_w: float,
) -> Dict[str, Union[float, np.ndarray]]:
"""
Calculates spectral moments and the coefficient A_{R,n} from a given sea state spectrum
and a response RAO.
Parameters
----------
spectrum_r : Union[np.ndarray, pd.Series]
Real part of the spectrum.
freq_hz : Union[np.ndarray, pd.Series]
Frequencies in Hz corresponding to spectrum_r.
rao : numpy ndarray
Response Amplitude Operator (RAO) of the system.
wave_spectrum : Union[pd.Series, pd.DataFrame, np.ndarray]
Wave spectrum values corresponding to freq_hz.
d_w : float
Delta omega, the frequency interval.
Returns
-------
Dict[str, Union[float, np.ndarray]]
A dictionary containing spectral moments (m_0, m_1, m_2) and the coefficient A_{R,n}.
"""
# Note: waves.A is "S" in Quon2016; 'waves' naming convention
# matches WEC-Sim conventions (EWQ)
# Response spectrum [(response units)^2-s/rad] -- Quon2016 Eqn. 3
spectrum_r = np.abs(rao_array) ** 2 * (2 * wave_spectrum)
# Calculate spectral moments
m_0 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 0).item()
m_1 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 1).item()
m_2 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 2).item()
# Calculate coefficient A_{R,n}
coeff_a_rn = (
np.abs(rao_array)
* np.sqrt(2 * wave_spectrum * d_w)
* ((m_2 - freq_hz * m_1) + (m_1 / m_0) * (freq_hz * m_0 - m_1))
/ (m_0 * m_2 - m_1**2)
)
return {
"m_0": m_0,
"m_1": m_1,
"m_2": m_2,
"coeff_a_rn": coeff_a_rn,
}
[docs]
def mler_coefficients(
rao: Union[NDArray[np.float64], pd.Series, List[float], List[int], xr.DataArray],
wave_spectrum: Union[pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset],
response_desired: Union[int, float],
frequency_dimension: str = "",
to_pandas: bool = True,
) -> Union[pd.DataFrame, xr.Dataset]:
"""
Calculate MLER (most likely extreme response) coefficients from a
sea state spectrum and a response RAO.
Parameters
----------
rao: numpy ndarray
Response amplitude operator.
wave_spectrum: pandas Series, pandas DataFrame, xarray DataArray, or xarray Dataset
Wave spectral density [m^2/Hz] indexed by frequency [Hz].
DataFrame and Dataset inputs should only have one data variable
response_desired: int or float
Desired response, units should correspond to a motion RAO or
units of force for a force RAO.
frequency_dimension: string (optional)
Name of the xarray dimension corresponding to frequency. If not supplied,
defaults to the first dimension. Does not affect pandas input.
to_pandas: bool (optional)
Flag to output pandas instead of xarray. Default = True.
Returns
-------
mler: pandas DataFrame or xarray Dataset
DataFrame containing conditioned wave spectral amplitude
coefficient [m^2-s], and Phase [rad] indexed by freq [Hz].
"""
if isinstance(rao, (list, pd.Series, xr.DataArray)):
rao_array = np.array(rao)
elif isinstance(rao, np.ndarray):
rao_array = rao
else:
raise TypeError(
"Unsupported type for 'rao'. Must be one of: list, pd.Series, \
np.ndarray, xr.DataArray."
)
if not isinstance(rao_array, np.ndarray):
raise TypeError(f"rao must be of type np.ndarray. Got: {type(rao_array)}")
if not isinstance(
wave_spectrum, (pd.Series, pd.DataFrame, xr.DataArray, xr.Dataset)
):
raise TypeError(
f"wave_spectrum must be of type pd.Series, pd.DataFrame, "
f"xr.DataArray, or xr.Dataset. Got: {type(wave_spectrum)}"
)
if not isinstance(response_desired, (int, float)):
raise TypeError(
f"response_desired must be of type int or float. Got: {type(response_desired)}"
)
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
# Convert input to xarray DataArray
if isinstance(wave_spectrum, (pd.Series, pd.DataFrame)):
wave_spectrum = wave_spectrum.squeeze().to_xarray()
if isinstance(wave_spectrum, xr.Dataset):
if len(wave_spectrum.data_vars) > 1:
raise ValueError(
f"wave_spectrum can only contain one variable. Got {list(wave_spectrum.data_vars)}."
)
wave_spectrum = wave_spectrum.to_array()
if frequency_dimension == "":
frequency_dimension = list(wave_spectrum.coords)[0]
# convert from Hz to rad/s
freq_hz = wave_spectrum.coords[frequency_dimension].values * (2 * np.pi)
wave_spectrum = wave_spectrum.to_numpy() / (2 * np.pi)
# get frequency step
d_w = 2.0 * np.pi / (len(freq_hz) - 1)
spectral_values = _calculate_spectral_values(freq_hz, rao_array, wave_spectrum, d_w)
# save the new spectral info to pass out
# Phase delay should be a positive number in this convention (AP)
_phase = -np.unwrap(np.angle(rao_array))
# for negative values of Amp, shift phase by pi and flip sign
# for negative amplitudes, add a pi phase shift, then flip sign on
# negative Amplitudes
_phase[spectral_values["coeff_a_rn"] < 0] -= np.pi
spectral_values["coeff_a_rn"][spectral_values["coeff_a_rn"] < 0] *= -1
# calculate the conditioned spectrum [m^2-s/rad]
conditioned_spectrum = (
wave_spectrum * spectral_values["coeff_a_rn"] ** 2 * response_desired**2
)
# if the response amplitude we ask for is negative, we will add
# a pi phase shift to the phase information. This is because
# the sign of self.desiredRespAmp is lost in the squaring above.
# Ordinarily this would be put into the final equation, but we
# are shaping the wave information so that it is buried in the
# new spectral information, S. (AP)
if response_desired < 0:
_phase += np.pi
mler = xr.Dataset(
{
"WaveSpectrum": (["frequency"], np.array(conditioned_spectrum)),
"Phase": (["frequency"], _phase + np.pi * (response_desired < 0)),
},
coords={"frequency": freq_hz},
)
mler.fillna(0)
return mler.to_pandas() if to_pandas else mler
[docs]
def mler_simulation(
parameters: Optional[SimulationParameters] = None,
) -> SimulationParameters:
"""
Define the simulation parameters that are used in various MLER
functionalities.
See `extreme_response_contour_example.ipynb` example for how this is
useful. If no input is given, then default values are returned.
Parameters
----------
parameters: dict (optional)
Simulation parameters.
Keys:
-----
- 'startTime': starting time [s]
- 'endTime': ending time [s]
- 'dT': time-step size [s]
- 'T0': time of maximum event [s]
- 'startx': start of simulation space [m]
- 'endX': end of simulation space [m]
- 'dX': horizontal spacing [m]
- 'X': position of maximum event [m]
The following keys are calculated from the above parameters:
- 'maxIT': int, maximum timestep index
- 'T': np.ndarray, time array
- 'maxIX': int, maximum index for space
- 'X': np.ndarray, space array
Returns
-------
sim: dict
Simulation parameters including spatial and time calculated
arrays.
"""
if not isinstance(parameters, (type(None), dict)):
raise TypeError(
f"If specified, parameters must be of type dict. Got: {type(parameters)}"
)
sim = {}
if parameters is None:
sim["startTime"] = -150.0 # [s] Starting time
sim["endTime"] = 150.0 # [s] Ending time
sim["dT"] = 1.0 # [s] Time-step size
sim["T0"] = 0.0 # [s] Time of maximum event
sim["startX"] = -300.0 # [m] Start of simulation space
sim["endX"] = 300.0 # [m] End of simulation space
sim["dX"] = 1.0 # [m] Horiontal spacing
sim["X0"] = 0.0 # [m] Position of maximum event
else:
sim = parameters
# maximum timestep index
sim["maxIT"] = int(np.ceil((sim["endTime"] - sim["startTime"]) / sim["dT"] + 1))
sim["T"] = np.linspace(sim["startTime"], sim["endTime"], sim["maxIT"])
sim["maxIX"] = int(np.ceil((sim["endX"] - sim["startX"]) / sim["dX"] + 1))
sim["X"] = np.linspace(sim["startX"], sim["endX"], sim["maxIX"])
return sim
[docs]
def mler_wave_amp_normalize(
wave_amp: float,
mler: Union[pd.DataFrame, xr.Dataset],
sim: SimulationParameters,
k: Union[NDArray[np.float64], List[float], pd.Series],
**kwargs: Any,
) -> Union[pd.DataFrame, xr.Dataset]:
"""
Function that renormalizes the incoming amplitude of the MLER wave
to the desired peak height (peak to MSL).
Parameters
----------
wave_amp: float
Desired wave amplitude (peak to MSL).
mler: pandas DataFrame or xarray Dataset
MLER coefficients generated by 'mler_coefficients' function.
sim: dict
Simulation parameters formatted by output from
'mler_simulation'.
k: numpy ndarray
Wave number
frequency_dimension: string (optional)
Name of the xarray dimension corresponding to frequency. If not supplied,
defaults to the first dimension. Does not affect pandas input.
to_pandas: bool (optional)
Flag to output pandas instead of xarray. Default = True.
Returns
-------
mler_norm : pandas DataFrame or xarray Dataset
MLER coefficients
"""
frequency_dimension = kwargs.get("frequency_dimension", "")
to_pandas = kwargs.get("to_pandas", True)
k_array = np.array(k, dtype=float) if not isinstance(k, np.ndarray) else k
if not isinstance(mler, (pd.DataFrame, xr.Dataset)):
raise TypeError(
f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}"
)
if not isinstance(wave_amp, (int, float)):
raise TypeError(f"wave_amp must be of type int or float. Got: {type(wave_amp)}")
if not isinstance(sim, dict):
raise TypeError(f"sim must be of type dict. Got: {type(sim)}")
if not isinstance(frequency_dimension, str):
raise TypeError(
"frequency_dimension must be of type bool."
+ f"Got: {type(frequency_dimension)}"
)
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
# If input is pandas, convert to xarray
mler_xr = mler.to_xarray() if isinstance(mler, pd.DataFrame) else mler()
# Determine frequency dimension
freq_dim = frequency_dimension or list(mler_xr.coords)[0]
# freq = mler_xr.coords[freq_dim].values * 2 * np.pi
# d_w = np.diff(freq).mean()
wave_amp_time = np.array(
[
np.sum(
np.sqrt(
2
* mler_xr["WaveSpectrum"].values
* np.diff(mler_xr.coords[freq_dim].values * 2 * np.pi).mean()
)
* np.cos(
mler_xr.coords[freq_dim].values * 2 * np.pi * (t - sim["T0"])
- k_array * (x - sim["X0"])
+ mler_xr["Phase"].values
)
)
for x in np.linspace(sim["startX"], sim["endX"], sim["maxIX"])
for t in np.linspace(sim["startTime"], sim["endTime"], sim["maxIT"])
]
).reshape(sim["maxIX"], sim["maxIT"])
rescale_fact = np.abs(wave_amp) / np.max(np.abs(wave_amp_time))
# Rescale the wave spectral amplitude coefficients and assign phase
mler_norm = xr.Dataset(
{
"WaveSpectrum": (
["frequency"],
mler_xr["WaveSpectrum"].data * rescale_fact**2,
),
"Phase": (["frequency"], mler_xr["Phase"].data),
},
coords={"frequency": (["frequency"], mler_xr.coords[freq_dim].data)},
)
return mler_norm.to_pandas() if to_pandas else mler_norm
[docs]
def mler_export_time_series(
rao: Union[NDArray[np.float64], List[float], pd.Series],
mler: Union[pd.DataFrame, xr.Dataset],
sim: SimulationParameters,
k: Union[NDArray[np.float64], List[float], pd.Series],
**kwargs: Any,
) -> Union[pd.DataFrame, xr.Dataset]:
"""
Generate the wave amplitude time series at X0 from the calculated
MLER coefficients
Parameters
----------
rao: numpy ndarray
Response amplitude operator.
mler: pandas DataFrame or xarray Dataset
MLER coefficients dataframe generated from an MLER function.
sim: dict
Simulation parameters formatted by output from
'mler_simulation'.
k: numpy ndarray
Wave number.
frequency_dimension: string (optional)
Name of the xarray dimension corresponding to frequency. If not supplied,
defaults to the first dimension. Does not affect pandas input.
to_pandas: bool (optional)
Flag to output pandas instead of xarray. Default = True.
Returns
-------
mler_ts: pandas DataFrame or xarray Dataset
Time series of wave height [m] and linear response [*] indexed
by time [s].
"""
frequency_dimension = kwargs.get("frequency_dimension", "")
to_pandas = kwargs.get("to_pandas", True)
if not isinstance(rao, np.ndarray):
raise TypeError(f"rao must be of type ndarray. Got: {type(rao)}")
if not isinstance(mler, (pd.DataFrame, xr.Dataset)):
raise TypeError(
f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}"
)
if not isinstance(sim, dict):
raise TypeError(f"sim must be of type dict. Got: {type(sim)}")
if not isinstance(k, (np.ndarray, list, pd.Series)):
raise TypeError(f"k must be of type ndarray. Got: {type(k)}")
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")
if not isinstance(frequency_dimension, str):
raise TypeError(
f"frequency_dimension must be of type str. Got: {type(frequency_dimension)}"
)
rao = np.array(rao, dtype=float) if not isinstance(rao, np.ndarray) else rao
k = np.array(k, dtype=float) if not isinstance(k, np.ndarray) else k
# If input is pandas, convert to xarray
mler = mler if isinstance(mler, xr.Dataset) else mler.to_xarray()
# Handle optional frequency dimension
frequency_dimension = (
frequency_dimension if frequency_dimension else list(mler.coords)[0]
)
freq = mler.coords[frequency_dimension].values * 2 * np.pi
d_w = np.diff(freq).mean()
wave_height = np.zeros(len(sim["T"]))
linear_response = np.zeros(len(sim["T"]))
for i, t_i in enumerate(sim["T"]):
cos_terms = np.cos(
freq * (t_i - sim["T0"])
- k * (sim["X0"] - sim["X0"])
+ mler["Phase"].values
)
wave_height[i] = np.sum(np.sqrt(2 * mler["WaveSpectrum"] * d_w) * cos_terms)
linear_response[i] = np.sum(
np.sqrt(2 * mler["WaveSpectrum"] * d_w)
* np.abs(rao)
* np.cos(freq * (t_i - sim["T0"]) - k * (sim["X0"] - sim["X0"]))
)
# Construct the output dataset
mler_ts = xr.Dataset(
{
"WaveHeight": (["time"], wave_height),
"LinearResponse": (["time"], linear_response),
},
coords={"time": sim["T"]},
)
# Convert to pandas DataFrame if requested
return mler_ts.to_dataframe() if to_pandas else mler_ts