import numpy as np
import xarray as xr
import warnings
from os.path import getsize
from pathlib import Path
import logging
from . import base
from .. import time as tmlib
from . import rdi_lib as lib
from . import rdi_defs as defs
from ..rotate.rdi import _calc_beam_orientmat, _calc_orientmat
from ..rotate.base import _set_coords
from ..rotate.api import set_declination
[docs]
def read_rdi(
filename,
userdata=None,
nens=None,
debug_level=-1,
vmdas_search=False,
winriver=False,
**kwargs,
) -> xr.Dataset:
"""
Read a TRDI binary data file.
Parameters
----------
filename : string
Filename of TRDI file to read.
userdata : True, False, or string of userdata.json filename
Whether to read the '<base-filename>.userdata.json' file. Default = True
nens : None, int or 2-element tuple (start, stop)
Number of pings or ensembles to read from the file.
Default is None, read entire file
debug_level : int
Debug level [0 - 2]. Default = -1
vmdas_search : bool
Search from the end of each ensemble for the VMDAS navigation
block. The byte offsets are sometimes incorrect. Default = False
winriver : bool
If file is winriver or not. Automatically set by dolfyn, this is helpful
for debugging. Default = False
Returns
-------
ds : xarray.Dataset
An xarray dataset from the binary instrument data
"""
# Start debugger logging
if debug_level >= 0:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
filepath = Path(filename)
logfile = filepath.with_suffix(".dolfyn.log")
logging.basicConfig(
filename=str(logfile),
filemode="w",
level=logging.NOTSET,
format="%(name)s - %(levelname)s - %(message)s",
)
# Reads into a dictionary of dictionaries using netcdf naming conventions
# Should be easier to debug
rdr = _RDIReader(
filename, debug_level=debug_level, vmdas_search=vmdas_search, winriver=winriver
)
datNB, datBB = rdr.load_data(nens=nens)
dats = [dat for dat in [datNB, datBB] if dat is not None]
# Read in userdata
userdata = base._find_userdata(filename, userdata)
dss = []
for dat in dats:
for nm in userdata:
dat["attrs"][nm] = userdata[nm]
# Pass one if only one ds returned
if not np.isfinite(dat["coords"]["time"][0]):
continue
# GPS data not necessarily sampling at the same rate as ADCP DAQ.
if "time_gps" in dat["coords"]:
dat = base._remove_gps_duplicates(dat)
# Convert time coords to dt64
t_coords = [t for t in dat["coords"] if "time" in t]
for ky in t_coords:
dat["coords"][ky] = tmlib.epoch2dt64(dat["coords"][ky])
# Convert time vars to dt64
t_data = [t for t in dat["data_vars"] if "time" in t]
for ky in t_data:
dat["data_vars"][ky] = tmlib.epoch2dt64(dat["data_vars"][ky])
# Create xarray dataset from upper level dictionary
ds = base._create_dataset(dat)
ds = _set_coords(ds, ref_frame=ds.coord_sys)
# Create orientation matrices
if "beam2inst_orientmat" not in ds:
ds["beam2inst_orientmat"] = xr.DataArray(
_calc_beam_orientmat(ds.beam_angle, ds.beam_pattern == "convex"),
coords={"x1": [1, 2, 3, 4], "x2": [1, 2, 3, 4]},
dims=["x1", "x2"],
attrs={"units": "1", "long_name": "Rotation Matrix"},
)
if "orientmat" not in ds:
ds["orientmat"] = _calc_orientmat(ds)
# Check magnetic declination if provided via software and/or userdata
_set_rdi_declination(ds, filename, inplace=True)
dss += [ds]
if len(dss) == 2:
warnings.warn(
"\nTwo profiling configurations retrieved from file" "\nReturning first."
)
# Close handler
if debug_level >= 0:
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
handler.close()
return dss[0]
def _set_rdi_declination(dat, fname, inplace):
"""
If magnetic_var_deg is set, this means that the declination is already
included in the heading and in the velocity data.
"""
declin = dat.attrs.pop("declination", None) # userdata declination
if dat.attrs["magnetic_var_deg"] != 0: # from TRDI software if set
dat.attrs["declination"] = dat.attrs["magnetic_var_deg"]
dat.attrs["declination_in_orientmat"] = 1 # logical
if dat.attrs["magnetic_var_deg"] != 0 and declin is not None:
warnings.warn(
"'magnetic_var_deg' is set to {:.2f} degrees in the binary "
"file '{}', AND 'declination' is set in the 'userdata.json' "
"file. DOLfYN WILL USE THE VALUE of {:.2f} degrees in "
"userdata.json. If you want to use the value in "
"'magnetic_var_deg', delete the value from userdata.json and "
"re-read the file.".format(dat.attrs["magnetic_var_deg"], fname, declin)
)
dat.attrs["declination"] = declin
if declin is not None:
set_declination(dat, declin, inplace)
class _RDIReader:
def __init__(
self, fname, navg=1, debug_level=-1, vmdas_search=False, winriver=False
):
self.fname = base._abspath(fname)
print("\nReading file {} ...".format(fname))
self._debug_level = debug_level
self._vmdas_search = vmdas_search
self._winrivprob = winriver
self._vm_source = 0
self._pos = 0
self.progress = 0
self._cfac32 = np.float32(180 / 2**31) # signed 32 to float
self._cfac16 = np.float32(180 / 2**15) # unsigned16 to float
self._fixoffset = 0
self._nbyte = 0
self.n_cells_diff = 0
self.n_cells_sl = 0
self.cs_diff = 0
self.cs = []
self.cfg = {}
self.cfgbb = {}
self.hdr = {}
self.f = lib.bin_reader(self.fname)
# Check header, double buffer, and get filesize
self._filesize = getsize(self.fname)
space = self.code_spacing() # '0x7F'
self._npings = self._filesize // space
if self._debug_level > -1:
logging.info("Done: {}".format(self.cfg))
logging.info("self._bb {}".format(self._bb))
logging.info("self.cfgbb: {}".format(self.cfgbb))
self.f.seek(self._pos, 0)
self.n_avg = navg
self.ensemble = lib._ensemble(self.n_avg, self.cfg["n_cells"])
if self._bb:
self.ensembleBB = lib._ensemble(self.n_avg, self.cfgbb["n_cells"])
self.vars_read = lib._variable_setlist(["time"])
if self._bb:
self.vars_readBB = lib._variable_setlist(["time"])
def code_spacing(self, iternum=50):
"""
Returns the average spacing, in bytes, between pings.
Repeat this * iternum * times(default 50).
"""
fd = self.f
p0 = self._pos
# Get basic header data and check dual profile
if not self.read_hdr():
raise RuntimeError("No header in this file")
self._bb = self.check_for_double_buffer()
# Turn off debugging to check code spacing
debug_level = self._debug_level
self._debug_level = -1
for i in range(iternum):
try:
self.read_hdr()
except:
break
# Compute the average of the data size:
size = (self._pos - p0) / (i + 1)
self.f = fd
self._pos = p0
self._debug_level = debug_level
return size
def read_hdr(self):
"""Scan file until 7f7f is found"""
if not self.search_buffer():
return False
self._pos = self.f.tell() - 2
self.read_hdrseg()
return True
def read_hdrseg(self):
fd = self.f
hdr = self.hdr
hdr["nbyte"] = fd.read_i16(1)
spare = fd.read_ui8(1)
ndat = fd.read_ui8(1)
hdr["dat_offsets"] = fd.read_ui16(ndat)
self._nbyte = 4 + ndat * 2
def check_for_double_buffer(self):
"""
VMDAS will record two buffers in NB or NB/BB mode, so we need to
figure out if that is happening here
"""
found = False
pos = self.f.pos
if self._debug_level > -1:
logging.info(self.hdr)
logging.info("pos {}".format(pos))
self.id_positions = {}
for offset in self.hdr["dat_offsets"]:
self.f.seek(offset + pos - self.hdr["dat_offsets"][0], rel=0)
id = self.f.read_ui16(1)
self.id_positions[id] = offset
if self._debug_level > -1:
logging.info("id {} offset {}".format(id, offset))
if id == 1:
defs.read_fixed(self, bb=True)
found = True
elif id == 0:
defs.read_fixed(self, bb=False)
elif id == 16:
defs.read_fixed_sl(self) # bb=True
elif id == 8192:
self._vmdas_search = True
return found
def load_data(self, nens=None):
"""Main function run after reader class is initiated."""
if nens is None:
# Attempt to overshoot WinRiver2 or *Pro filesize
if (self.cfg["coord_sys"] == "ship") or (
self.cfg["inst_model"]
in [
"RiverPro",
"StreamPro",
]
):
self._nens = int(self._filesize / self.hdr["nbyte"] / self.n_avg * 1.1)
else:
# Attempt to overshoot other instrument filesizes
self._nens = int(self._npings / self.n_avg)
elif nens.__class__ is tuple or nens.__class__ is list:
raise Exception(" `nens` must be a integer")
else:
self._nens = nens
if self._debug_level > -1:
logging.info(" taking data from pings 0 - %d" % self._nens)
logging.info(" %d ensembles will be produced.\n" % self._nens)
self.init_data()
for iens in range(self._nens):
if not self.read_buffer():
self.remove_end(iens)
break
self.ensemble.clean_data()
if self._bb:
self.ensembleBB.clean_data()
ens = [self.ensemble]
vars = [self.vars_read]
datl = [self.outd]
cfgl = [self.cfg]
if self._bb:
ens += [self.ensembleBB]
vars += [self.vars_readBB]
datl += [self.outdBB]
cfgl += [self.cfgbb]
for var, en, dat in zip(vars, ens, datl):
for nm in var:
dat = self.save_profiles(dat, nm, en, iens)
# reset flag after all variables run
self.n_cells_diff = 0
# Set clock
clock = en.rtc[:, :]
if clock[0, 0] < 100:
clock[0, :] += defs.century
try:
dates = tmlib.date2epoch(
tmlib.datetime(
*clock[:6, 0], microsecond=int(float(clock[6, 0]) * 10000)
)
)[0]
except ValueError:
warnings.warn(
"Invalid time stamp in ping {}.".format(
int(self.ensemble.number[0])
)
)
dat["coords"]["time"][iens] = np.nan
else:
dat["coords"]["time"][iens] = np.median(dates)
# Finalize dataset (runs through both nb and bb)
for dat, cfg in zip(datl, cfgl):
dat, cfg = self.cleanup(dat, cfg)
dat = self.finalize(dat)
if "vel_bt" in dat["data_vars"]:
dat["attrs"]["rotate_vars"].append("vel_bt")
datbb = self.outdBB if self._bb else None
return self.outd, datbb
def init_data(self):
"""Initiate data structure"""
outd = {
"data_vars": {},
"coords": {},
"attrs": {},
"units": {},
"long_name": {},
"standard_name": {},
"sys": {},
}
outd["attrs"]["inst_make"] = "TRDI"
outd["attrs"]["inst_type"] = "ADCP"
outd["attrs"]["rotate_vars"] = [
"vel",
]
# Currently RDI doesn't use IMUs
outd["attrs"]["has_imu"] = 0
if self._bb:
outdbb = {
"data_vars": {},
"coords": {},
"attrs": {},
"units": {},
"long_name": {},
"standard_name": {},
"sys": {},
}
outdbb["attrs"]["inst_make"] = "TRDI"
outdbb["attrs"]["inst_type"] = "ADCP"
outdbb["attrs"]["rotate_vars"] = [
"vel",
]
outdbb["attrs"]["has_imu"] = 0
# Preallocate variables and data sizes
for nm in defs.data_defs:
outd = lib._idata(
outd, nm, sz=lib._get_size(nm, self._nens, self.cfg["n_cells"])
)
self.outd = outd
if self._bb:
for nm in defs.data_defs:
outdbb = lib._idata(
outdbb, nm, sz=lib._get_size(nm, self._nens, self.cfgbb["n_cells"])
)
self.outdBB = outdbb
if self._debug_level > 1:
logging.info(np.shape(outdbb["data_vars"]["vel"]))
if self._debug_level > 1:
logging.info("{} ncells, not BB".format(self.cfg["n_cells"]))
if self._bb:
logging.info("{} ncells, BB".format(self.cfgbb["n_cells"]))
def read_buffer(self):
"""Read through the file"""
fd = self.f
self.ensemble.k = -1 # so that k+=1 gives 0 on the first loop.
if self._bb:
self.ensembleBB.k = -1 # so that k+=1 gives 0 on the first loop.
self.print_progress()
hdr = self.hdr
while self.ensemble.k < self.ensemble.n_avg - 1:
if not self.search_buffer():
return False
startpos = fd.tell() - 2
self.read_hdrseg()
if self._debug_level > -1:
logging.info("Read Header", hdr)
byte_offset = self._nbyte + 2
self._read_vmdas = False
for n in range(len(hdr["dat_offsets"])):
id = fd.read_ui16(1)
if self._debug_level > 0:
logging.info(f"n {n}: {id} {id:04x}")
self.print_pos()
retval = self.read_dat(id)
if retval == "FAIL":
break
byte_offset += self._nbyte
if n < (len(hdr["dat_offsets"]) - 1):
oset = hdr["dat_offsets"][n + 1] - byte_offset
if oset != 0:
if self._debug_level > 0:
logging.debug(" %s: Adjust location by %d\n" % (id, oset))
fd.seek(oset, 1)
byte_offset = hdr["dat_offsets"][n + 1]
else:
if hdr["nbyte"] - 2 != byte_offset:
if not self._winrivprob:
if self._debug_level > 0:
logging.debug(
" {:d}: Adjust location by {:d}\n".format(
id, hdr["nbyte"] - 2 - byte_offset
)
)
self.f.seek(hdr["nbyte"] - 2 - byte_offset, 1)
byte_offset = hdr["nbyte"] - 2
# Check for vmdas again because vmdas doesn't set the offsets
# correctly, and we need this info:
if not self._read_vmdas and self._vmdas_search:
if self._debug_level > 0:
logging.info("Searching for vmdas nav data. Going to next ensemble")
self.search_buffer()
# now go back to where vmdas would be:
fd.seek(-98, 1)
id = self.f.read_ui16(1)
if id is not None:
if self._debug_level > 0:
logging.info(f"Found {id:04d}")
if id == 8192:
self.read_dat(id)
readbytes = fd.tell() - startpos
offset = hdr["nbyte"] + 2 - readbytes
self.check_offset(offset, readbytes)
self.print_pos(byte_offset=byte_offset)
return True
def search_buffer(self):
"""
Check to see if the next bytes indicate the beginning of a
data block. If not, search for the next data block, up to
_search_num times.
"""
fd = self.f
id = fd.read_ui8(2)
if id is None:
return False
cfgid = list(id)
pos_7f79 = False
search_cnt = 0
if self._debug_level > -1:
logging.info("pos {}".format(fd.pos))
logging.info("cfgid0: [{:x}, {:x}]".format(*cfgid))
# If not [127, 127] or if the file ends in the next ensemble
while (cfgid != [127, 127]) or self.check_eof():
if cfgid == [127, 121]:
# Search for the next header or the end of the file
skipbytes = fd.read_i16(1)
fd.seek(skipbytes - 2, 1)
id = fd.read_ui8(2)
if id is None: # EOF
return False
cfgid = list(id)
pos_7f79 = True
else:
# Search til we find something or hit the end of the file
search_cnt += 1
nextbyte = fd.read_ui8(1)
if nextbyte is None: # EOF
return False
cfgid[0] = cfgid[1]
cfgid[1] = nextbyte
if pos_7f79 and self._debug_level > -1:
logging.info("Skipped junk data: [{:x}, {:x}]".format(*[127, 121]))
if search_cnt > 0:
if self._debug_level > 0:
logging.info(
" Searched {} bytes to find next "
"valid ensemble start [{:x}, {:x}]\n".format(search_cnt, *cfgid)
)
return True
def check_eof(self):
"""Returns True if next header is bad or at end of file."""
fd = self.f
out = True
numbytes = fd.read_i16(1)
# Search for next config id
if numbytes > 0:
fd.seek(numbytes - 2, 1)
cfgid = fd.read_ui8(2)
if cfgid is None:
if self._debug_level > 1:
logging.info("EOF")
return True
# Make sure one is found, either 7f7f or 7f79
if len(cfgid) == 2:
fd.seek(-numbytes - 2, 1)
if cfgid[0] == 127 and cfgid[1] in [127, 121]:
out = False
else:
fd.seek(-2, 1)
return out
def print_progress(self):
"""Print the buffer progress, used for debugging."""
self.progress = self.f.tell()
if self._debug_level > 1:
logging.debug(
" pos %0.0fmb/%0.0fmb\n"
% (self.f.tell() / 1048576, self._filesize / 1048576)
)
if (self.f.tell() - self.progress) < 1048576:
return
def print_pos(self, byte_offset=-1):
"""Print the position in the file, used for debugging."""
if self._debug_level > 1:
if hasattr(self, "ensemble"):
k = self.ensemble.k
else:
k = 0
logging.debug(
f" pos: {self.f.tell()}, pos_: {self._pos}, nbyte: {self._nbyte}, k: {k}, byte_offset: {byte_offset}"
)
def read_dat(self, id):
"""Main function map used to read or skip stored IDs"""
function_map = {
# 0000 1st profile fixed leader
0: (defs.read_fixed, [False]),
# 0001 2nd profile fixed leader
1: (defs.read_fixed, [True]),
# 0010 Surface layer fixed leader (RiverPro & StreamPro)
16: (defs.read_fixed_sl, []),
# 0080 1st profile variable leader
128: (defs.read_var, [False]),
# 0081 2nd profile variable leader
129: (defs.read_var, [True]),
# 0100 1st profile velocity
256: (defs.read_vel, [0]),
# 0101 2nd profile velocity
257: (defs.read_vel, [1]),
# 0103 Waves first leader
259: (defs.skip_Nbyte, [74]),
# 0110 Surface layer velocity (RiverPro & StreamPro)
272: (defs.read_vel, [2]),
# 0200 1st profile correlation
512: (defs.read_corr, [0]),
# 0201 2nd profile correlation
513: (defs.read_corr, [1]),
# 0203 Waves data
515: (defs.skip_Nbyte, [186]),
# 020C Ambient sound profile
524: (defs.skip_Nbyte, [4]),
# 0210 Surface layer correlation (RiverPro & StreamPro)
528: (defs.read_corr, [2]),
# 0300 1st profile amplitude
768: (defs.read_amp, [0]),
# 0301 2nd profile amplitude
769: (defs.read_amp, [1]),
# 0302 Beam 5 Sum of squared velocities
770: (defs.skip_Ncol, []),
# 0303 Waves last leader
771: (defs.skip_Ncol, [18]),
# 0310 Surface layer amplitude (RiverPro & StreamPro)
784: (defs.read_amp, [2]),
# 0400 1st profile % good
1024: (defs.read_prcnt_gd, [0]),
# 0401 2nd profile pct good
1025: (defs.read_prcnt_gd, [1]),
# 0403 Waves HPR data
1027: (defs.skip_Nbyte, [6]),
# 0410 Surface layer pct good (RiverPro & StreamPro)
1040: (defs.read_prcnt_gd, [2]),
# 0500 1st profile status
1280: (defs.read_status, [0]),
# 0501 2nd profile status
1281: (defs.read_status, [1]),
# 0510 Surface layer status (RiverPro & StreamPro)
1296: (defs.read_status, [2]),
1536: (defs.read_bottom, []), # 0600 bottom tracking
1793: (defs.skip_Ncol, [4]), # 0701 number of pings
1794: (defs.skip_Ncol, [4]), # 0702 sum of squared vel
1795: (defs.skip_Ncol, [4]), # 0703 sum of velocities
2560: (defs.skip_Ncol, []), # 0A00 Beam 5 velocity
2816: (defs.skip_Ncol, []), # 0B00 Beam 5 correlation
3072: (defs.skip_Ncol, []), # 0C00 Beam 5 amplitude
3328: (defs.skip_Ncol, []), # 0D00 Beam 5 pct_good
# Fixed attitude data format for Ocean Surveyor ADCPs
3000: (defs.skip_Nbyte, [32]),
3841: (defs.skip_Nbyte, [38]), # 0F01 Beam 5 leader
8192: (defs.read_vmdas, []), # 2000
# 2013 Navigation parameter data
8211: (defs.skip_Nbyte, [83]),
8226: (defs.read_winriver2, []), # 2022
8448: (defs.read_winriver, []), # 2100
8449: (defs.read_winriver, []), # 2101
8450: (defs.read_winriver, []), # 2102
8451: (defs.read_winriver, []), # 2103
8452: (defs.read_winriver, []), # 2104
# 3200 Transformation matrix
12800: (defs.skip_Nbyte, [32]),
# 3000 Fixed attitude data format for Ocean Surveyor ADCPs
12288: (defs.skip_Nbyte, [32]),
12496: (defs.skip_Nbyte, [24]), # 30D0
12504: (defs.skip_Nbyte, [48]), # 30D8
# 4100 beam 5 range
16640: (defs.read_alt, []),
# 4400 Firmware status data (RiverPro & StreamPro)
17408: (defs.skip_Nbyte, [28]),
# 4401 Auto mode setup (RiverPro & StreamPro)
17409: (defs.skip_Nbyte, [82]),
# 5803 High resolution bottom track velocity
22531: (defs.skip_Nbyte, [68]),
# 5804 Bottom track range
22532: (defs.skip_Nbyte, [21]),
# 5901 ISM (IMU) data
22785: (defs.skip_Nbyte, [65]),
# 5902 Ping attitude
22786: (defs.skip_Nbyte, [105]),
# 7001 ADC data
28673: (defs.skip_Nbyte, [14]),
}
# Call the correct function:
if self._debug_level > 1:
logging.debug(f"Trying to Read {id}")
if id in function_map:
if self._debug_level > 1:
logging.info(" Reading code {}...".format(hex(id)))
retval = function_map.get(id)[0](self, *function_map[id][1])
if retval:
return retval
if self._debug_level > 1:
logging.info(" success!")
else:
self.read_nocode(id)
def read_nocode(self, id):
"""Identify filler or unknown bytes and bypass them"""
# Skipping bytes from codes 0340-30FC, commented if needed
hxid = hex(id)
if hxid[2:4] == "30":
logging.warning("Skipping bytes from codes 0340-30FC")
# I want to count the number of 1s in the middle 4 bits
# of the 2nd two bytes.
# 60 is a 0b00111100 mask
nflds = bin(int(hxid[3]) & 60).count("1") + bin(int(hxid[4]) & 60).count(
"1"
)
# I want to count the number of 1s in the highest
# 2 bits of byte 3
# 3 is a 0b00000011 mask:
dfac = bin(int(hxid[3], 0) & 3).count("1")
defs.skip_Nbyte(self, 12 * nflds * dfac)
else:
if self._debug_level > -1:
logging.warning(" Unrecognized ID code: %0.4X" % id)
self.skip_nocode(id)
def skip_nocode(self, id):
"""
Skips bytes when an ID code is not found in the function map.
This method calculates the byte length to skip based on the positions
of known ID codes and uses this length to bypass filler or irrelevant data.
Parameters
----------
id : int
The ID code that is not present in the function map.
"""
offsets = list(self.id_positions.values())
idx = np.where(offsets == self.id_positions[id])[0][0]
byte_len = offsets[idx + 1] - offsets[idx] - 2
defs.skip_Nbyte(self, byte_len)
if self._debug_level > -1:
logging.debug(f"Skipping ID code {id}\n")
def check_offset(self, offset, readbytes):
"""
Checks and adjusts the file position based on the distance to the nearest function ID.
If the provided `offset` differs from the expected value and `_fixoffset` is zero,
this method updates `_fixoffset` and adjusts the file position in the data file
(`self.f`) accordingly. This adjustment is logged if `_debug_level` is set to a
positive value.
Parameters
----------
offset : int
The current offset from the expected position.
readbytes : int
The number of bytes that have been read so far.
"""
fd = self.f
if offset != 4 and self._fixoffset == 0:
if self._debug_level > 0:
if fd.tell() == self._filesize:
logging.error(
" EOF reached unexpectedly - discarding this last ensemble\n"
)
else:
logging.debug(
" Adjust location by {:d} (readbytes={:d},hdr['nbyte']={:d})\n".format(
offset, readbytes, self.hdr["nbyte"]
)
)
self._fixoffset = offset - 4
fd.seek(4 + self._fixoffset, 1)
def remove_end(self, iens):
"""
Removes incomplete measurements from the dataset.
This method cleans up any partially read data by truncating measurements
to the specified ensemble index (`iens`). This is typically called upon
reaching the end of the file to ensure only complete data is retained.
Parameters
----------
iens : int
The index up to which data is considered complete and should be retained.
"""
dat = self.outd
if self._debug_level > 0:
logging.info(" Encountered end of file. Cleaning up data.")
for nm in self.vars_read:
lib._setd(dat, nm, lib._get(dat, nm)[..., :iens])
def save_profiles(self, dat, nm, en, iens):
"""
Reformats profile measurements in the retrieved measurements.
This method processes profile measurements from individual pings,
adapting to changing cell counts and cell sizes as needed (from the WinRiver2
program with one of the ***Pro ADCPs).
Parameters
----------
dat : dict
Raw data dictionary
nm : str
The name of the profile variable
en : dict
The dictionary containing ensemble profiles
iens : int
The index of the current ensemble
Returns
-------
dict
The updated dataset dictionary with the reformatted profile measurements.
"""
ds = lib._get(dat, nm)
if self.n_avg == 1:
bn = en[nm][..., 0]
else:
bn = np.nanmean(en[nm], axis=-1)
# If n_cells has changed (RiverPro/StreamPro WinRiver transects)
if len(ds.shape) == 3:
if "_sl" in nm:
# This works here b/c the max number of surface layer cells
# is smaller than the min number of normal profile cells used.
# Extra nan cells created after this if-statement
# are trimmed off in self.cleanup.
bn = bn[: self.cfg["n_cells_sl"]]
else:
# Set bn to current ping size
bn = bn[: self.cfg["n_cells"]]
# If n_cells has increased, we also need to increment defs
if self.n_cells_diff > 0:
a = np.empty((self.n_cells_diff, ds.shape[1], ds.shape[2])) * np.nan
ds = np.append(ds, a.astype(ds.dtype), axis=0)
lib._setd(dat, nm, ds)
# If the number of cells decreases, set extra cells to nan instead of
# whatever is stuck in memory
if ds.shape[0] != bn.shape[0]:
n_cells = ds.shape[0] - bn.shape[0]
a = np.empty((n_cells, bn.shape[1])) * np.nan
bn = np.append(bn, a.astype(ds.dtype), axis=0)
# Keep track of when the cell size changes
if self.cs_diff:
self.cs.append([iens, self.cfg["cell_size"]])
self.cs_diff = 0
# Then copy the ensemble to the dataset.
ds[..., iens] = bn
lib._setd(dat, nm, ds)
return dat
def cleanup(self, dat, cfg):
"""
Cleans up recorded data by adjusting variable cell sizes and profile ranges.
This method handles adjustments when cell sizes change during data collection,
performing depth-bin averaging for smaller cells if needed. It also updates
the configuration data, range coordinates, and manages any surface layer profiles.
Parameters
----------
dat : dict
The dataset dictionary containing data variables and coordinates to be cleaned up.
cfg : dict
Configuration dictionary, which is updated with cell size, range, and additional
attributes after cleanup.
Returns
-------
tuple
- dict : The updated dataset dictionary with cleaned data.
- dict : The updated configuration dictionary with new attributes.
"""
# Clean up changing cell size, if necessary
cs = np.array(self.cs, dtype=np.float32)
cell_sizes = cs[:, 1]
# If cell sizes change, depth-bin average the smaller cell sizes
if len(self.cs) > 1:
bins_to_merge = cell_sizes.max() / cell_sizes
idx_start = cs[:, 0].astype(int)
idx_end = np.append(cs[1:, 0], self._nens).astype(int)
dv = dat["data_vars"]
for var in dv:
if (len(dv[var].shape) == 3) and ("_sl" not in var):
# Create a new NaN var to save data in
new_var = (np.zeros(dv[var].shape) * np.nan).astype(dv[var].dtype)
# For each cell size change, reshape and bin-average
for id1, id2, b in zip(idx_start, idx_end, bins_to_merge):
array = np.transpose(dv[var][..., id1:id2])
bin_arr = np.transpose(np.mean(self.reshape(array, b), axis=-1))
new_var[: len(bin_arr), :, id1:id2] = bin_arr
# Reset data. This often leaves nan data at farther ranges
dv[var] = new_var
# Set cell size and range
cfg["n_cells"] = self.ensemble["n_cells"]
cfg["cell_size"] = round(cell_sizes.max(), 3)
dat["coords"]["range"] = (
cfg["bin1_dist_m"] + np.arange(cfg["n_cells"]) * cfg["cell_size"]
).astype(np.float32)
# Save configuration data as attributes
for nm in cfg:
dat["attrs"][nm] = cfg[nm]
# Clean up surface layer profiles
if "surface_layer" in cfg: # RiverPro/StreamPro
dat["coords"]["range_sl"] = (
cfg["bin1_dist_m_sl"]
+ np.arange(0, self.n_cells_sl) * cfg["cell_size_sl"]
)
# Trim off extra nan data
dv = dat["data_vars"]
for var in dv:
if "sl" in var:
dv[var] = dv[var][: self.n_cells_sl]
dat["attrs"]["rotate_vars"].append("vel_sl")
return dat, cfg
def reshape(self, arr, n_bin=None):
"""
Reshapes the input array `arr` to a shape of (..., n, n_bin).
Parameters
----------
arr : np.ndarray
The array to reshape. The last dimension of `arr` will be divided into
`(n, n_bin)` based on the value of `n_bin`.
n_bin : int or float, optional
The bin size for reshaping. If `n_bin` is an integer, it divides the
last dimension directly. If not, indices are adjusted, and excess
bins may be removed. Default is None.
Returns
-------
np.ndarray
The reshaped array with shape (..., n, n_bin).
"""
out = np.zeros(
list(arr.shape[:-1]) + [int(arr.shape[-1] // n_bin), int(n_bin)],
dtype=arr.dtype,
)
shp = out.shape
if np.mod(n_bin, 1) == 0:
# n_bin needs to be int
n_bin = int(n_bin)
# If n_bin is an integer, we can do this simply.
out[..., :n_bin] = (arr[..., : (shp[-2] * shp[-1])]).reshape(shp, order="C")
else:
inds = np.arange(np.prod(shp[-2:])) * n_bin // int(n_bin)
# If there are too many indices, drop one bin
if inds[-1] >= arr.shape[-1]:
inds = inds[: -int(n_bin)]
shp[-2] -= 1
out = out[..., 1:, :]
n_bin = int(n_bin)
out[..., :n_bin] = (arr[..., inds]).reshape(shp, order="C")
n_bin = int(n_bin)
return out
def finalize(self, dat):
"""
This method cleans up the dataset by removing any attributes that were
defined but not loaded, updates configuration attributes, and sets the
sampling frequency (fs) based on the data source program. Additionally,
it adjusts the axes of certain variables as defined in data_defs.
Parameters
----------
dat : dict
The dataset dictionary to be finalized. This dictionary is modified
in place by removing unused attributes, setting configuration values
as attributes, and calculating `fs`.
Returns
-------
dict
The finalized dataset dictionary with cleaned attributes and added metadata.
"""
for nm in set(defs.data_defs.keys()) - self.vars_read:
lib._pop(dat, nm)
for nm in self.cfg:
dat["attrs"][nm] = self.cfg[nm]
# VMDAS and WinRiver have different set sampling frequency
da = dat["attrs"]
if ("sourceprog" in da) and (
da["sourceprog"].lower() in ["vmdas", "winriver", "winriver2"]
):
da["fs"] = round(1 / np.median(np.diff(dat["coords"]["time"])), 2)
else:
da["fs"] = 1 / (da["sec_between_ping_groups"] * da["pings_per_ensemble"])
for nm in defs.data_defs:
shp = defs.data_defs[nm][0]
if len(shp) and shp[0] == "nc" and lib._in_group(dat, nm):
lib._setd(dat, nm, np.swapaxes(lib._get(dat, nm), 0, 1))
return dat