Source code for mhkit.acoustics.io

"""
This submodule provides input/output functions for passive acoustics data,
focusing on hydrophone recordings stored in WAV files. The main functionality
includes reading and processing hydrophone data from various manufacturers 
and exporting audio files for easy playback and analysis.

Supported Hydrophone Models
---------------------------
- **SoundTrap** (Ocean Instruments)
- **icListen** (Ocean Sonics)

Functions Overview
------------------

1. **Data Reading**:

   - `read_hydrophone`: Main function to read a WAV file from a hydrophone and 
     convert it to either a voltage or pressure time series, depending on the 
     availability of sensitivity data.

   - `read_soundtrap`: Wrapper for reading Ocean Instruments SoundTrap hydrophone 
     files, automatically using appropriate metadata.

   - `read_iclisten`: Wrapper for reading Ocean Sonics icListen hydrophone files, 
     including metadata processing to apply hydrophone sensitivity for direct 
     sound pressure calculation.

2. **Audio Export**:

   - `export_audio`: Converts processed sound pressure data back into a WAV file 
     format, with optional gain adjustment to improve playback quality.

3. **Data Extraction**:

   - `_read_wav_metadata`: Extracts metadata from a WAV file, including bit depth 
     and other header information.

   - `_calculate_voltage_and_time`: Converts raw WAV data into voltage values and 
     generates a time index based on the sampling frequency.
"""

from typing import BinaryIO, Tuple, Dict, Union, Optional, Any
import io
import struct
import wave
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
from scipy.io import wavfile


def _read_wav_metadata(f: BinaryIO) -> dict:
    """
    Extracts the bit depth from a WAV file and skips over any metadata blocks
    that might be present (e.g., 'LIST' chunks).

    Parameters
    ----------
    f : BinaryIO
        An open WAV file in binary mode.

    Returns
    -------
    header : dict
        Dictionary containing .wav file's header data
    """

    header = {}
    f.read(4)  # riff_key
    header["filesize"] = struct.unpack("<I", f.read(4))[0]
    f.read(4)  # wave_key
    list_key = f.read(4)

    # Skip metadata if it exists
    if "LIST" in list_key.decode():
        list_size = struct.unpack("<I", f.read(4))[0]
        f.seek(f.tell() + list_size)
    else:
        f.seek(f.tell() - 4)

    f.read(4)  # fmt_key
    fmt_size = struct.unpack("<I", f.read(4))[0]
    header["compression_code"] = struct.unpack("<H", f.read(2))[0]
    header["n_channels"] = struct.unpack("<H", f.read(2))[0]
    header["sample_rate"] = struct.unpack("<I", f.read(4))[0]
    header["bytes_per_sec"] = struct.unpack("<I", f.read(4))[0]
    header["block_align"] = struct.unpack("<H", f.read(2))[0]
    header["bits_per_sample"] = struct.unpack("<H", f.read(2))[0]
    f.seek(f.tell() + fmt_size - 16)

    return header


def _calculate_voltage_and_time(
    fs: int,
    raw: np.ndarray,
    bits_per_sample: int,
    peak_voltage: Union[int, float],
    start_time: str,
) -> Tuple[np.ndarray, pd.DatetimeIndex, int]:
    """
    Normalizes the raw data from the WAV file to the appropriate voltage and
    calculates the time array based on the sampling frequency.

    Parameters
    ----------
    fs : int
        Sampling frequency of the audio data in Hertz.
    raw : numpy.ndarray
        Raw audio data extracted from the WAV file.
    bits_per_sample : int
        Number of bits per sample in the WAV file.
    peak_voltage : int or float
        Peak voltage supplied to the analog-to-digital converter (ADC) in volts.
    start_time : str, np.datetime64
        Start time of the recording in ISO 8601 format (e.g., '2024-06-06T00:00:00').

    Returns
    -------
    raw_voltage : numpy.ndarray
        Normalized voltage values corresponding to the raw audio data.
    time : pandas.DatetimeIndex
        Time index for the audio data based on the sample rate and start time.
    max_count : int
        Maximum possible count value for the given bit depth, used for normalization.
    """

    if not isinstance(fs, int):
        raise TypeError("Sampling frequency 'fs' must be an integer.")
    if not isinstance(raw, np.ndarray):
        raise TypeError("Raw audio data 'raw' must be a numpy.ndarray.")
    if not isinstance(bits_per_sample, int):
        raise TypeError("'bits_per_sample' must be an integer.")
    if not isinstance(peak_voltage, (int, float)):
        raise TypeError("'peak_voltage' must be numeric (int or float).")
    if not isinstance(start_time, (str, np.datetime64)):
        raise TypeError("'start_time' must be a string or np.datetime64.")

    length = raw.shape[0] // fs  # length of recording in seconds

    if bits_per_sample in [16, 32]:
        max_count = 2 ** (bits_per_sample - 1)
    elif bits_per_sample == 12:
        max_count = 2 ** (16 - 1) - 2**4  # 12 bit read in as 16 bit
    elif bits_per_sample == 24:
        max_count = 2 ** (32 - 1) - 2**8  # 24 bit read in as 32 bit
    else:
        raise IOError(
            f"Unknown how to read {bits_per_sample} bit ADC."
            "Please notify MHKiT team."
        )

    # Normalize and then scale to peak voltage
    # Use 64 bit float for decimal accuracy
    raw_voltage = raw.astype(float) / max_count * peak_voltage

    # Get time
    end_time = np.datetime64(start_time) + np.timedelta64(length * 1000, "ms")
    time = pd.date_range(start_time, end_time, raw.size + 1)

    return raw_voltage, time, max_count


