Source code for mhkit.river.io.d3d

from mhkit.utils import unorm
import scipy.interpolate as interp
import numpy as np
import pandas as pd
import xarray as xr
import netCDF4
import warnings


[docs] def get_all_time(data): """ Returns all of the time stamps from a D3D simulation passed to the function as a NetCDF object (data) Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress generated by running a Delft3D model. Returns ------- seconds_run: array Returns an array of integers representing the number of seconds after the simulation started and that the data object contains a snapshot of simulation conditions at that time. """ if not isinstance(data, netCDF4._netCDF4.Dataset): raise TypeError("data must be a NetCDF4 object") seconds_run = np.ma.getdata(data.variables["time"][:], False) return seconds_run
[docs] def index_to_seconds(data, time_index): """ The function will return 'seconds_run' if passed a 'time_index' Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. time_index: int A positive integer to pull the time index from the dataset. 0 being closest to time 0. Default is last time index -1. Returns ------- seconds_run: int, float The 'seconds_run' is the seconds corresponding to the 'time_index' increments. """ return _convert_time(data, time_index=time_index)
[docs] def seconds_to_index(data, seconds_run): """ The function will return the nearest 'time_index' in the data if passed an integer number of 'seconds_run' Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. seconds_run: int, float A positive integer or float that represents the amount of time in seconds passed since starting the simulation. Returns ------- time_index: int The 'time_index' is a positive integer starting from 0 and incrementing until in simulation is complete. """ return _convert_time(data, seconds_run=seconds_run)
def _convert_time(data, time_index=None, seconds_run=None): """ Converts a time index to seconds or seconds to a time index. The user must specify 'time_index' or 'seconds_run' (Not both). The function will returns 'seconds_run' if passed a 'time_index' or will return the closest 'time_index' if passed a number of 'seconds_run'. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. time_index: int An integer to pull the time index from the dataset. 0 being closest to the start time. seconds_run: int, float An integer or float that represents the amount of time in seconds passed since starting the simulation. Returns ------- QoI: int, float The quantity of interest is the unknown value either the 'time_index' or the 'seconds_run'. The 'time_index' is an integer starting from 0 and incrementing until in simulation is complete. The 'seconds_run' is the seconds corresponding to the 'time_index' increments. """ if not isinstance(data, netCDF4._netCDF4.Dataset): raise TypeError("data must be NetCDF4 object") if not (time_index or seconds_run): raise ValueError("Input of time_index or seconds_run needed") if time_index and seconds_run: raise ValueError("Only one of time_index or seconds_run should be provided") if not ( isinstance(time_index, (int, float)) or isinstance(seconds_run, (int, float)) ): raise TypeError("time_index or seconds_run input must be an int or float") times = get_all_time(data) if time_index: QoI = times[time_index] if seconds_run: try: idx = np.where(times == seconds_run) QoI = idx[0][0] except: idx = (np.abs(times - seconds_run)).argmin() QoI = idx warnings.warn( "Warning: seconds_run not found. Closest time stamp" + f"found {times[idx]}", stacklevel=2, ) return QoI
[docs] def get_layer_data(data, variable, layer_index=-1, time_index=-1, to_pandas=True): """ Get variable data from the NetCDF4 object at a specified layer and timestep. If the data is 2D the layer_index is ignored. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. variable: string Delft3D outputs many vairables. The full list can be found using "data.variables.keys()" in the console. layer_index: int An integer to pull out a layer from the dataset. 0 being closest to the surface. Default is the bottom layer, found with input -1. time_index: int An integer to pull the time index from the dataset. 0 being closest to the start time. Default is last time index, found with input -1. to_pandas : bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- layer_data: pd.DataFrame or xr.Dataset Dataset with columns of "x", "y", "waterdepth", and "waterlevel" location of the specified layer, variable values "v", and the "time" the simulation has run. The waterdepth is measured from the water surface and the "waterlevel" is the water level diffrencein meters from the zero water level. """ if not isinstance(time_index, int): raise TypeError("time_index must be an int") if not isinstance(layer_index, int): raise TypeError("layer_index must be an int") if not isinstance(data, netCDF4._netCDF4.Dataset): raise TypeError("data must be NetCDF4 object") if variable not in data.variables.keys(): raise ValueError("variable not recognized") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") coords = str(data.variables[variable].coordinates).split() var = data.variables[variable][:] max_time_index = data["time"].shape[0] - 1 # to account for zero index if abs(time_index) > max_time_index: raise ValueError( f"time_index must be less than the absolute value of the max time index {max_time_index}" ) x = np.ma.getdata(data.variables[coords[0]][:], False) y = np.ma.getdata(data.variables[coords[1]][:], False) if type(var[0][0]) == np.ma.core.MaskedArray: max_layer = len(var[0][0]) if abs(layer_index) > max_layer: raise ValueError(f"layer_index must be less than the max layer {max_layer}") v = np.ma.getdata(var[time_index, :, layer_index], False) dimensions = 3 else: if type(var[0][0]) != np.float64: raise TypeError("data not recognized") dimensions = 2 v = np.ma.getdata(var[time_index, :], False) # waterdepth if "mesh2d" in variable: cords_to_layers = { "mesh2d_face_x mesh2d_face_y": { "name": "mesh2d_nLayers", "coords": data.variables["mesh2d_layer_sigma"][:], }, "mesh2d_edge_x mesh2d_edge_y": { "name": "mesh2d_nInterfaces", "coords": data.variables["mesh2d_interface_sigma"][:], }, } bottom_depth = np.ma.getdata( data.variables["mesh2d_waterdepth"][time_index, :], False ) waterlevel = np.ma.getdata(data.variables["mesh2d_s1"][time_index, :], False) coords = str(data.variables["waterdepth"].coordinates).split() elif str(data.variables[variable].coordinates) == "FlowElem_xcc FlowElem_ycc": cords_to_layers = { "FlowElem_xcc FlowElem_ycc": { "name": "laydim", "coords": data.variables["LayCoord_cc"][:], }, "FlowLink_xu FlowLink_yu": { "name": "wdim", "coords": data.variables["LayCoord_w"][:], }, } bottom_depth = np.ma.getdata(data.variables["waterdepth"][time_index, :], False) waterlevel = np.ma.getdata(data.variables["s1"][time_index, :], False) coords = str(data.variables["waterdepth"].coordinates).split() else: cords_to_layers = { "FlowElem_xcc FlowElem_ycc LayCoord_cc LayCoord_cc": { "name": "laydim", "coords": data.variables["LayCoord_cc"][:], }, "FlowLink_xu FlowLink_yu": { "name": "wdim", "coords": data.variables["LayCoord_w"][:], }, } bottom_depth = np.ma.getdata(data.variables["waterdepth"][time_index, :], False) waterlevel = np.ma.getdata(data.variables["s1"][time_index, :], False) coords = str(data.variables["waterdepth"].coordinates).split() layer_dim = str(data.variables[variable].coordinates) cord_sys = cords_to_layers[layer_dim]["coords"] layer_percentages = np.ma.getdata(cord_sys, False) # accumulative if layer_dim == "FlowLink_xu FlowLink_yu": # interpolate x_laydim = np.ma.getdata(data.variables[coords[0]][:], False) y_laydim = np.ma.getdata(data.variables[coords[1]][:], False) points_laydim = np.array([[x, y] for x, y in zip(x_laydim, y_laydim)]) coords_request = str(data.variables[variable].coordinates).split() x_wdim = np.ma.getdata(data.variables[coords_request[0]][:], False) y_wdim = np.ma.getdata(data.variables[coords_request[1]][:], False) points_wdim = np.array([[x, y] for x, y in zip(x_wdim, y_wdim)]) bottom_depth_wdim = interp.griddata(points_laydim, bottom_depth, points_wdim) water_level_wdim = interp.griddata(points_laydim, waterlevel, points_wdim) idx_bd = np.where(np.isnan(bottom_depth_wdim)) for i in idx_bd: bottom_depth_wdim[i] = interp.griddata( points_laydim, bottom_depth, points_wdim[i], method="nearest" ) water_level_wdim[i] = interp.griddata( points_laydim, waterlevel, points_wdim[i], method="nearest" ) waterdepth = [] if dimensions == 2: if layer_dim == "FlowLink_xu FlowLink_yu": z = [bottom_depth_wdim] waterlevel = water_level_wdim else: z = [bottom_depth] else: if layer_dim == "FlowLink_xu FlowLink_yu": z = [bottom_depth_wdim * layer_percentages[layer_index]] waterlevel = water_level_wdim else: z = [bottom_depth * layer_percentages[layer_index]] waterdepth = np.append(waterdepth, z) time = np.ma.getdata(data.variables["time"][time_index], False) * np.ones(len(x)) index = np.arange(0, len(time)) layer_data = xr.Dataset( data_vars={ "x": (["index"], x), "y": (["index"], y), "waterdepth": (["index"], waterdepth), "waterlevel": (["index"], waterlevel), "v": (["index"], v), "time": (["index"], time), }, coords={"index": index}, ) if to_pandas: layer_data = layer_data.to_pandas() return layer_data
[docs] def create_points(x, y, waterdepth, to_pandas=True): """ Generate a Dataset of points from combinations of input coordinates. This function accepts three inputs and combines them to generate a Dataset of points. The inputs can be: - 3 points - 2 points and 1 array - 1 point and 2 arrays - 3 arrays (x and y must have the same size) For 3 points or less, every combination will be in the output. For 3 arrays, x and y are treated as coordinate pairs and combined with each value from the waterdepth array. Parameters ---------- x : int, float, array-like X values (longitude) for the points. y : int, float, array-like Y values (latitude) for the points. waterdepth : int, float, array-like Waterdepth values for the points. to_pandas : bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- points : xr.Dataset or pd.DataFrame A Dataset with columns 'x', 'y', and 'waterdepth' representing the generated points. Example ------- 2 arrays and 1 point: >>> x = np.array([1, 2]) >>> y = np.array([3, 4, 5]) >>> waterdepth = 6 >>> create_points(x, y, waterdepth) x y waterdepth 0 1.0 3.0 6.0 1 2.0 3.0 6.0 2 1.0 4.0 6.0 3 2.0 4.0 6.0 4 1.0 5.0 6.0 5 2.0 5.0 6.0 3 arrays (x and y must have the same length): >>> x = np.array([1, 2, 3]) >>> y = np.array([4, 5, 6]) >>> waterdepth = np.array([1, 2]) >>> create_points(x, y, waterdepth) x y waterdepth 0 1.0 4.0 1.0 1 2.0 5.0 1.0 2 3.0 6.0 1.0 3 1.0 4.0 2.0 4 2.0 5.0 2.0 5 4.0 6.0 2.0 """ # Check input types inputs = {"x": x, "y": y, "waterdepth": waterdepth} for name, value in inputs.items(): # Convert lists to numpy arrays if isinstance(value, list): value = np.array(value) inputs[name] = value # Update the value in the dictionary # Check data type if not isinstance(value, (int, float, np.ndarray, pd.Series, xr.DataArray)): raise TypeError( f"{name} must be an int, float, np.ndarray, pd.Series, or xr.DataArray. Got: {type(value)}" ) # Check for empty arrays if isinstance(value, (np.ndarray, pd.Series, xr.DataArray)) and len(value) == 0: raise ValueError(f"{name} should not be an empty array") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") x_array_like = not isinstance(x, (int, float)) y_array_like = not isinstance(y, (int, float)) waterdepth_array_like = not isinstance(waterdepth, (int, float)) if x_array_like and y_array_like and waterdepth_array_like: # if all inputs are arrays, grid the coordinate and waterdepth y_grid, waterdepth_grid = np.meshgrid(y, waterdepth) y_grid = y_grid.ravel() waterdepth_grid = waterdepth_grid.ravel() x_grid, _ = np.meshgrid(x, waterdepth) x_grid = x_grid.ravel() else: # if at least one input is a point, grid all inputs x_grid, y_grid, waterdepth_grid = np.meshgrid(x, y, waterdepth) x_grid = x_grid.ravel() y_grid = y_grid.ravel() waterdepth_grid = waterdepth_grid.ravel() index = np.arange(0, len(x_grid)) points = xr.Dataset( data_vars={ "x": (["index"], x_grid), "y": (["index"], y_grid), "waterdepth": (["index"], waterdepth_grid), }, coords={"index": index}, ) if to_pandas: points = points.to_pandas() return points
[docs] def variable_interpolation( data, variables, points="cells", edges="none", x_max_lim=float("inf"), x_min_lim=float("-inf"), y_max_lim=float("inf"), y_min_lim=float("-inf"), to_pandas=True, ): """ Interpolate multiple variables from the Delft3D onto the same points. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress generated by running a Delft3D model. variables: array of strings Name of variables to interpolate, e.g. 'turkin1', 'ucx', 'ucy' and 'ucz'. The full list can be found using "data.variables.keys()" in the console. points: string, pd.DataFrame, or xr.Dataset The points to interpolate data onto. 'cells'- interpolates all data onto the Delft3D cell coordinate system (Default) 'faces'- interpolates all dada onto the Delft3D face coordinate system Dataset of x, y, and waterdepth coordinates - Interpolates data onto user povided points. Can be created with `create_points` function. edges: string: 'nearest' If edges is set to 'nearest' the code will fill in nan values with nearest interpolation. Otherwise only linear interpolarion will be used. to_pandas : bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- transformed_data: pd.DataFrame or xr.Dataset Variables on specified grid points saved under the input variable names and the x, y, and waterdepth coordinates of those points. """ if not isinstance(points, (str, pd.DataFrame, xr.Dataset)): raise TypeError( f"points must be a string, pd.DataFrame, or xr.Dataset. Got {type(points)}." ) if isinstance(points, xr.Dataset): points = points.to_pandas() if isinstance(points, str): if not (points == "cells" or points == "faces"): raise ValueError( f"If a string, points must be cells or faces. Got {points}" ) if not isinstance(data, netCDF4._netCDF4.Dataset): raise TypeError(f"data must be netCDF4 object. Got {type(data)}") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") data_raw = {} for var in variables: var_data_df = get_all_data_points(data, var, time_index=-1, to_pandas=True) var_data_df["depth"] = var_data_df.waterdepth - var_data_df.waterlevel # added var_data_df = var_data_df.loc[:, ~var_data_df.T.duplicated(keep="first")] var_data_df = var_data_df[var_data_df.x > x_min_lim] var_data_df = var_data_df[var_data_df.x < x_max_lim] var_data_df = var_data_df[var_data_df.y > y_min_lim] var_data_df = var_data_df[var_data_df.y < y_max_lim] data_raw[var] = var_data_df if isinstance(points, pd.DataFrame): print("points provided") elif points == "faces": points = data_raw["ucx"][["x", "y", "waterdepth"]] elif points == "cells": points = data_raw["turkin1"][["x", "y", "waterdepth"]] transformed_data = points.copy(deep=True) for var in variables: transformed_data[var] = interp.griddata( data_raw[var][["x", "y", "waterdepth"]], # waterdepth to depth data_raw[var][var], points[["x", "y", "waterdepth"]], ) if edges == "nearest": idx = np.where(np.isnan(transformed_data[var])) if len(idx[0]): for i in idx[0]: transformed_data[var][i] = interp.griddata( data_raw[var][["x", "y", "waterdepth"]], data_raw[var][var], [points["x"][i], points["y"][i], points["waterdepth"][i]], method="nearest", ) if not to_pandas: transformed_data = transformed_data.to_dataset() return transformed_data
[docs] def get_all_data_points(data, variable, time_index=-1, to_pandas=True): """ Get data points for a passed variable for all layers at a specified time from the Delft3D NetCDF4 object by iterating over the `get_layer_data` function. Parameters ---------- data: Netcdf4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. variable: string Delft3D variable. The full list can be of variables can be found using "data.variables.keys()" in the console. time_index: int An integer to pull the time step from the dataset. Default is last time step, found with the input -1. to_pandas : bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- all_data: xr.Dataset or pd.Dataframe Dataframe with columns x, y, waterdepth, waterlevel, variable, and time. The waterdepth is measured from the water surface and the "waterlevel" is the water level diffrence in meters from the zero water level. """ if not isinstance(time_index, int): raise TypeError("time_index must be an int") if not isinstance(data, netCDF4._netCDF4.Dataset): raise TypeError("data must be NetCDF4 object") if variable not in data.variables.keys(): raise ValueError("variable not recognized") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") max_time_index = len(data.variables[variable][:]) if abs(time_index) > max_time_index: raise ValueError( f"time_index must be less than the max time index {max_time_index}" ) if "mesh2d" in variable: cords_to_layers = { "mesh2d_face_x mesh2d_face_y": { "name": "mesh2d_nLayers", "coords": data.variables["mesh2d_layer_sigma"][:], }, "mesh2d_edge_x mesh2d_edge_y": { "name": "mesh2d_nInterfaces", "coords": data.variables["mesh2d_interface_sigma"][:], }, } elif str(data.variables[variable].coordinates) == "FlowElem_xcc FlowElem_ycc": cords_to_layers = { "FlowElem_xcc FlowElem_ycc": { "name": "laydim", "coords": data.variables["LayCoord_cc"][:], }, "FlowLink_xu FlowLink_yu": { "name": "wdim", "coords": data.variables["LayCoord_w"][:], }, } else: cords_to_layers = { "FlowElem_xcc FlowElem_ycc LayCoord_cc LayCoord_cc": { "name": "laydim", "coords": data.variables["LayCoord_cc"][:], }, "FlowLink_xu FlowLink_yu": { "name": "wdim", "coords": data.variables["LayCoord_w"][:], }, } layer_dim = str(data.variables[variable].coordinates) try: cord_sys = cords_to_layers[layer_dim]["coords"] except: raise Exception("Coordinates not recognized.") else: layer_percentages = np.ma.getdata(cord_sys, False) x_all = [] y_all = [] depth_all = [] water_level_all = [] v_all = [] time_all = [] layers = range(len(layer_percentages)) for layer in layers: layer_data = get_layer_data(data, variable, layer, time_index) x_all = np.append(x_all, layer_data.x) y_all = np.append(y_all, layer_data.y) depth_all = np.append(depth_all, layer_data.waterdepth) water_level_all = np.append(water_level_all, layer_data.waterlevel) v_all = np.append(v_all, layer_data.v) time_all = np.append(time_all, layer_data.time) index = np.arange(0, len(time_all)) all_data = xr.Dataset( data_vars={ "x": (["index"], x_all), "y": (["index"], y_all), "waterdepth": (["index"], depth_all), "waterlevel": (["index"], water_level_all), f"{variable}": (["index"], v_all), "time": (["index"], time_all), }, coords={"index": index}, ) if to_pandas: all_data = all_data.to_pandas() return all_data
[docs] def turbulent_intensity( data, points="cells", time_index=-1, intermediate_values=False, to_pandas=True ): """ Calculate the turbulent intensity percentage for a given data set for the specified points. Assumes variable names: ucx, ucy, ucz and turkin1. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. points: string, pd.DataFrame, xr.Dataset Points to interpolate data onto. 'cells': interpolates all data onto velocity coordinate system (Default). 'faces': interpolates all data onto the TKE coordinate system. DataFrame of x, y, and z coordinates: Interpolates data onto user provided points. time_index: int An integer to pull the time step from the dataset. Default is late time step -1. intermediate_values: boolean (optional) If false the function will return position and turbulent intensity values. If true the function will return position(x,y,z) and values need to calculate turbulent intensity (ucx, uxy, uxz and turkin1) in a Dataframe. Default False. to_pandas : bool (optional) Flag to output pandas instead of xarray. Default = True. Returns ------- TI_data: xr.Dataset or pd.DataFrame If intermediate_values is true all values are output. If intermediate_values is equal to false only turbulent_intesity and x, y, and z variables are output. x- position in the x direction y- position in the y direction waterdepth- position in the vertical direction turbulet_intesity- turbulent kinetic energy divided by the root mean squared velocity turkin1- turbulent kinetic energy ucx- velocity in the x direction ucy- velocity in the y direction ucz- velocity in the vertical direction """ if not isinstance(points, (str, pd.DataFrame, xr.Dataset)): raise TypeError("points must be a string, pd.DataFrame, xr.Dataset") if isinstance(points, str): if not (points == "cells" or points == "faces"): raise ValueError("points must be cells or faces") if not isinstance(time_index, int): raise TypeError("time_index must be an int") if not isinstance(to_pandas, bool): raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}") if isinstance(points, xr.Dataset): points = points.to_pandas() max_time_index = data["time"].shape[0] - 1 # to account for zero index if abs(time_index) > max_time_index: raise ValueError( f"time_index must be less than the absolute value of the max time index {max_time_index}" ) if not isinstance(data, netCDF4._netCDF4.Dataset): raise TypeError("data must be netCDF4 object") for variable in ["turkin1", "ucx", "ucy", "ucz"]: if variable not in data.variables.keys(): raise ValueError(f"Variable {variable} not present in Data") TI_vars = ["turkin1", "ucx", "ucy", "ucz"] TI_data_raw = {} for var in TI_vars: var_data_df = get_all_data_points(data, var, time_index) TI_data_raw[var] = var_data_df if type(points) == pd.DataFrame: print("points provided") elif points == "faces": points = TI_data_raw["turkin1"].drop(["waterlevel", "turkin1"], axis=1) elif points == "cells": points = TI_data_raw["ucx"].drop(["waterlevel", "ucx"], axis=1) TI_data = points.copy(deep=True) for var in TI_vars: TI_data[var] = interp.griddata( TI_data_raw[var][["x", "y", "waterdepth"]], TI_data_raw[var][var], points[["x", "y", "waterdepth"]], ) idx = np.where(np.isnan(TI_data[var])) if len(idx[0]): for i in idx[0]: TI_data[var][i] = interp.griddata( TI_data_raw[var][["x", "y", "waterdepth"]], TI_data_raw[var][var], [points["x"][i], points["y"][i], points["waterdepth"][i]], method="nearest", ) u_mag = unorm( np.array(TI_data["ucx"]), np.array(TI_data["ucy"]), np.array(TI_data["ucz"]) ) neg_index = np.where(TI_data["turkin1"] < 0) zero_bool = np.isclose( TI_data["turkin1"][TI_data["turkin1"] < 0].array, np.zeros(len(TI_data["turkin1"][TI_data["turkin1"] < 0].array)), atol=1.0e-4, ) zero_ind = neg_index[0][zero_bool] non_zero_ind = neg_index[0][~zero_bool] TI_data.loc[zero_ind, "turkin1"] = np.zeros(len(zero_ind)) TI_data.loc[non_zero_ind, "turkin1"] = [np.nan] * len(non_zero_ind) TI_data["turbulent_intensity"] = ( np.sqrt(2 / 3 * TI_data["turkin1"]) / u_mag * 100 ) # % if intermediate_values == False: TI_data = TI_data.drop(TI_vars, axis=1) if not to_pandas: TI_data = TI_data.to_dataset() return TI_data