Source code for mhkit.dolfyn.tools.fft

import numpy as np
from .misc import detrend_array

fft = np.fft.fft


[docs] def fft_frequency(nfft, fs, full=False): """ Compute the frequency vector for a given `nfft` and `fs`. Parameters ---------- fs : float The sampling frequency (e.g. samples/sec) nfft : int The number of samples in a window. full : bool Whether to return half frequencies (positive), or the full frequencies. Default = False Returns ------- freq : numpy.ndarray The frequency vector, in same units as 'fs' """ fs = np.float64(fs) f = np.fft.fftfreq(int(nfft), 1 / fs) if full: return f else: return np.abs(f[1 : int(nfft / 2.0 + 1)])
def _getwindow(window, nfft): if window is None: window = np.ones(nfft) elif isinstance(window, (int, float)) and window == 1: window = np.ones(nfft) elif isinstance(window, str): if "hann" in window: window = np.hanning(nfft) elif "hamm" in window: window = np.hamming(nfft) else: raise ValueError("Unsupported window type: {}".format(window)) elif isinstance(window, np.ndarray): if len(window) != nfft: raise ValueError("Custom window length must be equal to nfft") else: raise ValueError("Invalid window parameter") return window def _stepsize(l, nfft, nens=None, step=None): """ Calculates the fft-step size for a length *l* array. If nens is None, the step size is chosen to maximize data use, minimize nens and have a minimum of 50% overlap. If nens is specified, the step-size is computed directly. Parameters ---------- l : The length of the array. nfft : The number of points in the fft. nens : The number of nens to perform (default compute this). Returns ------- step : The step size. nens : The number of ensemble ffts to average together. nfft : The number of points in the fft (set to l if nfft>l). """ if l < nfft: nfft = l if nens is None and step is None: if l == nfft: return 0, 1, int(nfft) nens = int(2.0 * l / nfft) return int((l - nfft) / (nens - 1)), nens, int(nfft) elif nens is None: return int(step), int((l - nfft) / step + 1), int(nfft) else: if nens == 1: return 0, 1, int(nfft) return int((l - nfft) / (nens - 1)), int(nens), int(nfft)
[docs] def cpsd_quasisync_1D(a, b, nfft, fs, window="hann"): """ Compute the cross power spectral density (CPSD) of the signals `a` and `b`. Parameters ---------- a : numpy.ndarray The first signal. b : numpy.ndarray The second signal. nfft : int The number of points in the fft. fs : float The sample rate (e.g. sample/second). window : {None, 1, 'hann', numpy.ndarray} The window to use (default: 'hann'). Valid entries are: - None,1 : uses a 'boxcar' or ones window. - 'hann' : hanning window. - a length(nfft) array : use this as the window directly. Returns ------- cpsd : numpy.ndarray The cross-spectral density of `a` and `b`. See Also --------- :func:`psd`, :func:`coherence_1D`, :func:`cpsd_1D`, numpy.fft Notes ----- `a` and `b` do not need to be 'tightly' synchronized, and can even be different lengths, but the first- and last-index of both series should be synchronized (to whatever degree you want unbiased phases). This performs: .. math:: fft(a)*conj(fft(b)) Note that this is consistent with :func:`numpy.correlate`. It detrends the data and uses a minimum of 50% overlap for the shorter of `a` and `b`. For the longer, the overlap depends on the difference in size. 1-(l_short/l_long) data will be underutilized (where l_short and l_long are the length of the shorter and longer series, respectively). The units of the spectra is the product of the units of `a` and `b`, divided by the units of fs. """ if np.iscomplexobj(a) or np.iscomplexobj(b): raise Exception("Velocity cannot be complex") l = [len(a), len(b)] if l[0] == l[1]: return cpsd_1D(a, b, nfft, fs, window=window) elif l[0] > l[1]: a, b = b, a l = l[::-1] step = [0, 0] step[0], nens, nfft = _stepsize(l[0], nfft) step[1], nens, nfft = _stepsize(l[1], nfft, nens=nens) fs = np.float64(fs) window = _getwindow(window, nfft) fft_inds = slice(1, int(nfft / 2.0 + 1)) wght = 2.0 / (window**2).sum() pwr = fft(detrend_array(a[0:nfft]) * window)[fft_inds] * np.conj( fft(detrend_array(b[0:nfft]) * window)[fft_inds] ) if nens - 1: for i1, i2 in zip( range(step[0], l[0] - nfft + 1, step[0]), range(step[1], l[1] - nfft + 1, step[1]), ): pwr += fft(detrend_array(a[i1 : (i1 + nfft)]) * window)[fft_inds] * np.conj( fft(detrend_array(b[i2 : (i2 + nfft)]) * window)[fft_inds] ) pwr *= wght / nens / fs return pwr
[docs] def cpsd_1D(a, b, nfft, fs, window="hann", step=None): """ Compute the cross power spectral density (CPSD) of the signals `a` and `b`. Parameters ---------- a : numpy.ndarray The first signal. b : numpy.ndarray The second signal. nfft : int The number of points in the fft. fs : float The sample rate (e.g. sample/second). window : {None, 1, 'hann', numpy.ndarray} The window to use (default: 'hann'). Valid entries are: - None,1 : uses a 'boxcar' or ones window. - 'hann' : hanning window. - a length(nfft) array : use this as the window directly. step : int Use this to specify the overlap. For example: - step : nfft/2 specifies a 50% overlap. - step : nfft specifies no overlap. - step=2*nfft means that half the data will be skipped. By default, `step` is calculated to maximize data use, have at least 50% overlap and minimize the number of ensembles. Returns ------- cpsd : numpy.ndarray The cross-spectral density of `a` and `b`. See also -------- :func:`psd` :func:`coherence_1D` `numpy.fft` Notes ----- cpsd removes a linear trend from the signals. The two signals should be the same length, and should both be real. This performs: .. math:: fft(a)*conj(fft(b)) This implementation is consistent with the numpy.correlate definition of correlation. (The conjugate of D.B. Chelton's definition of correlation.) The units of the spectra is the product of the units of `a` and `b`, divided by the units of fs. """ if np.iscomplexobj(a) or np.iscomplexobj(b): raise Exception("Velocity cannot be complex") auto_psd = False if a is b: auto_psd = True l = len(a) step, nens, nfft = _stepsize(l, nfft, step=step) fs = np.float64(fs) window = _getwindow(window, nfft) fft_inds = slice(1, int(nfft / 2.0 + 1)) wght = 2.0 / (window**2).sum() s1 = fft(detrend_array(a[0:nfft]) * window)[fft_inds] if auto_psd: pwr = np.abs(s1) ** 2 else: pwr = s1 * np.conj(fft(detrend_array(b[0:nfft]) * window)[fft_inds]) if nens - 1: for i in range(step, l - nfft + 1, step): s1 = fft(detrend_array(a[i : (i + nfft)]) * window)[fft_inds] if auto_psd: pwr += np.abs(s1) ** 2 else: pwr += s1 * np.conj( fft(detrend_array(b[i : (i + nfft)]) * window)[fft_inds] ) pwr *= wght / nens / fs return pwr
[docs] def psd_1D(a, nfft, fs, window="hann", step=None): """ Compute the power spectral density (PSD). This function computes the one-dimensional `n`-point PSD. The units of the spectra is the product of the units of `a` and `b`, divided by the units of fs. Parameters ---------- a : numpy.ndarray The first signal, as a 1D vector nfft : int The number of points in the fft. fs : float The sample rate (e.g. sample/second). window : {None, 1, 'hann', numpy.ndarray} The window to use (default: 'hann'). Valid entries are: - None,1 : uses a 'boxcar' or ones window. - 'hann' : hanning window. - a length(nfft) array : use this as the window directly. step : int Use this to specify the overlap. For example: - step : nfft/2 specifies a 50% overlap. - step : nfft specifies no overlap. - step=2*nfft means that half the data will be skipped. By default, `step` is calculated to maximize data use, have at least 50% overlap and minimize the number of ensembles. Returns ------- cpsd : numpy.ndarray The cross-spectral density of `a` and `b`. Notes ----- Credit: This function's line of code was copied from JN's fast_psd.m routine. See Also -------- :func:`cpsd_1D` :func:`coherence_1D` `numpy.fft` """ return np.abs(cpsd_1D(a, a, nfft, fs, window=window, step=step))