diff --git a/wetb/signal/interpolation.py b/wetb/signal/interpolation.py new file mode 100644 index 0000000000000000000000000000000000000000..098a25b76f741eb215802ae0348807130b40e0b6 --- /dev/null +++ b/wetb/signal/interpolation.py @@ -0,0 +1,85 @@ + +import numpy as np +import numpy.ma as ma +#from pylab import * +def interpolate(x, xp, yp, max_xp_step=None, max_dydxp=None, cyclic_range=None, max_repeated=None): + """Interpolation similar to numpy.interp that handles nan and missing values + + Parameters + ---------- + x : array_like + The x-coordinates of the interpolated values. + xp : 1-D sequence of floats + The x-coordinates of the data points, must be increasing. + yp : 1-D sequence of floats + The y-coordinates of the data points, same length as xp. + max_xp_step : int, float or None, optional + Maximum xp-time step that is interpolated to x.\n + If time step > max_xp_step then NAN is returned for intermediate x values + If None, default, then this fix is not applied + max_dydxp : int, float, None, optional + Maximum absolute dydxp (slop of yp) that is interpolated to y.\n + If dydxp > max_dydxp then NAN is returned for intermediate y values + If None, default, then this fix is not applied + cyclick_range : int, float, None, optional + Range of posible values, e.g. 360 for degrees (both 0..360 and -180..180) + If None (default), data not interpreted as cyclic + max_repeated : int, float, None, optional + Maximum xp that yp are allowed to be repeated + if yp[i]==yp[i+1]==..==yp[i+j] and xp[i+j]-xp[i]>max_repeated_yp then + NAN is returned for xp[i]<x<=xp[i+j] + + + Returns + ------- + y : {float, ndarray} + The interpolated values, same shape as x. + + Examples + -------- + >>> interpolate(x=[1,1.5,2,3], xp=[0.5,1.5,3], yp=[5,15,30]) + [10,15,20,30] + >>> interpolate(x=[0, 1, 2, 7, 12], xp=[0, 2, 12], yp=[359, 0, 350], max_dydxp=45) + [359,nan,0,175,350] + """ + + xp = np.array(xp, dtype=np.float) + yp = np.array(yp, dtype=np.float) + assert xp.shape[0] == yp.shape[0], "xp and yp must have same length (%d!=%d)" % (xp.shape[0], yp.shape[0]) + non_nan = ~(np.isnan(xp) & np.isnan(yp)) + yp = yp[non_nan] + xp = xp[non_nan] + y = np.interp(x, xp, yp, np.nan, np.nan) + + + if cyclic_range is not None: + cr = cyclic_range + y2 = np.interp(x, xp, (yp + cr / 2) % cr - cr / 2, np.nan, np.nan) % cr + y = np.choose(np.r_[0, np.abs(np.diff(y)) > np.abs(np.diff(y2))], np.array([y, y2])) + + if max_xp_step: + diff = np.diff(xp) + diff[np.isnan(diff)] = 0 + + indexes = (np.where(diff > max_xp_step)[0]) + for index in indexes: + y[(x > xp[index]) & (x < xp[index + 1])] = np.nan + if max_dydxp: + if cyclic_range is None: + abs_dydxp = np.abs(np.diff(yp) / np.diff(xp)) + else: + abs_dydxp = np.min([np.abs(np.diff((yp + cyclic_range / 2) % cyclic_range)) , np.abs(np.diff(yp % cyclic_range)) ], 0) / np.diff(xp) + abs_dydxp[np.isnan(abs_dydxp)] = 0 + + indexes = (np.where(abs_dydxp > max_dydxp)[0]) + for index in indexes: + y[(x > xp[index]) & (x < xp[index + 1])] = np.nan + if max_repeated: + rep = np.r_[False, yp[1:] == yp[:-1], False] + tr = rep[1:] ^ rep[:-1] + itr = np.where(tr)[0] + for start, stop, l in zip (itr[::2] , itr[1::2], xp[itr[1::2]] - xp[itr[::2]]): + if l >= max_repeated: + y[(x > xp[start]) & (x <= xp[stop])] = np.nan + return y +#print (interpolate(x=[0, 1, 2, 3, 4], xp=[0, 1, 2, 4], yp=[5, 5, 6, 6], max_repeated=1)) diff --git a/wetb/signal/tests/test_differentiation.py b/wetb/signal/tests/test_differentiation.py new file mode 100644 index 0000000000000000000000000000000000000000..87538cdc68ca0efa1a3ce733e57cc98f0ee1ab86 --- /dev/null +++ b/wetb/signal/tests/test_differentiation.py @@ -0,0 +1,20 @@ +''' +Created on 29. mar. 2017 + +@author: mmpe +''' +import unittest +import numpy as np +from wetb.signal.filters._differentiation import differentiation + +class Test(unittest.TestCase): + + + def testDifferentiation(self): + np.testing.assert_array_equal(differentiation([1,2,1,0,1,1]), [1,0,-1,0,.5,0]) + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testDifferentiation'] + unittest.main() \ No newline at end of file diff --git a/wetb/signal/tests/test_interpolation.py b/wetb/signal/tests/test_interpolation.py new file mode 100644 index 0000000000000000000000000000000000000000..3482829ecc39c382509a31bee97c3db2f84d11b6 --- /dev/null +++ b/wetb/signal/tests/test_interpolation.py @@ -0,0 +1,105 @@ +''' +Created on 19/12/2014 + +@author: MMPE +''' +import unittest + +from matplotlib.pyplot import * +import numpy as np +from wetb import signal +from wetb.signal.time_shift import find_time_shift +from wetb import gtsdf +import os +from wetb.gtsdf.unix_time import from_unix +import datetime + + +class TestInterpolation(unittest.TestCase): + + def test_interpolate1(self): + x = [1, 1.5, 2, 3] + xp = [0.5, 1.5, 3] + yp = [5, 15, 30] + np.testing.assert_array_equal(signal.interpolate(x, xp, yp), [10., 15., 20, 30.]) + np.testing.assert_array_equal(signal.interpolate(x, xp, yp, 1), [10., 15., np.nan, 30.]) + + + + + def test_interpolate2(self): + x = np.arange(0, 100, 5, dtype=np.float) + xp = np.arange(0, 100, 10, dtype=np.float) + xp = np.r_[xp[:3], xp[5:]] + yp = np.arange(10, dtype=np.float) + yp[7:8] = np.nan + yp = np.r_[yp[:3], yp[5:]] + + #x = [ 0. 5. 10. 15. 20. 25. 30. 35. 40. 45. 50. 55. 60. 65. 70. 75. 80. 85. 90. 95.] + #xp = [0.0, 10.0, 20.0, 50.0, 60.0, 70.0, 80.0, 90.0] + #yp = [0.0, 1.0, 2.0, 5.0, 6.0, nan, 8.0, 9.0] + + y = signal.interpolate(x, xp, yp) + np.testing.assert_array_equal(y[~np.isnan(y)], [0., 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 8. , 8.5, 9. ]) + + y = signal.interpolate(x, xp, yp, 10) + np.testing.assert_array_equal(y[~np.isnan(y)], [ 0. , 0.5, 1. , 1.5, 2. , 5. , 5.5, 8. , 8.5, 9. ]) + + + + def test_interpolate_max_dydxp(self): + x = np.arange(7) + xp = [0, 2, 4, 6] + yp = [358, 359, 0, 1] + y = signal.interpolate(x, xp, yp, max_dydxp=30) + np.testing.assert_array_equal(y, [ 358., 358.5, 359., np.nan, 0., 0.5, 1. ]) + + y = signal.interpolate(x, xp, yp, max_dydxp=180) + np.testing.assert_array_equal(y, [ 358., 358.5, 359., 179.5, 0., 0.5, 1. ]) + + + def test_interpolate_max_dydxp_cyclic(self): + x = np.arange(7) + xp = [0, 2, 4, 6] + yp = [358, 359, 0, 1] + + y = signal.interpolate(x, xp, [178, 179, -180, -179], max_dydxp=30, cyclic_range=360) + np.testing.assert_array_equal(y, [ 178. , 178.5, 179. , -0.5, -180. , -179.5, -179. ]) + + y = signal.interpolate(x, xp, yp, max_dydxp=30, cyclic_range=360) + np.testing.assert_array_equal(y, [ 358., 358.5, 359., 359.5, 0., 0.5, 1. ]) + + y = signal.interpolate(xp, x, [ 358., 358.5, 359., 359.5, 0., 0.5, 1. ], max_dydxp=30, cyclic_range=360) + np.testing.assert_array_equal(y, [ 358., 359., 0., 1. ]) + + y = signal.interpolate(x, xp, yp, max_dydxp=180) + np.testing.assert_array_equal(y, [ 358., 358.5, 359., 179.5, 0., 0.5, 1. ]) + + def test_interpolate_max_repeated(self): + x = np.arange(7) + xp = [0, 1, 2, 3, 4, 5, 6] + yp = [4, 5, 5, 5, 6, 6, 7] + y = signal.interpolate(x, xp, yp, max_repeated=2) + np.testing.assert_array_equal(y, [ 4, 5, np.nan, np.nan, 6, 6, 7]) + + + x = np.arange(7) + xp = [0, 3, 4, 5, 6] + yp = [5, 5, 7, 6, 6] + y = signal.interpolate(x, xp, yp, max_repeated=2) + np.testing.assert_array_equal(y, [ 5, np.nan, np.nan, np.nan, 7, 6, 6]) + + xp = [0, 2, 3, 4, 6] + y = signal.interpolate(x, xp, yp, max_repeated=1) + np.testing.assert_array_equal(y, [ 5, np.nan, np.nan, 7, 6, np.nan, np.nan]) + + + xp = [0, 1, 2, 3, 6] + y = signal.interpolate(x, xp, yp, max_repeated=2) + np.testing.assert_array_equal(y, [ 5, 5, 7, 6, np.nan, np.nan, np.nan]) + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testName'] + unittest.main() diff --git a/wetb/signal/tests/test_nan_replace.py b/wetb/signal/tests/test_nan_replace.py new file mode 100644 index 0000000000000000000000000000000000000000..6211ce029a04ee604af3a3589f0df31ffe3838f5 --- /dev/null +++ b/wetb/signal/tests/test_nan_replace.py @@ -0,0 +1,66 @@ +''' +Created on 02/11/2015 + +@author: MMPE +''' +import unittest +import numpy as np +from wetb.signal.nan_replace import replace_by_mean, replace_by_line, \ + replace_by_polynomial, max_no_nan +from matplotlib.pyplot import plot, show +class TestNan_replace(unittest.TestCase): + + + def test_nan_replace_by_mean(self): + a = np.array([1, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, 4.]) + np.testing.assert_array_equal(replace_by_mean(a), [1, 5, 6, 4, 5, 4, 3, 1, 5, 6, 4.]) + a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 1, 5, 6, 4.]) + np.testing.assert_array_equal(replace_by_mean(a), [1, 5, 6, 4, 5, 3, 3, 1, 5, 6, 4.]) + a = np.array([np.nan, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, np.nan]) + np.testing.assert_array_equal(replace_by_mean(a), [5, 5, 6, 4, 5, 4, 3, 1, 5, 6, 6]) + + + def test_nan_replace_by_line(self): + a = np.array([1, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, 4.]) + np.testing.assert_array_equal(replace_by_line(a), [1, 5, 6, 4, 5, 4, 3, 1, 5, 6, 4.]) + a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 2, 5, 6, 4.]) + np.testing.assert_array_equal(replace_by_line(a), [1, 5, 6, 4, 5, 4, 3, 2, 5, 6, 4.]) + a = np.array([np.nan, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, np.nan]) + np.testing.assert_array_equal(replace_by_line(a), [5, 5, 6, 4, 5, 4, 3, 1, 5, 6, 6]) + + + def test_nan_replace_by_polynomial(self): + a = np.array([np.nan, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, np.nan]) + np.testing.assert_array_equal(replace_by_polynomial(a), [5, 5, 6, 4, 5, 4, 3, 1, 5, 6, 6]) + a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 1, 5, 6, 4.]) + np.testing.assert_array_almost_equal(replace_by_polynomial(a, 3, 2), [1, 5, 6, 4, 5, 3.3, 1.2, 1, 5, 6, 4.]) + + if 0: + plot(a) + plot(replace_by_polynomial(a, 3, 2)) + show() + + + def test_nan_replace_by_polynomial2(self): + a = np.r_[np.arange(10), np.repeat(np.nan, 100), 10 - np.arange(10)] + self.assertLessEqual(replace_by_polynomial(a, 3, 9).max(), np.nanmax(a)) + + if 0: + plot(a, '.r') + plot(replace_by_polynomial(a, 3, 9), 'g-') + show() + + + def test_max_no_nan(self): + a = np.array([1, 5, 6, 4, 5, np.nan, 3, 1, 5, np.nan, 4.]) + self.assertEqual(max_no_nan(a), 1) + a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 1, 5, np.nan, 4.]) + self.assertEqual(max_no_nan(a), 2) + a = np.array([np.nan, np.nan, np.nan, 4, 5, np.nan, np.nan, 1, 5, np.nan, 4.]) + self.assertEqual(max_no_nan(a), 3) + a = np.array([1, 5, 6, 4, 5, np.nan, 4, 1, 5, np.nan, np.nan]) + self.assertEqual(max_no_nan(a), 2) + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_nanreplace'] + unittest.main()