Source code for mhkit.dolfyn.adp.clean

"""Module containing functions to clean data
"""

import numpy as np
import xarray as xr
from scipy.signal import medfilt
from ..tools.misc import medfiltnan
from ..rotate.api import rotate2
from ..rotate.base import _make_model, quaternion2orient


[docs] def set_range_offset(ds, h_deploy): """ Adds an instrument's height above seafloor (for an up-facing instrument) or depth below water surface (for a down-facing instrument) to the range coordinate. Also adds an attribute to the Dataset with the current "h_deploy" distance. Parameters ---------- ds : xarray.Dataset The adcp dataset to ajust 'range' on h_deploy : numeric Deployment location in the water column, in [m] Returns ------- None, operates "in place" Notes ----- `Center of bin 1 = h_deploy + blank_dist + cell_size` Nortek doesn't take `h_deploy` into account, so the range that DOLfYN calculates distance is from the ADCP transducers. TRDI asks for `h_deploy` input in their deployment software and is thereby known by DOLfYN. If the ADCP is mounted on a tripod on the seafloor, `h_deploy` will be the height of the tripod +/- any extra distance to the transducer faces. If the instrument is vessel-mounted, `h_deploy` is the distance between the surface and downward-facing ADCP's transducers. """ r = [s for s in ds.dims if "range" in s] for val in r: ds[val] = ds[val].values + h_deploy ds[val].attrs["units"] = "m" if hasattr(ds, "h_deploy"): ds.attrs["h_deploy"] += h_deploy else: ds.attrs["h_deploy"] = h_deploy
[docs] def find_surface(ds, thresh=10, nfilt=None): """ Find the surface (water level or seafloor) from amplitude data and adds the variable "depth" to the input Dataset. Parameters ---------- ds : xarray.Dataset The full adcp dataset thresh : int Specifies the threshold used in detecting the surface. Default = 10 (The amount that amplitude must increase by near the surface for it to be considered a surface hit) nfilt : int Specifies the width of the median filter applied, must be odd. Default is None Returns ------- None, operates "in place" """ # This finds the maximum of the echo profile: inds = np.argmax(ds.amp.values, axis=1) # This finds the first point that increases (away from the profiler) in # the echo profile edf = np.diff(ds.amp.values.astype(np.int16), axis=1) inds2 = ( np.max( (edf < 0) * np.arange(ds.vel.shape[1] - 1, dtype=np.uint8)[None, :, None], axis=1, ) + 1 ) # Calculate the depth of these quantities d1 = ds.range.values[inds] d2 = ds.range.values[inds2] # Combine them: D = np.vstack((d1, d2)) # Take the median value as the estimate of the surface: d = np.median(D, axis=0) # Throw out values that do not increase near the surface by *thresh* for ip in range(ds.vel.shape[1]): itmp = np.min(inds[:, ip]) if (edf[itmp:, :, ip] < thresh).all(): d[ip] = np.NaN if nfilt: dfilt = medfiltnan(d, nfilt, thresh=4) dfilt[dfilt == 0] = np.NaN d = dfilt ds["depth"] = xr.DataArray( d.astype("float32"), dims=["time"], attrs={ "units": "m", "long_name": "Depth", "standard_name": "depth", "positive": "down", }, )
[docs] def find_surface_from_P(ds, salinity=35): """ Calculates the distance to the water surface. Temperature and salinity are used to calculate seawater density, which is in turn used with the pressure data to calculate depth. Parameters ---------- ds : xarray.Dataset The full adcp dataset salinity: numeric Water salinity in psu. Default = 35 Returns ------- None, operates "in place" and adds the variables "water_density" and "depth" to the input dataset. Notes ----- Requires that the instrument's pressure sensor was calibrated/zeroed before deployment to remove atmospheric pressure. Calculates seawater density using a linear approximation of the sea water equation of state: .. math:: \\rho - \\rho_0 = -\\alpha (T-T_0) + \\beta (S-S_0) + \\kappa P Where :math:`\\rho` is water density, :math:`T` is water temperature, :math:`P` is water pressure, :math:`S` is practical salinity, :math:`\\alpha` is the thermal expansion coefficient, :math:`\\beta` is the haline contraction coefficient, and :math:`\\kappa` is adiabatic compressibility. """ # Density calcation P = ds.pressure.values T = ds.temp.values # temperature, degC S = salinity # practical salinity rho0 = 1027 # kg/m^3 T0 = 10 # degC S0 = 35 # psu assumed equivalent to ppt a = 0.15 # thermal expansion coefficient, kg/m^3/degC b = 0.78 # haline contraction coefficient, kg/m^3/ppt k = 4.5e-3 # adiabatic compressibility, kg/m^3/dbar rho = rho0 - a * (T - T0) + b * (S - S0) + k * P # Depth = pressure (conversion from dbar to MPa) / water weight d = (ds.pressure * 10000) / (9.81 * rho) if hasattr(ds, "h_deploy"): d += ds.h_deploy description = "Depth to Seafloor" else: description = "Depth to Instrument" ds["water_density"] = xr.DataArray( rho.astype("float32"), dims=["time"], attrs={ "units": "kg m-3", "long_name": "Water Density", "standard_name": "sea_water_density", "description": "Water density from linear approximation of sea water equation of state", }, ) ds["depth"] = xr.DataArray( d.astype("float32"), dims=["time"], attrs={ "units": "m", "long_name": description, "standard_name": "depth", "positive": "down", }, )
[docs] def nan_beyond_surface(ds, val=np.nan, beam_angle=None, inplace=False): """ Mask the values of 3D data (vel, amp, corr, echo) that are beyond the surface. Parameters ---------- ds : xarray.Dataset The adcp dataset to clean val : nan or numeric Specifies the value to set the bad values to. Default is `numpy.nan` beam_angle : int ADCP beam inclination angle. Default = dataset.attrs['beam_angle'] inplace : bool When True the existing data object is modified. When False a copy is returned. Default = False Returns ------- ds : xarray.Dataset Sets the adcp dataset where relevant arrays with values greater than `depth` set to NaN Notes ----- Surface interference expected to happen at `distance > range * cos(beam angle) - cell size` """ if not inplace: ds = ds.copy(deep=True) # Get all variables with 'range' coordinate var = [h for h in ds.keys() if any(s for s in ds[h].dims if "range" in s)] if beam_angle is None: if hasattr(ds, "beam_angle"): beam_angle = ds.beam_angle * (np.pi / 180) else: raise Exception( "'beam_angle` not found in dataset attributes. " "Please supply the ADCP's beam angle." ) # Surface interference distance calculated from distance of transducers to surface if hasattr(ds, "h_deploy"): range_limit = ( (ds.depth - ds.h_deploy) * np.cos(beam_angle) - ds.cell_size ) + ds.h_deploy else: range_limit = ds.depth * np.cos(beam_angle) - ds.cell_size bds = ds.range > range_limit # Echosounder data needs only be trimmed at water surface if "echo" in var: bds_echo = ds.range_echo > ds.depth ds["echo"].values[..., bds_echo] = val var.remove("echo") # Correct rest of "range" data for surface interference for nm in var: a = ds[nm].values try: # float dtype a[..., bds] = val except: # int dtype a[..., bds] = 0 ds[nm].values = a if not inplace: return ds
[docs] def correlation_filter(ds, thresh=50, inplace=False): """ Filters out data where correlation is below a threshold in the along-beam correlation data. Parameters ---------- ds : xarray.Dataset The adcp dataset to clean. thresh : numeric The maximum value of correlation to screen, in counts or %. Default = 50 inplace : bool When True the existing data object is modified. When False a copy is returned. Default = False Returns ------- ds : xarray.Dataset Elements in velocity, correlation, and amplitude are removed if below the correlation threshold Notes ----- Does not edit correlation or amplitude data. """ if not inplace: ds = ds.copy(deep=True) # 4 or 5 beam if hasattr(ds, "vel_b5"): tag = ["", "_b5"] else: tag = [""] # copy original ref frame coord_sys_orig = ds.coord_sys # correlation is always in beam coordinates rotate2(ds, "beam", inplace=True) # correlation is always in beam coordinates for tg in tag: mask = ds["corr" + tg].values <= thresh for var in ["vel", "corr", "amp"]: try: ds[var + tg].values[mask] = np.nan except: ds[var + tg].values[mask] = 0 ds[var + tg].attrs["Comments"] = ( "Filtered of data with a correlation value below " + str(thresh) + ds.corr.units ) rotate2(ds, coord_sys_orig, inplace=True) if not inplace: return ds
[docs] def medfilt_orient(ds, nfilt=7): """ Median filters the orientation data (heading-pitch-roll or quaternions) Parameters ---------- ds : xarray.Dataset The adcp dataset to clean nfilt : numeric The length of the median-filtering kernel. Must be odd. Default = 7 Return ------ ds : xarray.Dataset The adcp dataset with the filtered orientation data See Also -------- scipy.signal.medfilt() """ ds = ds.copy(deep=True) if getattr(ds, "has_imu"): q_filt = np.zeros(ds.quaternions.shape) for i in range(ds.quaternions.q.size): q_filt[i] = medfilt(ds.quaternions[i].values, nfilt) ds.quaternions.values = q_filt ds["orientmat"] = quaternion2orient(ds.quaternions) return ds else: # non Nortek AHRS-equipped instruments do_these = ["pitch", "roll", "heading"] for nm in do_these: ds[nm].values = medfilt(ds[nm].values, nfilt) return ds.drop_vars("orientmat")
[docs] def val_exceeds_thresh(var, thresh=5, val=np.nan): """ Find values of a variable that exceed a threshold value, and assign "val" to the velocity data where the threshold is exceeded. Parameters ---------- var : xarray.DataArray Variable to clean thresh : numeric The maximum value of velocity to screen. Default = 5 val : nan or numeric Specifies the value to set the bad values to. Default is `numpy.nan` Returns ------- ds : xarray.Dataset The adcp dataset with datapoints beyond thresh are set to `val` """ var = var.copy(deep=True) bd = np.zeros(var.shape, dtype="bool") bd |= np.abs(var.values) > thresh var.values[bd] = val return var
[docs] def fillgaps_time(var, method="cubic", maxgap=None): """ Fill gaps (nan values) in var across time using the specified method Parameters ---------- var : xarray.DataArray The variable to clean method : string Interpolation method to use. Default is 'cubic' maxgap : numeric Maximum gap of missing data to interpolate across. Default is None Returns ------- out : xarray.DataArray The input DataArray 'var' with gaps in 'var' interpolated across time See Also -------- xarray.DataArray.interpolate_na() """ time_dim = [t for t in var.dims if "time" in t][0] return var.interpolate_na( dim=time_dim, method=method, use_coordinate=True, limit=maxgap )
[docs] def fillgaps_depth(var, method="cubic", maxgap=None): """ Fill gaps (nan values) in var along the depth profile using the specified method Parameters ---------- var : xarray.DataArray The variable to clean method : string Interpolation method to use. Default is 'cubic' maxgap : numeric Maximum gap of missing data to interpolate across. Default is None Returns ------- out : xarray.DataArray The input DataArray 'var' with gaps in 'var' interpolated across depth See Also -------- xarray.DataArray.interpolate_na() """ range_dim = [t for t in var.dims if "range" in t][0] return var.interpolate_na( dim=range_dim, method=method, use_coordinate=False, limit=maxgap )