From 5364f6ed8a5e17727c04b081d78a5705de58bdf3 Mon Sep 17 00:00:00 2001 From: "Mads M. Pedersen" <mmpe@dtu.dk> Date: Thu, 12 Jan 2017 15:54:50 +0100 Subject: [PATCH] add spectrum (psd) --- wetb/signal_tools/spectrum.py | 70 ++++++++++++++++++++++++ wetb/signal_tools/tests/test_spectrum.py | 60 ++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 wetb/signal_tools/spectrum.py create mode 100644 wetb/signal_tools/tests/test_spectrum.py diff --git a/wetb/signal_tools/spectrum.py b/wetb/signal_tools/spectrum.py new file mode 100644 index 0000000..fb68407 --- /dev/null +++ b/wetb/signal_tools/spectrum.py @@ -0,0 +1,70 @@ +import numpy as np +import warnings + +def psd(data, sample_frq=1., no_segments=1, bin_size=1): + """Map a signal to frequency domain (Power spectrum density) + + Parameters + ---------- + data : array_like + The time domain signal + sample_frq : float, optional + The sample frequency of the signal. Affects the frequency part of the result only\n + Default is 1. + no_segments : int, optional + Remove noise by dividing the signal into no_segments bins and average the PSD of the subsets.\n + As the time span of the bins are shorter, low frequencies are sacrificed.\n + If e.g. 2, the signal is split into two parts and the result is the mean + of the PSD for the first and the last part of the signal\n + Default is 1 + + Be carefull when using many segments, as the result may be corrupted as the function does not use any window function. + bin_size : int, optional + Smoothen the PSD by dividing the frequency list and PSD values into bins of size 'bin_size' and return the means of the bins + + Returns + ------- + f : array_like + Frequencies + psd : array_like + Power spectrum density + + Examples + -------- + >>> ds = OpenHawc2("scripts/test.sel") + >>> f, psd = PSD(ds(3)[:],ds.sample_frq, 2) + >>> Plot(yscale='log') + >>> PlotData(None, f, psd) + """ + if no_segments is None: + no_segments = 1 + + nfft = data.shape[-1] // no_segments + data = data[:no_segments * nfft].reshape(no_segments, nfft) + + + NumUniquePts = int(np.ceil((nfft + 1) / 2)) + f = np.linspace(0, sample_frq / 2., NumUniquePts) + fftx = np.abs(np.fft.rfft(data)) + + fftx = fftx.sum(0) + y = fftx / (nfft * no_segments) + + y = y ** 2 + y *= 2 # Because negative half is ignored + + y /= f[1] - f[0] # distribute discrete energy values to continuous areas + + + f, y = f[1:], y[1:] # Remove 0Hz frequency contribution + bins = (NumUniquePts - 1) // bin_size + def smoothen(x, N): + M = (x.shape[0] - N + 1) // N + return np.array([x[i:M * N + i].reshape(M, N).mean(1) for i in range(N)]).T.flatten() + if bin_size > 1: + f, y = smoothen(f, bin_size), smoothen(y, bin_size) +# with warnings.catch_warnings(): +# warnings.simplefilter("ignore", category=RuntimeWarning) # ignore runtime warning of nanmean([nan]) +# f = np.nanmean(f[:bins * bin_size].reshape(bins, f.shape[0] // bins), 1) +# y = np.nanmean(y[:bins * bin_size].reshape(bins, y.shape[0] // bins), 1) + return (f, y) diff --git a/wetb/signal_tools/tests/test_spectrum.py b/wetb/signal_tools/tests/test_spectrum.py new file mode 100644 index 0000000..fc20a3b --- /dev/null +++ b/wetb/signal_tools/tests/test_spectrum.py @@ -0,0 +1,60 @@ +''' +Created on 12/11/2015 + +@author: MMPE +''' +import unittest +import numpy as np +from wetb.signal_tools.spectrum import psd + +class TestSpectrum(unittest.TestCase): + + + + + def test_psd(self): + """check peak at correct frequency""" + fs = 1024 + t = np.arange(0, 100, 1.0 / fs) + sig = np.sin(2 * np.pi * t * 200) + np.sin(2 * np.pi * t * 100) + f, spectrum = psd(sig, fs) + + #import matplotlib.pyplot as plt + #plt.plot(f, spectrum) + #plt.show() + + self.assertAlmostEqual(f[spectrum > 0.0001][0], 100, 2) + self.assertAlmostEqual(f[spectrum > 0.0001][1], 200, 2) + + + + def test_map_to_frq2(self): + """check integral==variance""" + sig = np.random.randint(-3, 3, 100000).astype(np.float64) + f, y = psd(sig, 10) + self.assertAlmostEqual(np.var(sig), np.trapz(y, f), 3) + + + def test_map_to_frq_smoothing(self): + """check integral==variance""" + sig = np.random.randint(-3, 3, 10000).astype(np.float64) + sig += np.sin(np.arange(len(sig)) * 2 * np.pi / 10) + + import matplotlib.pyplot as plt +# plt.plot(sig) +# plt.figure() + for i, k in enumerate([1, 4], 1): + #plt.subplot(1, 2, i) + for n in [1, 4]: + f, y = psd(sig, 10, k, n) +# plt.semilogy(f, y, label='k=%d, n=%d' % (k, n)) + if k == 1: + self.assertAlmostEqual(np.var(sig), np.trapz(y, f), 1) + self.assertAlmostEqual(f[np.argmax(y)], 1, 1) + +# plt.legend() +# plt.show() + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testName'] + unittest.main() -- GitLab