import numpy as np
import xarray as xr
import warnings
from os.path import getsize
from pathlib import Path
import logging
from .rdi_lib import bin_reader
from . import rdi_defs as defs
from .base import _find_userdata, _create_dataset, _abspath
from .. import time as tmlib
from ..rotate.rdi import _calc_beam_orientmat, _calc_orientmat
from ..rotate.base import _set_coords
from ..rotate.api import set_declination
[docs]
def read_rdi(
filename,
userdata=None,
nens=None,
debug_level=-1,
vmdas_search=False,
winriver=False,
**kwargs,
):
"""
Read a TRDI binary data file.
Parameters
----------
filename : string
Filename of TRDI file to read.
userdata : True, False, or string of userdata.json filename
Whether to read the '<base-filename>.userdata.json' file. Default = True
nens : None, int or 2-element tuple (start, stop)
Number of pings or ensembles to read from the file.
Default is None, read entire file
debug_level : int
Debug level [0 - 2]. Default = -1
vmdas_search : bool
Search from the end of each ensemble for the VMDAS navigation
block. The byte offsets are sometimes incorrect. Default = False
winriver : bool
If file is winriver or not. Automatically set by dolfyn, this is helpful
for debugging. Default = False
Returns
-------
ds : xarray.Dataset
An xarray dataset from the binary instrument data
"""
# Start debugger logging
if debug_level >= 0:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
filepath = Path(filename)
logfile = filepath.with_suffix(".dolfyn.log")
logging.basicConfig(
filename=str(logfile),
filemode="w",
level=logging.NOTSET,
format="%(name)s - %(levelname)s - %(message)s",
)
# Reads into a dictionary of dictionaries using netcdf naming conventions
# Should be easier to debug
rdr = _RDIReader(
filename, debug_level=debug_level, vmdas_search=vmdas_search, winriver=winriver
)
datNB, datBB = rdr.load_data(nens=nens)
dats = [dat for dat in [datNB, datBB] if dat is not None]
# Read in userdata
userdata = _find_userdata(filename, userdata)
dss = []
for dat in dats:
for nm in userdata:
dat["attrs"][nm] = userdata[nm]
# Pass one if only one ds returned
if not np.isfinite(dat["coords"]["time"][0]):
continue
# GPS data not necessarily sampling at the same rate as ADCP DAQ.
if "time_gps" in dat["coords"]:
dat = _remove_gps_duplicates(dat)
# Convert time coords to dt64
t_coords = [t for t in dat["coords"] if "time" in t]
for ky in t_coords:
dat["coords"][ky] = tmlib.epoch2dt64(dat["coords"][ky])
# Convert time vars to dt64
t_data = [t for t in dat["data_vars"] if "time" in t]
for ky in t_data:
dat["data_vars"][ky] = tmlib.epoch2dt64(dat["data_vars"][ky])
# Create xarray dataset from upper level dictionary
ds = _create_dataset(dat)
ds = _set_coords(ds, ref_frame=ds.coord_sys)
# Create orientation matrices
if "beam2inst_orientmat" not in ds:
ds["beam2inst_orientmat"] = xr.DataArray(
_calc_beam_orientmat(ds.beam_angle, ds.beam_pattern == "convex"),
coords={"x1": [1, 2, 3, 4], "x2": [1, 2, 3, 4]},
dims=["x1", "x2"],
attrs={"units": "1", "long_name": "Rotation Matrix"},
)
if "orientmat" not in ds:
ds["orientmat"] = _calc_orientmat(ds)
# Check magnetic declination if provided via software and/or userdata
_set_rdi_declination(ds, filename, inplace=True)
# VMDAS applies gps correction on velocity in .ENX files only
if filename.rsplit(".")[-1] == "ENX":
ds.attrs["vel_gps_corrected"] = 1
else: # (not ENR or ENS) or WinRiver files
ds.attrs["vel_gps_corrected"] = 0
dss += [ds]
if len(dss) == 2:
warnings.warn(
"\nTwo profiling configurations retrieved from file" "\nReturning first."
)
# Close handler
if debug_level >= 0:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
handler.close()
return dss[0]
def _remove_gps_duplicates(dat):
"""
Removes duplicate and nan timestamp values in 'time_gps' coordinate,
and add hardware (ADCP DAQ) timestamp corresponding to GPS acquisition
(in addition to the GPS unit's timestamp).
"""
dat["data_vars"]["hdwtime_gps"] = dat["coords"]["time"]
# Remove duplicate timestamp values, if applicable
dat["coords"]["time_gps"], idx = np.unique(
dat["coords"]["time_gps"], return_index=True
)
# Remove nan values, if applicable
nan = np.zeros(dat["coords"]["time"].shape, dtype=bool)
if any(np.isnan(dat["coords"]["time_gps"])):
nan = np.isnan(dat["coords"]["time_gps"])
dat["coords"]["time_gps"] = dat["coords"]["time_gps"][~nan]
for key in dat["data_vars"]:
if ("gps" in key) or ("nmea" in key):
dat["data_vars"][key] = dat["data_vars"][key][idx]
if sum(nan) > 0:
dat["data_vars"][key] = dat["data_vars"][key][~nan]
return dat
def _set_rdi_declination(dat, fname, inplace):
"""
If magnetic_var_deg is set, this means that the declination is already
included in the heading and in the velocity data.
"""
declin = dat.attrs.pop("declination", None) # userdata declination
if dat.attrs["magnetic_var_deg"] != 0: # from TRDI software if set
dat.attrs["declination"] = dat.attrs["magnetic_var_deg"]
dat.attrs["declination_in_orientmat"] = 1 # logical
if dat.attrs["magnetic_var_deg"] != 0 and declin is not None:
warnings.warn(
"'magnetic_var_deg' is set to {:.2f} degrees in the binary "
"file '{}', AND 'declination' is set in the 'userdata.json' "
"file. DOLfYN WILL USE THE VALUE of {:.2f} degrees in "
"userdata.json. If you want to use the value in "
"'magnetic_var_deg', delete the value from userdata.json and "
"re-read the file.".format(dat.attrs["magnetic_var_deg"], fname, declin)
)
dat.attrs["declination"] = declin
if declin is not None:
set_declination(dat, declin, inplace)
class _RDIReader:
def __init__(
self, fname, navg=1, debug_level=-1, vmdas_search=False, winriver=False
):
self.fname = _abspath(fname)
print("\nReading file {} ...".format(fname))
self._debug_level = debug_level
self._vmdas_search = vmdas_search
self._winrivprob = winriver
self._vm_source = 0
self._pos = 0
self.progress = 0
self._cfac = 180 / 2**31
self._fixoffset = 0
self._nbyte = 0
self.n_cells_diff = 0
self.n_cells_sl = 0
self.cs_diff = 0
self.cs = []
self.cfg = {}
self.cfgbb = {}
self.hdr = {}
self.f = bin_reader(self.fname)
# Check header, double buffer, and get filesize
self._filesize = getsize(self.fname)
space = self.code_spacing() # '0x7F'
self._npings = self._filesize // space
if self._debug_level > -1:
logging.info("Done: {}".format(self.cfg))
logging.info("self._bb {}".format(self._bb))
logging.info("self.cfgbb: {}".format(self.cfgbb))
self.f.seek(self._pos, 0)
self.n_avg = navg
self.ensemble = defs._ensemble(self.n_avg, self.cfg["n_cells"])
if self._bb:
self.ensembleBB = defs._ensemble(self.n_avg, self.cfgbb["n_cells"])
self.vars_read = defs._variable_setlist(["time"])
if self._bb:
self.vars_readBB = defs._variable_setlist(["time"])
def code_spacing(self, iternum=50):
"""
Returns the average spacing, in bytes, between pings.
Repeat this * iternum * times(default 50).
"""
fd = self.f
p0 = self._pos
# Get basic header data and check dual profile
if not self.read_hdr():
raise RuntimeError("No header in this file")
self._bb = self.check_for_double_buffer()
# Turn off debugging to check code spacing
debug_level = self._debug_level
self._debug_level = -1
for i in range(iternum):
try:
self.read_hdr()
except:
break
# Compute the average of the data size:
size = (self._pos - p0) / (i + 1)
self.f = fd
self._pos = p0
self._debug_level = debug_level
return size
def read_hdr(self):
"""
Scan file until 7f7f is found
"""
if not self.search_buffer():
return False
self._pos = self.f.tell() - 2
self.read_hdrseg()
return True
def read_hdrseg(self):
fd = self.f
hdr = self.hdr
hdr["nbyte"] = fd.read_i16(1)
spare = fd.read_ui8(1)
ndat = fd.read_ui8(1)
hdr["dat_offsets"] = fd.read_ui16(ndat)
self._nbyte = 4 + ndat * 2
def check_for_double_buffer(self):
"""
VMDAS will record two buffers in NB or NB/BB mode, so we need to
figure out if that is happening here
"""
found = False
pos = self.f.pos
if self._debug_level > -1:
logging.info(self.hdr)
logging.info("pos {}".format(pos))
self.id_positions = {}
for offset in self.hdr["dat_offsets"]:
self.f.seek(offset + pos - self.hdr["dat_offsets"][0], rel=0)
id = self.f.read_ui16(1)
self.id_positions[id] = offset
if self._debug_level > -1:
logging.info("id {} offset {}".format(id, offset))
if id == 1:
self.read_fixed(bb=True)
found = True
elif id == 0:
self.read_fixed(bb=False)
elif id == 16:
self.read_fixed_sl() # bb=True
elif id == 8192:
self._vmdas_search = True
return found
def load_data(self, nens=None):
if nens is None:
# Attempt to overshoot WinRiver2 or *Pro filesize
if (self.cfg["coord_sys"] == "ship") or (
self.cfg["inst_model"]
in [
"RiverPro",
"StreamPro",
]
):
self._nens = int(self._filesize / self.hdr["nbyte"] / self.n_avg * 1.1)
else:
# Attempt to overshoot other instrument filesizes
self._nens = int(self._npings / self.n_avg)
elif nens.__class__ is tuple or nens.__class__ is list:
raise Exception(" `nens` must be a integer")
else:
self._nens = nens
if self._debug_level > -1:
logging.info(" taking data from pings 0 - %d" % self._nens)
logging.info(" %d ensembles will be produced.\n" % self._nens)
self.init_data()
for iens in range(self._nens):
if not self.read_buffer():
self.remove_end(iens)
break
self.ensemble.clean_data()
if self._bb:
self.ensembleBB.clean_data()
ens = [self.ensemble]
vars = [self.vars_read]
datl = [self.outd]
cfgl = [self.cfg]
if self._bb:
ens += [self.ensembleBB]
vars += [self.vars_readBB]
datl += [self.outdBB]
cfgl += [self.cfgbb]
for var, en, dat in zip(vars, ens, datl):
for nm in var:
dat = self.save_profiles(dat, nm, en, iens)
# reset flag after all variables run
self.n_cells_diff = 0
# Set clock
clock = en.rtc[:, :]
if clock[0, 0] < 100:
clock[0, :] += defs.century
try:
dates = tmlib.date2epoch(
tmlib.datetime(*clock[:6, 0], microsecond=clock[6, 0] * 10000)
)[0]
except ValueError:
warnings.warn(
"Invalid time stamp in ping {}.".format(
int(self.ensemble.number[0])
)
)
dat["coords"]["time"][iens] = np.NaN
else:
dat["coords"]["time"][iens] = np.median(dates)
# Finalize dataset (runs through both nb and bb)
for dat, cfg in zip(datl, cfgl):
dat, cfg = self.cleanup(dat, cfg)
dat = self.finalize(dat)
if "vel_bt" in dat["data_vars"]:
dat["attrs"]["rotate_vars"].append("vel_bt")
datbb = self.outdBB if self._bb else None
return self.outd, datbb
def init_data(self):
outd = {
"data_vars": {},
"coords": {},
"attrs": {},
"units": {},
"long_name": {},
"standard_name": {},
"sys": {},
}
outd["attrs"]["inst_make"] = "TRDI"
outd["attrs"]["inst_type"] = "ADCP"
outd["attrs"]["rotate_vars"] = [
"vel",
]
# Currently RDI doesn't use IMUs
outd["attrs"]["has_imu"] = 0
if self._bb:
outdbb = {
"data_vars": {},
"coords": {},
"attrs": {},
"units": {},
"long_name": {},
"standard_name": {},
"sys": {},
}
outdbb["attrs"]["inst_make"] = "TRDI"
outdbb["attrs"]["inst_type"] = "ADCP"
outdbb["attrs"]["rotate_vars"] = [
"vel",
]
outdbb["attrs"]["has_imu"] = 0
# Preallocate variables and data sizes
for nm in defs.data_defs:
outd = defs._idata(
outd, nm, sz=defs._get_size(nm, self._nens, self.cfg["n_cells"])
)
self.outd = outd
if self._bb:
for nm in defs.data_defs:
outdbb = defs._idata(
outdbb, nm, sz=defs._get_size(nm, self._nens, self.cfgbb["n_cells"])
)
self.outdBB = outdbb
if self._debug_level > 1:
logging.info(np.shape(outdbb["data_vars"]["vel"]))
if self._debug_level > 1:
logging.info("{} ncells, not BB".format(self.cfg["n_cells"]))
if self._bb:
logging.info("{} ncells, BB".format(self.cfgbb["n_cells"]))
def read_buffer(self):
fd = self.f
self.ensemble.k = -1 # so that k+=1 gives 0 on the first loop.
if self._bb:
self.ensembleBB.k = -1 # so that k+=1 gives 0 on the first loop.
self.print_progress()
hdr = self.hdr
while self.ensemble.k < self.ensemble.n_avg - 1:
if not self.search_buffer():
return False
startpos = fd.tell() - 2
self.read_hdrseg()
if self._debug_level > -1:
logging.info("Read Header", hdr)
byte_offset = self._nbyte + 2
self._read_vmdas = False
for n in range(len(hdr["dat_offsets"])):
id = fd.read_ui16(1)
if self._debug_level > 0:
logging.info(f"n {n}: {id} {id:04x}")
self.print_pos()
retval = self.read_dat(id)
if retval == "FAIL":
break
byte_offset += self._nbyte
if n < (len(hdr["dat_offsets"]) - 1):
oset = hdr["dat_offsets"][n + 1] - byte_offset
if oset != 0:
if self._debug_level > 0:
logging.debug(" %s: Adjust location by %d\n" % (id, oset))
fd.seek(oset, 1)
byte_offset = hdr["dat_offsets"][n + 1]
else:
if hdr["nbyte"] - 2 != byte_offset:
if not self._winrivprob:
if self._debug_level > 0:
logging.debug(
" {:d}: Adjust location by {:d}\n".format(
id, hdr["nbyte"] - 2 - byte_offset
)
)
self.f.seek(hdr["nbyte"] - 2 - byte_offset, 1)
byte_offset = hdr["nbyte"] - 2
# Check for vmdas again because vmdas doesn't set the offsets
# correctly, and we need this info:
if not self._read_vmdas and self._vmdas_search:
if self._debug_level > 0:
logging.info("Searching for vmdas nav data. Going to next ensemble")
self.search_buffer()
# now go back to where vmdas would be:
fd.seek(-98, 1)
id = self.f.read_ui16(1)
if id is not None:
if self._debug_level > 0:
logging.info(f"Found {id:04d}")
if id == 8192:
self.read_dat(id)
readbytes = fd.tell() - startpos
offset = hdr["nbyte"] + 2 - readbytes
self.check_offset(offset, readbytes)
self.print_pos(byte_offset=byte_offset)
return True
def print_progress(self):
self.progress = self.f.tell()
if self._debug_level > 1:
logging.debug(
" pos %0.0fmb/%0.0fmb\n"
% (self.f.tell() / 1048576.0, self._filesize / 1048576.0)
)
if (self.f.tell() - self.progress) < 1048576:
return
def search_buffer(self):
"""
Check to see if the next bytes indicate the beginning of a
data block. If not, search for the next data block, up to
_search_num times.
"""
fd = self.f
id = fd.read_ui8(2)
if id is None:
return False
cfgid = list(id)
pos_7f79 = False
search_cnt = 0
if self._debug_level > -1:
logging.info("pos {}".format(fd.pos))
logging.info("cfgid0: [{:x}, {:x}]".format(*cfgid))
# If not [127, 127] or if the file ends in the next ensemble
while (cfgid != [127, 127]) or self.check_eof():
if cfgid == [127, 121]:
# Search for the next header or the end of the file
skipbytes = fd.read_i16(1)
fd.seek(skipbytes - 2, 1)
id = fd.read_ui8(2)
if id is None: # EOF
return False
cfgid = list(id)
pos_7f79 = True
else:
# Search til we find something or hit the end of the file
search_cnt += 1
nextbyte = fd.read_ui8(1)
if nextbyte is None: # EOF
return False
cfgid[0] = cfgid[1]
cfgid[1] = nextbyte
if pos_7f79 and self._debug_level > -1:
logging.info("Skipped junk data: [{:x}, {:x}]".format(*[127, 121]))
if search_cnt > 0:
if self._debug_level > 0:
logging.info(
" Searched {} bytes to find next "
"valid ensemble start [{:x}, {:x}]\n".format(search_cnt, *cfgid)
)
return True
def check_eof(self):
"""
Returns True if next header is bad or at end of file.
"""
fd = self.f
out = True
numbytes = fd.read_i16(1)
# Search for next config id
if numbytes > 0:
fd.seek(numbytes - 2, 1)
cfgid = fd.read_ui8(2)
if cfgid is None:
if self._debug_level > 1:
logging.info("EOF")
return True
# Make sure one is found, either 7f7f or 7f79
if len(cfgid) == 2:
fd.seek(-numbytes - 2, 1)
if cfgid[0] == 127 and cfgid[1] in [127, 121]:
out = False
else:
fd.seek(-2, 1)
return out
def print_pos(self, byte_offset=-1):
"""Print the position in the file, used for debugging."""
if self._debug_level > 1:
if hasattr(self, "ensemble"):
k = self.ensemble.k
else:
k = 0
logging.debug(
f" pos: {self.f.tell()}, pos_: {self._pos}, nbyte: {self._nbyte}, k: {k}, byte_offset: {byte_offset}"
)
def read_dat(self, id):
function_map = {
# 0000 1st profile fixed leader
0: (self.read_fixed, []),
# 0001 2nd profile fixed leader
1: (self.read_fixed, [True]),
# 0010 Surface layer fixed leader (RiverPro & StreamPro)
16: (self.read_fixed_sl, []),
# 0080 1st profile variable leader
128: (self.read_var, [0]),
# 0081 2nd profile variable leader
129: (self.read_var, [1]),
# 0100 1st profile velocity
256: (self.read_vel, [0]),
# 0101 2nd profile velocity
257: (self.read_vel, [1]),
# 0103 Waves first leader
259: (self.skip_Nbyte, [74]),
# 0110 Surface layer velocity (RiverPro & StreamPro)
272: (self.read_vel, [2]),
# 0200 1st profile correlation
512: (self.read_corr, [0]),
# 0201 2nd profile correlation
513: (self.read_corr, [1]),
# 0203 Waves data
515: (self.skip_Nbyte, [186]),
# 020C Ambient sound profile
524: (self.skip_Nbyte, [4]),
# 0210 Surface layer correlation (RiverPro & StreamPro)
528: (self.read_corr, [2]),
# 0300 1st profile amplitude
768: (self.read_amp, [0]),
# 0301 2nd profile amplitude
769: (self.read_amp, [1]),
# 0302 Beam 5 Sum of squared velocities
770: (self.skip_Ncol, []),
# 0303 Waves last leader
771: (self.skip_Ncol, [18]),
# 0310 Surface layer amplitude (RiverPro & StreamPro)
784: (self.read_amp, [2]),
# 0400 1st profile % good
1024: (self.read_prcnt_gd, [0]),
# 0401 2nd profile pct good
1025: (self.read_prcnt_gd, [1]),
# 0403 Waves HPR data
1027: (self.skip_Nbyte, [6]),
# 0410 Surface layer pct good (RiverPro & StreamPro)
1040: (self.read_prcnt_gd, [2]),
# 0500 1st profile status
1280: (self.read_status, [0]),
# 0501 2nd profile status
1281: (self.read_status, [1]),
# 0510 Surface layer status (RiverPro & StreamPro)
1296: (self.read_status, [2]),
1536: (self.read_bottom, []), # 0600 bottom tracking
1793: (self.skip_Ncol, [4]), # 0701 number of pings
1794: (self.skip_Ncol, [4]), # 0702 sum of squared vel
1795: (self.skip_Ncol, [4]), # 0703 sum of velocities
2560: (self.skip_Ncol, []), # 0A00 Beam 5 velocity
2816: (self.skip_Ncol, []), # 0B00 Beam 5 correlation
3072: (self.skip_Ncol, []), # 0C00 Beam 5 amplitude
3328: (self.skip_Ncol, []), # 0D00 Beam 5 pct_good
# Fixed attitude data format for Ocean Surveyor ADCPs
3000: (self.skip_Nbyte, [32]),
3841: (self.skip_Nbyte, [38]), # 0F01 Beam 5 leader
8192: (self.read_vmdas, []), # 2000
# 2013 Navigation parameter data
8211: (self.skip_Nbyte, [83]),
8226: (self.read_winriver2, []), # 2022
8448: (self.read_winriver, [38]), # 2100
8449: (self.read_winriver, [97]), # 2101
8450: (self.read_winriver, [45]), # 2102
8451: (self.read_winriver, [60]), # 2103
8452: (self.read_winriver, [38]), # 2104
# 3200 Transformation matrix
12800: (self.skip_Nbyte, [32]),
# 3000 Fixed attitude data format for Ocean Surveyor ADCPs
12288: (self.skip_Nbyte, [32]),
12496: (self.skip_Nbyte, [24]), # 30D0
12504: (self.skip_Nbyte, [48]), # 30D8
# 4100 beam 5 range
16640: (self.read_alt, []),
# 4400 Firmware status data (RiverPro & StreamPro)
17408: (self.skip_Nbyte, [28]),
# 4401 Auto mode setup (RiverPro & StreamPro)
17409: (self.skip_Nbyte, [82]),
# 5803 High resolution bottom track velocity
22531: (self.skip_Nbyte, [68]),
# 5804 Bottom track range
22532: (self.skip_Nbyte, [21]),
# 5901 ISM (IMU) data
22785: (self.skip_Nbyte, [65]),
# 5902 Ping attitude
22786: (self.skip_Nbyte, [105]),
# 7001 ADC data
28673: (self.skip_Nbyte, [14]),
}
# Call the correct function:
if self._debug_level > 1:
logging.debug(f"Trying to Read {id}")
if id in function_map:
if self._debug_level > 1:
logging.info(" Reading code {}...".format(hex(id)))
retval = function_map.get(id)[0](*function_map[id][1])
if retval:
return retval
if self._debug_level > 1:
logging.info(" success!")
else:
self.read_nocode(id)
def read_fixed(self, bb=False):
self.read_cfgseg(bb=bb)
self._nbyte += 2
if self._debug_level > -1:
logging.info("Read Fixed")
# Check if n_cells has increased (for winriver transect files)
if hasattr(self, "ensemble"):
self.n_cells_diff = self.cfg["n_cells"] - self.ensemble["n_cells"]
# Increase n_cells if greater than 0
if self.n_cells_diff > 0:
self.ensemble = defs._ensemble(self.n_avg, self.cfg["n_cells"])
if self._debug_level > 0:
logging.warning(
f"Maximum number of cells increased to {self.cfg['n_cells']}"
)
def read_fixed_sl(self):
# Surface layer profile
cfg = self.cfg
cfg["surface_layer"] = 1
n_cells = self.f.read_ui8(1)
# Check if n_cells is greater than what was used in prior profiles
if n_cells > self.n_cells_sl:
self.n_cells_sl = n_cells
if self._debug_level > 0:
logging.warning(
f"Maximum number of surface layer cells increased to {n_cells}"
)
cfg["n_cells_sl"] = n_cells
# Assuming surface layer profile cell size never changes
cfg["cell_size_sl"] = self.f.read_ui16(1) * 0.01
cfg["bin1_dist_m_sl"] = round(self.f.read_ui16(1) * 0.01, 4)
if self._debug_level > -1:
logging.info("Read Surface Layer Config")
self._nbyte = 2 + 5
def read_cfgseg(self, bb=False):
cfgstart = self.f.tell()
if bb:
cfg = self.cfgbb
else:
cfg = self.cfg
fd = self.f
tmp = fd.read_ui8(5)
prog_ver0 = tmp[0]
cfg["prog_ver"] = tmp[0] + tmp[1] / 100.0
cfg["inst_model"] = defs.adcp_type.get(tmp[0], "unrecognized firmware version")
config = tmp[2:4]
cfg["beam_angle"] = [15, 20, 30][(config[1] & 3)]
beam5 = [0, 1][int((config[1] & 16) == 16)]
cfg["freq"] = [75, 150, 300, 600, 1200, 2400, 38][(config[0] & 7)]
cfg["beam_pattern"] = ["concave", "convex"][int((config[0] & 8) == 8)]
cfg["orientation"] = ["down", "up"][int((config[0] & 128) == 128)]
simflag = ["real", "simulated"][tmp[4]]
fd.seek(1, 1)
cfg["n_beams"] = fd.read_ui8(1) + beam5
# Check if number of cells has changed
n_cells = fd.read_ui8(1)
if ("n_cells" not in cfg) or (n_cells != cfg["n_cells"]):
cfg["n_cells"] = n_cells
if self._debug_level > 0:
logging.info(f"Number of cells set to {cfg['n_cells']}")
cfg["pings_per_ensemble"] = fd.read_ui16(1)
# Check if cell size has changed
cs = fd.read_ui16(1) * 0.01
if ("cell_size" not in cfg) or (cs != cfg["cell_size"]):
self.cs_diff = cs if "cell_size" not in cfg else (cs - cfg["cell_size"])
cfg["cell_size"] = cs
if self._debug_level > 0:
logging.info(f"Cell size set to {cfg['cell_size']}")
cfg["blank_dist"] = fd.read_ui16(1) * 0.01
cfg["profiling_mode"] = fd.read_ui8(1)
cfg["min_corr_threshold"] = fd.read_ui8(1)
cfg["n_code_reps"] = fd.read_ui8(1)
cfg["min_prcnt_gd"] = fd.read_ui8(1)
cfg["max_error_vel"] = fd.read_ui16(1) / 1000
cfg["sec_between_ping_groups"] = np.sum(
np.array(fd.read_ui8(3)) * np.array([60.0, 1.0, 0.01])
)
coord_sys = fd.read_ui8(1)
cfg["coord_sys"] = ["beam", "inst", "ship", "earth"][((coord_sys >> 3) & 3)]
cfg["use_pitchroll"] = ["no", "yes"][(coord_sys & 4) == 4]
cfg["use_3beam"] = ["no", "yes"][(coord_sys & 2) == 2]
cfg["bin_mapping"] = ["no", "yes"][(coord_sys & 1) == 1]
cfg["heading_misalign_deg"] = fd.read_i16(1) * 0.01
cfg["magnetic_var_deg"] = fd.read_i16(1) * 0.01
cfg["sensors_src"] = np.binary_repr(fd.read_ui8(1), 8)
cfg["sensors_avail"] = np.binary_repr(fd.read_ui8(1), 8)
cfg["bin1_dist_m"] = round(fd.read_ui16(1) * 0.01, 4)
cfg["transmit_pulse_m"] = fd.read_ui16(1) * 0.01
cfg["water_ref_cells"] = list(fd.read_ui8(2)) # list for attrs
cfg["false_target_threshold"] = fd.read_ui8(1)
fd.seek(1, 1)
cfg["transmit_lag_m"] = fd.read_ui16(1) * 0.01
self._nbyte = 40
if cfg["prog_ver"] >= 8.14:
cpu_serialnum = fd.read_ui8(8)
self._nbyte += 8
if cfg["prog_ver"] >= 8.24:
cfg["bandwidth"] = fd.read_ui16(1)
self._nbyte += 2
if cfg["prog_ver"] >= 16.05:
cfg["power_level"] = fd.read_ui8(1)
self._nbyte += 1
if cfg["prog_ver"] >= 16.27:
# cfg['navigator_basefreqindex'] = fd.read_ui8(1)
fd.seek(1, 1)
cfg["serialnum"] = fd.read_ui32(1)
cfg["beam_angle"] = fd.read_ui8(1)
self._nbyte += 6
self.configsize = self.f.tell() - cfgstart
if self._debug_level > -1:
logging.info("Read Config")
def read_var(self, bb=False):
"""Read variable leader"""
fd = self.f
if bb:
ens = self.ensembleBB
else:
ens = self.ensemble
ens.k += 1
ens = self.ensemble
k = ens.k
self.vars_read += [
"number",
"rtc",
"number",
"builtin_test_fail",
"c_sound",
"depth",
"heading",
"pitch",
"roll",
"salinity",
"temp",
"min_preping_wait",
"heading_std",
"pitch_std",
"roll_std",
"adc",
]
ens.number[k] = fd.read_ui16(1)
ens.rtc[:, k] = fd.read_ui8(7)
ens.number[k] += 65535 * fd.read_ui8(1)
ens.builtin_test_fail[k] = fd.read_ui16(1)
ens.c_sound[k] = fd.read_ui16(1)
ens.depth[k] = fd.read_ui16(1) * 0.1
ens.heading[k] = fd.read_ui16(1) * 0.01
ens.pitch[k] = fd.read_i16(1) * 0.01
ens.roll[k] = fd.read_i16(1) * 0.01
ens.salinity[k] = fd.read_i16(1)
ens.temp[k] = fd.read_i16(1) * 0.01
ens.min_preping_wait[k] = (fd.read_ui8(3) * np.array([60, 1, 0.01])).sum()
ens.heading_std[k] = fd.read_ui8(1)
ens.pitch_std[k] = fd.read_ui8(1) * 0.1
ens.roll_std[k] = fd.read_ui8(1) * 0.1
ens.adc[:, k] = fd.read_i8(8)
self._nbyte = 2 + 40
cfg = self.cfg
if cfg["inst_model"].lower() == "broadband":
if cfg["prog_ver"] >= 5.55:
fd.seek(15, 1)
cent = fd.read_ui8(1)
ens.rtc[:, k] = fd.read_ui8(7)
ens.rtc[0, k] = ens.rtc[0, k] + cent * 100
self._nbyte += 23
elif cfg["inst_model"].lower() == "ocean surveyor":
fd.seek(16, 1) # 30 bytes all set to zero, 14 read above
self._nbyte += 16
if cfg["prog_ver"] > 23:
fd.seek(2, 1)
self._nbyte += 2
else:
ens.error_status[k] = np.binary_repr(fd.read_ui32(1), 32)
self.vars_read += ["pressure", "pressure_std"]
self._nbyte += 4
if cfg["prog_ver"] >= 8.13:
# Added pressure sensor stuff in 8.13
fd.seek(2, 1)
ens.pressure[k] = fd.read_ui32(1) / 1000 # dPa to dbar
ens.pressure_std[k] = fd.read_ui32(1) / 1000
self._nbyte += 10
if cfg["prog_ver"] >= 8.24:
# Spare byte added 8.24
fd.seek(1, 1)
self._nbyte += 1
if cfg["prog_ver"] >= 16.05:
# Added more fields with century in clock
cent = fd.read_ui8(1)
ens.rtc[:, k] = fd.read_ui8(7)
ens.rtc[0, k] = ens.rtc[0, k] + cent * 100
self._nbyte += 8
if cfg["prog_ver"] >= 56:
fd.seek(1) # lag near bottom flag
self._nbyte += 1
if self._debug_level > -1:
logging.info("Read Var")
def switch_profile(self, bb):
if bb == 1:
ens = self.ensembleBB
cfg = self.cfgbb
# Placeholder for dual profile mode
# Solution for vmdas profile in bb spot (vs nb)
tag = ""
elif bb == 2:
ens = self.ensemble
cfg = self.cfg
tag = "_sl"
else:
ens = self.ensemble
cfg = self.cfg
tag = ""
return ens, cfg, tag
def read_vel(self, bb=0):
ens, cfg, tg = self.switch_profile(bb)
self.vars_read += ["vel" + tg]
n_cells = cfg["n_cells" + tg]
k = ens.k
vel = np.array(self.f.read_i16(4 * n_cells)).reshape((n_cells, 4)) * 0.001
ens["vel" + tg][:n_cells, :, k] = vel
self._nbyte = 2 + 4 * n_cells * 2
if self._debug_level > -1:
logging.info("Read Vel")
def read_corr(self, bb=0):
ens, cfg, tg = self.switch_profile(bb)
self.vars_read += ["corr" + tg]
n_cells = cfg["n_cells" + tg]
k = ens.k
ens["corr" + tg][:n_cells, :, k] = np.array(
self.f.read_ui8(4 * n_cells)
).reshape((n_cells, 4))
self._nbyte = 2 + 4 * n_cells
if self._debug_level > -1:
logging.info("Read Corr")
def read_amp(self, bb=0):
ens, cfg, tg = self.switch_profile(bb)
self.vars_read += ["amp" + tg]
n_cells = cfg["n_cells" + tg]
k = ens.k
ens["amp" + tg][:n_cells, :, k] = np.array(
self.f.read_ui8(4 * n_cells)
).reshape((n_cells, 4))
self._nbyte = 2 + 4 * n_cells
if self._debug_level > -1:
logging.info("Read Amp")
def read_prcnt_gd(self, bb=0):
ens, cfg, tg = self.switch_profile(bb)
self.vars_read += ["prcnt_gd" + tg]
n_cells = cfg["n_cells" + tg]
ens["prcnt_gd" + tg][:n_cells, :, ens.k] = np.array(
self.f.read_ui8(4 * n_cells)
).reshape((n_cells, 4))
self._nbyte = 2 + 4 * n_cells
if self._debug_level > -1:
logging.info("Read PG")
def read_status(self, bb=0):
ens, cfg, tg = self.switch_profile(bb)
self.vars_read += ["status" + tg]
n_cells = cfg["n_cells" + tg]
ens["status" + tg][:n_cells, :, ens.k] = np.array(
self.f.read_ui8(4 * n_cells)
).reshape((n_cells, 4))
self._nbyte = 2 + 4 * n_cells
if self._debug_level > -1:
logging.info("Read Status")
def read_bottom(self):
self.vars_read += ["dist_bt", "vel_bt", "corr_bt", "amp_bt", "prcnt_gd_bt"]
fd = self.f
ens = self.ensemble
k = ens.k
cfg = self.cfg
if self._vm_source == 2:
self.vars_read += ["latitude_gps", "longitude_gps"]
fd.seek(2, 1)
long1 = fd.read_ui16(1)
fd.seek(6, 1)
ens.latitude_gps[k] = fd.read_i32(1) * self._cfac
if ens.latitude_gps[k] == 0:
ens.latitude_gps[k] = np.NaN
else:
fd.seek(14, 1)
ens.dist_bt[:, k] = fd.read_ui16(4) * 0.01
ens.vel_bt[:, k] = fd.read_i16(4) * 0.001
ens.corr_bt[:, k] = fd.read_ui8(4)
ens.amp_bt[:, k] = fd.read_ui8(4)
ens.prcnt_gd_bt[:, k] = fd.read_ui8(4)
if self._vm_source == 2:
fd.seek(2, 1)
ens.longitude_gps[k] = (long1 + 65536 * fd.read_ui16(1)) * self._cfac
if ens.longitude_gps[k] > 180:
ens.longitude_gps[k] = ens.longitude_gps[k] - 360
if ens.longitude_gps[k] == 0:
ens.longitude_gps[k] = np.NaN
fd.seek(16, 1)
qual = fd.read_ui8(1)
if qual == 0:
if self._debug_level > 0:
logging.info(
" qual==%d,%f %f"
% (qual, ens.latitude_gps[k], ens.longitude_gps[k])
)
ens.latitude_gps[k] = np.NaN
ens.longitude_gps[k] = np.NaN
fd.seek(71 - 45 - 16 - 17, 1)
self._nbyte = 2 + 68
else:
# Skip reference layer data
fd.seek(26, 1)
self._nbyte = 2 + 68
if cfg["prog_ver"] >= 5.3:
fd.seek(7, 1) # skip to rangeMsb bytes
ens.dist_bt[:, k] = ens.dist_bt[:, k] + fd.read_ui8(4) * 655.36
self._nbyte += 11
if cfg["prog_ver"] >= 16.2 and (cfg.get("sourceprog") != "WINRIVER"):
fd.seek(4, 1) # not documented
self._nbyte += 4
if cfg["prog_ver"] >= 56.1:
fd.seek(4, 1) # not documented
self._nbyte += 4
if self._debug_level > -1:
logging.info("Read Bottom Track")
def read_alt(self):
"""Read altimeter (vertical beam range)"""
fd = self.f
ens = self.ensemble
k = ens.k
self.vars_read += ["alt_dist", "alt_rssi", "alt_eval", "alt_status"]
ens.alt_eval[k] = fd.read_ui8(1) # evaluation amplitude
ens.alt_rssi[k] = fd.read_ui8(1) # RSSI amplitude
ens.alt_dist[k] = fd.read_ui32(1) / 1000 # range to surface/seafloor
ens.alt_status[k] = fd.read_ui8(1) # status bit flags
self._nbyte = 7 + 2
if self._debug_level > -1:
logging.info("Read Altimeter")
def read_vmdas(self):
"""Read VMDAS Navigation block"""
fd = self.f
self.cfg["sourceprog"] = "VMDAS"
ens = self.ensemble
k = ens.k
if self._vm_source != 1 and self._debug_level > -1:
logging.info(" \n***** Apparently a VMDAS file \n\n")
self._vm_source = 1
self.vars_read += [
"time_gps",
"clock_offset_UTC_gps",
"latitude_gps",
"longitude_gps",
"avg_speed_gps",
"avg_dir_gps",
"speed_made_good_gps",
"dir_made_good_gps",
"flags_gps",
"pitch_gps",
"roll_gps",
"heading_gps",
]
# UTC date time
utim = fd.read_ui8(4)
date_utc = tmlib.datetime(utim[2] + utim[3] * 256, utim[1], utim[0])
# 1st lat/lon position after previous ADCP ping
# This byte is in hundredths of seconds (10s of milliseconds):
utc_time_first_fix = tmlib.timedelta(milliseconds=(int(fd.read_ui32(1) / 10)))
ens.clock_offset_UTC_gps[k] = (
fd.read_i32(1) / 1000
) # "PC clock offset from UTC" in ms
latitude_first_gps = fd.read_i32(1) * self._cfac
longitude_first_gps = fd.read_i32(1) * self._cfac
# Last lat/lon position prior to current ADCP ping
utc_time_fix = tmlib.timedelta(milliseconds=(int(fd.read_ui32(1) / 10)))
ens.time_gps[k] = tmlib.date2epoch(date_utc + utc_time_fix)[0]
ens.latitude_gps[k] = fd.read_i32(1) * self._cfac
ens.longitude_gps[k] = fd.read_i32(1) * self._cfac
ens.avg_speed_gps[k] = fd.read_ui16(1) / 1000
ens.avg_dir_gps[k] = fd.read_ui16(1) * 180 / 2**15 # avg true track
fd.seek(2, 1) # avg magnetic track
ens.speed_made_good_gps[k] = fd.read_ui16(1) / 1000
ens.dir_made_good_gps[k] = fd.read_ui16(1) * 180 / 2**15
fd.seek(2, 1) # reserved
ens.flags_gps[k] = int(np.binary_repr(fd.read_ui16(1)))
fd.seek(6, 1) # reserved, ADCP ensemble #
# ADCP date time
utim = fd.read_ui8(4)
date_adcp = tmlib.datetime(utim[0] + utim[1] * 256, utim[3], utim[2])
time_adcp = tmlib.timedelta(milliseconds=(int(fd.read_ui32(1) / 10)))
ens.pitch_gps[k] = fd.read_ui16(1) * 180 / 2**15
ens.roll_gps[k] = fd.read_ui16(1) * 180 / 2**15
ens.heading_gps[k] = fd.read_ui16(1) * 180 / 2**15
fd.seek(10, 1)
self._nbyte = 2 + 76
if self._debug_level > -1:
logging.info("Read VMDAS")
self._read_vmdas = True
def read_winriver2(self):
startpos = self.f.tell()
self._winrivprob = True
self.cfg["sourceprog"] = "WinRiver2"
ens = self.ensemble
k = ens.k
if self._debug_level > -1:
logging.info("Read WinRiver2")
self._vm_source = 3
spid = self.f.read_ui16(1) # NMEA specific IDs
if spid in [4, 104]: # GGA
sz = self.f.read_ui16(1)
dtime = self.f.read_f64(1)
if sz <= 43: # If no sentence, data is still stored in nmea format
empty_gps = self.f.reads(sz - 2)
self.f.seek(2, 1)
else: # TRDI rewrites the nmea string into their format if one is found
start_string = self.f.reads(6)
if not isinstance(start_string, str):
if self._debug_level > 0:
logging.warning(
f"Invalid GGA string found in ensemble {k}," " skipping..."
)
return "FAIL"
self.f.seek(1, 1)
gga_time = self.f.reads(9)
time = tmlib.timedelta(
hours=int(gga_time[0:2]),
minutes=int(gga_time[2:4]),
seconds=int(gga_time[4:6]),
milliseconds=int(float(gga_time[6:]) * 1000),
)
clock = self.ensemble.rtc[:, :]
if clock[0, 0] < 100:
clock[0, :] += defs.century
date = tmlib.datetime(*clock[:3, 0]) + time
ens.time_gps[k] = tmlib.date2epoch(date)[0]
self.f.seek(1, 1)
ens.latitude_gps[k] = self.f.read_f64(1)
tcNS = self.f.reads(1) # 'N' or 'S'
if tcNS == "S":
ens.latitude_gps[k] *= -1
ens.longitude_gps[k] = self.f.read_f64(1)
tcEW = self.f.reads(1) # 'E' or 'W'
if tcEW == "W":
ens.longitude_gps[k] *= -1
ens.fix_gps[k] = self.f.read_ui8(1) # gps fix type/quality
ens.n_sat_gps[k] = self.f.read_ui8(1) # of satellites
# horizontal dilution of precision
ens.hdop_gps[k] = self.f.read_f32(1)
ens.elevation_gps[k] = self.f.read_f32(1) # altitude
m = self.f.reads(1) # altitude unit, 'm'
h_geoid = self.f.read_f32(1) # height of geoid
m2 = self.f.reads(1) # geoid unit, 'm'
ens.rtk_age_gps[k] = self.f.read_f32(1)
station_id = self.f.read_ui16(1)
self.vars_read += [
"time_gps",
"longitude_gps",
"latitude_gps",
"fix_gps",
"n_sat_gps",
"hdop_gps",
"elevation_gps",
"rtk_age_gps",
]
self._nbyte = self.f.tell() - startpos + 2
elif spid in [5, 105]: # VTG
sz = self.f.read_ui16(1)
dtime = self.f.read_f64(1)
if sz <= 22: # if no data
empty_gps = self.f.reads(sz - 2)
self.f.seek(2, 1)
else:
start_string = self.f.reads(6)
if not isinstance(start_string, str):
if self._debug_level > 0:
logging.warning(
f"Invalid VTG string found in ensemble {k}," " skipping..."
)
return "FAIL"
self.f.seek(1, 1)
true_track = self.f.read_f32(1)
t = self.f.reads(1) # 'T'
magn_track = self.f.read_f32(1)
m = self.f.reads(1) # 'M'
speed_knot = self.f.read_f32(1)
kts = self.f.reads(1) # 'N'
speed_kph = self.f.read_f32(1)
kph = self.f.reads(1) # 'K'
mode = self.f.reads(1)
# knots -> m/s
ens.speed_over_grnd_gps[k] = speed_knot / 1.944
ens.dir_over_grnd_gps[k] = true_track
self.vars_read += ["speed_over_grnd_gps", "dir_over_grnd_gps"]
self._nbyte = self.f.tell() - startpos + 2
elif spid in [6, 106]: # 'DBT' depth sounder
sz = self.f.read_ui16(1)
dtime = self.f.read_f64(1)
if sz <= 20:
empty_gps = self.f.reads(sz - 2)
self.f.seek(2, 1)
else:
start_string = self.f.reads(6)
if not isinstance(start_string, str):
if self._debug_level > 0:
logging.warning(
f"Invalid DBT string found in ensemble {k}," " skipping..."
)
return "FAIL"
self.f.seek(1, 1)
depth_ft = self.f.read_f32(1)
ft = self.f.reads(1) # 'f'
depth_m = self.f.read_f32(1)
m = self.f.reads(1) # 'm'
depth_fathom = self.f.read_f32(1)
f = self.f.reads(1) # 'F'
ens.dist_nmea[k] = depth_m
self.vars_read += ["dist_nmea"]
self._nbyte = self.f.tell() - startpos + 2
elif spid in [7, 107]: # 'HDT'
sz = self.f.read_ui16(1)
dtime = self.f.read_f64(1)
if sz <= 14:
empty_gps = self.f.reads(sz - 2)
self.f.seek(2, 1)
else:
start_string = self.f.reads(6)
if not isinstance(start_string, str):
if self._debug_level > 0:
logging.warning(
f"Invalid HDT string found in ensemble {k}," " skipping..."
)
return "FAIL"
self.f.seek(1, 1)
ens.heading_gps[k] = self.f.read_f64(1)
tt = self.f.reads(1)
self.vars_read += ["heading_gps"]
self._nbyte = self.f.tell() - startpos + 2
def read_winriver(self, nbt):
self._winrivprob = True
self.cfg["sourceprog"] = "WINRIVER"
if self._vm_source not in [2, 3]:
if self._debug_level > -1:
logging.warning(
"\n***** Apparently a WINRIVER file - "
"Raw NMEA data handler not yet implemented\n"
)
self._vm_source = 2
startpos = self.f.tell()
sz = self.f.read_ui16(1)
tmp = self.f.reads(sz - 2)
self._nbyte = self.f.tell() - startpos + 2
def skip_Ncol(self, n_skip=1):
self.f.seek(n_skip * self.cfg["n_cells"], 1)
self._nbyte = 2 + n_skip * self.cfg["n_cells"]
def skip_Nbyte(self, n_skip):
self.f.seek(n_skip, 1)
self._nbyte = 2 + n_skip
def read_nocode(self, id):
# Skipping bytes from codes 0340-30FC, commented if needed
hxid = hex(id)
if hxid[2:4] == "30":
logging.warning("Skipping bytes from codes 0340-30FC")
# I want to count the number of 1s in the middle 4 bits
# of the 2nd two bytes.
# 60 is a 0b00111100 mask
nflds = bin(int(hxid[3]) & 60).count("1") + bin(int(hxid[4]) & 60).count(
"1"
)
# I want to count the number of 1s in the highest
# 2 bits of byte 3
# 3 is a 0b00000011 mask:
dfac = bin(int(hxid[3], 0) & 3).count("1")
self.skip_Nbyte(12 * nflds * dfac)
else:
if self._debug_level > -1:
logging.warning(" Unrecognized ID code: %0.4X" % id)
self.skip_nocode(id)
def skip_nocode(self, id):
# Skipping bytes if ID isn't known
offsets = list(self.id_positions.values())
idx = np.where(offsets == self.id_positions[id])[0][0]
byte_len = offsets[idx + 1] - offsets[idx] - 2
self.skip_Nbyte(byte_len)
if self._debug_level > -1:
logging.debug(f"Skipping ID code {id}\n")
def check_offset(self, offset, readbytes):
fd = self.f
if offset != 4 and self._fixoffset == 0:
if self._debug_level > 0:
if fd.tell() == self._filesize:
logging.error(
" EOF reached unexpectedly - discarding this last ensemble\n"
)
else:
logging.debug(
" Adjust location by {:d} (readbytes={:d},hdr['nbyte']={:d})\n".format(
offset, readbytes, self.hdr["nbyte"]
)
)
self._fixoffset = offset - 4
fd.seek(4 + self._fixoffset, 1)
def remove_end(self, iens):
dat = self.outd
if self._debug_level > 0:
logging.info(" Encountered end of file. Cleaning up data.")
for nm in self.vars_read:
defs._setd(dat, nm, defs._get(dat, nm)[..., :iens])
def save_profiles(self, dat, nm, en, iens):
ds = defs._get(dat, nm)
if self.n_avg == 1:
bn = en[nm][..., 0]
else:
bn = np.nanmean(en[nm], axis=-1)
# If n_cells has changed (RiverPro/StreamPro WinRiver transects)
if len(ds.shape) == 3:
if "_sl" in nm:
# This works here b/c the max number of surface layer cells
# is smaller than the min number of normal profile cells used.
# Extra nan cells created after this if-statement
# are trimmed off in self.cleanup.
bn = bn[: self.cfg["n_cells_sl"]]
else:
# Set bn to current ping size
bn = bn[: self.cfg["n_cells"]]
# If n_cells has increased, we also need to increment defs
if self.n_cells_diff > 0:
a = np.empty((self.n_cells_diff, ds.shape[1], ds.shape[2])) * np.nan
ds = np.append(ds, a.astype(ds.dtype), axis=0)
defs._setd(dat, nm, ds)
# If the number of cells decreases, set extra cells to nan instead of
# whatever is stuck in memory
if ds.shape[0] != bn.shape[0]:
n_cells = ds.shape[0] - bn.shape[0]
a = np.empty((n_cells, bn.shape[1])) * np.nan
bn = np.append(bn, a.astype(ds.dtype), axis=0)
# Keep track of when the cell size changes
if self.cs_diff:
self.cs.append([iens, self.cfg["cell_size"]])
self.cs_diff = 0
# Then copy the ensemble to the dataset.
ds[..., iens] = bn
defs._setd(dat, nm, ds)
return dat
def cleanup(self, dat, cfg):
# Clean up changing cell size, if necessary
cs = np.array(self.cs)
cell_sizes = cs[:, 1]
# If cell sizes change, depth-bin average the smaller cell sizes
if len(self.cs) > 1:
bins_to_merge = cell_sizes.max() / cell_sizes
idx_start = cs[:, 0].astype(int)
idx_end = np.append(cs[1:, 0], self._nens).astype(int)
dv = dat["data_vars"]
for var in dv:
if (len(dv[var].shape) == 3) and ("_sl" not in var):
# Create a new NaN var to save data in
new_var = (np.zeros(dv[var].shape) * np.nan).astype(dv[var].dtype)
# For each cell size change, reshape and bin-average
for id1, id2, b in zip(idx_start, idx_end, bins_to_merge):
array = np.transpose(dv[var][..., id1:id2])
bin_arr = np.transpose(np.mean(self.reshape(array, b), axis=-1))
new_var[: len(bin_arr), :, id1:id2] = bin_arr
# Reset data. This often leaves nan data at farther ranges
dv[var] = new_var
# Set cell size and range
cfg["n_cells"] = self.ensemble["n_cells"]
cfg["cell_size"] = cell_sizes.max()
dat["coords"]["range"] = (
cfg["bin1_dist_m"] + np.arange(cfg["n_cells"]) * cfg["cell_size"]
)
# Save configuration data as attributes
for nm in cfg:
dat["attrs"][nm] = cfg[nm]
# Clean up surface layer profiles
if "surface_layer" in cfg: # RiverPro/StreamPro
dat["coords"]["range_sl"] = (
cfg["bin1_dist_m_sl"]
+ np.arange(0, self.n_cells_sl) * cfg["cell_size_sl"]
)
# Trim off extra nan data
dv = dat["data_vars"]
for var in dv:
if "sl" in var:
dv[var] = dv[var][: self.n_cells_sl]
dat["attrs"]["rotate_vars"].append("vel_sl")
return dat, cfg
def reshape(self, arr, n_bin=None):
"""
Reshape the array `arr` to shape (...,n,n_bin).
"""
out = np.zeros(
list(arr.shape[:-1]) + [int(arr.shape[-1] // n_bin), int(n_bin)],
dtype=arr.dtype,
)
shp = out.shape
if np.mod(n_bin, 1) == 0:
# n_bin needs to be int
n_bin = int(n_bin)
# If n_bin is an integer, we can do this simply.
out[..., :n_bin] = (arr[..., : (shp[-2] * shp[-1])]).reshape(shp, order="C")
else:
inds = (np.arange(np.prod(shp[-2:])) * n_bin // int(n_bin)).astype(int)
# If there are too many indices, drop one bin
if inds[-1] >= arr.shape[-1]:
inds = inds[: -int(n_bin)]
shp[-2] -= 1
out = out[..., 1:, :]
n_bin = int(n_bin)
out[..., :n_bin] = (arr[..., inds]).reshape(shp, order="C")
n_bin = int(n_bin)
return out
def finalize(self, dat):
"""
Remove the attributes from the data that were never loaded.
"""
for nm in set(defs.data_defs.keys()) - self.vars_read:
defs._pop(dat, nm)
for nm in self.cfg:
dat["attrs"][nm] = self.cfg[nm]
# VMDAS and WinRiver have different set sampling frequency
da = dat["attrs"]
if ("sourceprog" in da) and (
da["sourceprog"].lower() in ["vmdas", "winriver", "winriver2"]
):
da["fs"] = round(1 / np.median(np.diff(dat["coords"]["time"])), 2)
else:
da["fs"] = 1 / (da["sec_between_ping_groups"] * da["pings_per_ensemble"])
for nm in defs.data_defs:
shp = defs.data_defs[nm][0]
if len(shp) and shp[0] == "nc" and defs._in_group(dat, nm):
defs._setd(dat, nm, np.swapaxes(defs._get(dat, nm), 0, 1))
return dat