import numpy as np
import xarray as xr
import warnings
import scipy.signal as ss
from scipy.integrate import cumulative_trapezoid
from ..rotate import vector as rot
from ..rotate.api import _make_model, rotate2
[docs]
class MissingDataError(ValueError):
pass
[docs]
class DataAlreadyProcessedError(Exception):
pass
[docs]
class MissingRequiredDataError(Exception):
pass
def _get_body2imu(make_model):
if make_model == "nortek vector":
# In inches it is: (0.25, 0.25, 5.9)
return np.array([0.00635, 0.00635, 0.14986])
else:
raise Exception("The imu->body vector is unknown for this instrument.")
[docs]
class CalcMotion:
"""
A 'calculator' for computing the velocity of points that are
rigidly connected to an ADV-body with an IMU.
Parameters
----------
ds : xarray.Dataset
The IMU-adv data that will be used to compute motion.
accel_filtfreq : float
The frequency at which to high-pass filter the acceleration
sigal to remove low-frequency drift. Default = 0.03 Hz
vel_filtfreq : float (optional)
a second frequency to high-pass filter the integrated
acceleration. Default = 1/3 of `accel_filtfreq`
"""
_default_accel_filtfreq = 0.03
def __init__(self, ds, accel_filtfreq=None, vel_filtfreq=None, to_earth=True):
self.ds = ds
self._check_filtfreqs(accel_filtfreq, vel_filtfreq)
self.to_earth = to_earth
self._set_accel()
self._set_acclow()
self.angrt = ds["angrt"].values # No copy because not modified.
def _check_filtfreqs(self, accel_filtfreq, vel_filtfreq):
datval = self.ds.attrs.get("motion accel_filtfreq Hz", None)
if datval is None:
if accel_filtfreq is None:
accel_filtfreq = self._default_accel_filtfreq
# else use the accel_filtfreq value
else:
if accel_filtfreq is None:
accel_filtfreq = datval
else:
if datval != accel_filtfreq:
warnings.warn(
f"The default accel_filtfreq is {datval} Hz. "
"Overriding this with the user-specified "
"value: {accel_filtfreq} Hz."
)
if vel_filtfreq is None:
vel_filtfreq = self.ds.attrs.get("motion vel_filtfreq Hz", None)
if vel_filtfreq is None:
vel_filtfreq = accel_filtfreq / 3.0
self.accel_filtfreq = accel_filtfreq
self.accelvel_filtfreq = vel_filtfreq
def _set_accel(
self,
):
ds = self.ds
if ds.coord_sys == "inst":
self.accel = np.einsum(
"ij...,i...->j...", ds["orientmat"].values, ds["accel"].values
)
elif self.ds.coord_sys == "earth":
self.accel = ds["accel"].values.copy()
else:
raise Exception(
(
"Invalid coordinate system '%s'. The coordinate "
"system must either be 'earth' or 'inst' to "
"perform motion correction."
)
% (self.ds.coord_sys)
)
def _check_duty_cycle(
self,
):
"""
Function to check if duty cycle exists and if it is followed
consistently in the datafile
"""
n_burst = self.ds.attrs.get("duty_cycle_n_burst")
if not n_burst:
return
# duty cycle interval in seconds
interval = self.ds.attrs.get("duty_cycle_interval")
actual_interval = (
self.ds.time[n_burst:].values - self.ds.time[:-n_burst].values
) / 1e9
rng = actual_interval.max() - actual_interval.min()
mean = actual_interval.mean()
# Range will vary depending on how datetime64 rounds the timestamp
# But isn't an issue if it does
if rng > 2 or (mean > interval + 1 and mean < interval - 1):
raise Exception("Bad duty cycle detected")
# If this passes, it means we're safe to blindly skip n_burst for every integral
return n_burst
[docs]
def reshape(self, dat, n_bin):
# Assumes shape is (3, time)
length = dat.shape[-1] // n_bin
return np.reshape(dat[..., : length * n_bin], (dat.shape[0], length, n_bin))
def _set_acclow(
self,
):
# Check if file is duty cycled
n = self._check_duty_cycle()
if n:
warnings.warn(
" Duty Cycle detected. "
"Motion corrected data may contain edge effects "
"at the beginning and end of each duty cycle."
)
self.accel = self.reshape(self.accel, n_bin=n)
self.acclow = acc = self.accel.copy()
if self.accel_filtfreq == 0:
acc[:] = acc.mean(-1)[..., None]
else:
flt = ss.butter(1, self.accel_filtfreq / (self.ds.fs / 2))
for idx in range(3):
acc[idx] = ss.filtfilt(flt[0], flt[1], acc[idx], axis=-1)
# Fill nan with zeros - happens for some filter frequencies
if np.isnan(acc).any():
warnings.warn(
"Error filtering acceleration data. "
"Please decrease `accel_filtfreq`."
)
acc = np.nan_to_num(acc)
[docs]
def calc_velacc(
self,
):
"""
Calculates the translational velocity from the high-pass
filtered acceleration signal.
Returns
-------
velacc : numpy.ndarray (3 x n_time)
The acceleration-induced velocity array (3, n_time).
"""
samp_freq = self.ds.fs
# Check if file is duty cycled
n = self._check_duty_cycle()
# accel & accel-low will already be reshaped if n isn't none
# Get high-pass accelerations
hp = self.accel - self.acclow
# Integrate in time to get velocities
dat = np.concatenate(
(
np.zeros(list(hp.shape[:-1]) + [1]),
cumulative_trapezoid(hp, dx=1 / samp_freq, axis=-1),
),
axis=-1,
)
if self.accelvel_filtfreq > 0:
filt_freq = self.accelvel_filtfreq
# 2nd order Butterworth filter
# Applied twice by 'filtfilt' = 4th order butterworth
filt = ss.butter(2, float(filt_freq) / (samp_freq / 2))
for idx in range(hp.shape[0]):
dat[idx] = dat[idx] - ss.filtfilt(filt[0], filt[1], dat[idx], axis=-1)
# Fill nan with zeros - happens for some filter frequencies
if np.isnan(dat).any():
warnings.warn(
"Error filtering acceleration data. "
"Please decrease `vel_filtfreq`. "
"(default is 1/3 `accel_filtfreq`)"
)
dat = np.nan_to_num(dat)
if n:
# remove reshape
velacc_shaped = np.empty(self.angrt.shape)
acclow_shaped = np.empty(self.angrt.shape)
accel_shaped = np.empty(self.angrt.shape)
for idx in range(hp.shape[0]):
velacc_shaped[idx] = np.ravel(dat[idx], "C")
acclow_shaped[idx] = np.ravel(self.acclow[idx], "C")
accel_shaped[idx] = np.ravel(self.accel[idx], "C")
# return acclow and velacc
self.acclow = acclow_shaped
self.accel = accel_shaped
return velacc_shaped
else:
return dat
[docs]
def calc_velrot(self, vec, to_earth=None):
"""
Calculate the induced velocity due to rotations of the
instrument about the IMU center.
Parameters
----------
vec : numpy.ndarray (len(3) or 3 x M)
The vector in meters (or vectors) from the body-origin
(center of head end-cap) to the point of interest (in the
body coord-sys).
Returns
-------
velrot : numpy.ndarray (3 x M x N_time)
The rotation-induced velocity array (3, n_time).
"""
if to_earth is None:
to_earth = self.to_earth
dimflag = False
if vec.ndim == 1:
vec = vec[:3].copy().reshape((3, 1))
dimflag = True
# Correct for the body->imu distance.
# The nortek_body2imu vector is subtracted because of
# vector addition:
# body2head = body2imu + imu2head
# Thus:
# imu2head = body2head - body2imu
vec = vec - _get_body2imu(_make_model(self.ds))[:, None]
# This motion of the point *vec* due to rotations should be the
# cross-product of omega (rotation vector) and the vector.
# u=dz*omegaY-dy*omegaZ,v=dx*omegaZ-dz*omegaX,w=dy*omegaX-dx*omegaY
# where vec=[dx,dy,dz], and angrt=[omegaX,omegaY,omegaZ]
velrot = np.array(
[
(vec[2][:, None] * self.angrt[1] - vec[1][:, None] * self.angrt[2]),
(vec[0][:, None] * self.angrt[2] - vec[2][:, None] * self.angrt[0]),
(vec[1][:, None] * self.angrt[0] - vec[0][:, None] * self.angrt[1]),
]
)
if to_earth:
velrot = np.einsum("ji...,j...->i...", self.ds["orientmat"].values, velrot)
if dimflag:
return velrot[:, 0, :]
return velrot
def _calc_probe_pos(ds, separate_probes=False):
"""
Calculates the position of probe (or "head") of an ADV.
Paratmeters
-----------
ds : xarray.Dataset
ADV dataset
separate_probes : bool
If a Nortek Vector ADV, this function returns the
transformation matrix of positions of the probe's
acoustic recievers to the ADV's instrument frame of
reference. Optional, default = False
Returns
-------
vec : 3x3 numpy.ndarray
Transformation matrix to convert from ADV probe to
instrument frame of reference
"""
vec = ds.inst2head_vec
if type(vec) != np.ndarray:
vec = np.array(vec)
# According to the ADV technical drawing, the probe-length radius
# is 8.6 cm @ 120 deg from probe-stem axis. If I subtract 1 cm
# to get the center of a acoustic receiver, this is 7.6 cm.
# In the coordinate system of the center of the probe (origin at
# the acoustic transmitter) then, the positions of the centers of
# the receivers is:
if separate_probes and _make_model(ds) == "nortek vector":
r = 0.076
# The angle between the x-y plane and the probes
phi = np.deg2rad(-30)
# The angles of the probes from the x-axis:
theta = np.deg2rad(np.array([0.0, 120.0, 240.0]))
return (
np.dot(
ds["inst2head_rotmat"].values.T,
np.array(
[r * np.cos(theta), r * np.sin(theta), r * np.tan(phi) * np.ones(3)]
),
)
+ vec[:, None]
)
else:
return vec
[docs]
def correct_motion(
ds, accel_filtfreq=None, vel_filtfreq=None, to_earth=True, separate_probes=False
):
"""
This function performs motion correction on an IMU-ADV data
object. The IMU and ADV data should be tightly synchronized and
contained in a single data object.
Parameters
----------
ds : xarray.Dataset
Cleaned ADV dataset in "inst" coordinates
accel_filtfreq : float
the frequency at which to high-pass filter the acceleration
sigal to remove low-frequency drift.
vel_filtfreq : float
a second frequency to high-pass filter the integrated
acceleration. Optional, default = 1/3 of `accel_filtfreq`
to_earth : bool
All variables in the ds.props['rotate_vars'] list will be
rotated into either the earth frame (to_earth=True) or the
instrument frame (to_earth=False). Optional, default = True
separate_probes : bool
a flag to perform motion-correction at the probe tips, and
perform motion correction in beam-coordinates, then transform
back into XYZ/earth coordinates. This correction seems to be
lower than the noise levels of the ADV, so the default is to not
use it (False).
Returns
-------
This function returns None, it operates on the input data object,
``ds``. The following attributes are added to `ds`:
``velraw`` is the uncorrected velocity
``velrot`` is the rotational component of the head motion (from
angrt)
``velacc`` is the translational component of the head motion (from
accel, the high-pass filtered accel sigal)
``acclow`` is the low-pass filtered accel sigal (i.e.,
The primary velocity vector attribute, ``vel``, is motion corrected
such that:
vel = velraw + velrot + velacc
The sigs are correct in this equation. The measured velocity
induced by head-motion is *in the opposite direction* of the head
motion. i.e. when the head moves one way in stationary flow, it
measures a velocity in the opposite direction. Therefore, to
remove the motion from the raw sigal we *add* the head motion.
Notes
-----
Acceleration signals from inertial sensors are notorious for
having a small bias that can drift slowly in time. When
integrating these sigals to estimate velocity the bias is
amplified and leads to large errors in the estimated
velocity. There are two methods for removing these errors,
1) high-pass filter the acceleration sigal prior and/or after
integrating. This implicitly assumes that the low-frequency
translational velocity is zero.
2) provide a slowly-varying reference position (often from a GPS)
to an IMU that can use the sigal (usually using Kalman
filters) to debias the acceleration sigal.
Because method (1) removes `real` low-frequency acceleration,
method (2) is more accurate. However, providing reference position
estimates to undersea instruments is practically challenging and
expensive. Therefore, lacking the ability to use method (2), this
function utilizes method (1).
For deployments in which the ADV is mounted on a mooring, or other
semi-fixed structure, the assumption of zero low-frequency
translational velocity is a reasonable one. However, for
deployments on ships, gliders, or other moving objects it is
not. The measured velocity, after motion-correction, will still
hold some of this contamination and will be a sum of the ADV
motion and the measured velocity on long time scales. If
low-frequency motion is known separate from the ADV (e.g. from a
bottom-tracking ADP, or from a ship's GPS), it may be possible to
remove that sigal from the ADV sigal in post-processing.
"""
# Ensure acting on new dataset
ds = ds.copy(deep=True)
# Check that no nan's exist
if ds["accel"].isnull().sum():
raise MissingDataError("There should be no missing data in `accel` variable")
if ds["angrt"].isnull().sum():
raise MissingDataError("There should be no missing data in `angrt` variable")
if hasattr(ds, "velrot") or ds.attrs.get("motion corrected", False):
raise DataAlreadyProcessedError(
"The data appears to already have been " "motion corrected."
)
if not hasattr(ds, "has_imu") or ("accel" not in ds):
raise MissingRequiredDataError("The instrument does not appear to have an IMU.")
if ds.coord_sys != "inst":
rotate2(ds, "inst", inplace=True)
# Returns True/False if head2inst_rotmat has been set/not-set.
# Bad configs raises errors (this is to check for those)
rot._check_inst2head_rotmat(ds)
# Create the motion 'calculator':
calcobj = CalcMotion(
ds, accel_filtfreq=accel_filtfreq, vel_filtfreq=vel_filtfreq, to_earth=to_earth
)
##########
# Calculate the translational velocity (from the accel):
ds["velacc"] = xr.DataArray(
calcobj.calc_velacc(),
dims=["dirIMU", "time"],
attrs={"units": "m s-1", "long_name": "Velocity from IMU Accelerometer"},
).astype("float32")
# Copy acclow to the adv-object.
ds["acclow"] = xr.DataArray(
calcobj.acclow,
dims=["dirIMU", "time"],
attrs={"units": "m s-2", "long_name": "Low-Frequency Acceleration from IMU"},
).astype("float32")
##########
# Calculate rotational velocity (from angrt):
pos = _calc_probe_pos(ds, separate_probes)
# Calculate the velocity of the head (or probes).
velrot = calcobj.calc_velrot(pos, to_earth=False)
if separate_probes:
# The head->beam transformation matrix
transMat = ds.get("beam2inst_orientmat", None)
# The inst->head transformation matrix
rmat = ds["inst2head_rotmat"]
# 1) Rotate body-coordinate velocities to head-coord.
velrot = np.dot(rmat, velrot)
# 2) Rotate body-coord to beam-coord (einsum),
# 3) Take along beam-component (diagonal),
# 4) Rotate back to head-coord (einsum),
velrot = np.einsum(
"ij,kj->ik",
transMat,
np.diagonal(np.einsum("ij,j...->i...", np.linalg.inv(transMat), velrot)),
)
# 5) Rotate back to body-coord.
velrot = np.dot(rmat.T, velrot)
ds["velrot"] = xr.DataArray(
velrot,
dims=["dirIMU", "time"],
attrs={"units": "m s-1", "long_name": "Velocity from IMU Gyroscope"},
).astype("float32")
##########
# Rotate the data into the correct coordinate system.
# inst2earth expects a 'rotate_vars' property.
# Add velrot, velacc, acclow, to it.
if "rotate_vars" not in ds.attrs:
ds.attrs["rotate_vars"] = [
"vel",
"velrot",
"velacc",
"accel",
"acclow",
"angrt",
"mag",
]
else:
ds.attrs["rotate_vars"].extend(["velrot", "velacc", "acclow"])
# NOTE: accel, acclow, and velacc are in the earth-frame after
# calc_velacc() call.
inst2earth = rot._inst2earth
if to_earth:
# accel was converted to earth coordinates
ds["accel"].values = calcobj.accel
to_remove = ["accel", "acclow", "velacc"]
ds = inst2earth(
ds, rotate_vars=[e for e in ds.attrs["rotate_vars"] if e not in to_remove]
)
else:
# rotate these variables back to the instrument frame.
ds = inst2earth(ds, reverse=True, rotate_vars=["acclow", "velacc"], force=True)
##########
# Copy vel -> velraw prior to motion correction:
ds["vel_raw"] = ds.vel.copy(deep=True)
# Add it to rotate_vars:
ds.attrs["rotate_vars"].append("vel_raw")
##########
# Remove motion from measured velocity
# NOTE: The plus sign is because the measured-induced velocities
# are in the opposite direction of the head motion.
# i.e. when the head moves one way in stationary flow, it
# measures a velocity in the opposite direction.
# use xarray to keep dimensions consistent
velmot = ds["velrot"] + ds["velacc"]
ds["vel"].values += velmot.values
ds.attrs["motion corrected"] = 1
ds.attrs["motion accel_filtfreq Hz"] = calcobj.accel_filtfreq
return ds