"""Module containing functions to clean data."""
import warnings
from typing import Optional
import numpy as np
import xarray as xr
from scipy.signal import medfilt
from ..tools.misc import medfiltnan
from ..rotate.api import rotate2
from ..rotate.base import quaternion2orient
def __check_for_range_offset(ds) -> float:
"""
Determines the range offset based on a variety of possible dataset attributes.
The function first checks if specific attributes are present in the dataset (`ds`)
and calculates the range offset accordingly. If the attribute `h_deploy`
is found, it is renamed to `range_offset` with a deprecation warning.
Parameters
----------
ds : xarray.Dataset
Dataset containing attributes used to calculate the range offset.
Returns
-------
float
The calculated or retrieved range offset. Returns 0 if no
relevant attributes are present.
Raises
------
DeprecationWarning
Warns that the attribute `h_deploy` is deprecated and has been
renamed to `range_offset`.
"""
if "bin1_dist_m" in ds.attrs:
return ds.attrs["bin1_dist_m"] - ds.attrs["blank_dist"] - ds.attrs["cell_size"]
elif "range_offset" in ds.attrs:
return ds.attrs["range_offset"]
elif "h_deploy" in ds.attrs:
warnings.warn(
"The attribute 'h_deploy' is no longer in use."
"It will now be renamed to 'range_offset'.",
DeprecationWarning,
)
ds.attrs["range_offset"] = ds.attrs.pop("h_deploy")
return ds.attrs["range_offset"]
else:
return 0.0
[docs]
def set_range_offset(ds, range_offset) -> None:
"""
Adds an instrument's height above seafloor (for an up-facing instrument)
or depth below water surface (for a down-facing instrument) to the range
coordinate. Also adds an attribute to the Dataset with the current
"range_offset" distance.
Parameters
----------
ds : xarray.Dataset
The ADCP dataset to adjust 'range' on
range_offset : numeric
Deployment location in the water column, in [m]
Returns
-------
ds : xarray.Dataset
The ADCP dataset with applied range_offset
Notes
-----
`Center of bin 1 = range_offset + blank_dist + cell_size`
Nortek doesn't take `range_offset` into account, so the range that DOLfYN
calculates distance is from the ADCP transducers. TRDI asks for `range_offset`
input in their deployment software and is thereby known by DOLfYN.
If the ADCP is mounted on a tripod on the seafloor, `range_offset` will be
the height of the tripod +/- any extra distance to the transducer faces.
If the instrument is vessel-mounted, `range_offset` is the distance between
the surface and downward-facing ADCP's transducers.
"""
current_offset = __check_for_range_offset(ds)
if current_offset:
warnings.warn(
"The 'range_offset' is either already known or can be calculated "
f"from 'bin1_dist_m': {current_offset} m. If you would like to "
f"override this value with {range_offset} m, ignore this warning. "
"If you do not want to override this value, you do not need to use "
"this function."
)
# Remove offset from depth variable if exists
if "depth" in ds.data_vars:
ds["depth"].values -= current_offset
# Add offset to each range coordinate
r = [s for s in ds.dims if "range" in s]
for coord in r:
coord_attrs = ds[coord].attrs
ds[coord] = ds[coord].values + range_offset
ds[coord].attrs = coord_attrs
# Add to depth variable if exists
if "depth" in ds.data_vars:
ds["depth"].values += range_offset
# Add to dataset
ds.attrs["range_offset"] = range_offset
[docs]
def find_surface(*args, **kwargs):
"""
Deprecated function. Use `water_depth_from_amplitude` instead.
"""
warnings.warn(
"The 'find_surface' function was renamed to 'water_depth_from_amplitude"
"and will be dropped in a future release.",
DeprecationWarning,
)
return water_depth_from_amplitude(*args, **kwargs)
[docs]
def water_depth_from_amplitude(ds, thresh=10, nfilt=None) -> None:
"""
Find the surface (water level or seafloor) from amplitude data and
adds the variable "depth" to the input Dataset.
Depth is either water depth or the distance from the ADCP to
surface/seafloor, depending on if "range_offset" has been set.
Parameters
----------
ds : xarray.Dataset
The full adcp dataset
thresh : int
Specifies the threshold used in detecting the surface. Default = 10
(The amount that amplitude must increase by near the surface for it to
be considered a surface hit.)
nfilt : int
Specifies the width of the median filter applied, must be odd.
Default is None
Returns
-------
None, operates "in place" and adds the variable "depth" to the
input dataset
"""
if "depth" in ds.data_vars:
raise Exception(
"The variable 'depth' already exists. "
"Please manually remove 'depth' if it needs to be recalculated."
)
# This finds the maximum of the echo profile:
inds = np.argmax(ds["amp"].values, axis=1)
# This finds the first point that increases (away from the profiler) in
# the echo profile
edf = np.diff(ds["amp"].values.astype(np.int16), axis=1)
inds2 = (
np.max(
(edf < 0)
* np.arange(ds["vel"].shape[1] - 1, dtype=np.uint8)[None, :, None],
axis=1,
)
+ 1
)
# Calculate the depth of these quantities
d1 = ds["range"].values[inds]
d2 = ds["range"].values[inds2]
# Combine them:
D = np.vstack((d1, d2))
# Take the median value as the estimate of the surface:
d = np.median(D, axis=0)
# Throw out values that do not increase near the surface by *thresh*
for ip in range(ds["vel"].shape[1]):
itmp = np.min(inds[:, ip])
if (edf[itmp:, :, ip] < thresh).all():
d[ip] = np.nan
if nfilt:
dfilt = medfiltnan(d, nfilt, thresh=4)
dfilt[dfilt == 0] = np.nan
d = dfilt
range_offset = __check_for_range_offset(ds)
if range_offset:
d += range_offset
long_name = "Water Depth"
else:
long_name = "Instrument Depth"
ds["depth"] = xr.DataArray(
d.astype("float32"),
dims=["time"],
attrs={"units": "m", "long_name": long_name, "standard_name": "depth"},
)
[docs]
def find_surface_from_P(*args, **kwargs):
"""
Deprecated function. Use `water_depth_from_pressure` instead.
"""
warnings.warn(
"The 'find_surface_from_P' function was renamed to 'water_depth_from_pressure"
"and will be dropped in a future release.",
DeprecationWarning,
)
return water_depth_from_pressure(*args, **kwargs)
[docs]
def water_depth_from_pressure(ds, salinity=35) -> None:
"""
Calculates the distance to the water surface. Temperature and salinity
are used to calculate seawater density, which is in turn used with the
pressure data to calculate depth.
Depth is either water depth or the distance from the ADCP to
surface/seafloor, depending on if "range_offset" has been set.
Parameters
----------
ds : xarray.Dataset
The full adcp dataset
salinity: numeric
Water salinity in psu. Default = 35
Returns
-------
None, operates "in place" and adds the variables "water_density" and
"depth" to the input dataset.
Notes
-----
Requires that the instrument's pressure sensor was calibrated/zeroed
before deployment to remove atmospheric pressure.
Calculates seawater density using a linear approximation of the sea
water equation of state:
.. math:: \\rho - \\rho_0 = -\\alpha (T-T_0) + \\beta (S-S_0) + \\kappa P
Where :math:`\\rho` is water density, :math:`T` is water temperature,
:math:`P` is water pressure, :math:`S` is practical salinity,
:math:`\\alpha` is the thermal expansion coefficient, :math:`\\beta` is
the haline contraction coefficient, and :math:`\\kappa` is adiabatic
compressibility.
"""
if "depth" in ds.data_vars:
raise Exception(
"The variable 'depth' already exists. "
"Please manually remove 'depth' if it needs to be recalculated."
)
if "pressure" not in ds.data_vars:
raise NameError("The variable 'pressure' does not exist.")
elif not ds["pressure"].sum():
raise ValueError("Pressure data not recorded.")
if "temp" not in ds.data_vars:
raise NameError("The variable 'temp' does not exist.")
# Density calcation
P = ds["pressure"].values
T = ds["temp"].values # temperature, degC
S = salinity # practical salinity
rho0 = 1027 # kg/m^3
T0 = 10 # degC
S0 = 35 # psu assumed equivalent to ppt
a = 0.15 # thermal expansion coefficient, kg/m^3/degC
b = 0.78 # haline contraction coefficient, kg/m^3/ppt
k = 4.5e-3 # adiabatic compressibility, kg/m^3/dbar
rho = rho0 - a * (T - T0) + b * (S - S0) + k * P
# Depth = pressure (conversion from dbar to MPa) / water weight
d = (P * 10000) / (9.81 * rho)
# Apply range_offset if available
range_offset = __check_for_range_offset(ds)
if range_offset:
d += range_offset
long_name = "Water Depth"
else:
long_name = "Instrument Depth"
ds["water_density"] = xr.DataArray(
rho.astype("float32"),
dims=["time"],
attrs={
"units": "kg m-3",
"long_name": "Water Density",
"standard_name": "sea_water_density",
"description": "Water density from linear approximation of sea water equation of state",
},
)
ds["depth"] = xr.DataArray(
d.astype("float32"),
dims=["time"],
attrs={"units": "m", "long_name": long_name, "standard_name": "depth"},
)
[docs]
def nan_beyond_surface(*args, **kwargs):
"""
Deprecated function. Use `remove_surface_interference` instead.
"""
warnings.warn(
"The 'nan_beyond_surface' function was renamed to 'remove_surface_interference"
"and will be dropped in a future release.",
DeprecationWarning,
)
return remove_surface_interference(*args, **kwargs)
[docs]
def remove_surface_interference(
ds, val=np.nan, beam_angle=None, inplace=False
) -> Optional[xr.Dataset]:
"""
Mask the values of 3D data (vel, amp, corr, echo) that are beyond the surface.
Parameters
----------
ds : xarray.Dataset
The adcp dataset to clean
val : nan or numeric
Specifies the value to set the bad values to. Default is `numpy.nan`
beam_angle : int
ADCP beam inclination angle in degrees. Default = dataset.attrs['beam_angle']
inplace : bool
When True the existing data object is modified. When False
a copy is returned. Default = False
Returns
-------
ds : xarray.Dataset
Sets the adcp dataset where relevant arrays with values greater than `depth`
set to NaN
Notes
-----
Surface interference expected to happen at
`distance > range * cos(beam angle) - cell size`
"""
if "depth" not in ds.data_vars:
raise KeyError(
"Depth variable 'depth' does not exist in input dataset."
"Please calculate 'depth' using the function 'water_depth_from_pressure'"
"or 'water_depth_from_amplitude."
)
if beam_angle is None:
if hasattr(ds, "beam_angle"):
beam_angle = np.deg2rad(ds.attrs["beam_angle"])
else:
raise Exception(
"'beam_angle` not found in dataset attributes. "
"Please supply the ADCP's beam angle."
)
else:
beam_angle = np.deg2rad(beam_angle)
if not inplace:
ds = ds.copy(deep=True)
# Get all variables with 'range' coordinate
profile_vars = [h for h in ds.keys() if any(s for s in ds[h].dims if "range" in s)]
# Surface interference distance
# Apply range_offset if available
range_offset = __check_for_range_offset(ds)
if range_offset:
range_limit = (
(ds["depth"] - range_offset) * np.cos(beam_angle) - ds.attrs["cell_size"]
) + range_offset
else:
range_limit = ds["depth"] * np.cos(beam_angle) - ds.attrs["cell_size"]
bds = ds["range"] > range_limit
# Echosounder data needs only be trimmed at water surface
if "echo" in profile_vars:
mask_echo = ds["range_echo"] > ds["depth"]
ds["echo"].values[..., mask_echo] = val
profile_vars.remove("echo")
# Correct rest of "range" data for surface interference
for var in profile_vars:
a = ds[var].values
try: # float dtype
a[..., bds] = val
except: # int dtype
a[..., bds] = 0
ds[var].values = a
if not inplace:
return ds
[docs]
def correlation_filter(ds, thresh=50, inplace=False) -> Optional[xr.Dataset]:
"""
Filters out data where correlation is below a threshold in the
along-beam correlation data.
Parameters
----------
ds : xarray.Dataset
The adcp dataset to clean.
thresh : numeric
The maximum value of correlation to screen, in counts or %.
Default = 50
inplace : bool
When True the existing data object is modified. When False
a copy is returned. Default = False
Returns
-------
ds : xarray.Dataset
Elements in velocity, correlation, and amplitude are removed if below the
correlation threshold
Notes
-----
Does not edit correlation or amplitude data.
"""
if not inplace:
ds = ds.copy(deep=True)
# 4 or 5 beam
if hasattr(ds, "vel_b5"):
tag = ["", "_b5"]
else:
tag = [""]
# copy original ref frame
coord_sys_orig = ds.coord_sys
# correlation is always in beam coordinates
rotate2(ds, "beam", inplace=True)
# correlation is always in beam coordinates
for tg in tag:
mask = ds["corr" + tg].values <= thresh
for var in ["vel", "corr", "amp"]:
try:
ds[var + tg].values[mask] = np.nan
except:
ds[var + tg].values[mask] = 0
ds[var + tg].attrs["Comments"] = (
"Filtered of data with a correlation value below "
+ str(thresh)
+ ds.corr.units
)
rotate2(ds, coord_sys_orig, inplace=True)
if not inplace:
return ds
[docs]
def medfilt_orient(ds, nfilt=7) -> xr.Dataset:
"""
Median filters the orientation data (heading-pitch-roll or quaternions)
Parameters
----------
ds : xarray.Dataset
The adcp dataset to clean
nfilt : numeric
The length of the median-filtering kernel. Must be odd.
Default = 7
Return
------
ds : xarray.Dataset
The adcp dataset with the filtered orientation data
See Also
--------
scipy.signal.medfilt()
"""
ds = ds.copy(deep=True)
if getattr(ds, "has_imu"):
q_filt = np.zeros(ds.quaternions.shape)
for i in range(ds.quaternions.q.size):
q_filt[i] = medfilt(ds.quaternions[i].values, nfilt)
ds.quaternions.values = q_filt
ds["orientmat"] = quaternion2orient(ds.quaternions)
return ds
else:
# non Nortek AHRS-equipped instruments
do_these = ["pitch", "roll", "heading"]
for nm in do_these:
ds[nm].values = medfilt(ds[nm].values, nfilt)
return ds.drop_vars("orientmat")
[docs]
def val_exceeds_thresh(var, thresh=5, val=np.nan) -> xr.DataArray:
"""
Find values of a variable that exceed a threshold value,
and assign "val" to the velocity data where the threshold is
exceeded.
Parameters
----------
var : xarray.DataArray
Variable to clean
thresh : numeric
The maximum value of velocity to screen. Default = 5
val : nan or numeric
Specifies the value to set the bad values to. Default is `numpy.nan`
Returns
-------
ds : xarray.DataArray
The adcp dataarray with datapoints beyond thresh are set to `val`
"""
var = var.copy(deep=True)
bd = np.zeros(var.shape, dtype="bool")
bd |= np.abs(var.values) > thresh
var.values[bd] = val
return var
[docs]
def fillgaps_time(var, method="cubic", maxgap=None) -> xr.DataArray:
"""
Fill gaps (nan values) in var across time using the specified method
Parameters
----------
var : xarray.DataArray
The variable to clean
method : string
Interpolation method to use. Default is 'cubic'
maxgap : numeric
Maximum gap of missing data to interpolate across. Default is None
Returns
-------
out : xarray.DataArray
The input DataArray 'var' with gaps in 'var' interpolated across time
See Also
--------
xarray.DataArray.interpolate_na()
"""
time_dim = [t for t in var.dims if "time" in t][0]
return var.interpolate_na(
dim=time_dim, method=method, use_coordinate=True, limit=maxgap
)
[docs]
def fillgaps_depth(var, method="cubic", maxgap=None) -> xr.DataArray:
"""
Fill gaps (nan values) in var along the depth profile using the specified method
Parameters
----------
var : xarray.DataArray
The variable to clean
method : string
Interpolation method to use. Default is 'cubic'
maxgap : numeric
Maximum gap of missing data to interpolate across. Default is None
Returns
-------
out : xarray.DataArray
The input DataArray 'var' with gaps in 'var' interpolated across depth
See Also
--------
xarray.DataArray.interpolate_na()
"""
range_dim = [t for t in var.dims if "range" in t][0]
return var.interpolate_na(
dim=range_dim, method=method, use_coordinate=False, limit=maxgap
)