-
Mads M. Pedersen authoredMads M. Pedersen authored
spectra.py 7.48 KiB
'''
Created on 27/11/2015
@author: MMPE
'''
import numpy as np
import warnings
def spectrum(x, y=None, k=1):
"""PSD or Cross spectrum (only positive half)
If input time series are two dimensional, then columns are interpreted
as different time series and the mean spectrum is returned
Parameters
----------
x : array_like
Time series
y : array_like, optional
If array_like, cross spectrum of x and y is returned
if None, cross spectrum of x and x (i.e. PSD) is returned
k : int or float, optional
Max wave number
"""
if x is None:
return None
def fft(x):
return np.fft.fft(x.T).T / len(x)
if y is None or x is y:
fftx = fft(x)
fftx = fftx * np.conj(fftx) # PSD, ~ fft(x)**2
else:
fftx = fft(x) * np.conj(fft(y)) # Cross spectra
fftx = fftx[:len(fftx) // 2] * 2 # positive half * 2
# if len(fftx.shape) == 2:
# fftx = np.mean(fftx, 1)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return np.real(fftx * len(x) / (2 * k))[1:]
def spectra(spatial_resolution, u, v=None, w=None, detrend=True):
"""Return the wave number, the uu, vv, ww autospectra and the uw cross spectra
Parameters
----------
spatial_resolution : int, float or array_like
Sample points per meter
- For turbulence boxes: 1/dx = Nx/Lx where dx is distance between points,
Nx is number of points and Lx is box length in meters
- For time series: Sample frequency / U
u : array_like
The u-wind component\n
- if shape is (r,): One time series with *r* observations\n
- if shape is (r,c): *c* different time series with *r* observations\n
v : array_like, optional
The v-wind component
- if shape is (r,): One time series with *r* observations\n
- if shape is (r,c): *c* different time series with *r* observations\n
w : array_like, optional
The w-wind component
- if shape is (r,): One time series with *r* observations\n
- if shape is (r,c): *c* different time series with *r* observations\n
detrend : boolean, optional
if True (default) wind speeds are detrended before calculating spectra
Returns
-------
k1, uu[, vv[, ww, uw]] : 2-5 x array_like
- k1: wave number
- uu: The uu auto spectrum\n
- vv: The vv auto spectrum, only if v is provided\n
- ww: The ww auto spectrum, only if w is provided\n
- uw: The uw cross spectrum, only if w is provided\n
For two dimensional input, the mean spectra are returned
"""
assert isinstance(spatial_resolution, (int, float))
k = 2 * np.pi * spatial_resolution
if v is not None:
assert u.shape == v.shape
if w is not None:
assert u.shape == w.shape
if 1 and len(u.shape) == 2:
# assert np.abs(np.mean(u, 0)).max() < 2
# if v is not None:
# assert np.abs(np.mean(v, 0)).max() < 1
# if w is not None:
# assert np.abs(np.mean(w, 0)).max() < 1
if isinstance(k, float):
k = np.repeat(k, u.shape[1])
else:
assert u.shape[1] == k.shape[0]
k1_vec = np.array([np.linspace(0, k_ / 2, len(u) // 2)[1:] for k_ in k]).T
else:
#assert np.abs(np.mean(u)) < 1
if v is not None:
assert np.abs(np.mean(v)) < 1
if w is not None:
assert np.abs(np.mean(w)) < 1
assert isinstance(k, float)
k1_vec = np.linspace(0, k / 2, int(len(u) / 2))[1:]
if detrend:
u, v, w = detrend_wsp(u, v, w)
return [k1_vec] + [spectrum(x1, x2, k=k) for x1, x2 in [(u, u), (v, v), (w, w), (w, u)]]
def spectra_from_time_series(sample_frq, Uvw_lst):
"""Return the wave number, the uu, vv, ww autospectra and the uw cross spectra
Parameters
----------
sample_frq : int, float or array_like
Sample frequency
Uvw_lst : array_like
list of U, v and w, [(U1,v1,w1),(U2,v2,w2)...], v and w are optional
Returns
-------
k1, uu[, vv[, ww, uw]] : 2-5 x array_like
- k1: wave number
- uu: The uu auto spectrum\n
- vv: The vv auto spectrum, only if v is provided\n
- ww: The ww auto spectrum, only if w is provided\n
- uw: The uw cross spectrum, only if w is provided\n
For two dimensional input, the mean spectra are returned
"""
assert isinstance(sample_frq, (int, float))
Uvw_arr = np.array(Uvw_lst)
Ncomp = Uvw_arr.shape[1]
U, v, w = [Uvw_arr[:, i, :].T for i in range(Ncomp)] + [None] * (3 - Ncomp)
k = 2 * np.pi * sample_frq / U.mean(0)
if v is not None:
assert np.abs(np.nanmean(v, 0)).max() < 1, "Max absolute mean of v is %f" % np.abs(np.nanmean(v, 0)).max()
if w is not None:
assert np.abs(np.nanmean(w, 0)).max() < 1
k1_vec = np.array([np.linspace(0, k_ / 2, U.shape[0] // 2)[1:] for k_ in k]).T
u = U - np.nanmean(U, 0)
u, v, w = detrend_wsp(u, v, w)
return [k1_vec] + [spectrum(x1, x2, k=k) for x1, x2 in [(u, u), (v, v), (w, w), (w, u)]]
def bin_spectrum(x, y, bin_size, min_bin_count=2):
assert min_bin_count > 0
x = x / bin_size
low, high = np.floor(np.nanmin(x)), np.ceil(np.nanmax(x))
bins = int(high - low)
nbr_in_bins = np.histogram(x, bins, range=(low, high))[0]
if len(x.shape) == 2:
min_bin_count *= x.shape[1]
mask = nbr_in_bins >= min_bin_count
return np.histogram(x, bins, range=(low, high), weights=y)[0][mask] / nbr_in_bins[mask], nbr_in_bins
def logbin_spectrum(k1, xx, log10_bin_size=.2, min_bin_count=2):
ln_bin_size = np.log(10) * log10_bin_size
if xx is None:
return None
return (bin_spectrum(np.log(k1), (xx), ln_bin_size, min_bin_count)[0])
def logbin_spectra(k1, uu, vv=None, ww=None, uw=None, log10_bin_size=0.2, min_bin_count=2):
return tuple([logbin_spectrum(k1, xx, log10_bin_size, min_bin_count) for xx in [k1, uu, vv, ww, uw]])
def plot_spectrum(spatial_resolution, u, plt=None):
if plt is None:
import matplotlib.pyplot as plt
k1, uu = logbin_spectra(*spectra(spatial_resolution, u))[:2]
plt.semilogx(k1, k1 * uu, 'b-')
def detrend_wsp(u, v=None, w=None):
def _detrend(wsp):
if wsp is None:
return None
dwsp = np.atleast_2d(wsp.copy().T).T
t = np.arange(dwsp.shape[0])
A = np.vstack([t, np.ones(len(t))]).T
for i in range(dwsp.shape[1]):
m = ~np.isnan(dwsp[:, i])
trend, offset = np.linalg.lstsq(A[:m.sum()], dwsp[:, i][m], rcond=None)[0]
dwsp[:, i] = dwsp[:, i] - t * trend + t[-1] / 2 * trend
return dwsp.reshape(wsp.shape)
return [_detrend(wsp) for wsp in [u, v, w]]
def plot_spectra(k1, uu, vv=None, ww=None, uw=None, mean_u=1, log10_bin_size=.2, plt=None, marker_style='.'):
if plt is None:
import matplotlib.pyplot as plt
bk1, buu, bvv, bww, buw = logbin_spectra(k1, uu, vv, ww, uw, log10_bin_size)
def plot(xx, label, color, plt):
plt.semilogx(bk1, bk1 * xx * 10 ** 0 / mean_u ** 2, marker_style + color, label=label)
plot(buu, 'uu', 'r', plt)
plt.xlabel('Wavenumber $k_{1}$ [$m^{-1}$]')
if mean_u == 1:
plt.ylabel(r'Spectral density $k_{1} F(k_{1}) [m^2/s^2]$')
else:
plt.ylabel(r'Spectral density $k_{1} F(k_{1})/U^{2} [-]$')
if (bvv) is not None:
plot(bvv, 'vv', 'g', plt)
if bww is not None:
plot(bww, 'ww', 'b', plt)
plot(buw, 'uw', 'm', plt)