Skip to content
Snippets Groups Projects
spectra.py 7.48 KiB
'''
Created on 27/11/2015

@author: MMPE
'''


import numpy as np
import warnings


def spectrum(x, y=None, k=1):
    """PSD or Cross spectrum (only positive half)

    If input time series are two dimensional, then columns are interpreted
    as different time series and the mean spectrum is returned

    Parameters
    ----------
    x : array_like
        Time series
    y : array_like, optional
        If array_like, cross spectrum of x and y is returned
        if None, cross spectrum of x and x (i.e. PSD) is returned
    k : int or float, optional
        Max wave number
    """
    if x is None:
        return None

    def fft(x):
        return np.fft.fft(x.T).T / len(x)

    if y is None or x is y:
        fftx = fft(x)
        fftx = fftx * np.conj(fftx)  # PSD, ~ fft(x)**2
    else:
        fftx = fft(x) * np.conj(fft(y))  # Cross spectra

    fftx = fftx[:len(fftx) // 2] * 2  # positive half * 2

#    if len(fftx.shape) == 2:
#        fftx = np.mean(fftx, 1)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        return np.real(fftx * len(x) / (2 * k))[1:]


def spectra(spatial_resolution, u, v=None, w=None, detrend=True):
    """Return the wave number, the uu, vv, ww autospectra and the uw cross spectra

    Parameters
    ----------
    spatial_resolution : int, float or array_like
        Sample points per meter
        - For turbulence boxes: 1/dx = Nx/Lx where dx is distance between points,
        Nx is number of points and Lx is box length in meters
        - For time series: Sample frequency / U
    u : array_like
        The u-wind component\n
        - if shape is (r,): One time series with *r* observations\n
        - if shape is (r,c): *c* different time series with *r* observations\n
    v : array_like, optional
        The v-wind component
        - if shape is (r,): One time series with *r* observations\n
        - if shape is (r,c): *c* different time series with *r* observations\n
    w : array_like, optional
        The w-wind component
        - if shape is (r,): One time series with *r* observations\n
        - if shape is (r,c): *c* different time series with *r* observations\n
    detrend : boolean, optional
        if True (default) wind speeds are detrended before calculating spectra

    Returns
    -------
    k1, uu[, vv[, ww, uw]] : 2-5 x array_like
        - k1: wave number
        - uu: The uu auto spectrum\n
        - vv: The vv auto spectrum, only if v is provided\n
        - ww: The ww auto spectrum, only if w is provided\n
        - uw: The uw cross spectrum, only if w is provided\n

        For two dimensional input, the mean spectra are returned
    """
    assert isinstance(spatial_resolution, (int, float))

    k = 2 * np.pi * spatial_resolution
    if v is not None:
        assert u.shape == v.shape
    if w is not None:
        assert u.shape == w.shape

    if 1 and len(u.shape) == 2:
        #         assert np.abs(np.mean(u, 0)).max() < 2
        #         if v is not None:
        #             assert np.abs(np.mean(v, 0)).max() < 1
        #         if w is not None:
        #             assert np.abs(np.mean(w, 0)).max() < 1
        if isinstance(k, float):
            k = np.repeat(k, u.shape[1])
        else:
            assert u.shape[1] == k.shape[0]
        k1_vec = np.array([np.linspace(0, k_ / 2, len(u) // 2)[1:] for k_ in k]).T
    else:
        #assert np.abs(np.mean(u)) < 1
        if v is not None:
            assert np.abs(np.mean(v)) < 1
        if w is not None:
            assert np.abs(np.mean(w)) < 1
        assert isinstance(k, float)
        k1_vec = np.linspace(0, k / 2, int(len(u) / 2))[1:]
    if detrend:
        u, v, w = detrend_wsp(u, v, w)

    return [k1_vec] + [spectrum(x1, x2, k=k) for x1, x2 in [(u, u), (v, v), (w, w), (w, u)]]


def spectra_from_time_series(sample_frq, Uvw_lst):
    """Return the wave number, the uu, vv, ww autospectra and the uw cross spectra

    Parameters
    ----------
    sample_frq : int, float or array_like
        Sample frequency
    Uvw_lst : array_like
        list of U, v and w, [(U1,v1,w1),(U2,v2,w2)...], v and w are optional

    Returns
    -------
    k1, uu[, vv[, ww, uw]] : 2-5 x array_like
        - k1: wave number
        - uu: The uu auto spectrum\n
        - vv: The vv auto spectrum, only if v is provided\n
        - ww: The ww auto spectrum, only if w is provided\n
        - uw: The uw cross spectrum, only if w is provided\n

        For two dimensional input, the mean spectra are returned
    """
    assert isinstance(sample_frq, (int, float))
    Uvw_arr = np.array(Uvw_lst)
    Ncomp = Uvw_arr.shape[1]
    U, v, w = [Uvw_arr[:, i, :].T for i in range(Ncomp)] + [None] * (3 - Ncomp)
    k = 2 * np.pi * sample_frq / U.mean(0)

    if v is not None:
        assert np.abs(np.nanmean(v, 0)).max() < 1, "Max absolute mean of v is %f" % np.abs(np.nanmean(v, 0)).max()
    if w is not None:
        assert np.abs(np.nanmean(w, 0)).max() < 1
    k1_vec = np.array([np.linspace(0, k_ / 2, U.shape[0] // 2)[1:] for k_ in k]).T
    u = U - np.nanmean(U, 0)
    u, v, w = detrend_wsp(u, v, w)

    return [k1_vec] + [spectrum(x1, x2, k=k) for x1, x2 in [(u, u), (v, v), (w, w), (w, u)]]


def bin_spectrum(x, y, bin_size, min_bin_count=2):
    assert min_bin_count > 0
    x = x / bin_size
    low, high = np.floor(np.nanmin(x)), np.ceil(np.nanmax(x))
    bins = int(high - low)
    nbr_in_bins = np.histogram(x, bins, range=(low, high))[0]
    if len(x.shape) == 2:
        min_bin_count *= x.shape[1]
    mask = nbr_in_bins >= min_bin_count
    return np.histogram(x, bins, range=(low, high), weights=y)[0][mask] / nbr_in_bins[mask], nbr_in_bins


def logbin_spectrum(k1, xx, log10_bin_size=.2, min_bin_count=2):
    ln_bin_size = np.log(10) * log10_bin_size
    if xx is None:
        return None
    return (bin_spectrum(np.log(k1), (xx), ln_bin_size, min_bin_count)[0])


def logbin_spectra(k1, uu, vv=None, ww=None, uw=None, log10_bin_size=0.2, min_bin_count=2):
    return tuple([logbin_spectrum(k1, xx, log10_bin_size, min_bin_count) for xx in [k1, uu, vv, ww, uw]])


def plot_spectrum(spatial_resolution, u, plt=None):
    if plt is None:
        import matplotlib.pyplot as plt
    k1, uu = logbin_spectra(*spectra(spatial_resolution, u))[:2]

    plt.semilogx(k1, k1 * uu, 'b-')


def detrend_wsp(u, v=None, w=None):
    def _detrend(wsp):
        if wsp is None:
            return None
        dwsp = np.atleast_2d(wsp.copy().T).T
        t = np.arange(dwsp.shape[0])
        A = np.vstack([t, np.ones(len(t))]).T
        for i in range(dwsp.shape[1]):
            m = ~np.isnan(dwsp[:, i])
            trend, offset = np.linalg.lstsq(A[:m.sum()], dwsp[:, i][m], rcond=None)[0]
            dwsp[:, i] = dwsp[:, i] - t * trend + t[-1] / 2 * trend
        return dwsp.reshape(wsp.shape)
    return [_detrend(wsp) for wsp in [u, v, w]]


def plot_spectra(k1, uu, vv=None, ww=None, uw=None, mean_u=1, log10_bin_size=.2, plt=None, marker_style='.'):
    if plt is None:
        import matplotlib.pyplot as plt
    bk1, buu, bvv, bww, buw = logbin_spectra(k1, uu, vv, ww, uw, log10_bin_size)

    def plot(xx, label, color, plt):
        plt.semilogx(bk1, bk1 * xx * 10 ** 0 / mean_u ** 2, marker_style + color, label=label)
    plot(buu, 'uu', 'r', plt)
    plt.xlabel('Wavenumber $k_{1}$ [$m^{-1}$]')
    if mean_u == 1:
        plt.ylabel(r'Spectral density $k_{1} F(k_{1}) [m^2/s^2]$')
    else:
        plt.ylabel(r'Spectral density $k_{1} F(k_{1})/U^{2} [-]$')
    if (bvv) is not None:
        plot(bvv, 'vv', 'g', plt)
    if bww is not None:
        plot(bww, 'ww', 'b', plt)
        plot(buw, 'uw', 'm', plt)