import warnings
import logging
import numpy as np
from struct import unpack
from pathlib import Path
from datetime import datetime
from . import nortek_defs
from .. import time
from .base import _find_userdata, _create_dataset, _handle_nan, _abspath
from ..tools import misc as tbx
from ..rotate.vector import _calc_omat
from ..rotate.base import _set_coords
from ..rotate import api as rot
[docs]def read_nortek(filename, userdata=True, debug=False, do_checksum=False,
nens=None, **kwargs):
"""
Read a classic Nortek (AWAC and Vector) datafile
Parameters
----------
filename : string
Filename of Nortek file to read.
userdata : bool, or string of userdata.json filename
Whether to read the '<base-filename>.userdata.json' file.
Default = True
debug : bool
Logs debugger ouput if true. Default = False
do_checksum : bool
Whether to perform the checksum of each data block. Default = False
nens : None, int or 2-element tuple (start, stop)
Number of pings or ensembles to read from the file.
Default is None, read entire file
Returns
-------
ds : xarray.Dataset
An xarray dataset from the binary instrument data
"""
# Start debugger logging
if debug:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
filepath = Path(filename)
logfile = filepath.with_suffix('.dolfyn.log')
logging.basicConfig(filename=str(logfile),
filemode='w',
level=logging.NOTSET,
format='%(name)s - %(levelname)s - %(message)s')
userdata = _find_userdata(filename, userdata)
with _NortekReader(filename, debug=debug, do_checksum=do_checksum,
nens=nens) as rdr:
rdr.readfile()
rdr.dat2sci()
dat = rdr.data
# Remove trailing nan's in time and orientation data
dat = _handle_nan(dat)
# Search for missing timestamps and interpolate them
coords = dat['coords']
t_list = [t for t in coords if 'time' in t]
for ky in t_list:
tdat = coords[ky]
tdat[tdat == 0] = np.NaN
if np.isnan(tdat).any():
tag = ky.lstrip('time')
warnings.warn("Zero/NaN values found in '{}'. Interpolating and "
"extrapolating them. To identify which values were filled later, "
"look for 0 values in 'status{}'".format(ky, tag))
tdat = time._fill_time_gaps(
tdat, sample_rate_hz=dat['attrs']['fs'])
coords[ky] = time.epoch2dt64(tdat).astype('datetime64[ns]')
# Apply rotation matrix and declination
rotmat = None
declin = None
for nm in userdata:
if 'rotmat' in nm:
rotmat = userdata[nm]
elif 'dec' in nm:
declin = userdata[nm]
else:
dat['attrs'][nm] = userdata[nm]
# Create xarray dataset from upper level dictionary
ds = _create_dataset(dat)
ds = _set_coords(ds, ref_frame=ds.coord_sys)
if 'orientmat' not in ds:
ds['orientmat'] = _calc_omat(ds['time'],
ds['heading'],
ds['pitch'],
ds['roll'],
ds.get('orientation_down', None))
if rotmat is not None:
rot.set_inst2head_rotmat(ds, rotmat, inplace=True)
if declin is not None:
rot.set_declination(ds, declin, inplace=True)
# Close handler
if debug:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
handler.close()
return ds
def _bcd2char(cBCD):
"""Taken from the Nortek System Integrator Manual
"Example Program" Chapter.
"""
cBCD = min(cBCD, 153)
c = (cBCD & 15)
c += 10 * (cBCD >> 4)
return c
def _bitshift8(val):
return val >> 8
def _int2binarray(val, n):
out = np.zeros(n, dtype='bool')
for idx, n in enumerate(range(n)):
out[idx] = val & (2 ** n)
return out
class _NortekReader():
"""
A class for reading reading nortek binary files.
This reader currently only supports AWAC and Vector data formats.
Parameters
----------
fname : string
Nortek filename to read.
endian : {'<','>'} (optional)
Specifies if the file is in 'little' or 'big' endian format. By
default the reader will attempt to determine this.
debug : {True, False*} (optional)
Print debug/progress information?
do_checksum : {True*, False} (optional)
Specifies whether to perform the checksum.
bufsize : int
The size of the read buffer to use. Default = 100000
nens : None, int or 2-element tuple (start, stop)
Number of pings or ensembles to read from the file.
Default is None, read entire file
"""
_lastread = [None, None, None, None, None]
fun_map = {'0x00': 'read_user_cfg',
'0x04': 'read_head_cfg',
'0x05': 'read_hw_cfg',
'0x07': 'read_vec_checkdata',
'0x10': 'read_vec_data',
'0x11': 'read_vec_sysdata',
'0x12': 'read_vec_hdr',
'0x71': 'read_microstrain',
'0x20': 'read_awac_profile',
}
def __init__(self, fname, endian=None, debug=False,
do_checksum=True, bufsize=100000, nens=None):
self.fname = fname
self._bufsize = bufsize
self.f = open(_abspath(fname), 'rb', 1000)
self.do_checksum = do_checksum
self.filesize # initialize the filesize.
self.debug = debug
self.c = 0
self._dtypes = []
self._n_start = 0
try:
len(nens)
except TypeError:
# not a tuple, so we assume None or int
self._npings = nens
else:
if len(nens) != 2:
raise TypeError('nens must be: None (), int, or len 2')
warnings.warn("A 'start ensemble' is not yet supported "
"for the Nortek reader. This function will read "
"the entire file, then crop the beginning at "
"nens[0].")
self._npings = nens[1]
self._n_start = nens[0]
if endian is None:
if unpack('<HH', self.read(4)) == (1445, 24):
endian = '<'
elif unpack('>HH', self.read(4)) == (1445, 24):
endian = '>'
else:
raise Exception("I/O error: could not determine the "
"'endianness' of the file. Are you sure this is a Nortek "
"file?")
self.endian = endian
self.f.seek(0, 0)
# This is the configuration data:
self.config = {}
err_msg = ("I/O error: The file does not "
"appear to be a Nortek data file.")
# Read the header:
if self.read_id() == 5:
self.read_hw_cfg()
else:
raise Exception()
if self.read_id() == 4:
self.read_head_cfg()
else:
raise Exception(err_msg)
if self.read_id() == 0:
self.read_user_cfg()
else:
raise Exception(err_msg)
if self.config['hdw']['serial_number'][0:3].upper() == 'WPR':
self.config['config_type'] = 'AWAC'
elif self.config['hdw']['serial_number'][0:3].upper() == 'VEC':
self.config['config_type'] = 'ADV'
# Initialize the instrument type:
self._inst = self.config.pop('config_type')
# This is the position after reading the 'hardware',
# 'head', and 'user' configuration.
pnow = self.pos
# Run the appropriate initialization routine (e.g. init_ADV).
getattr(self, 'init_' + self._inst)()
self.f.close() # This has a small buffer, so close it.
# This has a large buffer...
self.f = open(_abspath(fname), 'rb', bufsize)
self.close = self.f.close
if self._npings is not None:
self.n_samp_guess = self._npings
self.f.seek(pnow, 0) # Seek to the previous position.
da = self.data['attrs']
if self.config['n_burst'] > 0:
fs = round(self.config['fs'], 7)
da['duty_cycle_n_burst'] = self.config['n_burst']
da['duty_cycle_interval'] = self.config['burst_interval']
if fs > 1:
burst_seconds = self.config['n_burst']/fs
else:
burst_seconds = round(1/fs, 3)
da['duty_cycle_description'] = "{} second bursts collected at {} Hz, with bursts taken every {} minutes".format(
burst_seconds, fs, self.config['burst_interval']/60)
self.burst_start = np.zeros(self.n_samp_guess, dtype='bool')
da['fs'] = self.config['fs']
da['coord_sys'] = {'XYZ': 'inst',
'ENU': 'earth',
'beam': 'beam'}[self.config['coord_sys_axes']]
da['has_imu'] = 0 # Initiate attribute
if self.debug:
logging.info('Init completed')
@property
def filesize(self,):
if not hasattr(self, '_filesz'):
pos = self.pos
self.f.seek(0, 2)
# Seek to the end of the file to determine the filesize.
self._filesz = self.pos
self.f.seek(pos, 0) # Return to the initial position.
return self._filesz
@property
def pos(self,):
return self.f.tell()
def init_ADV(self,):
dat = self.data = {'data_vars': {}, 'coords': {}, 'attrs': {},
'units': {}, 'long_name': {}, 'standard_name': {},
'sys': {}}
da = dat['attrs']
dv = dat['data_vars']
da['inst_make'] = 'Nortek'
da['inst_model'] = 'Vector'
da['inst_type'] = 'ADV'
da['rotate_vars'] = ['vel']
dv['beam2inst_orientmat'] = self.config.pop('beam2inst_orientmat')
self.config['fs'] = 512 / self.config['awac']['avg_interval']
da.update(self.config['usr'])
da.update(self.config['adv'])
da.update(self.config['head'])
da.update(self.config['hdw'])
# No apparent way to determine how many samples are in a file
dlta = self.code_spacing('0x11')
self.n_samp_guess = int(self.filesize / dlta + 1)
self.n_samp_guess *= int(self.config['fs'])
def init_AWAC(self,):
dat = self.data = {'data_vars': {}, 'coords': {}, 'attrs': {},
'units': {}, 'long_name': {}, 'standard_name': {},
'sys': {}}
da = dat['attrs']
dv = dat['data_vars']
da['inst_make'] = 'Nortek'
da['inst_model'] = 'AWAC'
da['inst_type'] = 'ADCP'
dv['beam2inst_orientmat'] = self.config.pop('beam2inst_orientmat')
da['rotate_vars'] = ['vel']
self.config['fs'] = 1. / self.config['awac']['avg_interval']
da.update(self.config['usr'])
da.update(self.config['awac'])
da.update(self.config['head'])
da.update(self.config['hdw'])
space = self.code_spacing('0x20')
if space == 0:
# code spacing is zero if there's only 1 profile
self.n_samp_guess = 1
else:
self.n_samp_guess = int(self.filesize / space + 1)
def read(self, nbyte):
byts = self.f.read(nbyte)
if not (len(byts) == nbyte):
raise EOFError('Reached the end of the file')
return byts
def findnext(self, do_cs=True):
"""Find the next data block by checking the checksum and the
sync byte(0xa5)
"""
sum = np.uint16(int('0xb58c', 0)) # Initialize the sum
cs = 0
func = _bitshift8
func2 = np.uint8
if self.endian == '<':
func = np.uint8
func2 = _bitshift8
while True:
val = unpack(self.endian + 'H', self.read(2))[0]
if func(val) == 165 and (not do_cs or cs == np.uint16(sum)):
self.f.seek(-2, 1)
return hex(func2(val))
sum += cs
cs = val
def read_id(self,):
"""Read the next 'ID' from the file.
"""
self._thisid_bytes = bts = self.read(2)
tmp = unpack(self.endian + 'BB', bts)
if self.debug:
logging.info('Position: {}, codes: {}'.format(self.f.tell(), tmp))
if tmp[0] != 165: # This catches a corrupted data block.
if self.debug:
logging.warning("Corrupted data block sync code (%d, %d) found "
"in ping %d. Searching for next valid code..." %
(tmp[0], tmp[1], self.c))
val = int(self.findnext(do_cs=False), 0)
self.f.seek(2, 1)
if self.debug:
logging.debug(
' ...FOUND {} at position: {}.'.format(val, self.pos))
return val
return tmp[1]
def readnext(self,):
id = '0x%02x' % self.read_id()
if id in self.fun_map:
func_name = self.fun_map[id]
out = getattr(self, func_name)() # Should return None
self._lastread = [func_name[5:]] + self._lastread[:-1]
return out
else:
logging.warning('Unrecognized identifier: ' + id)
self.f.seek(-2, 1)
return 10
def readfile(self, nlines=None):
print('Reading file %s ...' % self.fname)
retval = None
try:
while not retval:
if self.c == nlines:
break
retval = self.readnext()
if retval == 10:
self.findnext()
retval = None
if self._npings is not None and self.c >= self._npings:
if 'microstrain' in self._dtypes:
try:
self.readnext()
except:
pass
break
except EOFError:
if self.debug:
logging.info(' end of file at {} bytes.'.format(self.pos))
else:
if self.debug:
logging.info(' stopped at {} bytes.'.format(self.pos))
self.c -= 1
_crop_data(self.data, slice(0, self.c), self.n_samp_guess)
def findnextid(self, id):
if id.__class__ is str:
id = int(id, 0)
nowid = None
while nowid != id:
nowid = self.read_id()
if nowid == 16:
shift = 22
else:
sz = 2 * unpack(self.endian + 'H', self.read(2))[0]
shift = sz - 4
self.f.seek(shift, 1)
return self.pos
def code_spacing(self, searchcode, iternum=50):
"""
Find the spacing, in bytes, between a specific hardware code.
Repeat this * iternum * times(default 50).
Returns the average spacing, in bytes, between the code.
"""
p0 = self.findnextid(searchcode)
for i in range(iternum):
try:
self.findnextid(searchcode)
except EOFError:
break
if self.debug:
logging.info('p0={}, pos={}, i={}'.format(p0, self.pos, i))
# Compute the average of the data size:
return (self.pos - p0) / (i + 1)
def checksum(self, byts):
"""Perform a checksum on `byts` and read the checksum value.
"""
if self.do_checksum:
if not np.sum(unpack(self.endian + str(int(1 + len(byts) / 2)) + 'H',
self._thisid_bytes + byts)) + \
46476 - unpack(self.endian + 'H', self.read(2)):
raise Exception("CheckSum Failed at {}".format(self.pos))
else:
self.f.seek(2, 1)
def read_user_cfg(self,):
# ID: '0x00 = 00
if self.debug:
logging.info('Reading user configuration (0x00) ping #{} @ {}...'
.format(self.c, self.pos))
cfg_u = self.config
byts = self.read(508)
# the first two bytes are the size.
tmp = unpack(self.endian +
'2x18H6s4HI9H90H80s48xH50x6H4xH2x2H2xH30x8H',
byts)
cfg_u['usr'] = {}
cfg_u['adv'] = {}
cfg_u['awac'] = {}
cfg_u['transmit_pulse_length_m'] = tmp[0] # counts
cfg_u['blank_dist'] = tmp[1] # overridden below
cfg_u['receive_length_m'] = tmp[2] # counts
cfg_u['time_between_pings'] = tmp[3] # counts
cfg_u['time_between_bursts'] = tmp[4] # counts
cfg_u['adv']['n_pings_per_burst'] = tmp[5]
cfg_u['awac']['avg_interval'] = tmp[6]
cfg_u['usr']['n_beams'] = tmp[7]
TimCtrlReg = _int2binarray(tmp[8], 16).astype(int)
# From the nortek system integrator manual
# (note: bit numbering is zero-based)
cfg_u['usr']['profile_mode'] = [
'single', 'continuous'][TimCtrlReg[1]]
cfg_u['usr']['burst_mode'] = str(bool(~TimCtrlReg[2]))
cfg_u['usr']['power_level'] = TimCtrlReg[5] + 2 * TimCtrlReg[6] + 1
cfg_u['usr']['sync_out_pos'] = ['middle', 'end', ][TimCtrlReg[7]]
cfg_u['usr']['sample_on_sync'] = str(bool(TimCtrlReg[8]))
cfg_u['usr']['start_on_sync'] = str(bool(TimCtrlReg[9]))
cfg_u['PwrCtrlReg'] = _int2binarray(tmp[9], 16)
cfg_u['A1'] = tmp[10]
cfg_u['B0'] = tmp[11]
cfg_u['B1'] = tmp[12]
cfg_u['usr']['compass_update_rate'] = tmp[13]
cfg_u['coord_sys_axes'] = ['ENU', 'XYZ', 'beam'][tmp[14]]
cfg_u['usr']['n_bins'] = tmp[15]
cfg_u['bin_length'] = tmp[16]
cfg_u['burst_interval'] = tmp[17]
cfg_u['usr']['deployment_name'] = tmp[18].partition(b'\x00')[
0].decode('utf-8')
cfg_u['usr']['wrap_mode'] = str(bool(tmp[19]))
cfg_u['deployment_time'] = np.array(tmp[20:23])
cfg_u['diagnotics_interval'] = tmp[23]
Mode0 = _int2binarray(tmp[24], 16)
cfg_u['user_soundspeed_adj_factor'] = tmp[25]
cfg_u['n_samples_diag'] = tmp[26]
cfg_u['n_beams_cells_diag'] = tmp[27]
cfg_u['n_pings_diag_wave'] = tmp[28]
ModeTest = _int2binarray(tmp[29], 16)
cfg_u['usr']['analog_in'] = tmp[30]
sfw_ver = str(tmp[31])
cfg_u['usr']['software_version'] = sfw_ver[0] + \
'.'+sfw_ver[1:3]+'.'+sfw_ver[3:]
cfg_u['usr']['salinity'] = tmp[32]/10
cfg_u['VelAdjTable'] = np.array(tmp[33:123])
cfg_u['usr']['comments'] = tmp[123].partition(b'\x00')[
0].decode('utf-8')
cfg_u['awac']['wave_processing_method'] = [
'PUV', 'SUV', 'MLM', 'MLMST', 'None'][tmp[124]]
Mode1 = _int2binarray(tmp[125], 16)
cfg_u['awac']['prc_dyn_wave_cell_pos'] = int(tmp[126]/32767 * 100)
cfg_u['wave_transmit_pulse'] = tmp[127]
cfg_u['wave_blank_dist'] = tmp[128]
cfg_u['awac']['wave_cell_size'] = tmp[129]
cfg_u['awac']['n_samples_wave'] = tmp[130]
cfg_u['n_burst'] = tmp[131]
cfg_u['analog_out_scale'] = tmp[132]
cfg_u['corr_thresh'] = tmp[133]
cfg_u['transmit_pulse_lag2'] = tmp[134] # counts
cfg_u['QualConst'] = np.array(tmp[135:143])
self.checksum(byts)
cfg_u['usr']['user_specified_sound_speed'] = str(Mode0[0])
cfg_u['awac']['wave_mode'] = ['Disabled', 'Enabled'][int(Mode0[1])]
cfg_u['usr']['analog_output'] = str(Mode0[2])
cfg_u['usr']['output_format'] = ['Vector', 'ADV'][int(Mode0[3])] # noqa
cfg_u['vel_scale_mm'] = [1, 0.1][int(Mode0[4])]
cfg_u['usr']['serial_output'] = str(Mode0[5])
cfg_u['reserved_EasyQ'] = str(Mode0[6])
cfg_u['usr']['power_output_analog'] = str(Mode0[8])
cfg_u['mode_test_use_DSP'] = str(ModeTest[0])
cfg_u['mode_test_filter_output'] = ['total', 'correction_only'][int(ModeTest[1])] # noqa
cfg_u['awac']['wave_fs'] = ['1 Hz', '2 Hz'][int(Mode1[0])]
cfg_u['awac']['wave_cell_position'] = ['fixed', 'dynamic'][int(Mode1[1])] # noqa
cfg_u['awac']['type_wave_cell_pos'] = ['pct_of_mean_pressure', 'pct_of_min_re'][int(Mode1[2])] # noqa
def read_head_cfg(self,):
# ID: '0x04 = 04
if self.debug:
logging.info('Reading head configuration (0x04) ping #{} @ {}...'
.format(self.c, self.pos))
cfg = self.config
cfg['head'] = {}
byts = self.read(220)
tmp = unpack(self.endian + '2x3H12s176s22sH', byts)
head_config = _int2binarray(tmp[0], 16).astype(int)
cfg['head']['pressure_sensor'] = ['no', 'yes'][head_config[0]]
cfg['head']['compass'] = ['no', 'yes'][head_config[1]]
cfg['head']['tilt_sensor'] = ['no', 'yes'][head_config[2]]
cfg['head']['carrier_freq_kHz'] = tmp[1]
cfg['beam2inst_orientmat'] = np.array(
unpack(self.endian + '9h', tmp[4][8:26])).reshape(3, 3) / 4096.
self.checksum(byts)
def read_hw_cfg(self,):
# ID 0x05 = 05
if self.debug:
logging.info('Reading hardware configuration (0x05) ping #{} @ {}...'
.format(self.c, self.pos))
cfg_hw = self.config
cfg_hw['hdw'] = {}
byts = self.read(44)
tmp = unpack(self.endian + '2x14s6H12x4s', byts)
cfg_hw['hdw']['serial_number'] = tmp[0][:8].decode('utf-8')
cfg_hw['ProLogID'] = unpack('B', tmp[0][8:9])[0]
cfg_hw['hdw']['ProLogFWver'] = tmp[0][10:].decode('utf-8')
cfg_hw['board_config'] = tmp[1]
cfg_hw['board_freq'] = tmp[2]
cfg_hw['hdw']['PIC_version'] = tmp[3]
cfg_hw['hdw']['hardware_rev'] = tmp[4]
cfg_hw['hdw']['recorder_size_bytes'] = tmp[5] * 65536
status = _int2binarray(tmp[6], 16).astype(int)
cfg_hw['hdw']['vel_range'] = ['normal', 'high'][status[0]]
cfg_hw['hdw']['firmware_version'] = tmp[7].decode('utf-8')
self.checksum(byts)
def rd_time(self, strng):
"""Read the time from the first 6bytes of the input string.
"""
min, sec, day, hour, year, month = unpack('BBBBBB', strng[:6])
return time.date2epoch(datetime(time._fullyear(_bcd2char(year)),
_bcd2char(month),
_bcd2char(day),
_bcd2char(hour),
_bcd2char(min),
_bcd2char(sec)))[0]
def _init_data(self, vardict):
"""Initialize the data object according to vardict.
Parameters
----------
vardict : (dict of :class:`<VarAttrs>`)
The variable definitions in the :class:`<VarAttrs>` specify
how to initialize each data variable.
"""
shape_args = {'n': self.n_samp_guess}
try:
shape_args['nbins'] = self.config['usr']['n_bins']
except KeyError:
pass
for nm, va in list(vardict.items()):
if va.group is None:
# These have to stay separated.
if nm not in self.data:
self.data[nm] = va._empty_array(**shape_args)
else:
if nm not in self.data[va.group]:
self.data[va.group][nm] = va._empty_array(**shape_args)
self.data['units'][nm] = va.units
self.data['long_name'][nm] = va.long_name
if va.standard_name:
self.data['standard_name'][nm] = va.standard_name
def read_vec_data(self,):
# ID: 0x10 = 16
c = self.c
dat = self.data
if self.debug:
logging.info('Reading vector velocity data (0x10) ping #{} @ {}...'
.format(self.c, self.pos))
if 'vel' not in dat['data_vars']:
self._init_data(nortek_defs.vec_data)
self._dtypes += ['vec_data']
byts = self.read(20)
ds = dat['sys']
dv = dat['data_vars']
(ds['AnaIn2LSB'][c],
ds['Count'][c],
dv['PressureMSB'][c],
ds['AnaIn2MSB'][c],
dv['PressureLSW'][c],
ds['AnaIn1'][c],
dv['vel'][0, c],
dv['vel'][1, c],
dv['vel'][2, c],
dv['amp'][0, c],
dv['amp'][1, c],
dv['amp'][2, c],
dv['corr'][0, c],
dv['corr'][1, c],
dv['corr'][2, c]) = unpack(self.endian + '4B2H3h6B', byts)
self.checksum(byts)
self.c += 1
def read_vec_checkdata(self,):
# ID: 0x07 = 07
if self.debug:
logging.info('Reading vector check data (0x07) ping #{} @ {}...'
.format(self.c, self.pos))
byts0 = self.read(6)
checknow = {}
tmp = unpack(self.endian + '2x2H', byts0) # The first two are size.
checknow['Samples'] = tmp[0]
n = checknow['Samples']
checknow['First_samp'] = tmp[1]
checknow['Amp1'] = tbx._nans(n, dtype=np.uint8) + 8
checknow['Amp2'] = tbx._nans(n, dtype=np.uint8) + 8
checknow['Amp3'] = tbx._nans(n, dtype=np.uint8) + 8
byts1 = self.read(3 * n)
tmp = unpack(self.endian + (3 * n * 'B'), byts1)
for idx, nm in enumerate(['Amp1', 'Amp2', 'Amp3']):
checknow[nm] = np.array(tmp[idx * n:(idx + 1) * n], dtype=np.uint8)
self.checksum(byts0 + byts1)
if 'checkdata' not in self.config:
self.config['checkdata'] = checknow
else:
if not isinstance(self.config['checkdata'], list):
self.config['checkdata'] = [self.config['checkdata']]
self.config['checkdata'] += [checknow]
def _sci_data(self, vardict):
"""
Convert the data to scientific units accordint to vardict.
Parameters
----------
vardict : (dict of :class:`<VarAttrs>`)
The variable definitions in the :class:`<VarAttrs>` specify
how to scale each data variable.
"""
for nm, vd in list(vardict.items()):
if vd.group is None:
dat = self.data
else:
dat = self.data[vd.group]
retval = vd.sci_func(dat[nm])
# This checks whether a new data object was created:
# sci_func returns None if it modifies the existing data.
if retval is not None:
dat[nm] = retval
def sci_vec_data(self,):
self._sci_data(nortek_defs.vec_data)
dat = self.data
dat['data_vars']['pressure'] = (
dat['data_vars']['PressureMSB'].astype('float32') * 65536 +
dat['data_vars']['PressureLSW'].astype('float32')) / 1000.
dat['units']['pressure'] = 'dbar'
dat['long_name']['pressure'] = 'Pressure'
dat['standard_name']['pressure'] = 'sea_water_pressure'
dat['data_vars'].pop('PressureMSB')
dat['data_vars'].pop('PressureLSW')
# Apply velocity scaling (1 or 0.1)
dat['data_vars']['vel'] *= self.config['vel_scale_mm']
def read_vec_hdr(self,):
# ID: '0x12 = 18
if self.debug:
logging.info('Reading vector header data (0x12) ping #{} @ {}...'
.format(self.c, self.pos))
byts = self.read(38)
# The first two are size, the next 6 are time.
tmp = unpack(self.endian + '8xH7B21x', byts)
hdrnow = {}
hdrnow['time'] = self.rd_time(byts[2:8])
hdrnow['NRecords'] = tmp[0]
hdrnow['Noise1'] = tmp[1]
hdrnow['Noise2'] = tmp[2]
hdrnow['Noise3'] = tmp[3]
hdrnow['Spare0'] = byts[13:14].decode('utf-8')
hdrnow['Corr1'] = tmp[5]
hdrnow['Corr2'] = tmp[6]
hdrnow['Corr3'] = tmp[7]
hdrnow['Spare1'] = byts[17:].decode('utf-8')
self.checksum(byts)
if 'data_header' not in self.config:
self.config['data_header'] = hdrnow
else:
if not isinstance(self.config['data_header'], list):
self.config['data_header'] = [self.config['data_header']]
self.config['data_header'] += [hdrnow]
def read_vec_sysdata(self,):
# ID: 0x11 = 17
c = self.c
if self.debug:
logging.info('Reading vector system data (0x11) ping #{} @ {}...'
.format(self.c, self.pos))
dat = self.data
if self._lastread[:2] == ['vec_checkdata', 'vec_hdr', ]:
self.burst_start[c] = True
if 'time' not in dat['coords']:
self._init_data(nortek_defs.vec_sysdata)
self._dtypes += ['vec_sysdata']
byts = self.read(24)
# The first two are size (skip them).
dat['coords']['time'][c] = self.rd_time(byts[2:8])
ds = dat['sys']
dv = dat['data_vars']
(dv['batt'][c],
dv['c_sound'][c],
dv['heading'][c],
dv['pitch'][c],
dv['roll'][c],
dv['temp'][c],
dv['error'][c],
dv['status'][c],
ds['AnaIn'][c]) = unpack(self.endian + '2H3hH2BH', byts[8:])
self.checksum(byts)
def sci_vec_sysdata(self,):
"""Translate the data in the vec_sysdata structure into
scientific units.
"""
dat = self.data
fs = dat['attrs']['fs']
self._sci_data(nortek_defs.vec_sysdata)
t = dat['coords']['time']
dv = dat['data_vars']
dat['sys']['_sysi'] = ~np.isnan(t)
# These are the indices in the sysdata variables
# that are not interpolated.
nburst = self.config['n_burst']
dv['orientation_down'] = tbx._nans(len(t), dtype='bool')
if nburst == 0:
num_bursts = 1
nburst = len(t)
else:
num_bursts = int(len(t) // nburst + 1)
for nb in range(num_bursts):
iburst = slice(nb * nburst, (nb + 1) * nburst)
sysi = dat['sys']['_sysi'][iburst]
if len(sysi) == 0:
break
# Skip the first entry for the interpolation process
inds = np.nonzero(sysi)[0][1:]
arng = np.arange(len(t[iburst]), dtype=np.float64)
if len(inds) >= 2:
p = np.poly1d(np.polyfit(inds, t[iburst][inds], 1))
t[iburst] = p(arng)
elif len(inds) == 1:
t[iburst] = ((arng - inds[0]) / (fs * 3600 * 24) +
t[iburst][inds[0]])
else:
t[iburst] = (t[iburst][0] + arng / (fs * 24 * 3600))
tmpd = tbx._nans_like(dv['heading'][iburst])
# The first status bit should be the orientation.
tmpd[sysi] = dv['status'][iburst][sysi] & 1
tbx.fillgaps(tmpd, extrapFlg=True)
tmpd = np.nan_to_num(tmpd, nan=0) # nans in pitch roll heading
slope = np.diff(tmpd)
tmpd[1:][slope < 0] = 1
tmpd[:-1][slope > 0] = 0
dv['orientation_down'][iburst] = tmpd.astype('bool')
tbx.interpgaps(dv['batt'], t)
tbx.interpgaps(dv['c_sound'], t)
tbx.interpgaps(dv['heading'], t)
tbx.interpgaps(dv['pitch'], t)
tbx.interpgaps(dv['roll'], t)
tbx.interpgaps(dv['temp'], t)
def read_microstrain(self,):
"""Read ADV microstrain sensor (IMU) data
"""
def update_defs(dat, mag=False, orientmat=False):
imu_data = {'accel': ['m s-2', 'Acceleration'],
'angrt': ['rad s-1', 'Angular Velocity'],
'mag': ['gauss', 'Compass'],
'orientmat': ['1', 'Orientation Matrix']}
for ky in imu_data:
dat['units'].update({ky: imu_data[ky][0]})
dat['long_name'].update({ky: imu_data[ky][1]})
if not mag:
dat['units'].pop('mag')
dat['long_name'].pop('mag')
if not orientmat:
dat['units'].pop('orientmat')
dat['long_name'].pop('orientmat')
# 0x71 = 113
if self.c == 0:
logging.warning('First "microstrain data" block '
'is before first "vector system data" block.')
else:
self.c -= 1
if self.debug:
logging.info('Reading vector microstrain data (0x71) ping #{} @ {}...'
.format(self.c, self.pos))
byts0 = self.read(4)
# The first 2 are the size, 3rd is count, 4th is the id.
ahrsid = unpack(self.endian + '3xB', byts0)[0]
if hasattr(self, '_ahrsid') and self._ahrsid != ahrsid:
logging.warning('AHRS_ID changes mid-file!')
if ahrsid in [195, 204, 210, 211]:
self._ahrsid = ahrsid
c = self.c
dat = self.data
dv = dat['data_vars']
da = dat['attrs']
da['has_imu'] = 1 # logical
if 'accel' not in dv:
self._dtypes += ['microstrain']
if ahrsid == 195:
self._orient_dnames = ['accel', 'angrt', 'orientmat']
dv['accel'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
dv['angrt'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
dv['orientmat'] = tbx._nans((3, 3, self.n_samp_guess),
dtype=np.float32)
rv = ['accel', 'angrt']
if not all(x in da['rotate_vars'] for x in rv):
da['rotate_vars'].extend(rv)
update_defs(dat, mag=False, orientmat=True)
if ahrsid in [204, 210]:
self._orient_dnames = ['accel', 'angrt', 'mag', 'orientmat']
dv['accel'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
dv['angrt'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
dv['mag'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
rv = ['accel', 'angrt', 'mag']
if not all(x in da['rotate_vars'] for x in rv):
da['rotate_vars'].extend(rv)
if ahrsid == 204:
dv['orientmat'] = tbx._nans((3, 3, self.n_samp_guess),
dtype=np.float32)
update_defs(dat, mag=True, orientmat=True)
if ahrsid == 211:
self._orient_dnames = ['angrt', 'accel', 'mag']
dv['angrt'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
dv['accel'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
dv['mag'] = tbx._nans((3, self.n_samp_guess),
dtype=np.float32)
rv = ['angrt', 'accel', 'mag']
if not all(x in da['rotate_vars'] for x in rv):
da['rotate_vars'].extend(rv)
update_defs(dat, mag=True, orientmat=False)
byts = ''
if ahrsid == 195: # 0xc3
byts = self.read(64)
dt = unpack(self.endian + '6f9f4x', byts)
(dv['angrt'][:, c],
dv['accel'][:, c]) = (dt[0:3], dt[3:6],)
dv['orientmat'][:, :, c] = ((dt[6:9], dt[9:12], dt[12:15]))
elif ahrsid == 204: # 0xcc
byts = self.read(78)
# This skips the "DWORD" (4 bytes) and the AHRS checksum
# (2 bytes)
dt = unpack(self.endian + '18f6x', byts)
(dv['accel'][:, c],
dv['angrt'][:, c],
dv['mag'][:, c]) = (dt[0:3], dt[3:6], dt[6:9],)
dv['orientmat'][:, :, c] = ((dt[9:12], dt[12:15], dt[15:18]))
elif ahrsid == 211:
byts = self.read(42)
dt = unpack(self.endian + '9f6x', byts)
(dv['angrt'][:, c],
dv['accel'][:, c],
dv['mag'][:, c]) = (dt[0:3], dt[3:6], dt[6:9],)
else:
logging.warning('Unrecognized IMU identifier: ' + str(ahrsid))
self.f.seek(-2, 1)
return 10
self.checksum(byts0 + byts)
self.c += 1 # reset the increment
def sci_microstrain(self,):
"""Rotate orientation data into ADV coordinate system.
"""
# MS = MicroStrain
dv = self.data['data_vars']
for nm in self._orient_dnames:
# Rotate the MS orientation data (in MS coordinate system)
# to be consistent with the ADV coordinate system.
# (x,y,-z)_ms = (z,y,x)_adv
(dv[nm][2],
dv[nm][0]) = (dv[nm][0],
-dv[nm][2].copy())
if 'orientmat' in self._orient_dnames:
# MS coordinate system is in North-East-Down (NED),
# we want East-North-Up (ENU)
dv['orientmat'][:, 2] *= -1
(dv['orientmat'][:, 0],
dv['orientmat'][:, 1]) = (dv['orientmat'][:, 1],
dv['orientmat'][:, 0].copy())
if 'accel' in dv:
# This value comes from the MS 3DM-GX3 MIP manual
dv['accel'] *= 9.80665
if self._ahrsid in [195, 211]:
# These are DAng and DVel, so we convert them to angrt, accel here
dv['angrt'] *= self.config['fs']
dv['accel'] *= self.config['fs']
def read_awac_profile(self,):
# ID: '0x20' = 32
dat = self.data
if self.debug:
logging.info('Reading AWAC velocity data (0x20) ping #{} @ {}...'
.format(self.c, self.pos))
nbins = self.config['usr']['n_bins']
if 'temp' not in dat['data_vars']:
self._init_data(nortek_defs.awac_profile)
self._dtypes += ['awac_profile']
# Note: docs state there is 'fill' byte at the end, if nbins is odd,
# but doesn't appear to be the case
n = self.config['usr']['n_beams']
byts = self.read(116 + n*3 * nbins)
c = self.c
dat['coords']['time'][c] = self.rd_time(byts[2:8])
ds = dat['sys']
dv = dat['data_vars']
(dv['error'][c],
ds['AnaIn1'][c],
dv['batt'][c],
dv['c_sound'][c],
dv['heading'][c],
dv['pitch'][c],
dv['roll'][c],
p_msb,
dv['status'][c],
p_lsw,
dv['temp'][c],) = unpack(self.endian + '7HBB2H', byts[8:28])
dv['pressure'][c] = (65536 * p_msb + p_lsw)
# The nortek system integrator manual specifies an 88byte 'spare'
# field, therefore we start at 116.
tmp = unpack(self.endian + str(n * nbins) + 'h' +
str(n * nbins) + 'B', byts[116:116 + n*3 * nbins])
for idx in range(n):
dv['vel'][idx, :, c] = tmp[idx * nbins: (idx + 1) * nbins]
dv['amp'][idx, :, c] = tmp[(idx + n) * nbins: (idx + n+1) * nbins]
self.checksum(byts)
self.c += 1
def sci_awac_profile(self,):
self._sci_data(nortek_defs.awac_profile)
# Calculate the ranges.
cs_coefs = {2000: 0.0239,
1000: 0.0478,
600: 0.0797,
400: 0.1195}
h_ang = 25 * (np.pi / 180) # Head angle is 25 degrees for all awacs.
# Cell size
cs = round(float(self.config['bin_length']) / 256. *
cs_coefs[self.config['head']['carrier_freq_kHz']] * np.cos(h_ang), ndigits=2)
# Blanking distance
bd = round(self.config['blank_dist'] *
0.0229 * np.cos(h_ang) - cs, ndigits=2)
r = (np.float32(np.arange(self.config['usr']['n_bins']))+1)*cs + bd
self.data['coords']['range'] = r
self.data['attrs']['cell_size'] = cs
self.data['attrs']['blank_dist'] = bd
def dat2sci(self,):
for nm in self._dtypes:
getattr(self, 'sci_' + nm)()
for nm in ['data_header', 'checkdata']:
if nm in self.config and isinstance(self.config[nm], list):
self.config[nm] = _recatenate(self.config[nm])
def __exit__(self, type, value, trace):
self.close()
def __enter__(self):
return self
def _crop_data(obj, range, n_lastdim):
for nm, dat in obj.items():
if isinstance(dat, np.ndarray) and (dat.shape[-1] == n_lastdim):
obj[nm] = dat[..., range]
def _recatenate(obj):
out = type(obj[0])()
for ky in list(obj[0].keys()):
if ky in ['__data_groups__', '_type']:
continue
val0 = obj[0][ky]
if isinstance(val0, np.ndarray) and val0.size > 1:
out[ky] = np.concatenate([val[ky][..., None] for val in obj],
axis=-1)
else:
out[ky] = np.array([val[ky] for val in obj])
return out