From 3fe7ef5fb7d946a27b92add84301d27e6cbee0c5 Mon Sep 17 00:00:00 2001
From: "Mads M. Pedersen" <mmpe@dtu.dk>
Date: Wed, 29 Mar 2017 11:47:54 +0200
Subject: [PATCH] missing files

---
 wetb/signal/interpolation.py              |  85 ++++++++++++++++++
 wetb/signal/tests/test_differentiation.py |  20 +++++
 wetb/signal/tests/test_interpolation.py   | 105 ++++++++++++++++++++++
 wetb/signal/tests/test_nan_replace.py     |  66 ++++++++++++++
 4 files changed, 276 insertions(+)
 create mode 100644 wetb/signal/interpolation.py
 create mode 100644 wetb/signal/tests/test_differentiation.py
 create mode 100644 wetb/signal/tests/test_interpolation.py
 create mode 100644 wetb/signal/tests/test_nan_replace.py

diff --git a/wetb/signal/interpolation.py b/wetb/signal/interpolation.py
new file mode 100644
index 00000000..098a25b7
--- /dev/null
+++ b/wetb/signal/interpolation.py
@@ -0,0 +1,85 @@
+
+import numpy as np
+import numpy.ma as ma
+#from pylab import *
+def interpolate(x, xp, yp, max_xp_step=None, max_dydxp=None, cyclic_range=None, max_repeated=None):
+    """Interpolation similar to numpy.interp that handles nan and missing values
+
+    Parameters
+    ----------
+    x : array_like
+        The x-coordinates of the interpolated values.
+    xp : 1-D sequence of floats
+        The x-coordinates of the data points, must be increasing.
+    yp : 1-D sequence of floats
+        The y-coordinates of the data points, same length as xp.
+    max_xp_step : int, float or None, optional
+        Maximum xp-time step that is interpolated to x.\n
+        If time step > max_xp_step then NAN is returned for intermediate x values
+        If None, default, then this fix is not applied
+    max_dydxp : int, float, None, optional
+        Maximum absolute dydxp (slop of yp) that is interpolated to y.\n
+        If dydxp > max_dydxp then NAN is returned for intermediate y values
+        If None, default, then this fix is not applied
+    cyclick_range : int, float, None, optional
+        Range of posible values, e.g. 360 for degrees (both 0..360 and -180..180)
+        If None (default), data not interpreted as cyclic
+    max_repeated : int, float, None, optional
+        Maximum xp that yp are allowed to be repeated
+        if yp[i]==yp[i+1]==..==yp[i+j] and xp[i+j]-xp[i]>max_repeated_yp then
+        NAN is returned for xp[i]<x<=xp[i+j]
+
+
+    Returns
+    -------
+    y : {float, ndarray}
+        The interpolated values, same shape as x.
+
+    Examples
+    --------
+    >>> interpolate(x=[1,1.5,2,3], xp=[0.5,1.5,3], yp=[5,15,30])
+    [10,15,20,30]
+    >>> interpolate(x=[0, 1, 2, 7, 12], xp=[0, 2, 12], yp=[359, 0, 350], max_dydxp=45)
+    [359,nan,0,175,350]
+    """
+
+    xp = np.array(xp, dtype=np.float)
+    yp = np.array(yp, dtype=np.float)
+    assert xp.shape[0] == yp.shape[0], "xp and yp must have same length (%d!=%d)" % (xp.shape[0], yp.shape[0])
+    non_nan = ~(np.isnan(xp) & np.isnan(yp))
+    yp = yp[non_nan]
+    xp = xp[non_nan]
+    y = np.interp(x, xp, yp, np.nan, np.nan)
+
+
+    if cyclic_range is not None:
+        cr = cyclic_range
+        y2 = np.interp(x, xp, (yp + cr / 2) % cr - cr / 2, np.nan, np.nan) % cr
+        y = np.choose(np.r_[0, np.abs(np.diff(y)) > np.abs(np.diff(y2))], np.array([y, y2]))
+
+    if max_xp_step:
+        diff = np.diff(xp)
+        diff[np.isnan(diff)] = 0
+
+        indexes = (np.where(diff > max_xp_step)[0])
+        for index in indexes:
+            y[(x > xp[index]) & (x < xp[index + 1])] = np.nan
+    if max_dydxp:
+        if cyclic_range is None:
+            abs_dydxp = np.abs(np.diff(yp) / np.diff(xp))
+        else:
+            abs_dydxp = np.min([np.abs(np.diff((yp + cyclic_range / 2) % cyclic_range)) , np.abs(np.diff(yp % cyclic_range)) ], 0) / np.diff(xp)
+        abs_dydxp[np.isnan(abs_dydxp)] = 0
+
+        indexes = (np.where(abs_dydxp > max_dydxp)[0])
+        for index in indexes:
+            y[(x > xp[index]) & (x < xp[index + 1])] = np.nan
+    if max_repeated:
+        rep = np.r_[False, yp[1:] == yp[:-1], False]
+        tr = rep[1:] ^ rep[:-1]
+        itr = np.where(tr)[0]
+        for start, stop, l in zip (itr[::2] , itr[1::2], xp[itr[1::2]] - xp[itr[::2]]):
+            if l >= max_repeated:
+                y[(x > xp[start]) & (x <= xp[stop])] = np.nan
+    return y
+#print (interpolate(x=[0, 1, 2, 3, 4], xp=[0, 1, 2, 4], yp=[5, 5, 6, 6], max_repeated=1))
diff --git a/wetb/signal/tests/test_differentiation.py b/wetb/signal/tests/test_differentiation.py
new file mode 100644
index 00000000..87538cdc
--- /dev/null
+++ b/wetb/signal/tests/test_differentiation.py
@@ -0,0 +1,20 @@
+'''
+Created on 29. mar. 2017
+
+@author: mmpe
+'''
+import unittest
+import numpy as np
+from wetb.signal.filters._differentiation import differentiation
+
+class Test(unittest.TestCase):
+
+
+    def testDifferentiation(self):
+        np.testing.assert_array_equal(differentiation([1,2,1,0,1,1]), [1,0,-1,0,.5,0])
+        
+
+
+if __name__ == "__main__":
+    #import sys;sys.argv = ['', 'Test.testDifferentiation']
+    unittest.main()
\ No newline at end of file
diff --git a/wetb/signal/tests/test_interpolation.py b/wetb/signal/tests/test_interpolation.py
new file mode 100644
index 00000000..3482829e
--- /dev/null
+++ b/wetb/signal/tests/test_interpolation.py
@@ -0,0 +1,105 @@
+'''
+Created on 19/12/2014
+
+@author: MMPE
+'''
+import unittest
+
+from matplotlib.pyplot  import *
+import numpy as np
+from wetb import signal
+from wetb.signal.time_shift import find_time_shift
+from wetb import gtsdf
+import os
+from wetb.gtsdf.unix_time import from_unix
+import datetime
+
+
+class TestInterpolation(unittest.TestCase):
+
+    def test_interpolate1(self):
+        x = [1, 1.5, 2, 3]
+        xp = [0.5, 1.5, 3]
+        yp = [5, 15, 30]
+        np.testing.assert_array_equal(signal.interpolate(x, xp, yp), [10., 15., 20, 30.])
+        np.testing.assert_array_equal(signal.interpolate(x, xp, yp, 1), [10., 15., np.nan, 30.])
+
+
+
+
+    def test_interpolate2(self):
+        x = np.arange(0, 100, 5, dtype=np.float)
+        xp = np.arange(0, 100, 10, dtype=np.float)
+        xp = np.r_[xp[:3], xp[5:]]
+        yp = np.arange(10, dtype=np.float)
+        yp[7:8] = np.nan
+        yp = np.r_[yp[:3], yp[5:]]
+
+        #x = [ 0.   5.  10.  15.  20.  25.  30.  35.  40.  45.  50.  55.  60.  65.  70.  75.  80.  85.  90.  95.]
+        #xp = [0.0, 10.0, 20.0, 50.0, 60.0, 70.0, 80.0, 90.0]
+        #yp = [0.0, 1.0, 2.0, 5.0, 6.0, nan, 8.0, 9.0]
+
+        y = signal.interpolate(x, xp, yp)
+        np.testing.assert_array_equal(y[~np.isnan(y)], [0., 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 8. , 8.5, 9. ])
+
+        y = signal.interpolate(x, xp, yp, 10)
+        np.testing.assert_array_equal(y[~np.isnan(y)], [ 0. , 0.5, 1. , 1.5, 2. , 5. , 5.5, 8. , 8.5, 9. ])
+
+
+
+    def test_interpolate_max_dydxp(self):
+        x = np.arange(7)
+        xp = [0, 2, 4, 6]
+        yp = [358, 359, 0, 1]
+        y = signal.interpolate(x, xp, yp, max_dydxp=30)
+        np.testing.assert_array_equal(y, [ 358., 358.5, 359., np.nan, 0., 0.5, 1. ])
+
+        y = signal.interpolate(x, xp, yp, max_dydxp=180)
+        np.testing.assert_array_equal(y, [ 358., 358.5, 359., 179.5, 0., 0.5, 1. ])
+
+
+    def test_interpolate_max_dydxp_cyclic(self):
+        x = np.arange(7)
+        xp = [0, 2, 4, 6]
+        yp = [358, 359, 0, 1]
+
+        y = signal.interpolate(x, xp, [178, 179, -180, -179], max_dydxp=30, cyclic_range=360)
+        np.testing.assert_array_equal(y, [ 178. , 178.5, 179. , -0.5, -180. , -179.5, -179. ])
+
+        y = signal.interpolate(x, xp, yp, max_dydxp=30, cyclic_range=360)
+        np.testing.assert_array_equal(y, [ 358., 358.5, 359., 359.5, 0., 0.5, 1. ])
+
+        y = signal.interpolate(xp, x, [ 358., 358.5, 359., 359.5, 0., 0.5, 1. ], max_dydxp=30, cyclic_range=360)
+        np.testing.assert_array_equal(y, [ 358., 359., 0., 1. ])
+
+        y = signal.interpolate(x, xp, yp, max_dydxp=180)
+        np.testing.assert_array_equal(y, [ 358., 358.5, 359., 179.5, 0., 0.5, 1. ])
+
+    def test_interpolate_max_repeated(self):
+        x = np.arange(7)
+        xp = [0, 1, 2, 3, 4, 5, 6]
+        yp = [4, 5, 5, 5, 6, 6, 7]
+        y = signal.interpolate(x, xp, yp, max_repeated=2)
+        np.testing.assert_array_equal(y, [ 4, 5, np.nan, np.nan, 6, 6, 7])
+
+
+        x = np.arange(7)
+        xp = [0, 3, 4, 5, 6]
+        yp = [5, 5, 7, 6, 6]
+        y = signal.interpolate(x, xp, yp, max_repeated=2)
+        np.testing.assert_array_equal(y, [ 5, np.nan, np.nan, np.nan, 7, 6, 6])
+
+        xp = [0, 2, 3, 4, 6]
+        y = signal.interpolate(x, xp, yp, max_repeated=1)
+        np.testing.assert_array_equal(y, [ 5, np.nan, np.nan, 7, 6, np.nan, np.nan])
+
+
+        xp = [0, 1, 2, 3, 6]
+        y = signal.interpolate(x, xp, yp, max_repeated=2)
+        np.testing.assert_array_equal(y, [ 5, 5, 7, 6, np.nan, np.nan, np.nan])
+
+
+
+if __name__ == "__main__":
+    #import sys;sys.argv = ['', 'Test.testName']
+    unittest.main()
diff --git a/wetb/signal/tests/test_nan_replace.py b/wetb/signal/tests/test_nan_replace.py
new file mode 100644
index 00000000..6211ce02
--- /dev/null
+++ b/wetb/signal/tests/test_nan_replace.py
@@ -0,0 +1,66 @@
+'''
+Created on 02/11/2015
+
+@author: MMPE
+'''
+import unittest
+import numpy as np
+from wetb.signal.nan_replace import replace_by_mean, replace_by_line, \
+    replace_by_polynomial, max_no_nan
+from matplotlib.pyplot import plot, show
+class TestNan_replace(unittest.TestCase):
+
+
+    def test_nan_replace_by_mean(self):
+        a = np.array([1, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, 4.])
+        np.testing.assert_array_equal(replace_by_mean(a), [1, 5, 6, 4, 5, 4, 3, 1, 5, 6, 4.])
+        a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 1, 5, 6, 4.])
+        np.testing.assert_array_equal(replace_by_mean(a), [1, 5, 6, 4, 5, 3, 3, 1, 5, 6, 4.])
+        a = np.array([np.nan, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, np.nan])
+        np.testing.assert_array_equal(replace_by_mean(a), [5, 5, 6, 4, 5, 4, 3, 1, 5, 6, 6])
+
+
+    def test_nan_replace_by_line(self):
+        a = np.array([1, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, 4.])
+        np.testing.assert_array_equal(replace_by_line(a), [1, 5, 6, 4, 5, 4, 3, 1, 5, 6, 4.])
+        a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 2, 5, 6, 4.])
+        np.testing.assert_array_equal(replace_by_line(a), [1, 5, 6, 4, 5, 4, 3, 2, 5, 6, 4.])
+        a = np.array([np.nan, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, np.nan])
+        np.testing.assert_array_equal(replace_by_line(a), [5, 5, 6, 4, 5, 4, 3, 1, 5, 6, 6])
+
+
+    def test_nan_replace_by_polynomial(self):
+        a = np.array([np.nan, 5, 6, 4, 5, np.nan, 3, 1, 5, 6, np.nan])
+        np.testing.assert_array_equal(replace_by_polynomial(a), [5, 5, 6, 4, 5, 4, 3, 1, 5, 6, 6])
+        a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 1, 5, 6, 4.])
+        np.testing.assert_array_almost_equal(replace_by_polynomial(a, 3, 2), [1, 5, 6, 4, 5, 3.3, 1.2, 1, 5, 6, 4.])
+
+        if 0:
+            plot(a)
+            plot(replace_by_polynomial(a, 3, 2))
+            show()
+
+
+    def test_nan_replace_by_polynomial2(self):
+        a = np.r_[np.arange(10), np.repeat(np.nan, 100), 10 - np.arange(10)]
+        self.assertLessEqual(replace_by_polynomial(a, 3, 9).max(), np.nanmax(a))
+
+        if 0:
+            plot(a, '.r')
+            plot(replace_by_polynomial(a, 3, 9), 'g-')
+            show()
+
+
+    def test_max_no_nan(self):
+        a = np.array([1, 5, 6, 4, 5, np.nan, 3, 1, 5, np.nan, 4.])
+        self.assertEqual(max_no_nan(a), 1)
+        a = np.array([1, 5, 6, 4, 5, np.nan, np.nan, 1, 5, np.nan, 4.])
+        self.assertEqual(max_no_nan(a), 2)
+        a = np.array([np.nan, np.nan, np.nan, 4, 5, np.nan, np.nan, 1, 5, np.nan, 4.])
+        self.assertEqual(max_no_nan(a), 3)
+        a = np.array([1, 5, 6, 4, 5, np.nan, 4, 1, 5, np.nan, np.nan])
+        self.assertEqual(max_no_nan(a), 2)
+
+if __name__ == "__main__":
+    #import sys;sys.argv = ['', 'Test.test_nanreplace']
+    unittest.main()
-- 
GitLab