diff --git a/wetb/signal/fix/__init__.py b/wetb/signal/fix/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d78316911251e8084e53f096c205f3f3620b9c22 --- /dev/null +++ b/wetb/signal/fix/__init__.py @@ -0,0 +1 @@ +from ._rotor_position import * \ No newline at end of file diff --git a/wetb/signal/fix/_rotor_position.py b/wetb/signal/fix/_rotor_position.py new file mode 100644 index 0000000000000000000000000000000000000000..bb12f6abaf9f9c2eed26ca82fb97bff6d2d17925 --- /dev/null +++ b/wetb/signal/fix/_rotor_position.py @@ -0,0 +1,74 @@ +''' +Created on 30. mar. 2017 + +@author: mmpe +''' +import numpy as np +def fix_rotor_position(rotor_position, sample_frq, rotor_speed, polynomial_sample_length=500): + """Rotor position fitted with multiple polynomials + + Parameters + ---------- + rotor_position : array_like + Rotor position [deg] (0-360) + sample_frq : int or float + Sample frequency [Hz] + rotor_speed : array_like + Rotor speed [RPM] + polynomial_sample_length : int, optional + Sample lengths of polynomial used for fit + + Returns + ------- + y : nd_array + Fitted rotor position + """ + + from wetb.signal.subset_mean import revolution_trigger + + t = np.arange(len(rotor_position)) + indexes = revolution_trigger(rotor_position[:].copy(), sample_frq, rotor_speed, max_no_round_diff=4) + + rp = rotor_position[:].copy() + + for i in indexes: + rp[i:] += 360 + + N = polynomial_sample_length + N2 = N/2 + + rp_fita = np.empty_like(rp)+np.nan + rp_fitb = np.empty_like(rp)+np.nan + + for i in range(0,len(rp)+N, N): + #indexes for subsets for overlapping a and b polynomials + ia1 = max(0,i-N2) + ia2 = min(len(rp),i+N2) + ib1 = i + ib2 = i+N + + #fit a polynomial + if ia1<len(rp): + z = np.polyfit(t[ia1:ia2]-t[ia1], rp[ia1:ia2], 3) + rp_fita[ia1:ia2] = np.poly1d(z)(t[ia1:ia2]-t[ia1]) + + if ib1<len(rp): + #fit b polynomial + z = np.polyfit(t[ib1:ib2]-t[ib1], rp[ib1:ib2], 3) + rp_fitb[ib1:ib2] = np.poly1d(z)(t[ib1:ib2]-t[ib1]) + + weighta = (np.cos(np.arange(len(rp))/N*2*np.pi))/2+.5 + weightb = (-np.cos(np.arange(len(rp))/N*2*np.pi))/2+.5 + + return (rp_fita*weighta + rp_fitb*weightb)%360 + + +def find_polynomial_sample_length(rotor_position, sample_frq, rotor_speed): + from wetb.signal.filters import differentiation + def err(i): + rpm_pos = differentiation(fix_rotor_position(rotor_position, sample_frq, rotor_speed, i))%180 / 360 * sample_frq * 60 + return np.sum((rpm_pos - rotor_speed)**2) + x_lst = np.arange(100,2000,100) + res = [err(x) for x in x_lst] + return x_lst[np.argmin(res)] + \ No newline at end of file diff --git a/wetb/signal/tests/test_fix.py b/wetb/signal/tests/test_fix.py new file mode 100644 index 0000000000000000000000000000000000000000..36c3296c15865d3d44e6e1e6c506a779f7368bc3 --- /dev/null +++ b/wetb/signal/tests/test_fix.py @@ -0,0 +1,40 @@ +''' +Created on 30. mar. 2017 + +@author: mmpe +''' +import os +import unittest +from wetb import gtsdf +from wetb.signal.fix._rotor_position import fix_rotor_position,\ + find_polynomial_sample_length +from wetb.signal.filters._differentiation import differentiation + + +tfp = os.path.join(os.path.dirname(__file__), 'test_files/') +import numpy as np +class TestFix(unittest.TestCase): + + + def testRotorPositionFix(self): + ds = gtsdf.Dataset(tfp+'azi.hdf5') + sample_frq = 25 + print (find_polynomial_sample_length(ds.azi, sample_frq, ds.Rot_cor)) + rp_fit = fix_rotor_position(ds.azi, sample_frq, ds.Rot_cor, 500) + + rpm_pos = differentiation(rp_fit)%180 / 360 * sample_frq * 60 + + self.assertLess(np.sum((rpm_pos - ds.Rot_cor)**2),50) + if 0: + import matplotlib.pyplot as plt + plt.plot( differentiation(ds.azi)%180 / 360 * sample_frq * 60, label='fit') + plt.plot(ds.Rot_cor) + plt.plot( differentiation(rp_fit)%180 / 360 * sample_frq * 60, label='fit') + plt.ylim(10,16) + + plt.show() + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testRotorPositionFix'] + unittest.main() \ No newline at end of file diff --git a/wetb/signal/tests/test_frq_filters.py b/wetb/signal/tests/test_frq_filters.py new file mode 100644 index 0000000000000000000000000000000000000000..b4537de2c1f7986c67204db1046a22c1a5277247 --- /dev/null +++ b/wetb/signal/tests/test_frq_filters.py @@ -0,0 +1,94 @@ +''' +Created on 27. mar. 2017 + +@author: mmpe +''' +import unittest + +import numpy as np +from wetb.signal.filters.frq_filters import sine_generator, low_pass, \ + frequency_response, high_pass + + +class Test(unittest.TestCase): + + + def test_sine_generator(self): + t,y = sine_generator(10,1,2) + self.assertEqual(np.diff(t).mean(),.1) + self.assertAlmostEqual(t.max(),1.9) + if 0: + import matplotlib.pyplot as plt + plt.plot(*sine_generator(10,1,2), label="1Hz sine") + plt.plot(*sine_generator(100,2,2), label="2Hz sine") + plt.legend() + plt.show() + + def test_low_pass(self): + sf = 100 # sample frequency + t,y1 = sine_generator(sf,1,5) # 1Hz sine + t,y10 = sine_generator(sf,10,5) # 10Hz sine + sig = y1+y10 + y_lp = low_pass(sig, sf, 1, order=1) + + # 1 order: + # cut off frq: -3db + # Above cut off: 20db/decade + np.testing.assert_almost_equal(y_lp[100:400], y1[100:400]*.501, 1) + np.testing.assert_almost_equal((y_lp-y1*.501)[100:400], y10[100:400]*.01, 1) + + if 0: + import matplotlib.pyplot as plt + plt.plot(t,sig, label='Input signal') + plt.plot(t, y1*0.501, label='1Hz sine (-3db)') + plt.plot(t,y_lp, label='Output signal') + plt.plot(t,y_lp - y1*0.501, label="Output - 1Hz sine(-3db)") + plt.plot(t, y10*0.01, label='10Hz sine (-20db)') + plt.plot(t, y_lp - y1*0.501 - y10*.01, label="(Output - 1Hz sine(-3db)) - 10Hz sine (-20db)") + plt.legend() + plt.show() + + def test_frq_response(self): + w,h = frequency_response(100,10,'low',1) + self.assertAlmostEqual(np.interp(10, w, h), -3.01,2) # cut off frq -3.01 db + self.assertAlmostEqual(np.interp(100, w, h), -20,1) # -20 db per decade + if 0: + import matplotlib.pyplot as plt + frequency_response(100,10,'low',1, plt=plt) + frequency_response(100,10,'low',2, plt=plt) + frequency_response(100,10,'low',5, plt=plt) + frequency_response(100,10,'low',8, plt=plt) + frequency_response(100,10,'low',10, plt=plt) + frequency_response(100,10,'high',1, plt=plt) + frequency_response(100,10,'high',2, plt=plt) + plt.show() + + def test_high_pass(self): + sf = 100 # sample frequency + t,y1 = sine_generator(sf,1,5) # 1Hz sine + t,y10 = sine_generator(sf,10,5) # 10Hz sine + sig = y1+y10 + y_lp = high_pass(sig, sf, 10, order=1) + + # 1 order: + # cut off frq: -3db + # Below cut off: 20db/decade + np.testing.assert_almost_equal(y_lp[100:400], y10[100:400]*.501, 1) + np.testing.assert_almost_equal((y_lp-y10*.501)[100:400], y1[100:400]*.01, 1) + + if 0: + import matplotlib.pyplot as plt + plt.plot(t,sig, label='Input signal') + plt.plot(t, y10*0.501, label='10Hz sine (-3db)') + plt.plot(t,y_lp, label='Output signal') + plt.plot(t,y_lp - y10*0.501, label="Output - 10Hz sine(-3db)") + plt.plot(t, y1*0.01, label='1Hz sine (-20db)') + plt.plot(t, y_lp - y10*0.501 - y1*.01, label="(Output - 10Hz sine(-3db)) - 1Hz sine (-20db)") + plt.legend() + plt.show() + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_sine_generator'] + unittest.main() \ No newline at end of file