diff --git a/wetb/signal/error_measures.py b/wetb/signal/error_measures.py new file mode 100644 index 0000000000000000000000000000000000000000..6fbcdddc2bfff3458016b04631992a14cb1f551f --- /dev/null +++ b/wetb/signal/error_measures.py @@ -0,0 +1,219 @@ +''' +Created on 30/06/2016 + +@author: MMPE +''' + +from scipy.interpolate.interpolate import interp1d + +import numpy as np +from wetb.signal.fit import bin_fit + + +def rms(a, b): + """Calculate the Root-Mean-Squared Error of two value sets + + Parameters + --------- + a : array_like + First value set + b : array_like + Second value set + + Returns + ------- + y : float + Root mean squared error of a and b + + """ + a, b = [np.array(ab[:]) for ab in [a, b]] + if a.shape != b.shape: + raise ValueError("Dimensions differ: %s!=%s" % (a.shape, b.shape)) + if len(a) == 0: + return np.nan + return np.sqrt(np.nanmean((a - b) ** 2)) + + +def rms2fit(x, y, fit_func=bin_fit): + """ + Calculate the rms error of the points (xi, yi) relative to the mean curve + + The mean curve is computed by:\n + - Divide x into bins + 1 bins\n + - Remove bins with less than 2 elements\n + - Calculate the mean of x and y in the bins\n + - Do a linear interpolation between the bin mean values\n + - Extrapolate to the minimum and maximum value of x using the slope of the first and last line segment\n + + Usefull for calculating e.g. power curve scatter + + Parameters + --------- + x : array_like + x values + y : array_like + y values + bins : int or array_like, optional + If int: Number of control points for the mean curve, default is 10\n + If array_like: Bin egdes + kind : str or int, optional + Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', + 'quadratic','cubic' where 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation + of first, second or third order) or as an integer specifying the order of the spline + interpolator to use. Default is 'cubic'. + fit_func : function, optional + Function to apply on each bin to find control points for fit + + Returns + ------- + err : float + Mean error of points compared to mean curve + f : function + Interpolation function + """ + x, y = np.array(x[:]), np.array(y[:]) + _, fit = fit_func(x,y) + return rms(fit(x),y), fit + +def rms2fit_old(x, y, bins=10, kind='cubic', fit_func=np.nanmean, normalize_with_slope=False): + """ + Calculate the rms error of the points (xi, yi) relative to the mean curve + + The mean curve is computed by:\n + - Divide x into bins + 1 bins\n + - Remove bins with less than 2 elements\n + - Calculate the mean of x and y in the bins\n + - Do a linear interpolation between the bin mean values\n + - Extrapolate to the minimum and maximum value of x using the slope of the first and last line segment\n + + Usefull for calculating e.g. power curve scatter + + Parameters + --------- + x : array_like + x values + y : array_like + y values + bins : int or array_like, optional + If int: Number of control points for the mean curve, default is 10\n + If array_like: Bin egdes + kind : str or int, optional + Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', + 'quadratic','cubic' where 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation + of first, second or third order) or as an integer specifying the order of the spline + interpolator to use. Default is 'cubic'. + fit_func : function, optional + Function to apply on each bin to find control points for fit + normalize_with_slope : boolean, optional + If True, the mean error in each bin is normalized with the slope of the corresponding line segment + + Returns + ------- + err : float + Mean error of points compared to mean curve + f : function + Interpolation function + """ + x, y = np.array(x[:]), np.array(y[:]) + if isinstance(bins, int): + bins = np.linspace(np.nanmin(x), np.nanmax(x) + 1e-10, bins + 1) + + digitized = np.digitize(x, bins) + digitized[np.isnan(x) | np.isnan(y)] = -1 + + masks = [digitized == i for i in range(1, len(bins))] + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + bin_x = np.array([np.nanmean(x[mask]) for mask in masks]) + bin_y = np.array([fit_func(y[mask]) for mask in masks]) + bin_count = np.array([np.sum(mask) for mask in masks]) + bin_x_fit, bin_y = [b[bin_count >= 1] for b in [bin_x, bin_y]] + + #extrapolate to first and last value of x + if bin_x_fit[0] > np.nanmin(x): + bin_y = np.r_[bin_y[0] - (bin_x_fit[0] - np.nanmin(x)) * (bin_y[1] - bin_y[0]) / (bin_x_fit[1] - bin_x_fit[0]), bin_y] + bin_x_fit = np.r_[np.nanmin(x), bin_x_fit] + + if bin_x_fit[-1] < np.nanmax(x): + bin_y = np.r_[bin_y, bin_y[-1] + (np.nanmax(x) - bin_x_fit[-1]) * (bin_y[-1] - bin_y[-2]) / (bin_x_fit[-1] - bin_x_fit[-2]) ] + bin_x_fit = np.r_[bin_x_fit, np.nanmax(x)] + + #Create mean function + f = lambda x : interp1d(bin_x_fit, bin_y, kind)(x[:]) + + #calculate error of segment + digitized = np.digitize(x, bin_x[bin_count > 0]) + bin_err = np.array([rms(y[digitized == i], f(x[digitized == i])) for i in range(1, len(bin_x_fit))]) + if normalize_with_slope: + slopes = np.diff(bin_y) / np.diff(bin_x_fit) + return np.nanmean(bin_err / np.abs(slopes)), f + return np.sqrt(np.nanmean(bin_err ** 2)), f + + +def rms2mean(x, y, bins=10, kind='cubic', normalize_with_slope=False): + """ + Calculate the rms error of the points (xi, yi) relative to the mean curve + + The mean curve is computed by:\n + - Divide x into bins + 1 bins\n + - Remove bins with less than 2 elements\n + - Calculate the mean of x and y in the bins\n + - Do a linear interpolation between the bin mean values\n + - Extrapolate to the minimum and maximum value of x using the slope of the first and last line segment\n + + Usefull for calculating e.g. power curve scatter + + Parameters + --------- + x : array_like + x values + y : array_like + y values + bins : int or array_like, optional + If int: Number of control points for the mean curve, default is 10\n + If array_like: Bin egdes + kind : str or int, optional + Specifies the kind of interpolation as a string ('linear', 'nearest', 'zero', 'slinear', + 'quadratic','cubic' where 'slinear', 'quadratic' and 'cubic' refer to a spline interpolation + of first, second or third order) or as an integer specifying the order of the spline + interpolator to use. Default is 'cubic'. + normalize_with_slope : boolean, optional + If True, the mean error in each bin is normalized with the slope of the corresponding line segment + + Returns + ------- + err : float + Mean error of points compared to mean curve + f : function + Interpolation function + """ + return rms2fit(x, y, lambda x,y : bin_fit(x, y, bins, kind)) + + +def bootstrap_comparison(x, y, kind=1, N=15, M=100): + f_lst = [] + y_lst = [] + x_min, x_max = max(np.percentile(x, 2), np.sort(x)[2]), min(np.percentile(x, 98), np.sort(x)[-2]) + + y_arr = np.empty((M, N * 10)) + np.NaN + inside = 0 + for i in range(M): + indexes = np.random.randint(0, len(x) - 1, len(x)) + while x[indexes].min() > x_min or x[indexes].max() < x_max: + indexes = np.random.randint(0, len(x) - 1, len(x)) + #indexes = np.arange(i, len(x), M) + + _, f = rms2fit(x[indexes], y[indexes], lambda x,y : bin_fit(x,y, kind=kind, bins=N)) + x_ = np.linspace(x_min, x_max, N * 10) + y_ = (f(x_)) + if i > 10: + if np.all(y_ < np.nanmax(y_arr, 0)) and np.all(y_ > np.nanmin(y_arr, 0)): + inside += 1 + if inside == 5: + #print ("break", i) + #break + pass + y_arr[i, :] = y_ + + return (np.mean(np.std(y_arr, 0)), x_, y_arr[~np.isnan(y_arr[:, 0])]) diff --git a/wetb/signal/fit/__init__.py b/wetb/signal/fit/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..2578ab2dde2c387e6801224980c9c13848c1b548 --- /dev/null +++ b/wetb/signal/fit/__init__.py @@ -0,0 +1,8 @@ +d = None +d = dir() + +from wetb.signal.fit._linear_fit import * +from wetb.signal.fit._bin_fit import * +from wetb.signal.fit._fourier_fit import * + +__all__ = sorted([m for m in set(dir()) - set(d)]) diff --git a/wetb/signal/fit/_bin_fit.py b/wetb/signal/fit/_bin_fit.py new file mode 100644 index 0000000000000000000000000000000000000000..f1124ff6fb0919a22f2ef8a1e45ad14a597ce9eb --- /dev/null +++ b/wetb/signal/fit/_bin_fit.py @@ -0,0 +1,195 @@ + +import numpy as np +from scipy.interpolate.interpolate import interp1d + + + +def bin_fit(x,y, bins=10, kind='linear', bin_func=np.nanmean, bin_min_count=3, lower_upper='discard'): + """Fit observations based on bin statistics + + Parameters + --------- + x : array_like + x observations + y : array_like + y observations + bins : int, array_like or (int, int) + if int: <bins> binx evenly distributed on the x-axis + if (xbins,ybins): <xbins> and <ybins> evenly distributed on the x and y axis respectively\n + Note that ybins only make sense if every y-value maps to a single x-value + kind : int or string + degree of polynomial for fit (argument passed to scipy.interpolate.interpolate.interp1d) + bin_func : function, optional + Statistic function to apply on bins, default is nanmean + bin_min_count : int, optional + Minimum number of observations in bins to include + Default is 3 + lower_upper : str, int, (str,str), (int,int) + How to handle observations below and above first and last bin values. Can be:\n + - "discard": + - "extrapolate": + - int: Set f(max(x)) to mean of first/last int observations + + + Returns + ------- + bin_x, fit_function + + """ + x, y = np.array(x[:]), np.array(y[:]) + if isinstance(bins, int): + bins = np.linspace(np.nanmin(x), np.nanmax(x) + 1e-10, bins + 1) + elif isinstance(bins, tuple) and len(bins)==2 and isinstance(bins[0], int) and isinstance(bins[1], int): + xbins, ybins = bins + if xbins>0: + xbinsx = np.linspace(np.nanmin(x), np.nanmax(x) + 1e-10, xbins + 1) + else: + xbinsx = [] + if ybins>0: + x1, f1 = bin_fit(y,x, kind=1, bins=ybins) + xbinsy = f1(x1) + else: + xbinsy = [] + #x2, f2 = bin_fit(x,y, kind=1, bins=xbins) + bins = sorted(np.r_[xbinsx, xbinsy ]) + + digitized = np.digitize(x, bins) + digitized[np.isnan(x) | np.isnan(y)] = -1 + + masks = [digitized == i for i in range(1, len(bins))] + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + bin_x = np.array([np.nanmean(x[mask]) for mask in masks]) + bin_y = np.array([bin_func(y[mask]) for mask in masks]) + bin_count = np.array([np.sum(mask) for mask in masks]) + #bin_x_fit, bin_y = [b[bin_count >= bin_min_count] for b in [bin_x, bin_y]] + bin_x_fit = bin_x + m = np.isnan(bin_x_fit) + bin_x_fit[m] = ((bins[:-1]+bins[1:])/2)[m] + bin_y_fit = bin_y.copy() + bin_y_fit[bin_count<bin_min_count]= np.nan + + + if isinstance(lower_upper, (str, int)): + lower = upper = lower_upper + else: + lower, upper = lower_upper + + #Add value to min(x) + if bin_x_fit[0] > np.nanmin(x): + if lower =='extrapolate': + + bin_y_fit = np.r_[bin_y_fit[0] - (bin_x_fit[0] - np.nanmin(x)) * (bin_y_fit[1] - bin_y_fit[0]) / (bin_x_fit[1] - bin_x_fit[0]), bin_y_fit] + bin_x_fit = np.r_[np.nanmin(x), bin_x_fit] + elif lower=="discard": + pass + elif isinstance(lower, int): + bin_y_fit = np.r_[np.mean(y[~np.isnan(x)][np.argsort(x[~np.isnan(x)])[:lower]]), bin_y_fit] + bin_x_fit = np.r_[np.nanmin(x), bin_x_fit] + else: + raise NotImplementedError("Argument for handling lower observations, %s, not implemented"%lower) + + #add value to max(x) + if bin_x_fit[-1] < np.nanmax(x): + if upper == 'extrapolate': + bin_y_fit = np.r_[bin_y_fit, bin_y_fit[-1] + (np.nanmax(x) - bin_x_fit[-1]) * (bin_y_fit[-1] - bin_y_fit[-2]) / (bin_x_fit[-1] - bin_x_fit[-2]) ] + bin_x_fit = np.r_[bin_x_fit, np.nanmax(x)] + elif upper=="discard": + pass + elif isinstance(upper, int): + bin_y_fit = np.r_[bin_y_fit, np.mean(y[~np.isnan(x)][np.argsort(x[~np.isnan(x)])[-upper:]])] + bin_x_fit = np.r_[bin_x_fit, np.nanmax(x)] + else: + raise NotImplementedError("Argument for handling upper observations, %s, not implemented"%upper) + + #Create mean function + def fit(x): + x = x[:].copy().astype(np.float) + x[x<bin_x_fit[0]] = np.nan + x[x>bin_x_fit[-1]] = np.nan + return interp1d(bin_x_fit, bin_y_fit, kind)(x[:]) + + return bin_x_fit, fit + +def perpendicular_bin_fit(x, y, bins = 30, fit_func=None, bin_min_count=3, plt=None): + """Fit a curve to the values, (x,y) using bins that are perpendicular to an initial fit + + Parameters + --------- + x : array_like + x observations + y : array_like + y observations + bins : int + Number of perpendicular bins + fit_func : function(x,y) -> (x,y) or None + Initial fit function + If None, bin_fit with same number of bins are used + bin_min_count : int, optional + Minimum number of observations in bins to include + Default is 3 + + plt : pyplot or None + If pyplot the fitting process is plotted on plt + + Returns + ------- + fit_x, fit_y + """ + + if fit_func is None: + fit_func = lambda x,y : bin_fit(x, y, bins, bin_func=np.nanmean) + + x,y = [v[~np.isnan(x)&~np.isnan(y)] for v in [x,y]] + + bfx,f = fit_func(x, y) + bfy = f(bfx) + + + bfx, bfy = [v[~np.isnan(bfx)&~np.isnan(bfy)] for v in [bfx,bfy]] + if plt: + x_range, y_range = [v.max()-v.min() for v in [x,y]] + plt.ylim([y.min()-y_range*.1, y.max()+y_range*.1]) + plt.xlim([x.min()-x_range*.1, x.max()+x_range*.1]) + + # divide curve into N segments of same normalized curve length + xg, xo = np.nanmax(bfx)-np.nanmin(bfx), np.nanmin(bfx) + yg, yo = np.nanmax(bfy)-np.nanmin(bfy), np.nanmin(bfy) + nbfx = (bfx-xo)/xg + nbfy = (bfy-yo)/yg + l = np.cumsum(np.sqrt(np.diff(nbfx)**2+np.diff(nbfy)**2)) + nx, ny = [np.interp(np.linspace(l[0], l[-1], bins), l, (xy[1:]+xy[:-1])/2) for xy in [nbfx,nbfy]] + + + last = (-1,0) + + pc = [] + used = np.zeros_like(x).astype(np.bool) + for i in range(0,len(nx)): + i1,i2 = max(0,i-1), min(len(nx)-1,i+1) + a =-(nx[i2]-nx[i1])/ (ny[i2]-ny[i1]) + b = (ny[i]-(a*nx[i]))*yg+yo + a *=yg/xg + x_ = [np.nanmin(x), np.nanmax(x)] + m1 = np.sign(last[0])*y < np.sign(last[0])*((x-xo)*last[0]+last[1]) + m2 = np.sign(a)*y>np.sign(a)*(a*(x-xo)+b) + m = m1&m2&~used + if plt: + plt.plot(x_, ((a)*(x_-xo))+b) + plt.plot(x[m], y[m],'.') + + if np.sum(m)>=bin_min_count: + pc.append((np.median(x[m]), np.median(y[m]))) + used = used|m + last = (a,b) + #bfx,bfy = zip(*pc) + if plt: + pbfx, pbfy = np.array(pc).T + plt.plot(bfx,bfy, 'orange', label='initial_fit') + plt.plot(pbfx, pbfy, 'gray', label="perpendicular fit") + plt.legend() + #PlotData(None, bfx,bfy) + + return np.array(pc).T + diff --git a/wetb/signal/fit/_fourier_fit.py b/wetb/signal/fit/_fourier_fit.py new file mode 100644 index 0000000000000000000000000000000000000000..eee63657160057f351c262111306fbd0b508454b --- /dev/null +++ b/wetb/signal/fit/_fourier_fit.py @@ -0,0 +1,73 @@ +''' +Created on 07/07/2015 + +@author: MMPE +''' +import numpy as np +from wetb.signal.fit import bin_fit + + +def fourier_fit(y, max_nfft, x=None): + """Approximate a signal, y, with Fourier fit""" + d = np.arange(360) + return d, lambda deg : np.interp(deg%360, d, F2x(x2F(y, max_nfft, x))) + +def fourier_fit_old(y, nfft): + F = np.zeros(len(y), dtype=np.complex) + F[:nfft + 1] = x2F(y, nfft)[:nfft + 1] + return np.fft.ifft(F) * len(F) + +def F2x(F_coefficients): + """Compute signal from Fourier coefficients""" + F = np.zeros(360, dtype=np.complex) + nfft = len(F_coefficients) // 2 + F[:nfft + 1] = np.conj(F_coefficients[:nfft + 1]) + F[1:nfft + 1] += (F_coefficients[-nfft:][::-1]) + return np.real(np.fft.ifft(F) * len(F)) + +def x2F(y, max_nfft, x=None): + """Compute Fourier coefficients from signal (signal may contain NANs)""" + d = np.arange(360) + if x is not None: + x,fit = bin_fit(x,y, d) + y = fit(d) + + nfft = min(max_nfft, len(y) // 2 + 1) + n = len(y) + N = nfft * 2 + 1 + theta = np.linspace(0, 2 * np.pi, n + 1)[:n] + theta[np.isnan(y)] = np.nan + a = np.empty((nfft * 2 + 1, nfft * 2 + 1)) + b = np.empty(nfft * 2 + 1) + A0_lst = lambda dF : 2 * np.nansum(1 * dF) + A_lst = lambda dF : [2 * np.nansum(np.cos(i * theta) * dF) for i in range(1, nfft + 1)] + B_lst = lambda dF : [2 * np.nansum(np.sin(i * theta) * dF) for i in range(1, nfft + 1)] + row = lambda dF : np.r_[A0_lst(dF), A_lst(dF), B_lst(dF)] + + + for i in range(nfft + 1): + a[i, :] = row(np.cos(i * theta)) + b[i] = 2 * np.nansum(y * np.cos(i * theta)) + for i, r in enumerate(range(nfft + 1, nfft * 2 + 1), 1): + a[r, :] = row(np.sin(i * theta)) + b[r] = 2 * np.nansum(y * np.sin(i * theta)) + AB = np.linalg.solve(a, b) + + F = np.zeros(n, dtype=np.complex) + + F = np.r_[AB[0], (AB[1:nfft + 1] + 1j * AB[nfft + 1:]), np.zeros(nfft) ] + return F + +def rx2F(y, max_fft): + """Convert non-complex signal, y, to single sided Fourier components, that satifies x(t) = sum(X(cos(iw)+sin(iw)), i=0..N)""" + F = np.fft.rfft(y) / len(y) + F[1:-1] *= 2 # add negative side + F = np.conj(F) + return F[:max_fft + 1] + +def rF2x(rF): + """Convert single sided Fourier components, that satisfies x(t) = sum(X(cos(iw)+sin(iw)), i=0..N) to non-complex signal""" + rF = np.conj(rF) + rF[1:] /= 2 + rF = np.r_[rF, np.zeros(181 - len(rF), dtype=np.complex)] + return np.fft.irfft(rF) * 360 diff --git a/wetb/signal/fit/_linear_fit.py b/wetb/signal/fit/_linear_fit.py new file mode 100644 index 0000000000000000000000000000000000000000..fb6d15813f101d3015b52cb53ba2a7f2e96dff48 --- /dev/null +++ b/wetb/signal/fit/_linear_fit.py @@ -0,0 +1,10 @@ +''' +Created on 22. mar. 2017 + +@author: mmpe +''' +import numpy as np +def linear_fit(x,y): + from scipy.stats import linregress + slope, intercept, r_value, p_value, std_err = linregress(x,y) + return np.array([x.min(), x.max()]), lambda x : np.array(x)*slope+intercept , (slope, intercept) \ No newline at end of file diff --git a/wetb/signal/tests/test_error_measures.py b/wetb/signal/tests/test_error_measures.py new file mode 100644 index 0000000000000000000000000000000000000000..b83723250f6968d0d5d5400c6fb77167483ccf54 --- /dev/null +++ b/wetb/signal/tests/test_error_measures.py @@ -0,0 +1,54 @@ +''' +Created on 20/07/2016 + +@author: MMPE +''' +import os +import unittest + +import numpy as np +from wetb.signal.error_measures import rms2fit_old, rms2fit +from wetb.signal.fit import bin_fit + + +tfp = os.path.join(os.path.dirname(__file__), 'test_files/') +class Test(unittest.TestCase): + + +# def test_rms2mean(self): +# data = np.load(tfp + "wsp_power.npy") +# print (data.shape) +# wsp = data[:, 1].flatten() +# power = data[:, 0].flatten() +# +# import matplotlib.pyplot as plt +# plt.plot(wsp, power, '.') +# x = np.linspace(wsp.min(), wsp.max(), 100) +# err, f = rms2mean(wsp, power) +# plt.plot(x, f(x), label='rms2mean, err=%.1f' % err) +# err, f = rms2fit(wsp, power, bins=20, kind=3, fit_func=np.median) +# plt.plot(x, f(x), label='rms2median, err=%.1f' % err) +# print (list(x)) +# print (list(f(x))) +# plt.legend() +# plt.show() + + def test_rms2fit(self): + x = np.array([10.234302313156817, 13.98517783627376, 7.7902362498947921, 11.08597865379001, 8.430623529700588, 12.279982848438033, 33.89151260027775, 12.095047111211629, 13.731371675689642, 14.858309846006723, 15.185588405617654]) + y = np.array([28.515665187174477, 46.285328159179684, 17.763652093098958, 32.949007991536462, 20.788106673177083, 38.819226477864589, 96.53278479817709, 38.479684539388025, 46.072654127604167, 51.875484233398439, 53.379342967122398]) + err, fit = rms2fit_old(x, y, kind=1, bins=15) + err2, fit2 = rms2fit(x, y, fit_func=lambda x,y: bin_fit(x,y, kind=1, bins=15, bin_min_count=1, lower_upper='extrapolate')) + self.assertAlmostEqual(err, 0.306,2) + if 0: + import matplotlib.pyplot as plt + plt.plot(x,y, '.') + x_ = np.linspace(x.min(), x.max(), 100) + plt.plot(x_, fit(x_), label='rms2fit, err=%.5f' % err) + plt.plot(x_, fit2(x_), label='rms2fit, err=%.5f' % err2) + plt.legend() + plt.show() + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_rms2mean'] + unittest.main() diff --git a/wetb/signal/tests/test_fit.py b/wetb/signal/tests/test_fit.py new file mode 100644 index 0000000000000000000000000000000000000000..ecdfd2cc69796b268d7351a376552d2d401a23ef --- /dev/null +++ b/wetb/signal/tests/test_fit.py @@ -0,0 +1,148 @@ +''' +Created on 17. okt. 2016 + +@author: mmpe +''' +from wetb import gtsdf +from wetb.signal.fit import perpendicular_bin_fit, linear_fit +import numpy as np +import os +import unittest +from wetb.signal.fit import fourier_fit +from wetb.signal.error_measures import rms +tfp = os.path.join(os.path.dirname(__file__), 'test_files/') + +class TestFit(unittest.TestCase): + pass + +# def testBinFit(self): +# ds = gtsdf.Dataset(tfp+"binfit.hdf5") +# import matplotlib.pyplot as plt +# print (ds.info['attribute_names']) +# x,y = ds('Wsp_metmast'), ds('Power') +# m = ~np.isnan(x) +# x,y = x[m],y[m] +# +# +# x_, f = bin_fit(x,y, bins=(10,10)) +# rms2fit(x,y) +# +# plt.plot(x,y,'.') +# plt.plot(x_, f(x_),'.-r') +# plt.show() + +# +# def testBinFit2(self): +# X = np.r_[np.arange(90), np.arange(90,100,.01)] +# Y = np.r_[np.arange(90)/10, np.arange(90,100,.01)*10-890] +# import matplotlib.pyplot as plt +# x,fit = bin_fit(X,Y, (10,0), 1) +# plt.plot(X,Y,'k') +# plt.plot(x,fit(x),'r.', label='(10,0)') +# x,fit = bin_fit(X,Y, (0,10), 1) +# plt.plot(x,fit(x),'g.', label='(0,10)') +# x,fit = bin_fit(X,Y, (10,10), 1) +# plt.plot(x,fit(x),'bx', label="(10,10)") +# plt.legend() +# plt.show() +# + +# def testBinFit_non_monotomic_y(self): +# X = np.arange(0,10,.01) +# Y = np.sin(X) +# import matplotlib.pyplot as plt +# plt.plot(X,Y,'k') +# # x,fit = bin_fit(X,Y, (10,0), 1) +# # plt.plot(x,fit(x),'r.-', label='(10,0)') +# x,fit = bin_fit(X,Y, (0,10), 1) +# plt.plot(x,fit(x),'g.', label='(0,10)') +# # x,fit = bin_fit(X,Y, (10,10), 1) +# # plt.plot(x,fit(x),'bx', label="(10,10)") +# # plt.legend() +# plt.show() + + +# def testBinFit_missing_segments(self): +# ds = gtsdf.Dataset(tfp+"binfit.hdf5") +# import matplotlib.pyplot as plt +# print (ds.info['attribute_names']) +# x,y = ds('Wsp_metmast'), ds('Power') +# +# m = ~np.isnan(x) +# x,y = [v[(x[m]<8)|(x[m]>15)] for v in [x[m],y[m]]] +# +# +# x_, f = bin_fit(x,y, bins=10) +# x_ = np.linspace(x.min(), x.max(), 100 ) +# +# +# plt.plot(x,y,'.') +# plt.plot(x_, f(x_),'.-r') +# plt.show() + + + def test_perpendicular_fit(self): + ds = gtsdf.Dataset(tfp+"binfit.hdf5") + + + x,y = ds('Wsp_metmast'), ds('Power') + if 0: + import matplotlib.pyplot as plt + fx, fy = perpendicular_bin_fit(x,y,30,plt=plt) + plt.show() + + else: + fx, fy = perpendicular_bin_fit(x,y,30) + self.assertEqual(len(fx), 30) + + + def test_linear_fit(self): + x = np.random.rand(1000)*10 + y = x*.5+3+(np.random.rand(len(x))-.5) + x_, fit, (a,b) = linear_fit(x,y) + self.assertAlmostEqual(a,.5,delta=.1) + self.assertAlmostEqual(b,3,delta=.1) + if 0: + import matplotlib.pyplot as plt + plt.plot(x,y,'.') + + plt.plot(x_, fit(x_), label='y=%fx+%f'%(a,b)) + plt.legend() + plt.show() + + + + + def test_fourier_fit(self): + from numpy import nan + import matplotlib.pyplot as plt + y = [nan, nan, nan, nan, -0.36773350834846497, -0.34342807531356812, -0.3105124831199646, -0.2949407696723938, nan, nan, nan, nan, nan, nan, nan, nan, -0.37076538801193237, -0.35946175456047058, -0.35366204380989075, -0.34812772274017334, -0.32674536108970642, -0.31197881698608398, -0.31780806183815002, -0.31430944800376892, -0.32355087995529175, -0.35628914833068848, -0.39329639077186584, -0.46684062480926514, -0.48477476835250854, -0.50368523597717285, -0.51693356037139893, -0.50966787338256836, -0.49876394867897034, -0.486896812915802, -0.48280572891235352, -0.4708983302116394, -0.46562659740447998, -0.4582551121711731, -0.46219301223754883, -0.46569514274597168, -0.4741971492767334, -0.48431938886642456, -0.49597686529159546, -0.50340372323989868, -0.50065416097640991] + #y = [-0.36884582, -0.36256081, -0.35047901, -0.33841938, -0.3289246, -0.32291612, -0.32149044, -0.32851833, -0.34011644, -0.35467893, -0.36627313, -0.37245053, -0.37924927, -0.39883283, -0.38590872, -0.39833149, -0.40406495, -0.4102158, -0.41886991, -0.42862922, -0.43947089, -0.45299602, -0.46831554, -0.48249167, -0.49108803, -0.500368, -0.50779951, -0.51360059, -0.51370221, -0.50541216, -0.49272588, -0.47430229, -0.45657015, -0.44043627, -0.4286592, -0.41741648, -0.41344571, -0.40986174, -0.40896985, -0.40939313, -0.40635225, -0.40435526, -0.40015101, -0.39243227, -0.38454708] + + x = np.linspace(0, 360, len(y) + 1)[:len(y)] + #plt.plot(, y) + +# x_fit = fourier_fit.fit_old(y, 5) +# plt.plot(x, x_fit[::-1], label='fit2') + x_,fit = fourier_fit(y, 5) + self.assertAlmostEqual(rms(fit(x), y), 0.0056, 3) + if 0: + plt.plot(x, y, label='Observations') + plt.plot(x_,fit(x_), label='fit') + plt.legend() + plt.show() + +# def test_fourier_fit2(self): +# import matplotlib.pyplot as plt +# t = np.linspace(0, 2 * np.pi, 9 + 1)[:-1] +# y = np.cos(t) + np.sin(t) + 1 +# nfft = 4 +# print (fourier_fit.x2F(y, nfft)) +# plt.plot(deg(t), y, label='cos') +# plt.plot(fourier_fit.fit(y, nfft), label='fit') +# plt.plot(fourier_fit.F2x(np.fft.fft(y) / len(y)), label='fft') +# plt.legend() +# plt.show() +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testName'] + unittest.main() \ No newline at end of file diff --git a/wetb/signal/tests/test_fouier_fit.py b/wetb/signal/tests/test_fouier_fit.py new file mode 100644 index 0000000000000000000000000000000000000000..4992afafab0b40e0c86c016a0cc2938c88ddd792 --- /dev/null +++ b/wetb/signal/tests/test_fouier_fit.py @@ -0,0 +1,60 @@ +''' +Created on 07/07/2015 + +@author: MMPE +''' +import unittest + +import numpy as np +from wetb.signal.error_measures import rms +from wetb.signal.fit._fourier_fit import fourier_fit_old, fourier_fit, x2F, rx2F +from wetb.utils.geometry import deg + + +class Test(unittest.TestCase): + + def test_fourier_fit(self): + from numpy import nan + import matplotlib.pyplot as plt + y = [nan, nan, nan, nan, -0.36773350834846497, -0.34342807531356812, -0.3105124831199646, -0.2949407696723938, nan, nan, nan, nan, nan, nan, nan, nan, -0.37076538801193237, -0.35946175456047058, -0.35366204380989075, -0.34812772274017334, -0.32674536108970642, -0.31197881698608398, -0.31780806183815002, -0.31430944800376892, -0.32355087995529175, -0.35628914833068848, -0.39329639077186584, -0.46684062480926514, -0.48477476835250854, -0.50368523597717285, -0.51693356037139893, -0.50966787338256836, -0.49876394867897034, -0.486896812915802, -0.48280572891235352, -0.4708983302116394, -0.46562659740447998, -0.4582551121711731, -0.46219301223754883, -0.46569514274597168, -0.4741971492767334, -0.48431938886642456, -0.49597686529159546, -0.50340372323989868, -0.50065416097640991] + #y = [-0.36884582, -0.36256081, -0.35047901, -0.33841938, -0.3289246, -0.32291612, -0.32149044, -0.32851833, -0.34011644, -0.35467893, -0.36627313, -0.37245053, -0.37924927, -0.39883283, -0.38590872, -0.39833149, -0.40406495, -0.4102158, -0.41886991, -0.42862922, -0.43947089, -0.45299602, -0.46831554, -0.48249167, -0.49108803, -0.500368, -0.50779951, -0.51360059, -0.51370221, -0.50541216, -0.49272588, -0.47430229, -0.45657015, -0.44043627, -0.4286592, -0.41741648, -0.41344571, -0.40986174, -0.40896985, -0.40939313, -0.40635225, -0.40435526, -0.40015101, -0.39243227, -0.38454708] + + x = np.linspace(0, 360, len(y) + 1)[:len(y)] + #plt.plot(, y) + + x_fit = fourier_fit_old(y, 5)[::-1] + x_fit = np.r_[x_fit[-1],x_fit[:-1]] + + x_,fit = fourier_fit(y, 5) + self.assertAlmostEqual(rms(fit(x), y), 0.0056, 3) + if 0: + plt.plot(x, y, label='Observations') + plt.plot(x, x_fit, label='fit_old') + plt.plot(x_,fit(x_), label='fit') + plt.legend() + plt.show() + + def test_x2F(self): + t = np.linspace(0, 2 * np.pi, 9 + 1)[:-1] + y = 6 + 5*np.cos(t) + 4*np.sin(t) + 3*np.cos(2*t)+ 2*np.sin(2*t) + nfft = 4 + np.testing.assert_array_almost_equal(x2F(y, nfft)[:nfft+1], rx2F(y, nfft)) + np.testing.assert_array_almost_equal(rx2F(y, nfft),[6, 5+4j, 3+2j,0,0]) + + def test_fourier_fit2(self): + import matplotlib.pyplot as plt + t = np.linspace(0, 2 * np.pi, 9 + 1)[:-1] + y = np.cos(t) + np.sin(t) + 1 + nfft = 4 + x,fit = fourier_fit(y, nfft) + if 0: + plt.figure() + plt.plot(deg(t), y, label='cos+sin+1') + plt.plot(x,fit(x), label='fit') + plt.legend() + plt.show() + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.testName'] + unittest.main() + diff --git a/wetb/signal/tests/test_subset_mean.py b/wetb/signal/tests/test_subset_mean.py new file mode 100644 index 0000000000000000000000000000000000000000..21f92445cba215d4e75357457e376fa85da1873a --- /dev/null +++ b/wetb/signal/tests/test_subset_mean.py @@ -0,0 +1,86 @@ +''' +Created on 18/07/2016 + +@author: MMPE +''' +import os +import unittest + +import numpy as np +from wetb import gtsdf +from wetb.signal.subset_mean import time_trigger, subset_mean, \ + non_nan_index_trigger, revolution_trigger +from wetb.utils.geometry import rpm2rads + + +tfp = os.path.join(os.path.dirname(__file__), 'test_files/') +class TestSubsetMean(unittest.TestCase): + def test_time_trigger(self): + time = np.arange(0, 99.5, .5) + np.testing.assert_array_equal(time[time_trigger(time, 20)], [0, 20, 40, 60, 80]) + np.testing.assert_array_equal(time[time_trigger(time + .5, 20)], [0, 20, 40, 60, 80]) + np.testing.assert_array_equal(time[time_trigger(time + 100000000.5, 20)], [0, 20, 40, 60, 80]) + np.testing.assert_array_equal(time[time_trigger(time, 20, 20, 60)], [20, 40, 60]) + np.testing.assert_array_equal(time_trigger(np.arange(101), 20), [0, 20, 40, 60, 80, 100]) + time, data, info = gtsdf.load(tfp + "subset_mean_test.hdf5") + np.testing.assert_array_equal(time_trigger(time, 200), [0, 5000, 10000, 15000]) + + def test_subset_mean(self): + + time, data, info = gtsdf.load(tfp + "subset_mean_test.hdf5") + triggers = time_trigger(time, 100) + t, p = subset_mean([time, data[:, 0]], triggers).T + self.assertEqual(t[1], time[2500:5000].mean()) + self.assertEqual(p[1], data[2500:5000, 0].mean()) + + triggers[1] = 2501 + t, p = subset_mean([time, data[:, 0]], triggers).T + self.assertEqual(t[1], time[2501:5000].mean()) + self.assertEqual(p[1], data[2501:5000, 0].mean()) + + def test_non_nan_index_trigger(self): + sensor = np.arange(18).astype(np.float) + sensor[[5, 11]] = np.nan + triggers = non_nan_index_trigger(sensor, 3) + for i1, i2 in triggers: + self.assertFalse(np.any(np.isnan(sensor[i1:i2]))) + self.assertEqual(len(triggers), 4) + + + def test_subset_mean_trigger_tuple(self): + sensor = np.arange(18).astype(np.float) + triggers = non_nan_index_trigger(sensor, 3) + np.testing.assert_array_equal(subset_mean(sensor, triggers), [ 1., 4., 7., 10., 13., 16]) + + #start with nan eq step, eq len + sensor[0] = np.nan + triggers = non_nan_index_trigger(sensor, 3) + np.testing.assert_array_equal(subset_mean(sensor, triggers), [ 2, 5, 8, 11, 14]) + + #nan in the middle, noneq step and len + sensor = np.arange(18).astype(np.float) + sensor[[5, 11]] = np.nan + triggers = non_nan_index_trigger(sensor, 3) + + np.testing.assert_array_equal(subset_mean(sensor, triggers), [1, 7, 13, 16]) + + def test_cycle_trigger(self): + ds = gtsdf.Dataset(tfp+'azi.hdf5') + azi, rpm, time = [ds(x)[8403:8803] for x in ['azi','Rot_cor','Time']] + + trigger = revolution_trigger(azi) + np.testing.assert_array_equal(trigger, [ 17, 128, 241, 354]) + azi[64] = 358 + trigger = revolution_trigger(azi, (ds('Rot_cor'), np.diff(time).mean())) + +# import matplotlib.pyplot as plt +# t = np.arange(len(azi)) +# plt.plot(t, azi) +# for i1,i2 in trigger: +# plt.plot(t[i1:i2],azi[i1:i2],'.--') +# plt.show() + np.testing.assert_array_equal(trigger, [ (128,241),(241,354)]) + + +if __name__ == "__main__": + unittest.main()