Source code for mhkit.river.io.d3d

from mhkit.utils import unorm
import scipy.interpolate as interp
import numpy as np
import pandas as pd
import netCDF4
import warnings


[docs]def get_all_time(data): ''' Returns all of the time stamps from a D3D simulation passed to the function as a NetCDF object (data) Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress generated by running a Delft3D model. Returns ------- seconds_run: array Returns an array of integers representing the number of seconds after the simulation started and that the data object contains a snapshot of simulation conditions at that time. ''' assert type(data)== netCDF4._netCDF4.Dataset, 'data must be NetCDF4 object' seconds_run = np.ma.getdata(data.variables['time'][:], False) return seconds_run
[docs]def index_to_seconds(data, time_index): ''' The function will return 'seconds_run' if passed a 'time_index' Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. time_index: int A positive integer to pull the time index from the dataset. 0 being closest to time 0. Default is last time index -1. Returns ------- seconds_run: int, float The 'seconds_run' is the seconds corresponding to the 'time_index' increments. ''' return _convert_time(data, time_index=time_index)
[docs]def seconds_to_index(data, seconds_run): ''' The function will return the nearest 'time_index' in the data if passed an integer number of 'seconds_run' Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. seconds_run: int, float A positive integer or float that represents the amount of time in seconds passed since starting the simulation. Returns ------- time_index: int The 'time_index' is a positive integer starting from 0 and incrementing until in simulation is complete. ''' return _convert_time(data, seconds_run=seconds_run)
def _convert_time(data, time_index=None, seconds_run=None): ''' Converts a time index to seconds or seconds to a time index. The user must specify 'time_index' or 'seconds_run' (Not both). The function will returns 'seconds_run' if passed a 'time_index' or will return the closest 'time_index' if passed a number of 'seconds_run'. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. time_index: int An integer to pull the time index from the dataset. 0 being closest to the start time. seconds_run: int, float An integer or float that represents the amount of time in seconds passed since starting the simulation. Returns ------- QoI: int, float The quantity of interest is the unknown value either the 'time_index' or the 'seconds_run'. The 'time_index' is an integer starting from 0 and incrementing until in simulation is complete. The 'seconds_run' is the seconds corresponding to the 'time_index' increments. ''' assert type(data)== netCDF4._netCDF4.Dataset, 'data must be NetCDF4 object' assert time_index or seconds_run, 'input of time_index or seconds_run needed' assert not(time_index and seconds_run), f'only one time_index or seconds_run' assert isinstance(time_index, (int, float)) or isinstance(seconds_run, (int, float)),'time_index or seconds_run input must be a int or float' times = get_all_time(data) if time_index: QoI= times[time_index] if seconds_run: try: idx=np.where(times == seconds_run) QoI=idx[0][0] except: idx = (np.abs(times - seconds_run)).argmin() QoI= idx warnings.warn( f'Warning: seconds_run not found. Closest time stamp' +'found {times[idx]}', stacklevel= 2) return QoI
[docs]def get_layer_data(data, variable, layer_index=-1, time_index=-1): ''' Get variable data from the NetCDF4 object at a specified layer and timestep. If the data is 2D the layer_index is ignored. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. variable: string Delft3D outputs many vairables. The full list can be found using "data.variables.keys()" in the console. layer_index: int An integer to pull out a layer from the dataset. 0 being closest to the surface. Default is the bottom layer, found with input -1. time_index: int An integer to pull the time index from the dataset. 0 being closest to the start time. Default is last time index, found with input -1. Returns ------- layer_data: DataFrame DataFrame with columns of "x", "y", "waterdepth", and "waterlevel" location of the specified layer, variable values "v", and the "time" the simulation has run. The waterdepth is measured from the water surface and the "waterlevel" is the water level diffrencein meters from the zero water level. ''' assert isinstance(time_index, int), 'time_index must be an int' assert isinstance(layer_index, int), 'layer_index must be an int' assert type(data)== netCDF4._netCDF4.Dataset, 'data must be NetCDF4 object' assert variable in data.variables.keys(), 'variable not recognized' coords = str(data.variables[variable].coordinates).split() var=data.variables[variable][:] max_time_index= data['time'].shape[0]-1 # to account for zero index assert abs(time_index) <= max_time_index, (f'time_index must be less than' +'the absolute value of the max time index {max_time_index}') x=np.ma.getdata(data.variables[coords[0]][:], False) y=np.ma.getdata(data.variables[coords[1]][:], False) if type(var[0][0]) == np.ma.core.MaskedArray: max_layer= len(var[0][0]) assert abs(layer_index) <= max_layer,( f'layer_index must be less than' +'the max layer {max_layer}') v= np.ma.getdata(var[time_index,:,layer_index], False) dimensions= 3 else: assert type(var[0][0])== np.float64, 'data not recognized' dimensions= 2 v= np.ma.getdata(var[time_index,:], False) #waterdepth if "mesh2d" in variable: cords_to_layers= {'mesh2d_face_x mesh2d_face_y': {'name':'mesh2d_nLayers', 'coords':data.variables['mesh2d_layer_sigma'][:]}, 'mesh2d_edge_x mesh2d_edge_y': {'name':'mesh2d_nInterfaces', 'coords':data.variables['mesh2d_interface_sigma'][:]}} bottom_depth=np.ma.getdata(data.variables['mesh2d_waterdepth'][time_index, :], False) waterlevel= np.ma.getdata(data.variables['mesh2d_s1'][time_index, :], False) coords = str(data.variables['waterdepth'].coordinates).split() else: cords_to_layers= {'FlowElem_xcc FlowElem_ycc':{'name':'laydim', 'coords':data.variables['LayCoord_cc'][:]}, 'FlowLink_xu FlowLink_yu': {'name':'wdim', 'coords':data.variables['LayCoord_w'][:]}} bottom_depth=np.ma.getdata(data.variables['waterdepth'][time_index, :], False) waterlevel= np.ma.getdata(data.variables['s1'][time_index, :], False) coords = str(data.variables['waterdepth'].coordinates).split() layer_dim = str(data.variables[variable].coordinates) cord_sys= cords_to_layers[layer_dim]['coords'] layer_percentages= np.ma.getdata(cord_sys, False) #accumulative if layer_dim == 'FlowLink_xu FlowLink_yu': #interpolate x_laydim=np.ma.getdata(data.variables[coords[0]][:], False) y_laydim=np.ma.getdata(data.variables[coords[1]][:], False) points_laydim = np.array([ [x, y] for x, y in zip(x_laydim, y_laydim)]) coords_request = str(data.variables[variable].coordinates).split() x_wdim=np.ma.getdata(data.variables[coords_request[0]][:], False) y_wdim=np.ma.getdata(data.variables[coords_request[1]][:], False) points_wdim=np.array([ [x, y] for x, y in zip(x_wdim, y_wdim)]) bottom_depth_wdim = interp.griddata(points_laydim, bottom_depth, points_wdim) water_level_wdim= interp.griddata(points_laydim, waterlevel, points_wdim) idx_bd= np.where(np.isnan(bottom_depth_wdim)) for i in idx_bd: bottom_depth_wdim[i]= interp.griddata(points_laydim, bottom_depth, points_wdim[i], method='nearest') water_level_wdim[i]= interp.griddata(points_laydim, waterlevel, points_wdim[i], method='nearest') waterdepth=[] if dimensions== 2: if layer_dim == 'FlowLink_xu FlowLink_yu': z = [bottom_depth_wdim] waterlevel=water_level_wdim else: z = [bottom_depth] else: if layer_dim == 'FlowLink_xu FlowLink_yu': z = [bottom_depth_wdim*layer_percentages[layer_index]] waterlevel=water_level_wdim else: z = [bottom_depth*layer_percentages[layer_index]] waterdepth=np.append(waterdepth, z) time= np.ma.getdata(data.variables['time'][time_index], False)*np.ones(len(x)) layer= np.array([ [x_i, y_i, d_i, w_i, v_i, t_i] for x_i, y_i, d_i, w_i, v_i, t_i in zip(x, y, waterdepth, waterlevel, v, time)]) layer_data = pd.DataFrame(layer, columns=['x', 'y', 'waterdepth','waterlevel', 'v', 'time']) return layer_data
[docs]def create_points(x, y, waterdepth): ''' Turns three coordinate inputs into a single output DataFrame of points. In any order the three inputs can consist of 3 points, 2 points and 1 array, or 1 point and 2 arrays. The final output DataFrame will be the unique combinations of the 3 inputs. Parameters ---------- x: float, array or int x values to create points. y: float, array or int y values to create points. waterdepth: float, array or int waterdepth values to create points. Returns ------- points: DateFrame DataFrame with columns x, y and waterdepth points. Example ------- If the inputs are 2 arrays: and [3,4,5] and 1 point [6], the output will contain 6 array combinations of the 3 inputs as shown. x=np.array([1,2]) y=np.array([3,4,5]) waterdepth= 6 d3d.create_points(x,y,waterdepth) x y waterdepth 0 1.0 3.0 6.0 1 2.0 3.0 6.0 2 1.0 4.0 6.0 3 2.0 4.0 6.0 4 1.0 5.0 6.0 5 2.0 5.0 6.0 ''' assert isinstance(x, (int, float, np.ndarray)), ('x must be a int, float' +' or array') assert isinstance(y, (int, float, np.ndarray)), ('y must be a int, float' +' or array') assert isinstance(waterdepth, (int, float, np.ndarray)), ('waterdepth must be a int, float' +' or array') directions = {0:{'name': 'x', 'values': x}, 1:{'name': 'y', 'values': y}, 2:{'name': 'waterdepth', 'values': waterdepth}} for i in directions: try: N=len(directions[i]['values']) except: directions[i]['values'] = np.array([directions[i]['values']]) N=len(directions[i]['values']) if N == 1 : directions[i]['type']= 'point' elif N > 1 : directions[i]['type']= 'array' else: raise Exception(f'length of direction {directions[i]["name"]} was' +'neagative or zero') # Check how many times point is in "types" types= [directions[i]['type'] for i in directions] N_points = types.count('point') if N_points >= 2: lens = np.array([len(directions[d]['values']) for d in directions]) max_len_idx = lens.argmax() not_max_idxs= [i for i in directions.keys()] del not_max_idxs[max_len_idx] for not_max in not_max_idxs: N= len(directions[max_len_idx]['values']) vals =np.ones(N)*directions[not_max]['values'] directions[not_max]['values'] = np.array(vals) x_new = directions[0]['values'] y_new = directions[1]['values'] depth_new = directions[2]['values'] request= np.array([ [x_i, y_i, depth_i] for x_i, y_i, depth_i in zip(x_new, y_new, depth_new)]) points= pd.DataFrame(request, columns=[ 'x', 'y', 'waterdepth']) elif N_points == 1: # treat as plane #find index of point idx_point = types.index('point') max_idxs= [i for i in directions.keys()] print(max_idxs) del max_idxs[idx_point] #find vectors XX, YY = np.meshgrid(directions[max_idxs[0]]['values'], directions[max_idxs[1]]['values'] ) N_X=np.shape(XX)[1] N_Y=np.shape(YY)[0] ZZ= np.ones((N_Y,N_X))*directions[idx_point]['values'] request= np.array([ [x_i, y_i, z_i] for x_i, y_i, z_i in zip(XX.ravel(), YY.ravel() , ZZ.ravel())]) columns=[ directions[max_idxs[0]]['name'], directions[max_idxs[1]]['name'], directions[idx_point]['name']] points= pd.DataFrame(request, columns=columns) else: raise Exception('Can provide at most two arrays') return points
[docs]def variable_interpolation(data, variables, points='cells', edges= 'none'): ''' Interpolate multiple variables from the Delft3D onto the same points. Parameters ---------- data: NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress generated by running a Delft3D model. variables: array of strings Name of variables to interpolate, e.g. 'turkin1', 'ucx', 'ucy' and 'ucz'. The full list can be found using "data.variables.keys()" in the console. points: string, DataFrame The points to interpolate data onto. 'cells'- interpolates all data onto the Delft3D cell coordinate system (Default) 'faces'- interpolates all dada onto the Delft3D face coordinate system DataFrame of x, y, and waterdepth coordinates - Interpolates data onto user povided points. Can be created with `create_points` function. edges: sting: 'nearest' If edges is set to 'nearest' the code will fill in nan values with nearest interpolation. Otherwise only linear interpolarion will be used. Returns ------- transformed_data: DataFrame Variables on specified grid points saved under the input variable names and the x, y, and waterdepth coordinates of those points. ''' assert isinstance(points, (str, pd.DataFrame)),('points must be a string ' +'or DataFrame') if isinstance ( points, str): assert any([points == 'cells', points=='faces']), ('points must be' +' cells or faces') assert type(data)== netCDF4._netCDF4.Dataset, 'data must be nerCDF4 object' data_raw = {} for var in variables: var_data_df = get_all_data_points(data, var,time_index=-1) var_data_df=var_data_df.loc[:,~var_data_df.T.duplicated(keep='first')] data_raw[var] = var_data_df if type(points) == pd.DataFrame: print('points provided') elif points=='faces': points = data_raw['ucx'][['x','y','waterdepth']] elif points=='cells': points = data_raw['turkin1'][['x','y','waterdepth']] transformed_data= points.copy(deep=True) for var in variables : transformed_data[var] = interp.griddata(data_raw[var][['x','y','waterdepth']], data_raw[var][var], points[['x','y','waterdepth']]) if edges == 'nearest' : idx= np.where(np.isnan(transformed_data[var])) if len(idx[0]): for i in idx[0]: transformed_data[var][i]= (interp .griddata(data_raw[var][['x','y','waterdepth']], data_raw[var][var], [points['x'][i],points['y'][i], points['waterdepth'][i]], method='nearest')) return transformed_data
[docs]def get_all_data_points(data, variable, time_index=-1): ''' Get data points for a passed variable for all layers at a specified time from the Delft3D NetCDF4 object by iterating over the `get_layer_data` function. Parameters ---------- data: Netcdf4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. variable: string Delft3D variable. The full list can be of variables can be found using "data.variables.keys()" in the console. time_index: int An integer to pull the time step from the dataset. Default is last time step, found with the input -1. Returns ------- all_data: DataFrame Dataframe with columns x, y, waterdepth, waterlevel, variable, and time. The waterdepth is measured from the water surface and the "waterlevel" is the water level diffrence in meters from the zero water level. ''' assert isinstance(time_index, int), 'time_index must be a int' assert type(data)== netCDF4._netCDF4.Dataset, 'data must be NetCDF4 object' assert variable in data.variables.keys(), 'variable not recognized' max_time_index = len(data.variables[variable][:]) assert abs(time_index) <= max_time_index, (f'time_index must be less than' +'the max time index {max_time_index}') if "mesh2d" in variable: cords_to_layers= {'mesh2d_face_x mesh2d_face_y': {'name':'mesh2d_nLayers', 'coords':data.variables['mesh2d_layer_sigma'][:]}, 'mesh2d_edge_x mesh2d_edge_y': {'name':'mesh2d_nInterfaces', 'coords':data.variables['mesh2d_interface_sigma'][:]}} else: cords_to_layers= {'FlowElem_xcc FlowElem_ycc':{'name':'laydim', 'coords':data.variables['LayCoord_cc'][:]}, 'FlowLink_xu FlowLink_yu': {'name':'wdim', 'coords':data.variables['LayCoord_w'][:]}} layer_dim = str(data.variables[variable].coordinates) try: cord_sys= cords_to_layers[layer_dim]['coords'] except: raise Exception('Coordinates not recognized.') else: layer_percentages= np.ma.getdata(cord_sys, False) x_all=[] y_all=[] depth_all=[] water_level_all=[] v_all=[] time_all=[] layers = range(len(layer_percentages)) for layer in layers: layer_data= get_layer_data(data, variable, layer, time_index) x_all=np.append(x_all, layer_data.x) y_all=np.append(y_all, layer_data.y) depth_all=np.append(depth_all, layer_data.waterdepth) water_level_all=np.append(water_level_all, layer_data.waterlevel) v_all=np.append(v_all, layer_data.v) time_all= np.append(time_all, layer_data.time) known_points = np.array([ [x, y, waterdepth, waterlevel, v, time] for x, y, waterdepth, waterlevel, v, time in zip(x_all, y_all, depth_all, water_level_all, v_all, time_all)]) all_data= pd.DataFrame(known_points, columns=['x','y','waterdepth', 'waterlevel' ,f'{variable}', 'time']) return all_data
[docs]def turbulent_intensity(data, points='cells', time_index= -1, intermediate_values = False ): ''' Calculate the turbulent intensity percentage for a given data set for the specified points. Assumes variable names: ucx, ucy, ucz and turkin1. Parameters ---------- data : NetCDF4 object A NetCDF4 object that contains spatial data, e.g. velocity or shear stress, generated by running a Delft3D model. points : string, DataFrame Points to interpolate data onto. 'cells': interpolates all data onto velocity coordinate system (Default). 'faces': interpolates all data onto the TKE coordinate system. DataFrame of x, y, and z coordinates: Interpolates data onto user provided points. time_index : int An integer to pull the time step from the dataset. Default is late time step -1. intermediate_values : boolean (optional) If false the function will return position and turbulent intensity values. If true the function will return position(x,y,z) and values need to calculate turbulent intensity (ucx, uxy, uxz and turkin1) in a Dataframe. Default False. Returns ------- TI_data : Dataframe If intermediate_values is true all values are output. If intermediate_values is equal to false only turbulent_intesity and x, y, and z variables are output. x- position in the x direction y- position in the y direction waterdepth- position in the vertical direction turbulet_intesity- turbulent kinetic energy divided by the root mean squared velocity turkin1- turbulent kinetic energy ucx- velocity in the x direction ucy- velocity in the y direction ucz- velocity in the vertical direction ''' assert isinstance(points, (str, pd.DataFrame)),('points must a string or' +' DataFrame') if isinstance ( points, str): assert any([points == 'cells', points=='faces']), ('points must be cells' +' or faces') assert isinstance(time_index, int), 'time_index must be a int' max_time_index= data['time'].shape[0]-1 # to account for zero index assert abs(time_index) <= max_time_index, (f'time_index must be less than' +'the absolute value of the max time index {max_time_index}') assert type(data)== netCDF4._netCDF4.Dataset, 'data must be nerCDF4 object' assert 'turkin1' in data.variables.keys(), ('Varaiable turkin1 not' +' present in Data') assert 'ucx' in data.variables.keys(),'Varaiable ucx 1 not present in Data' assert 'ucy' in data.variables.keys(),'Varaiable ucy 1 not present in Data' assert 'ucz' in data.variables.keys(),'Varaiable ucz 1 not present in Data' TI_vars= ['turkin1', 'ucx', 'ucy', 'ucz'] TI_data_raw = {} for var in TI_vars: var_data_df = get_all_data_points(data, var ,time_index) TI_data_raw[var] = var_data_df if type(points) == pd.DataFrame: print('points provided') elif points=='faces': points = TI_data_raw['turkin1'].drop(['waterlevel','turkin1'],axis=1) elif points=='cells': points = TI_data_raw['ucx'].drop(['waterlevel','ucx'],axis=1) TI_data = points.copy(deep=True) for var in TI_vars: TI_data[var] = interp.griddata(TI_data_raw[var][['x','y','waterdepth']], TI_data_raw[var][var], points[['x','y','waterdepth']]) idx= np.where(np.isnan(TI_data[var])) if len(idx[0]): for i in idx[0]: TI_data[var][i]= interp.griddata(TI_data_raw[var][['x','y','waterdepth']], TI_data_raw[var][var], [points['x'][i],points['y'][i], points['waterdepth'][i]], method='nearest') u_mag=unorm(np.array(TI_data['ucx']),np.array(TI_data['ucy']), np.array(TI_data['ucz'])) neg_index=np.where( TI_data['turkin1']<0) zero_bool= np.isclose( TI_data['turkin1'][ TI_data['turkin1']<0].array, np.zeros(len( TI_data['turkin1'][TI_data['turkin1']<0].array)), atol=1.0e-4) zero_ind= neg_index[0][zero_bool] non_zero_ind= neg_index[0][~zero_bool] TI_data.loc[zero_ind,'turkin1']=np.zeros(len(zero_ind)) TI_data.loc[non_zero_ind,'turkin1']=[np.nan]*len(non_zero_ind) TI_data['turbulent_intensity']= np.sqrt(2/3*TI_data['turkin1'])/u_mag * 100 #% if intermediate_values == False: TI_data= TI_data.drop(TI_vars, axis = 1) return TI_data