Source code for mhkit.wave.contours

from statsmodels.nonparametric.kde import KDEUnivariate
from sklearn.decomposition import PCA as skPCA
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import scipy.optimize as optim
import scipy.stats as stats
import scipy.interpolate as interp
import numpy as np
import warnings
from mhkit.utils import to_numeric_array
import matplotlib


# Contours
[docs] def environmental_contours(x1, x2, sea_state_duration, return_period, method, **kwargs): """ Returns a Dictionary of x1 and x2 components for each contour method passed. A method may be one of the following: Principal Component Analysis, Gaussian, Gumbel, Clayton, Rosenblatt, nonparametric Gaussian, nonparametric Clayton, nonparametric Gumbel, bivariate KDE, log bivariate KDE Parameters ---------- x1: list, np.ndarray, pd.Series, xr.DataArray Component 1 data x2: list, np.ndarray, pd.Series, xr.DataArray Component 2 data sea_state_duration : int or float `x1` and `x2` averaging period in seconds return_period: int, float Return period of interest in years method: string or list Copula method to apply. Options include ['PCA','gaussian', 'gumbel', 'clayton', 'rosenblatt', 'nonparametric_gaussian', 'nonparametric_clayton', 'nonparametric_gumbel', 'bivariate_KDE' 'bivariate_KDE_log'] **kwargs min_bin_count: int Passed to _copula_parameters to sets the minimum number of bins allowed. Default = 40. initial_bin_max_val: int, float Passed to _copula_parameters to set the max value of the first bin. Default = 1. bin_val_size: int, float Passed to _copula_parameters to set the size of each bin after the initial bin. Default 0.25. nb_steps: int Discretization of the circle in the normal space is used for copula component calculation. Default nb_steps=1000. bandwidth: Must specify bandwidth for bivariate KDE method. Default = None. Ndata_bivariate_KDE: int Must specify bivariate KDE method. Defines the contoured space from which samples are taken. Default = 100. max_x1: float Defines the max value of x1 to discretize the KDE space max_x2: float Defines the max value of x2 to discretize the KDE space PCA: dict If provided, the principal component analysis (PCA) on x1, x2 is skipped. The PCA will be the same for a given x1, x2 therefore this step may be skipped if multiple calls to environmental contours are made for the same x1, x2 pair. The PCA dict may be obtained by setting return_fit=True when calling the PCA method. return_fit: boolean Will return fitting parameters used for each method passed. Default False. Returns ------- copulas: Dictionary Dictionary of x1 and x2 copula components for each copula method """ x1 = to_numeric_array(x1, "x1") x2 = to_numeric_array(x2, "x2") if not isinstance(x1, np.ndarray) or x1.ndim == 0: raise TypeError(f"x1 must be a non-scalar array. Got: {type(x1)}") if not isinstance(x2, np.ndarray) or x2.ndim == 0: raise TypeError(f"x2 must be a non-scalar array. Got: {type(x2)}") if len(x1) != len(x2): raise ValueError("The lengths of x1 and x2 must be equal.") if not isinstance(sea_state_duration, (int, float)): raise TypeError( f"sea_state_duration must be of type int or float. Got: {type(sea_state_duration)}" ) if not isinstance(return_period, (int, float, np.ndarray)): raise TypeError( f"return_period must be of type int, float, or np.ndarray. Got: {type(return_period)}" ) bin_val_size = kwargs.get("bin_val_size", 0.25) nb_steps = kwargs.get("nb_steps", 1000) initial_bin_max_val = kwargs.get("initial_bin_max_val", 1.0) min_bin_count = kwargs.get("min_bin_count", 40) bandwidth = kwargs.get("bandwidth", None) Ndata_bivariate_KDE = kwargs.get("Ndata_bivariate_KDE", 100) max_x1 = kwargs.get("max_x1", None) max_x2 = kwargs.get("max_x2", None) PCA = kwargs.get("PCA", None) PCA_bin_size = kwargs.get("PCA_bin_size", 250) return_fit = kwargs.get("return_fit", False) if not isinstance(max_x1, (int, float, type(None))): raise TypeError(f"If specified, max_x1 must be a dict. Got: {type(PCA)}") if not isinstance(max_x2, (int, float, type(None))): raise TypeError(f"If specified, max_x2 must be a dict. Got: {type(PCA)}") if not isinstance(PCA, (dict, type(None))): raise TypeError(f"If specified, PCA must be a dict. Got: {type(PCA)}") if not isinstance(PCA_bin_size, int): raise TypeError(f"PCA_bin_size must be of type int. Got: {type(PCA_bin_size)}") if not isinstance(return_fit, bool): raise TypeError(f"return_fit must be of type bool. Got: {type(return_fit)}") if not isinstance(bin_val_size, (int, float)): raise TypeError( f"bin_val_size must be of type int or float. Got: {type(bin_val_size)}" ) if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") if not isinstance(min_bin_count, int): raise TypeError( f"min_bin_count must be of type int. Got: {type(min_bin_count)}" ) if not isinstance(initial_bin_max_val, (int, float)): raise TypeError( f"initial_bin_max_val must be of type int or float. Got: {type(initial_bin_max_val)}" ) if "bivariate_KDE" in method and bandwidth == None: raise TypeError( f"Must specify keyword bandwidth with bivariate KDE method. Got: {type(bandwidth)}" ) if isinstance(method, str): method = [method] if not (len(set(method)) == len(method)): raise ValueError( f"Can only pass a unique " + "method once per function call. Consider wrapping this " + "function in a for loop to investage variations on the same method" ) method_class = { "PCA": "parametric", "gaussian": "parametric", "gumbel": "parametric", "clayton": "parametric", "rosenblatt": "parametric", "nonparametric_gaussian": "nonparametric", "nonparametric_clayton": "nonparametric", "nonparametric_gumbel": "nonparametric", "bivariate_KDE": "KDE", "bivariate_KDE_log": "KDE", } classification = [] methods = method for method in methods: classification.append(method_class[method]) fit = _iso_prob_and_quantile(sea_state_duration, return_period, nb_steps) fit_parametric = None fit_nonparametric = None component_1 = None if "parametric" in classification: (para_dist_1, para_dist_2, mean_cond, std_cond) = _copula_parameters( x1, x2, min_bin_count, initial_bin_max_val, bin_val_size ) x_quantile = fit["x_quantile"] a = para_dist_1[0] c = para_dist_1[1] loc = para_dist_1[2] scale = para_dist_1[3] component_1 = stats.exponweib.ppf(x_quantile, a, c, loc=loc, scale=scale) fit_parametric = fit fit_parametric["para_dist_1"] = para_dist_1 fit_parametric["para_dist_2"] = para_dist_2 fit_parametric["mean_cond"] = mean_cond fit_parametric["std_cond"] = std_cond if PCA == None: PCA = fit_parametric if "nonparametric" in classification: ( nonpara_dist_1, nonpara_dist_2, nonpara_pdf_2, ) = _nonparametric_copula_parameters(x1, x2, nb_steps=nb_steps) fit_nonparametric = fit fit_nonparametric["nonpara_dist_1"] = nonpara_dist_1 fit_nonparametric["nonpara_dist_2"] = nonpara_dist_2 fit_nonparametric["nonpara_pdf_2"] = nonpara_pdf_2 copula_functions = { "PCA": { "func": PCA_contour, "vals": ( x1, x2, PCA, { "nb_steps": nb_steps, "return_fit": return_fit, "bin_size": PCA_bin_size, }, ), }, "gaussian": { "func": _gaussian_copula, "vals": (x1, x2, fit_parametric, component_1, {"return_fit": return_fit}), }, "gumbel": { "func": _gumbel_copula, "vals": ( x1, x2, fit_parametric, component_1, nb_steps, {"return_fit": return_fit}, ), }, "clayton": { "func": _clayton_copula, "vals": (x1, x2, fit_parametric, component_1, {"return_fit": return_fit}), }, "rosenblatt": { "func": _rosenblatt_copula, "vals": (x1, x2, fit_parametric, component_1, {"return_fit": return_fit}), }, "nonparametric_gaussian": { "func": _nonparametric_gaussian_copula, "vals": (x1, x2, fit_nonparametric, nb_steps, {"return_fit": return_fit}), }, "nonparametric_clayton": { "func": _nonparametric_clayton_copula, "vals": (x1, x2, fit_nonparametric, nb_steps, {"return_fit": return_fit}), }, "nonparametric_gumbel": { "func": _nonparametric_gumbel_copula, "vals": (x1, x2, fit_nonparametric, nb_steps, {"return_fit": return_fit}), }, "bivariate_KDE": { "func": _bivariate_KDE, "vals": ( x1, x2, bandwidth, fit, nb_steps, Ndata_bivariate_KDE, {"max_x1": max_x1, "max_x2": max_x2, "return_fit": return_fit}, ), }, "bivariate_KDE_log": { "func": _bivariate_KDE, "vals": ( x1, x2, bandwidth, fit, nb_steps, Ndata_bivariate_KDE, { "max_x1": max_x1, "max_x2": max_x2, "log_transform": True, "return_fit": return_fit, }, ), }, } copulas = {} for method in methods: vals = copula_functions[method]["vals"] if return_fit: component_1, component_2, fit = copula_functions[method]["func"](*vals) copulas[f"{method}_fit"] = fit else: component_1, component_2 = copula_functions[method]["func"](*vals) copulas[f"{method}_x1"] = component_1 copulas[f"{method}_x2"] = component_2 return copulas
[docs] def PCA_contour(x1, x2, fit, kwargs): """ Calculates environmental contours of extreme sea states using the improved joint probability distributions with the inverse first-order reliability method (I-FORM) probability for the desired return period (`return_period`). Given the return_period of interest, a circle of iso-probability is created in the principal component analysis (PCA) joint probability (`x1`, `x2`) reference frame. Using the joint probability value, the cumulative distribution function (CDF) of the marginal distribution is used to find the quantile of each component. Finally, using the improved PCA methodology, the component 2 contour lines are calculated from component 1 using the relationships defined in Eckert-Gallup et. al. 2016. Eckert-Gallup, A. C., Sallaberry, C. J., Dallman, A. R., & Neary, V. S. (2016). Application of principal component analysis (PCA) and improved joint probability distributions to the inverse first-order reliability method (I-FORM) for predicting extreme sea states. Ocean Engineering, 112, 307-319. Parameters ---------- x1: list, np.ndarray, pd.Series, xr.DataArray Component 1 data x2: list, np.ndarray, pd.Series, xr.DataArray Component 2 data fit: dict Dictionary of the iso-probability results. May additionally contain the principal component analysis (PCA) on x1, x2 The PCA will be the same for a given x1, x2 therefore this step may be skipped if multiple calls to environmental contours are made for the same x1, x2 pair. The PCA dict may be obtained by setting return_fit=True when calling the PCA method. kwargs : optional bin_size : int Data points in each bin for the PCA fit. Default bin_size=250. nb_steps : int Discretization of the circle in the normal space used for I-FORM calculation. Default nb_steps=1000. return_fit: boolean Default False, if True will return the PCA fit dictionary Returns ------- x1_contour : numpy array Calculated x1 values along the contour boundary following return to original input orientation. x2_contour : numpy array Calculated x2 values along the contour boundary following return to original input orientation. fit: dict (optional) principal component analysis dictionary Keys: ----- 'principal_axes': sign corrected PCA axes 'shift' : The shift applied to x2 'x1_fit' : gaussian fit of x1 data 'mu_param' : fit to _mu_fcn 'sigma_param' : fit to _sig_fits """ x1 = to_numeric_array(x1, "x1") x2 = to_numeric_array(x2, "x2") if not isinstance(x1, np.ndarray) or x1.ndim == 0: raise TypeError(f"x1 must be a non-scalar array. Got: {type(x1)}") if not isinstance(x2, np.ndarray) or x2.ndim == 0: raise TypeError(f"x2 must be a non-scalar array. Got: {type(x2)}") if len(x1) != len(x2): raise ValueError("The lengths of x1 and x2 must be equal.") bin_size = kwargs.get("bin_size", 250) nb_steps = kwargs.get("nb_steps", 1000) return_fit = kwargs.get("return_fit", False) if not isinstance(bin_size, int): raise TypeError(f"bin_size must be of type int. Got: {type(bin_size)}") if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") if not isinstance(return_fit, bool): raise TypeError(f"return_fit must be of type bool. Got: {type(return_fit)}") if "x1_fit" not in fit: pca_fit = _principal_component_analysis(x1, x2, bin_size=bin_size) for key in pca_fit: fit[key] = pca_fit[key] x_quantile = fit["x_quantile"] y_quantile = fit["y_quantile"] # Use the inverse of cdf to calculate component 1 values component_1 = stats.invgauss.ppf( x_quantile, mu=fit["x1_fit"]["mu"], loc=fit["x1_fit"]["loc"], scale=fit["x1_fit"]["scale"], ) # Find Component 2 mu using first order linear regression mu_slope = fit["mu_fit"].slope mu_intercept = fit["mu_fit"].intercept component_2_mu = mu_slope * component_1 + mu_intercept # Find Componenet 2 sigma using second order polynomial fit sigma_polynomial_coeffcients = fit["sigma_fit"].x component_2_sigma = np.polyval(sigma_polynomial_coeffcients, component_1) # Use calculated mu and sigma values to calculate C2 along the contour component_2 = stats.norm.ppf( y_quantile, loc=component_2_mu, scale=component_2_sigma ) # Convert contours back to the original reference frame principal_axes = fit["principal_axes"] shift = fit["shift"] pa00 = principal_axes[0, 0] pa01 = principal_axes[0, 1] x1_contour = (pa00 * component_1 + pa01 * (component_2 - shift)) / ( pa01**2 + pa00**2 ) x2_contour = (pa01 * component_1 - pa00 * (component_2 - shift)) / ( pa01**2 + pa00**2 ) # Assign 0 value to any negative x1 contour values x1_contour = np.maximum(0, x1_contour) if return_fit: return np.transpose(x1_contour), np.transpose(x2_contour), fit return np.transpose(x1_contour), np.transpose(x2_contour)
def _principal_component_analysis(x1, x2, bin_size=250): """ Performs a modified principal component analysis (PCA) [Eckert et. al 2016] on two variables (`x1`, `x2`). The additional PCA is performed in 5 steps: 1) Transform `x1` & `x2` into the principal component domain and shift the y-axis so that all values are positive and non-zero 2) Fit the `x1` data in the transformed reference frame with an inverse Gaussian Distribution 3) Bin the transformed data into groups of size bin and find the mean of `x1`, the mean of `x2`, and the standard deviation of `x2` 4) Perform a first-order linear regression to determine a continuous the function relating the mean of the `x1` bins to mean of the `x2` bins 5) Find a second-order polynomial which best relates the means of `x1` to the standard deviation of `x2` using constrained optimization Eckert-Gallup, A. C., Sallaberry, C. J., Dallman, A. R., & Neary, V. S. (2016). Application of principal component analysis (PCA) and improved joint probability distributions to the inverse first-order reliability method (I-FORM) for predicting extreme sea states. Ocean Engineering, 112, 307-319. Parameters ---------- x1: numpy array Component 1 data x2: numpy array Component 2 data bin_size : int Number of data points in each bin Returns ------- PCA: dict Keys: ----- 'principal_axes': sign corrected PCA axes 'shift' : The shift applied to x2 'x1_fit' : gaussian fit of x1 data 'mu_param' : fit to _mu_fcn 'sigma_param' : fit to _sig_fits """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(bin_size, int): raise TypeError(f"bin_size must be of type int. Got: {type(bin_size)}") # Step 0: Perform Standard PCA mean_location = 0 x1_mean_centered = x1 - x1.mean(axis=0) x2_mean_centered = x2 - x2.mean(axis=0) n_samples_by_n_features = np.column_stack((x1_mean_centered, x2_mean_centered)) pca = skPCA(n_components=2) pca.fit(n_samples_by_n_features) principal_axes = pca.components_ # STEP 1: Transform data into new reference frame # Apply correct/expected sign convention principal_axes = abs(principal_axes) principal_axes[1, 1] = -principal_axes[1, 1] # Rotate data into Principal direction x1_and_x2 = np.column_stack((x1, x2)) x1_x2_components = np.dot(x1_and_x2, principal_axes) x1_components = x1_x2_components[:, 0] x2_components = x1_x2_components[:, 1] # Apply shift to Component 2 to make all values positive shift = abs(min(x2_components)) + 0.1 x2_components = x2_components + shift # STEP 2: Fit Component 1 data using a Gaussian Distribution x1_sorted_index = x1_components.argsort() x1_sorted = x1_components[x1_sorted_index] x2_sorted = x2_components[x1_sorted_index] x1_fit_results = stats.invgauss.fit(x1_sorted, floc=mean_location) x1_fit = { "mu": x1_fit_results[0], "loc": x1_fit_results[1], "scale": x1_fit_results[2], } # Step 3: Bin Data & find order 1 linear relation between x1 & x2 means N = len(x1) minimum_4_bins = np.floor(N * 0.25) if bin_size > minimum_4_bins: bin_size = minimum_4_bins msg = ( "To allow for a minimum of 4 bins, the bin size has been " + f"set to {minimum_4_bins}" ) warnings.warn(msg, UserWarning) N_multiples = int(N // bin_size) max_N_multiples_index = int(N_multiples * bin_size) x1_integer_multiples_of_bin_size = x1_sorted[0:max_N_multiples_index] x2_integer_multiples_of_bin_size = x2_sorted[0:max_N_multiples_index] x1_bins = np.split(x1_integer_multiples_of_bin_size, N_multiples) x2_bins = np.split(x2_integer_multiples_of_bin_size, N_multiples) x1_last_bin = x1_sorted[max_N_multiples_index:] x2_last_bin = x2_sorted[max_N_multiples_index:] x1_bins.append(x1_last_bin) x2_bins.append(x2_last_bin) x1_means = np.array([]) x2_means = np.array([]) x2_sigmas = np.array([]) for x1_bin, x2_bin in zip(x1_bins, x2_bins): x1_means = np.append(x1_means, x1_bin.mean()) x2_means = np.append(x2_means, x2_bin.mean()) x2_sigmas = np.append(x2_sigmas, x2_bin.std()) mu_fit = stats.linregress(x1_means, x2_means) # STEP 4: Find order 2 relation between x1_mean and x2 standard deviation sigma_polynomial_order = 2 sig_0 = 0.1 * np.ones(sigma_polynomial_order + 1) def _objective_function(sig_p, x1_means, x2_sigmas): return mean_squared_error(np.polyval(sig_p, x1_means), x2_sigmas) # Constraint Functions def y_intercept_gt_0(sig_p): return sig_p[2] def sig_polynomial_min_gt_0(sig_p): return sig_p[2] - (sig_p[1] ** 2) / (4 * sig_p[0]) constraints = ( {"type": "ineq", "fun": y_intercept_gt_0}, {"type": "ineq", "fun": sig_polynomial_min_gt_0}, ) sigma_fit = optim.minimize( _objective_function, x0=sig_0, args=(x1_means, x2_sigmas), method="SLSQP", constraints=constraints, ) PCA = { "principal_axes": principal_axes, "shift": shift, "x1_fit": x1_fit, "mu_fit": mu_fit, "sigma_fit": sigma_fit, } return PCA def _iso_prob_and_quantile(sea_state_duration, return_period, nb_steps): """ Calculates the iso-probability and the x, y quantiles along the iso-probability radius Parameters ---------- sea_state_duration : int or float `x1` and `x2` sample rate (seconds) return_period: int, float Return period of interest in years nb_steps: int Discretization of the circle in the normal space. Default nb_steps=1000. Returns ------- results: Dictionay Dictionary of the iso-probability results Keys: 'exceedance_probability' - probability of exceedance 'x_component_iso_prob' - x-component of iso probability circle 'y_component_iso_prob' - y-component of iso probability circle 'x_quantile' - CDF of x-component 'y_quantile' - CDF of y-component """ if not isinstance(sea_state_duration, (int, float)): raise TypeError( f"sea_state_duration must be of type int or float. Got: {type(sea_state_duration)}" ) if not isinstance(return_period, (int, float)): raise TypeError( f"return_period must be of type int or float. Got: {type(return_period)}" ) if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") dt_yrs = sea_state_duration / (3600 * 24 * 365) exceedance_probability = 1 / (return_period / dt_yrs) iso_probability_radius = stats.norm.ppf( (1 - exceedance_probability), loc=0, scale=1 ) discretized_radians = np.linspace(0, 2 * np.pi, nb_steps) x_component_iso_prob = iso_probability_radius * np.cos(discretized_radians) y_component_iso_prob = iso_probability_radius * np.sin(discretized_radians) x_quantile = stats.norm.cdf(x_component_iso_prob, loc=0, scale=1) y_quantile = stats.norm.cdf(y_component_iso_prob, loc=0, scale=1) results = { "exceedance_probability": exceedance_probability, "x_component_iso_prob": x_component_iso_prob, "y_component_iso_prob": y_component_iso_prob, "x_quantile": x_quantile, "y_quantile": y_quantile, } return results def _copula_parameters(x1, x2, min_bin_count, initial_bin_max_val, bin_val_size): """ Returns an estimate of the Weibull and Lognormal distribution for x1 and x2 respectively. Additionally returns the estimates of the coefficients from the mean and standard deviation of the Log of x2 given x1. Parameters ---------- x1: array Component 1 data x2: array Component 2 data min_bin_count: int Sets the minimum number of bins allowed initial_bin_max_val: int, float Sets the max value of the first bin bin_val_size: int, float The size of each bin after the initial bin Returns ------- para_dist_1: array Weibull distribution parameters for for component 1 para_dist_2: array Lognormal distribution parameters for component 2 mean_cond: array Estimate coefficients of mean of Ln(x2|x1) std_cond: array Estimate coefficients of the standard deviation of Ln(x2|x1) """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(min_bin_count, int): raise TypeError( f"min_bin_count must be of type int. Got: {type(min_bin_count)}" ) if not isinstance(bin_val_size, (int, float)): raise TypeError( f"bin_val_size must be of type int or float. Got: {type(bin_val_size)}" ) if not isinstance(initial_bin_max_val, (int, float)): raise TypeError( f"initial_bin_max_val must be of type int or float. Got: {type(initial_bin_max_val)}" ) # Binning x1_sorted_index = x1.argsort() x1_sorted = x1[x1_sorted_index] x2_sorted = x2[x1_sorted_index] # Because x1 is sorted we can find the max index as follows: ind = np.array([]) N_vals_lt_limit = sum(x1_sorted <= initial_bin_max_val) ind = np.append(ind, N_vals_lt_limit) # Make sure first bin isn't empty or too small to avoid errors while ind == 0 or ind < min_bin_count: ind = np.array([]) initial_bin_max_val += bin_val_size N_vals_lt_limit = sum(x1_sorted <= initial_bin_max_val) ind = np.append(ind, N_vals_lt_limit) # Add bins until the total number of vals in between bins is # < the min bin size i = 0 bin_size_i = np.inf while bin_size_i >= min_bin_count: i += 1 bin_i_max_val = initial_bin_max_val + bin_val_size * (i) N_vals_lt_limit = sum(x1_sorted <= bin_i_max_val) ind = np.append(ind, N_vals_lt_limit) bin_size_i = ind[i] - ind[i - 1] # Weibull distribution parameters for component 1 using MLE para_dist_1 = stats.exponweib.fit(x1_sorted, floc=0, fa=1) # Lognormal distribution parameters for component 2 using MLE para_dist_2 = stats.norm.fit(np.log(x2_sorted)) # Parameters for conditional distribution of T|Hs for each bin num = len(ind) # num+1: number of bins para_dist_cond = [] hss = [] # Bin zero special case (lognormal dist over only 1 bin) # parameters for zero bin ind0 = range(0, int(ind[0])) x2_log0 = np.log(x2_sorted[ind0]) x2_lognormal_dist0 = stats.norm.fit(x2_log0) para_dist_cond.append(x2_lognormal_dist0) # mean of x1 (component 1 for zero bin) x1_bin0 = x1_sorted[range(0, int(ind[0]) - 1)] hss.append(np.mean(x1_bin0)) # Special case 2-bin lognormal Dist # parameters for 1 bin ind1 = range(0, int(ind[1])) x2_log1 = np.log(x2_sorted[ind1]) x2_lognormal_dist1 = stats.norm.fit(x2_log1) para_dist_cond.append(x2_lognormal_dist1) # mean of Hs (component 1 for bin 1) hss.append(np.mean(x1_sorted[range(0, int(ind[1]) - 1)])) # lognormal Dist (lognormal dist over only 2 bins) for i in range(2, num): ind_i = range(int(ind[i - 2]), int(ind[i])) x2_log_i = np.log(x2_sorted[ind_i]) x2_lognormal_dist_i = stats.norm.fit(x2_log_i) para_dist_cond.append(x2_lognormal_dist_i) hss.append(np.mean(x1_sorted[ind_i])) # Estimate coefficient using least square solution (mean: 3rd order, # sigma: 2nd order) ind_f = range(int(ind[num - 2]), int(len(x1))) x2_log_f = np.log(x2_sorted[ind_f]) x2_lognormal_dist_f = stats.norm.fit(x2_log_f) para_dist_cond.append(x2_lognormal_dist_f) # parameters for last bin # mean of Hs (component 1 for last bin) hss.append(np.mean(x1_sorted[ind_f])) para_dist_cond = np.array(para_dist_cond) hss = np.array(hss) # cubic in Hs: a + bx + cx**2 + dx**3 phi_mean = np.column_stack((np.ones(num + 1), hss, hss**2, hss**3)) # quadratic in Hs a + bx + cx**2 phi_std = np.column_stack((np.ones(num + 1), hss, hss**2)) # Estimate coefficients of mean of Ln(T|Hs)(vector 4x1) (cubic in Hs) mean_cond = np.linalg.lstsq(phi_mean, para_dist_cond[:, 0], rcond=None)[0] # Estimate coefficients of standard deviation of Ln(T|Hs) # (vector 3x1) (quadratic in Hs) std_cond = np.linalg.lstsq(phi_std, para_dist_cond[:, 1], rcond=None)[0] return para_dist_1, para_dist_2, mean_cond, std_cond def _gaussian_copula(x1, x2, fit, component_1, kwargs): """ Extreme Sea State Gaussian Copula Contour function. This function calculates environmental contours of extreme sea states using a Gaussian copula and the inverse first-order reliability method. Parameters ---------- x1: numpy array Component 1 data x2: numpy array Component 2 data fit: Dictionay Dictionary of the iso-probability results component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. component_2_Gaussian Calculated x2 values along the contour boundary following return to original input orientation. fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ try: x1 = np.array(x1) except: pass try: x2 = np.array(x2) except: pass if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(component_1, np.ndarray): raise TypeError( f"component_1 must be of type np.ndarray. Got: {type(component_1)}" ) return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) x_component_iso_prob = fit["x_component_iso_prob"] y_component_iso_prob = fit["y_component_iso_prob"] # Calculate Kendall's tau tau = stats.kendalltau(x2, x1)[0] rho_gau = np.sin(tau * np.pi / 2.0) z2_Gauss = stats.norm.cdf( y_component_iso_prob * np.sqrt(1.0 - rho_gau**2.0) + rho_gau * x_component_iso_prob ) para_dist_2 = fit["para_dist_2"] s = para_dist_2[1] loc = 0 scale = np.exp(para_dist_2[0]) # lognormal inverse component_2_Gaussian = stats.lognorm.ppf(z2_Gauss, s=s, loc=loc, scale=scale) fit["tau"] = tau fit["rho"] = rho_gau fit["z2"] = z2_Gauss if return_fit: return component_1, component_2_Gaussian, fit return component_1, component_2_Gaussian def _gumbel_density(u, alpha): """ Calculates the Gumbel copula density. Parameters ---------- u: np.array Vector of equally spaced points between 0 and twice the maximum value of T. alpha: float Copula parameter. Must be greater than or equal to 1. Returns ------- y: np.array Copula density function. """ # Ignore divide by 0 warnings and resulting NaN warnings np.seterr(all="ignore") v = -np.log(u) v = np.sort(v, axis=0) vmin = v[0, :] vmax = v[1, :] nlogC = vmax * (1 + (vmin / vmax) ** alpha) ** (1 / alpha) y = (alpha - 1 + nlogC) * np.exp( -nlogC + np.sum((alpha - 1) * np.log(v) + v, axis=0) + (1 - 2 * alpha) * np.log(nlogC) ) np.seterr(all="warn") return y def _gumbel_copula(x1, x2, fit, component_1, nb_steps, kwargs): """ This function calculates environmental contours of extreme sea states using a Gumbel copula and the inverse first-order reliability method. Parameters ---------- x1: numpy array Component 1 data x2: numpy array Component 2 data fit: Dictionay Dictionary of the iso-probability results component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. nb_steps: int Discretization of the circle in the normal space used for copula component calculation. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. component_2_Gumbel: array Calculated x2 values along the contour boundary following return to original input orientation. fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ try: x1 = np.array(x1) except: pass try: x2 = np.array(x2) except: pass if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(component_1, np.ndarray): raise TypeError( f"component_1 must be of type np.ndarray. Got: {type(component_1)}" ) return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) x_quantile = fit["x_quantile"] y_quantile = fit["y_quantile"] para_dist_2 = fit["para_dist_2"] # Calculate Kendall's tau tau = stats.kendalltau(x2, x1)[0] theta_gum = 1.0 / (1.0 - tau) min_limit_2 = 0 max_limit_2 = np.ceil(np.amax(x2) * 2) Ndata = 1000 x = np.linspace(min_limit_2, max_limit_2, Ndata) s = para_dist_2[1] scale = np.exp(para_dist_2[0]) z2 = stats.lognorm.cdf(x, s=s, loc=0, scale=scale) fit["tau"] = tau fit["theta"] = theta_gum fit["z2"] = z2 component_2_Gumbel = np.zeros(nb_steps) for k in range(nb_steps): z1 = np.array([x_quantile[k]] * Ndata) Z = np.array((z1, z2)) Y = _gumbel_density(Z, theta_gum) Y = np.nan_to_num(Y) # pdf 2|1, f(comp_2|comp_1)=c(z1,z2)*f(comp_2) p_x_x1 = Y * (stats.lognorm.pdf(x, s=s, loc=0, scale=scale)) # Estimate CDF from PDF dum = np.cumsum(p_x_x1) cdf = dum / (dum[Ndata - 1]) # Result of conditional CDF derived based on Gumbel copula table = np.array((x, cdf)) table = table.T for j in range(Ndata): if y_quantile[k] <= table[0, 1]: component_2_Gumbel[k] = min(table[:, 0]) break elif y_quantile[k] <= table[j, 1]: component_2_Gumbel[k] = (table[j, 0] + table[j - 1, 0]) / 2 break else: component_2_Gumbel[k] = table[:, 0].max() if return_fit: return component_1, component_2_Gumbel, fit return component_1, component_2_Gumbel def _clayton_copula(x1, x2, fit, component_1, kwargs): """ This function calculates environmental contours of extreme sea states using a Clayton copula and the inverse first-order reliability method. Parameters ---------- x1: numpy array Component 1 data x2: numpy array Component 2 data fit: Dictionay Dictionary of the iso-probability results component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. nb_steps: int Discretization of the circle in the normal space used for copula component calculation. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. component_2_Clayton: array Calculated x2 values along the contour boundary following return to original input orientation. fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(component_1, np.ndarray): raise TypeError( f"component_1 must be of type np.ndarray. Got: {type(component_1)}" ) return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) x_quantile = fit["x_quantile"] y_quantile = fit["y_quantile"] para_dist_2 = fit["para_dist_2"] # Calculate Kendall's tau tau = stats.kendalltau(x2, x1)[0] theta_clay = (2.0 * tau) / (1.0 - tau) s = para_dist_2[1] scale = np.exp(para_dist_2[0]) z2_Clay = ( (1.0 - x_quantile ** (-theta_clay) + x_quantile ** (-theta_clay) / y_quantile) ** (theta_clay / (1.0 + theta_clay)) ) ** (-1.0 / theta_clay) # lognormal inverse component_2_Clayton = stats.lognorm.ppf(z2_Clay, s=s, loc=0, scale=scale) fit["theta_clay"] = theta_clay fit["tau"] = tau fit["z2_Clay"] = z2_Clay if return_fit: return component_1, component_2_Clayton, fit return component_1, component_2_Clayton def _rosenblatt_copula(x1, x2, fit, component_1, kwargs): """ This function calculates environmental contours of extreme sea states using a Rosenblatt transformation and the inverse first-order reliability method. Parameters ---------- x1: numpy array Component 1 data x2: numpy array Component 2 data fit: Dictionay Dictionary of the iso-probability results component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. nb_steps: int Discretization of the circle in the normal space used for copula component calculation. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1: array Calculated x1 values along the contour boundary following return to original input orientation. component_1 is not specifically used in this calculation but is passed through to create a consistent output from all copula methods. component_2_Rosenblatt: array Calculated x2 values along the contour boundary following return to original input orientation. fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ try: x1 = np.array(x1) except: pass try: x2 = np.array(x2) except: pass if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(component_1, np.ndarray): raise TypeError( f"component_1 must be of type np.ndarray. Got: {type(component_1)}" ) return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) y_quantile = fit["y_quantile"] mean_cond = fit["mean_cond"] std_cond = fit["std_cond"] # mean of Ln(T) as a function of x1 lamda_cond = ( mean_cond[0] + mean_cond[1] * component_1 + mean_cond[2] * component_1**2 + mean_cond[3] * component_1**3 ) # Standard deviation of Ln(x2) as a function of x1 sigma_cond = std_cond[0] + std_cond[1] * component_1 + std_cond[2] * component_1**2 # lognormal inverse component_2_Rosenblatt = stats.lognorm.ppf( y_quantile, s=sigma_cond, loc=0, scale=np.exp(lamda_cond) ) fit["lamda_cond"] = lamda_cond fit["sigma_cond"] = sigma_cond if return_fit: return component_1, component_2_Rosenblatt, fit return component_1, component_2_Rosenblatt def _nonparametric_copula_parameters(x1, x2, max_x1=None, max_x2=None, nb_steps=1000): """ Calculates nonparametric copula parameters Parameters ---------- x1: array Component 1 data x2: array Component 2 data max_x1: float Defines the max value of x1 to discretize the KDE space max_x2:float Defines the max value of x2 to discretize the KDE space nb_steps: int number of points used to discretize KDE space Returns ------- nonpara_dist_1: x1 points in KDE space and Nonparametric CDF for x1 nonpara_dist_2: x2 points in KDE space and Nonparametric CDF for x2 nonpara_pdf_2: x2 points in KDE space and Nonparametric PDF for x2 """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not max_x1: max_x1 = x1.max() * 2 if not max_x2: max_x2 = x2.max() * 2 if not isinstance(max_x1, float): raise TypeError(f"max_x1 must be of type float. Got: {type(max_x1)}") if not isinstance(max_x2, float): raise TypeError(f"max_x2 must be of type float. Got: {type(max_x2)}") if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") # Binning x1_sorted_index = x1.argsort() x1_sorted = x1[x1_sorted_index] x2_sorted = x2[x1_sorted_index] # Calcualte KDE bounds (potential input) min_limit_1 = 0 min_limit_2 = 0 # Discretize for KDE pts_x1 = np.linspace(min_limit_1, max_x1, nb_steps) pts_x2 = np.linspace(min_limit_2, max_x2, nb_steps) # Calculate optimal bandwidth for T and Hs sig = stats.median_abs_deviation(x2_sorted) num = float(len(x2_sorted)) bwT = sig * (4.0 / (3.0 * num)) ** (1.0 / 5.0) sig = stats.median_abs_deviation(x1_sorted) num = float(len(x1_sorted)) bwHs = sig * (4.0 / (3.0 * num)) ** (1.0 / 5.0) # Nonparametric PDF for x2 temp = KDEUnivariate(x2_sorted) temp.fit(bw=bwT) f_x2 = temp.evaluate(pts_x2) # Nonparametric CDF for x1 temp = KDEUnivariate(x1_sorted) temp.fit(bw=bwHs) tempPDF = temp.evaluate(pts_x1) F_x1 = tempPDF / sum(tempPDF) F_x1 = np.cumsum(F_x1) # Nonparametric CDF for x2 F_x2 = f_x2 / sum(f_x2) F_x2 = np.cumsum(F_x2) nonpara_dist_1 = np.transpose(np.array([pts_x1, F_x1])) nonpara_dist_2 = np.transpose(np.array([pts_x2, F_x2])) nonpara_pdf_2 = np.transpose(np.array([pts_x2, f_x2])) return nonpara_dist_1, nonpara_dist_2, nonpara_pdf_2 def _nonparametric_component(z, nonpara_dist, nb_steps): """ A generalized method for calculating copula components Parameters ---------- z: array CDF of isoprobability nonpara_dist: array x1 or x2 points in KDE space and Nonparametric CDF for x1 or x2 nb_steps: int Discretization of the circle in the normal space used for copula component calculation. Returns ------- component: array nonparametic component values """ if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") component = np.zeros(nb_steps) for k in range(0, nb_steps): for j in range(0, np.size(nonpara_dist, 0)): if z[k] <= nonpara_dist[0, 1]: component[k] = min(nonpara_dist[:, 0]) break elif z[k] <= nonpara_dist[j, 1]: component[k] = (nonpara_dist[j, 0] + nonpara_dist[j - 1, 0]) / 2 break else: component[k] = max(nonpara_dist[:, 0]) return component def _nonparametric_gaussian_copula(x1, x2, fit, nb_steps, kwargs): """ This function calculates environmental contours of extreme sea states using a Gaussian copula with non-parametric marginal distribution fits and the inverse first-order reliability method. Parameters ---------- x1: array Component 1 data x2: array Component 2 data fit: Dictionary Dictionary of the iso-probability results nb_steps: int Discretization of the circle in the normal space used for copula component calculation. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1_np: array Component 1 nonparametric copula component_2_np_gaussian: array Component 2 nonparametric Gaussian copula fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) x_component_iso_prob = fit["x_component_iso_prob"] y_component_iso_prob = fit["y_component_iso_prob"] nonpara_dist_1 = fit["nonpara_dist_1"] nonpara_dist_2 = fit["nonpara_dist_2"] # Calculate Kendall's tau tau = stats.kendalltau(x2, x1)[0] rho_gau = np.sin(tau * np.pi / 2.0) # Component 1 z1 = stats.norm.cdf(x_component_iso_prob) z2 = stats.norm.cdf( y_component_iso_prob * np.sqrt(1.0 - rho_gau**2.0) + rho_gau * x_component_iso_prob ) comps = { 1: {"z": z1, "nonpara_dist": nonpara_dist_1}, 2: {"z": z2, "nonpara_dist": nonpara_dist_2}, } for c in comps: z = comps[c]["z"] nonpara_dist = comps[c]["nonpara_dist"] comps[c]["comp"] = _nonparametric_component(z, nonpara_dist, nb_steps) component_1_np = comps[1]["comp"] component_2_np_gaussian = comps[2]["comp"] fit["tau"] = tau fit["rho"] = rho_gau fit["z1"] = z1 fit["z2"] = z2 if return_fit: return component_1_np, component_2_np_gaussian, fit return component_1_np, component_2_np_gaussian def _nonparametric_clayton_copula(x1, x2, fit, nb_steps, kwargs): """ This function calculates environmental contours of extreme sea states using a Clayton copula with non-parametric marginal distribution fits and the inverse first-order reliability method. Parameters ---------- x1: array Component 1 data x2: array Component 2 data fit: Dictionary Dictionary of the iso-probability results nb_steps: int Discretization of the circle in the normal space used for copula component calculation. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1_np: array Component 1 nonparametric copula component_2_np_gaussian: array Component 2 nonparametric Clayton copula fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) x_component_iso_prob = fit["x_component_iso_prob"] x_quantile = fit["x_quantile"] y_quantile = fit["y_quantile"] nonpara_dist_1 = fit["nonpara_dist_1"] nonpara_dist_2 = fit["nonpara_dist_2"] nonpara_pdf_2 = fit["nonpara_pdf_2"] # Calculate Kendall's tau tau = stats.kendalltau(x2, x1)[0] theta_clay = (2.0 * tau) / (1.0 - tau) # Component 1 (Hs) z1 = stats.norm.cdf(x_component_iso_prob) z2_clay = ( (1 - x_quantile ** (-theta_clay) + x_quantile ** (-theta_clay) / y_quantile) ** (theta_clay / (1.0 + theta_clay)) ) ** (-1.0 / theta_clay) comps = { 1: {"z": z1, "nonpara_dist": nonpara_dist_1}, 2: {"z": z2_clay, "nonpara_dist": nonpara_dist_2}, } for c in comps: z = comps[c]["z"] nonpara_dist = comps[c]["nonpara_dist"] comps[c]["comp"] = _nonparametric_component(z, nonpara_dist, nb_steps) component_1_np = comps[1]["comp"] component_2_np_clayton = comps[2]["comp"] fit["tau"] = tau fit["theta"] = theta_clay fit["z1"] = z1 fit["z2"] = z2_clay if return_fit: return component_1_np, component_2_np_clayton, fit return component_1_np, component_2_np_clayton def _nonparametric_gumbel_copula(x1, x2, fit, nb_steps, kwargs): """ This function calculates environmental contours of extreme sea states using a Gumbel copula with non-parametric marginal distribution fits and the inverse first-order reliability method. Parameters ---------- x1: array Component 1 data x2: array Component 2 data results: Dictionay Dictionary of the iso-probability results nb_steps: int Discretization of the circle in the normal space used for copula component calculation. kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- component_1_np: array Component 1 nonparametric copula component_2_np_gumbel: array Component 2 nonparametric Gumbel copula fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") return_fit = kwargs.get("return_fit", False) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be a bool. Got: {type(return_fit)}" ) Ndata = 1000 x_quantile = fit["x_quantile"] y_quantile = fit["y_quantile"] nonpara_dist_1 = fit["nonpara_dist_1"] nonpara_dist_2 = fit["nonpara_dist_2"] nonpara_pdf_2 = fit["nonpara_pdf_2"] # Calculate Kendall's tau tau = stats.kendalltau(x2, x1)[0] theta_gum = 1.0 / (1.0 - tau) # Component 1 (Hs) z1 = x_quantile component_1_np = _nonparametric_component(z1, nonpara_dist_1, nb_steps) pts_x2 = nonpara_pdf_2[:, 0] f_x2 = nonpara_pdf_2[:, 1] F_x2 = nonpara_dist_2[:, 1] component_2_np_gumbel = np.zeros(nb_steps) for k in range(nb_steps): z1 = np.array([x_quantile[k]] * Ndata) Z = np.array((z1.T, F_x2)) Y = _gumbel_density(Z, theta_gum) Y = np.nan_to_num(Y) # pdf 2|1 p_x2_x1 = Y * f_x2 # Estimate CDF from PDF dum = np.cumsum(p_x2_x1) cdf = dum / (dum[Ndata - 1]) table = np.array((pts_x2, cdf)) table = table.T for j in range(Ndata): if y_quantile[k] <= table[0, 1]: component_2_np_gumbel[k] = min(table[:, 0]) break elif y_quantile[k] <= table[j, 1]: component_2_np_gumbel[k] = (table[j, 0] + table[j - 1, 0]) / 2 break else: component_2_np_gumbel[k] = max(table[:, 0]) fit["tau"] = tau fit["theta"] = theta_gum fit["z1"] = z1 fit["pts_x2"] = pts_x2 fit["f_x2"] = f_x2 fit["F_x2"] = F_x2 if return_fit: return component_1_np, component_2_np_gumbel, fit return component_1_np, component_2_np_gumbel def _bivariate_KDE(x1, x2, bw, fit, nb_steps, Ndata_bivariate_KDE, kwargs): """ Contours generated under this class will use a non-parametric KDE to fit the joint distribution. This function calculates environmental contours of extreme sea states using a bivariate KDE to estimate the joint distribution. The contour is then calculated directly from the joint distribution. Parameters ---------- x1: array Component 1 data x2: array Component 2 data bw: np.array Array containing KDE bandwidth for x1 and x2 fit: Dictionay Dictionary of the iso-probability results nb_steps: int number of points used to discretize KDE space max_x1: float Defines the max value of x1 to discretize the KDE space max_x2: float Defines the max value of x2 to discretize the KDE space kwargs : optional return_fit: boolean Will return fitting parameters used. Default False. Returns ------- x1_bivariate_KDE: array Calculated x1 values along the contour boundary following return to original input orientation. x2_bivariate_KDE: array Calculated x2 values along the contour boundary following return to original input orientation. fit: Dictionary (optional) If return_fit=True. Dictionary with iso-probabilities passed with additional fit metrics from the copula method. """ if not isinstance(x1, np.ndarray): raise TypeError(f"x1 must be of type np.ndarray. Got: {type(x1)}") if not isinstance(x2, np.ndarray): raise TypeError(f"x2 must be of type np.ndarray. Got: {type(x2)}") if not isinstance(nb_steps, int): raise TypeError(f"nb_steps must be of type int. Got: {type(nb_steps)}") max_x1 = kwargs.get("max_x1", None) max_x2 = kwargs.get("max_x2", None) log_transform = kwargs.get("log_transform", False) return_fit = kwargs.get("return_fit", False) if isinstance(max_x1, type(None)): max_x1 = x1.max() * 2 if isinstance(max_x2, type(None)): max_x2 = x2.max() * 2 if not isinstance(max_x1, float): raise TypeError(f"max_x1 must be of type float. Got: {type(max_x1)}") if not isinstance(max_x2, float): raise TypeError(f"max_x2 must be of type float. Got: {type(max_x2)}") if not isinstance(log_transform, bool): raise TypeError( f"If specified, log_transform must be of type bool. Got: {type(log_transform)}" ) if not isinstance(return_fit, bool): raise TypeError( f"If specified, return_fit must be of type bool. Got: {type(return_fit)}" ) p_f = fit["exceedance_probability"] min_limit_1 = 0.01 min_limit_2 = 0.01 pts_x1 = np.linspace(min_limit_1, max_x1, Ndata_bivariate_KDE) pts_x2 = np.linspace(min_limit_2, max_x2, Ndata_bivariate_KDE) pt1, pt2 = np.meshgrid(pts_x2, pts_x1) mesh_pts_x2 = pt1.flatten() mesh_pts_x1 = pt2.flatten() # Transform gridded points using log ty = [x2, x1] xi = [mesh_pts_x2, mesh_pts_x1] txi = xi if log_transform: ty = [np.log(x2), np.log(x1)] txi = [np.log(mesh_pts_x2), np.log(mesh_pts_x1)] m = len(txi[0]) n = len(ty[0]) d = 2 # Create contour f = np.zeros((1, m)) weight = np.ones((1, n)) for i in range(0, m): ftemp = np.ones((n, 1)) for j in range(0, d): z = (txi[j][i] - ty[j]) / bw[j] fk = stats.norm.pdf(z) if log_transform: fnew = fk * (1 / np.transpose(xi[j][i])) else: fnew = fk fnew = np.reshape(fnew, (n, 1)) ftemp = np.multiply(ftemp, fnew) f[:, i] = np.dot(weight, ftemp) fhat = f.reshape(100, 100) vals = plt.contour(pt1, pt2, fhat, levels=[p_f]) plt.clf() x1_bivariate_KDE = [] x2_bivariate_KDE = [] segments = [path.vertices for path in vals.get_paths()] for seg in segments: x1_bivariate_KDE.append(seg[:, 1]) x2_bivariate_KDE.append(seg[:, 0]) x1_bivariate_KDE = np.transpose(np.asarray(x1_bivariate_KDE)[0]) x2_bivariate_KDE = np.transpose(np.asarray(x2_bivariate_KDE)[0]) fit["mesh_pts_x1"] = mesh_pts_x1 fit["mesh_pts_x2"] = mesh_pts_x2 fit["ty"] = ty fit["xi"] = xi fit["contour_vals"] = vals if return_fit: return x1_bivariate_KDE, x2_bivariate_KDE, fit return x1_bivariate_KDE, x2_bivariate_KDE # Sampling
[docs] def samples_full_seastate( x1, x2, points_per_interval, return_periods, sea_state_duration, method="PCA", bin_size=250, ): """ Sample a sea state between contours of specified return periods. This function is used for the full sea state approach for the extreme load. See Coe et al. 2018 for more details. It was originally part of WDRT. Coe, R. G., Michelen, C., Eckert-Gallup, A., & Sallaberry, C. (2018). Full long-term design response analysis of a wave energy converter. Renewable Energy, 116, 356-366. Parameters ---------- x1: list, np.ndarray, pd.Series, xr.DataArray Component 1 data x2: list, np.ndarray, pd.Series, xr.DataArray Component 2 data points_per_interval : int Number of sample points to be calculated per contour interval. return_periods: np.array Vector of return periods that define the contour intervals in which samples will be taken. Values must be greater than zero and must be in increasing order. sea_state_duration : int or float `x1` and `x2` sample rate (seconds) method: string or list Copula method to apply. Currently only 'PCA' is implemented. bin_size : int Number of data points in each bin Returns ------- Hs_Samples: np.array Vector of Hs values for each sample point. Te_Samples: np.array Vector of Te values for each sample point. weight_points: np.array Vector of probabilistic weights for each sampling point to be used in risk calculations. """ if method != "PCA": raise NotImplementedError( "Full sea state sampling is currently only implemented using " + "the 'PCA' method." ) x1 = to_numeric_array(x1, "x1") x2 = to_numeric_array(x2, "x2") if not isinstance(points_per_interval, int): raise TypeError( f"points_per_interval must be of int. Got: {type(points_per_interval)}" ) if not isinstance(return_periods, np.ndarray): raise TypeError( f"return_periods must be of type np.ndarray. Got: {type(return_periods)}" ) if not isinstance(sea_state_duration, (int, float)): raise TypeError( f"sea_state_duration must be of int or float. Got: {type(sea_state_duration)}" ) if not isinstance(method, (str, list)): raise TypeError(f"method must be of type string or list. Got: {type(method)}") if not isinstance(bin_size, int): raise TypeError(f"bin_size must be of int. Got: {type(bin_size)}") pca_fit = _principal_component_analysis(x1, x2, bin_size) # Calculate line where Hs = 0 to avoid sampling Hs in negative space t_zeroline = np.linspace(2.5, 30, 1000) t_zeroline = np.transpose(t_zeroline) h_zeroline = np.zeros(len(t_zeroline)) # Transform zero line into principal component space coeff = pca_fit["principal_axes"] shift = pca_fit["shift"] comp_zeroline = np.dot(np.transpose(np.vstack([h_zeroline, t_zeroline])), coeff) comp_zeroline[:, 1] = comp_zeroline[:, 1] + shift comp1 = pca_fit["x1_fit"] c1_zeroline_prob = stats.invgauss.cdf( comp_zeroline[:, 0], mu=comp1["mu"], loc=0, scale=comp1["scale"] ) mu_slope = pca_fit["mu_fit"].slope mu_intercept = pca_fit["mu_fit"].intercept mu_zeroline = mu_slope * comp_zeroline[:, 0] + mu_intercept sigma_polynomial_coeffcients = pca_fit["sigma_fit"].x sigma_zeroline = np.polyval(sigma_polynomial_coeffcients, comp_zeroline[:, 0]) c2_zeroline_prob = stats.norm.cdf( comp_zeroline[:, 1], loc=mu_zeroline, scale=sigma_zeroline ) c1_normzeroline = stats.norm.ppf(c1_zeroline_prob, 0, 1) c2_normzeroline = stats.norm.ppf(c2_zeroline_prob, 0, 1) return_periods = np.asarray(return_periods) contour_probs = 1 / (365 * 24 * 60 * 60 / sea_state_duration * return_periods) # Reliability contour generation # Calculate reliability beta_lines = stats.norm.ppf((1 - contour_probs), 0, 1) # Add zero as lower bound to first contour beta_lines = np.hstack((0, beta_lines)) # Discretize the circle theta_lines = np.linspace(0, 2 * np.pi, 1000) # Add probablity of 1 to the reliability set, corresponding to # probability of the center point of the normal space contour_probs = np.hstack((1, contour_probs)) # Vary U1,U2 along circle sqrt(U1^2+U2^2) = beta u1_lines = np.dot(np.cos(theta_lines[:, None]), beta_lines[None, :]) # Removing values on the H_s = 0 line that are far from the circles in the # normal space that will be evaluated to speed up calculations minval = np.amin(u1_lines) - 0.5 mask = c1_normzeroline > minval c1_normzeroline = c1_normzeroline[mask] c2_normzeroline = c2_normzeroline[mask] # Transform to polar coordinates theta_zeroline = np.arctan2(c2_normzeroline, c1_normzeroline) rho_zeroline = np.sqrt(c1_normzeroline**2 + c2_normzeroline**2) theta_zeroline[theta_zeroline < 0] = theta_zeroline[theta_zeroline < 0] + 2 * np.pi sample_alpha, sample_beta, weight_points = _generate_sample_data( beta_lines, rho_zeroline, theta_zeroline, points_per_interval, contour_probs ) # Sample transformation to principal component space sample_u1 = sample_beta * np.cos(sample_alpha) sample_u2 = sample_beta * np.sin(sample_alpha) comp1_sample = stats.invgauss.ppf( stats.norm.cdf(sample_u1, loc=0, scale=1), mu=comp1["mu"], loc=0, scale=comp1["scale"], ) mu_sample = mu_slope * comp1_sample + mu_intercept # Calculate sigma values at each point on the circle sigma_sample = np.polyval(sigma_polynomial_coeffcients, comp1_sample) # Use calculated mu and sigma values to calculate C2 along the contour comp2_sample = stats.norm.ppf( stats.norm.cdf(sample_u2, loc=0, scale=1), loc=mu_sample, scale=sigma_sample ) # Sample transformation into Hs-T space h_sample, t_sample = _princomp_inv(comp1_sample, comp2_sample, coeff, shift) return h_sample, t_sample, weight_points
[docs] def samples_contour(t_samples, t_contour, hs_contour): """ Get Hs points along a specified environmental contour using user-defined T values. Parameters ---------- t_samples : list, np.ndarray, pd.Series, xr.DataArray Points for sampling along return contour t_contour : list, np.ndarray, pd.Series, xr.DataArray T values along contour hs_contour : list, np.ndarray, pd.Series, xr.DataArray Hs values along contour Returns ------- hs_samples : np.ndarray points sampled along return contour """ t_samples = to_numeric_array(t_samples, "t_samples") t_contour = to_numeric_array(t_contour, "t_contour") hs_contour = to_numeric_array(hs_contour, "hs_contour") # finds minimum and maximum energy period values amin = np.argmin(t_contour) amax = np.argmax(t_contour) aamin = np.min([amin, amax]) aamax = np.max([amin, amax]) # finds points along the contour w1 = hs_contour[aamin:aamax] w2 = np.concatenate((hs_contour[aamax:], hs_contour[:aamin])) if np.max(w1) > np.max(w2): x1 = t_contour[aamin:aamax] y1 = hs_contour[aamin:aamax] else: x1 = np.concatenate((t_contour[aamax:], t_contour[:aamin])) y1 = np.concatenate((hs_contour[aamax:], hs_contour[:aamin])) # sorts data based on the max and min energy period values ms = np.argsort(x1) x = x1[ms] y = y1[ms] # interpolates the sorted data si = interp.interp1d(x, y) # finds the wave height based on the user specified energy period values hs_samples = si(t_samples) return hs_samples
def _generate_sample_data( beta_lines, rho_zeroline, theta_zeroline, points_per_interval, contour_probs ): """ Calculate radius, angle, and weight for each sample point Parameters ---------- beta_lines: list, np.ndarray, pd.Series, xr.DataArray Array of mu fitting function parameters. rho_zeroline: list, np.ndarray, pd.Series, xr.DataArray Array of radii theta_zeroline: list, np.ndarray, pd.Series, xr.DataArray points_per_interval: int contour_probs: list, np.ndarray, pd.Series, xr.DataArray Returns ------- sample_alpha: np.array Array of fitted sample angle values. sample_beta: np.array Array of fitted sample radius values. weight_points: np.array Array of weights for each point. """ beta_lines = to_numeric_array(beta_lines, "beta_lines") rho_zeroline = to_numeric_array(rho_zeroline, "rho_zeroline") theta_zeroline = to_numeric_array(theta_zeroline, "theta_zeroline") contour_probs = to_numeric_array(contour_probs, "contour_probs") if not isinstance(points_per_interval, int): raise TypeError( f"points_per_interval must be of type int. Got: {type(points_per_interval)}" ) num_samples = (len(beta_lines) - 1) * points_per_interval alpha_bounds = np.zeros((len(beta_lines) - 1, 2)) angular_dist = np.zeros(len(beta_lines) - 1) angular_ratio = np.zeros(len(beta_lines) - 1) alpha = np.zeros((len(beta_lines) - 1, points_per_interval + 1)) weight = np.zeros(len(beta_lines) - 1) sample_beta = np.zeros(num_samples) sample_alpha = np.zeros(num_samples) weight_points = np.zeros(num_samples) # Loop over contour intervals for i in range(len(beta_lines) - 1): # Check if any of the radii for the Hs=0, line are smaller than # the radii of the contour, meaning that these lines intersect r = rho_zeroline - beta_lines[i + 1] + 0.01 if any(r < 0): left = np.amin(np.where(r < 0)) right = np.amax(np.where(r < 0)) # Save sampling bounds alpha_bounds[i, :] = ( theta_zeroline[left], theta_zeroline[right] - 2 * np.pi, ) else: alpha_bounds[i, :] = np.array((0, 2 * np.pi)) # Find the angular distance that will be covered by sampling the disc angular_dist[i] = sum(abs(alpha_bounds[i])) # Calculate ratio of area covered for each contour angular_ratio[i] = angular_dist[i] / (2 * np.pi) # Discretize the remaining portion of the disc into 10 equally spaced # areas to be sampled alpha[i, :] = np.arange( min(alpha_bounds[i]), max(alpha_bounds[i]) + 0.1, angular_dist[i] / points_per_interval, ) # Calculate the weight of each point sampled per contour weight[i] = ( (contour_probs[i] - contour_probs[i + 1]) * angular_ratio[i] / points_per_interval ) for j in range(points_per_interval): # Generate sample radius by adding a randomly sampled distance to # the 'disc' lower bound sample_beta[(i) * points_per_interval + j] = beta_lines[ i ] + np.random.random_sample() * (beta_lines[i + 1] - beta_lines[i]) # Generate sample angle by adding a randomly sampled distance to # the lower bound of the angle defining a discrete portion of the # 'disc' sample_alpha[(i) * points_per_interval + j] = alpha[ i, j ] + np.random.random_sample() * (alpha[i, j + 1] - alpha[i, j]) # Save the weight for each sample point weight_points[i * points_per_interval + j] = weight[i] return sample_alpha, sample_beta, weight_points def _princomp_inv(princip_data1, princip_data2, coeff, shift): """ Take the inverse of the principal component rotation given data, coefficients, and shift. Parameters ---------- princip_data1: np.array Array of Component 1 values. princip_data2: np.array Array of Component 2 values. coeff: np.array Array of principal component coefficients. shift: float Shift applied to Component 2 to make all values positive. Returns ------- original1: np.array Hs values following rotation from principal component space. original2: np.array T values following rotation from principal component space. """ if not isinstance(princip_data1, np.ndarray): raise TypeError( f"princip_data1 must be of type np.ndarray. Got: {type(princip_data1)}" ) if not isinstance(princip_data2, np.ndarray): raise TypeError( f"princip_data2 must be of type np.ndarray. Got: {type(princip_data2)}" ) if not isinstance(coeff, np.ndarray): raise TypeError(f"coeff must be of type np.ndarray. Got: {type(coeff)}") if not isinstance(shift, float): raise TypeError(f"shift must be of type float. Got: {type(shift)}") original1 = np.zeros(len(princip_data1)) original2 = np.zeros(len(princip_data1)) for i in range(len(princip_data2)): original1[i] = ( (coeff[0, 1] * (princip_data2[i] - shift)) + (coeff[0, 0] * princip_data1[i]) ) / (coeff[0, 1] ** 2 + coeff[0, 0] ** 2) original2[i] = ( (coeff[0, 1] * princip_data1[i]) - (coeff[0, 0] * (princip_data2[i] - shift)) ) / (coeff[0, 1] ** 2 + coeff[0, 0] ** 2) return original1, original2