[docs] def read_hydrophone( filename: Union[str, Path], peak_voltage: Union[int, float], sensitivity: Optional[Union[int, float]] = None, gain: Union[int, float] = 0, start_time: str = "2024-01-01T00:00:00", ) -> xr.DataArray: """ Read .wav file from a hydrophone. Returns voltage timeseries if sensitivity not provided, returns pressure timeseries if it is provided. Parameters ---------- filename: str or pathlib.Path Input filename peak_voltage: int or float Peak voltage supplied to the analog to digital converter (ADC) in V. (Or 1/2 of the peak to peak voltage). sensitivity: int or float Hydrophone calibration sensitivity in dB re 1 V/uPa. Should be negative. Default: None. gain: int or float Amplifier gain in dB re 1 V/uPa. Default 0. start_time: str Start time in the format yyyy-mm-ddTHH:MM:SS Returns ------- out: numpy.array Sound pressure [Pa] or Voltage [V] indexed by time[s] """ if not isinstance(filename, (str, Path)): raise TypeError("Filename must be a string or a pathlib.Path object.") if not isinstance(peak_voltage, (int, float)): raise TypeError("'peak_voltage' must be numeric (int or float).") if sensitivity is not None and not isinstance(sensitivity, (int, float)): raise TypeError("'sensitivity' must be numeric (int, float) or None.") if not isinstance(gain, (int, float)): raise TypeError("'gain' must be numeric (int or float).") if not isinstance(start_time, (str, np.datetime64)): raise TypeError("'start_time' must be a string or np.datetime64") if (sensitivity is not None) and (sensitivity > 0): raise ValueError( "Hydrophone calibrated sensitivity should be entered as a negative number." ) # Read metadata from WAV file with open(filename, "rb") as f: header = _read_wav_metadata(f) # Read data using scipy (will auto drop as int16 or int32) fs, raw = wavfile.read(filename) # Calculate raw voltage and time array raw_voltage, time, max_count = _calculate_voltage_and_time( fs, raw, header["bits_per_sample"], peak_voltage, start_time ) # If sensitivity is provided, convert to sound pressure if sensitivity is not None: # Subtract gain # Hydrophone with sensitivity of -177 dB and gain of -3 dB = sensitivity of -174 dB if gain: sensitivity -= gain # Convert calibration from dB rel 1 V/uPa into ratio sensitivity = 10 ** (sensitivity / 20) # V/uPa # Sound pressure pressure = raw_voltage / sensitivity # uPa pressure = pressure / 1e6 # Pa out = xr.DataArray( pressure, coords={"time": time[:-1]}, attrs={ "units": "Pa", "sensitivity": np.round(sensitivity, 12), # Pressure min resolution "resolution": np.round(peak_voltage / max_count / sensitivity / 1e6, 9), # Minimum pressure sensor can read "valid_min": np.round(-peak_voltage / sensitivity / 1e6, 6), # Pressure at which sensor is saturated "valid_max": np.round(peak_voltage / sensitivity / 1e6, 6), "fs": fs, "filename": Path(filename).stem, }, ) else: out = xr.DataArray( raw_voltage, coords={"time": time[:-1]}, attrs={ "units": "V", # Voltage min resolution "resolution": np.round(peak_voltage / max_count, 6), # Minimum voltage sensor can read "valid_min": -peak_voltage, # Voltage at which sensor is saturated "valid_max": peak_voltage, "fs": fs, "filename": Path(filename).stem, }, ) return out
[docs] def read_soundtrap( filename: str, sensitivity: Optional[Union[int, float]] = None, gain: Union[int, float] = 0, ) -> xr.DataArray: """ Read .wav file from an Ocean Instruments SoundTrap hydrophone. Returns voltage timeseries if sensitivity not provided, returns pressure timeseries if it is provided. Parameters ---------- filename : str Input filename. sensitivity : int or float, optional Hydrophone calibration sensitivity in dB re 1 V/μPa. Should be negative. Default is None. gain : int or float Amplifier gain in dB re 1 V/μPa. Default is 0. Returns ------- out : xarray.DataArray Sound pressure [Pa] or Voltage [V] indexed by time[s]. """ # Get time from filename st = filename.split(".")[-2] start_time = ( "20" + st[:2] + "-" + st[2:4] + "-" + st[4:6] + "T" + st[6:8] + ":" + st[8:10] + ":" + st[10:12] ) # Soundtrap uses a peak voltage of 1 V out = read_hydrophone( filename, peak_voltage=1, sensitivity=sensitivity, gain=gain, start_time=start_time, ) out.attrs["make"] = "SoundTrap" return out
def _read_iclisten_metadata(f: io.BufferedIOBase) -> Dict[str, Any]: """ Reads the metadata from the icListen .wav file and returns the metadata in a dictionary. Parameters ---------- f: io.BufferedIOBase Opened .wav file for reading metadata. Returns ------- metadata: dict A dictionary containing metadata such as peak_voltage, stored_sensitivity, humidity, temperature, etc. """ def read_string(f: io.BufferedIOBase) -> dict: """Reads a string from the file based on its size.""" key = f.read(4).decode().lower() # skip 4 bytes to bypass key name item = struct.unpack("<I", f.read(4))[0] return {key: f.read(item).decode().rstrip("\x00")} metadata: Dict[str, Any] = {} # Read header keys riff_key = f.read(4) metadata["filesize"] = struct.unpack("<I", f.read(4))[0] wave_key = f.read(4) # Check if headers are as expected if riff_key != b"RIFF" or wave_key != b"WAVE": raise IOError("Invalid file format or file corrupted.") # Read metadata keys list_key = f.read(4) if list_key != b"LIST": raise KeyError("Missing LIST chunk in WAV file.") list_size_bytes = f.read(4) if len(list_size_bytes) < 4: raise EOFError("Unexpected end of file when reading list size.") info_key = f.read(4) if info_key != b"INFO": raise KeyError("Expected INFO key in metadata but got different key.") # Read metadata and store in the dictionary metadata.update(read_string(f)) # Hydrophone make and SN metadata.update(read_string(f)) # Hydrophone model metadata.update(read_string(f)) # File creation date metadata.update(read_string(f)) # Hydrophone software version metadata.update(read_string(f)) # Original filename # Additional comments icmt_key = f.read(4) if icmt_key != b"ICMT": raise KeyError("Expected ICMT key in metadata but got different key.") icmt_size_bytes = f.read(4) if len(icmt_size_bytes) < 4: raise EOFError("Unexpected end of file when reading ICMT size.") icmt_size = struct.unpack("<I", icmt_size_bytes)[0] icmt_bytes = f.read(icmt_size) if len(icmt_bytes) < icmt_size: raise EOFError("Unexpected end of file when reading ICMT data.") icmt = icmt_bytes.decode().rstrip("\x00") # Parse the fields from comments and update the metadata dictionary fields = icmt.split(",") try: metadata["peak_voltage"] = float(fields[0].split(" ")[0]) metadata["stored_sensitivity"] = int(fields[1].strip().split(" ")[0]) metadata["humidity"] = fields[2].strip() metadata["temperature"] = fields[3].strip() metadata["accelerometer"] = ( ",".join(fields[4:7]).strip() if len(fields) > 6 else None ) metadata["magnetometer"] = ( ",".join(fields[7:10]).strip() if len(fields) > 9 else None ) metadata["count_at_peak_voltage"] = fields[-2].strip() metadata["sequence_num"] = fields[-1].strip() except (IndexError, ValueError) as e: raise ValueError(f"Error parsing metadata comments: {e}") from e # Return a dictionary with metadata return metadata
[docs] def read_iclisten( filename: str, sensitivity: Optional[Union[int, float]] = None, use_metadata: bool = True, ) -> xr.DataArray: """ Read .wav file from an Ocean Sonics icListen "Smart" hydrophone. Returns voltage timeseries if sensitivity not provided, returns pressure timeseries if it is provided. Parameters ---------- filename : str Input filename. sensitivity : int or float, optional Hydrophone calibration sensitivity in dB re 1 V/μPa. Should be negative. Default is None. use_metadata : bool If True and `sensitivity` is None, applies sensitivity value stored in the .wav file's LIST block. If False and `sensitivity` is None, a sensitivity value isn't applied. Returns ------- out : xarray.DataArray Sound pressure [Pa] or Voltage [V] indexed by time[s]. """ if not isinstance(use_metadata, bool): raise TypeError("'use_metadata' must be a boolean value.") # Read icListen metadata from file header with open(filename, "rb") as f: metadata = _read_iclisten_metadata(f) # Use stored sensitivity if use_metadata and sensitivity is None: sensitivity = metadata["stored_sensitivity"] if sensitivity is None: raise ValueError("Stored sensitivity not found in metadata.") # Convert metadata creation date to datetime64 try: start_time = np.datetime64(metadata["icrd"]) except ValueError as e: raise ValueError(f"Invalid creation date format in metadata: {e}") from e out = read_hydrophone( filename, peak_voltage=metadata["peak_voltage"], sensitivity=sensitivity, gain=0, start_time=start_time, ) # Update attributes with metadata out.attrs.update( { "serial_num": metadata["iart"], "model": metadata["iprd"], "software_ver": metadata["isft"], "filename": metadata["inam"] + ".wav", "peak_voltage": metadata["peak_voltage"], "sensitivity": sensitivity, "humidity": metadata["humidity"], "temperature": metadata["temperature"], "accelerometer": metadata["accelerometer"], "magnetometer": metadata["magnetometer"], "count_at_peak_voltage": metadata["count_at_peak_voltage"], "sequence_num": metadata["sequence_num"], } ) return out
[docs] def export_audio( filename: str, pressure: xr.DataArray, gain: Union[int, float] = 1 ) -> None: """ Creates human-scaled audio file from underwater recording. Parameters ---------- filename : str Output filename for the WAV file (without extension). pressure : xarray.DataArray Sound pressure data with attributes: - 'values' (numpy.ndarray): Pressure data array. - 'sensitivity' (int or float): Sensitivity of the hydrophone in dB. - 'fs' (int or float): Sampling frequency in Hz. gain : int or float, optional Gain to multiply the original time series by. Default is 1. Returns ------- None """ if not isinstance(filename, str): raise TypeError("'filename' must be a string.") if not isinstance(pressure, xr.DataArray): raise TypeError("'pressure' must be an xarray.DataArray.") if not hasattr(pressure, "values") or not isinstance(pressure.values, np.ndarray): raise TypeError("'pressure.values' must be a numpy.ndarray.") if not hasattr(pressure, "sensitivity") or not isinstance( pressure.sensitivity, (int, float) ): raise TypeError("'pressure.sensitivity' must be a numeric type (int or float).") if not hasattr(pressure, "fs") or not isinstance(pressure.fs, (int, float)): raise TypeError("'pressure.fs' must be a numeric type (int or float).") if not isinstance(gain, (int, float)): raise TypeError("'gain' must be a numeric type (int or float).") # Convert from Pascals to UPa upa = pressure.values.T * 1e6 # Change to voltage waveform v = upa * 10 ** (pressure.sensitivity / 20) # in V # Normalize v = v / max(abs(v)) * gain # Convert to (little-endian) 16 bit integers. audio = (v * (2**16 - 1)).astype("<h") # pylint incorrectly thinks this is opening in read mode with wave.open(f"{filename}.wav", mode="w") as f: f.setnchannels(1) # pylint: disable=no-member f.setsampwidth(2) # pylint: disable=no-member f.setframerate(pressure.fs) # pylint: disable=no-member f.writeframes(audio.tobytes()) # pylint: disable=no-member