Skip to content
Snippets Groups Projects
Commit 236a94c7 authored by David Verelst's avatar David Verelst
Browse files

Merge branch 'master' of gitlab.windenergy.dtu.dk:toolbox/WindEnergyToolbox

parents 4cee4bcc 6685fc32
No related branches found
No related tags found
No related merge requests found
Showing
with 881 additions and 21 deletions
......@@ -44,6 +44,36 @@ def Weibull2(u, k, wsp_lst):
edges = np.r_[wsp_lst[0] - (wsp_lst[1] - wsp_lst[0]) / 2, (wsp_lst[1:] + wsp_lst[:-1]) / 2, wsp_lst[-1] + (wsp_lst[-1] - wsp_lst[-2]) / 2]
return [-cdf(e1) + cdf(e2) for wsp, e1, e2 in zip(wsp_lst, edges[:-1], edges[1:])]
def Weibull_IEC(Vref, Vhub_lst):
"""Weibull distribution according to IEC 61400-1:2005, page 24
Parameters
----------
Vref : int or float
Vref of wind turbine class
Vhub_lst : array_like
Wind speed at hub height. Must be equally spaced.
Returns
-------
nd_array : list of probabilities
Examples
--------
>>> Weibull_IEC(50, [4,6,8])
[ 0.11002961 0.14116891 0.15124155]
"""
Vhub_lst = np.array(Vhub_lst)
#Average wind speed
Vave=.2*Vref
#Rayleigh distribution
Pr = lambda x : 1 - np.exp(-np.pi*(x/(2*Vave))**2)
#Wsp bin edges: [4,6,8] -> [3,5,7,9]
wsp_bin_edges = np.r_[Vhub_lst[0] - (Vhub_lst[1] - Vhub_lst[0]) / 2, (Vhub_lst[1:] + Vhub_lst[:-1]) / 2, Vhub_lst[-1] + (Vhub_lst[-1] - Vhub_lst[-2]) / 2]
#probabilities of 3-5, 5-7, 7-9
return np.array([-Pr(e1) + Pr(e2) for e1, e2 in zip(wsp_bin_edges[:-1], wsp_bin_edges[1:])])
class DLCHighLevel(object):
......@@ -159,7 +189,7 @@ class DLCHighLevel(object):
dist = self.dlc_df[dist_key][row]
if str(dist).lower() == "weibull" or str(dist).lower() == "rayleigh":
dist = Weibull2(self.vref * .2, self.shape_k, values)
dist = Weibull_IEC(self.vref, values)
else:
def fmt(v):
if "#" in str(v):
......
......@@ -10,7 +10,7 @@ from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
import unittest
from wetb.dlc.high_level import DLCHighLevel, Weibull
from wetb.dlc.high_level import DLCHighLevel, Weibull, Weibull_IEC
import os
import numpy as np
......@@ -109,7 +109,10 @@ class TestDLCHighLevel(unittest.TestCase):
p_tot = np.array([value for key, value in weibull.items()]).sum()
self.assertTrue(np.allclose(p_tot, 1.0))
def test_weibull_IEC(self):
Vref = 50
np.testing.assert_array_almost_equal(Weibull_IEC(Vref, [4,6,8]), [ 0.11002961, 0.14116891, 0.15124155])
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
......
d = None
d = dir()
from .interpolation import interpolate
__all__ = [m for m in set(dir()) - set(d)]
from ._despike import *
from ._differentiation import *
\ No newline at end of file
......@@ -4,8 +4,7 @@ Created on 13/07/2016
@author: MMPE
'''
import numpy as np
from wetb.signal.filters.first_order import low_pass
from wetb.signal.filters import replacer
from wetb.signal.filters import replacer, frq_filters
replace_by_nan = replacer.replace_by_nan
......@@ -42,15 +41,15 @@ def univeral_thresshold_finder(data, variation='mad', plt=None):
thresshold = np.sqrt(2 * np.log(data.shape[0])) * variation # Universal thresshold (expected maximum of n random variables)
return thresshold_finder(data, thresshold, plt)
def despike(data, dt, spike_finder=univeral_thresshold_finder, spike_replacer=replace_by_nan, plt=None):
def despike(data, spike_length, spike_finder=univeral_thresshold_finder, spike_replacer=replace_by_nan, plt=None):
"""Despike data
Parameters
---------
data : array_like
data
dt : int or float
time step
spike_length : int
Typical spike duration [samples]
spike_finder : function
Function returning indexes of the spikes
spike_replacer : function
......@@ -63,8 +62,10 @@ def despike(data, dt, spike_finder=univeral_thresshold_finder, spike_replacer=re
if plt:
plt.plot(data, label='Input')
data = np.array(data).copy()
lp_data = low_pass(data, dt, 1)
lp_data = low_pass(data, spike_length, 1)
hp_data = data - lp_data
hp_data = frq_filters.high_pass(data, spike_length*4, 1, order=2)
#spike_length, sample_frq/2, 1, order=1)
spike_mask = spike_finder(hp_data, plt=plt)
despike_data = spike_replacer(data, spike_mask)
if plt:
......
'''
Created on 29. mar. 2017
@author: mmpe
'''
from wetb.signal.filters.frq_filters import low_pass
import numpy as np
def differentiation(x,sample_frq=None, cutoff_frq=None):
"""Differentiate the signal
Parameters
----------
x : array_like
The input signal
sample_frq : int, float or None, optional
sample frequency of signal (only required if low pass filer is applied)
cutoff_frq : int, float or None, optional
Low pass filter cut off (frequencies higher than this frequency will be suppressed)
Returns
-------
y : ndarray
differentiated signal
Examples
--------
>>> differentiation([1,2,1,0,1,1])
"""
if cutoff_frq is not None:
assert sample_frq is not None, "Argument sample_frq must be set to apply low pass filter"
x = low_pass(x, sample_frq, cutoff_frq)
else:
x = np.array(x)
dy = np.r_[x[1]-x[0], (x[2:]-x[:-2])/2, x[-1]-x[-2]]
return dy
\ No newline at end of file
'''
Created on 27. mar. 2017
@author: mmpe
'''
import numpy as np
from scipy import signal
def sine_generator(sample_frq, sine_frq, duration):
"""Create a sine signal for filter test
Parameters
----------
sample_frq : int, float
Sample frequency of returned signal [Hz]
sine_frq : int, float
Frequency of sine signal [Hz]
duration : int, float
Duration of returned signal [s]
Returns
-------
t,sig : ndarray, ndarray
time (t) and sine signal (sig)
Examples
--------
>>> sine_generator(10,1,2)
"""
T = duration
nsamples = sample_frq * T
w = 2. * np.pi * sine_frq
t = np.linspace(0, T, nsamples, endpoint=False)
sig = np.sin(w * t)
return t, sig
def low_pass(x, sample_frq, cutoff_frq, order=5):
"""Low pass filter (butterworth)
Parameters
----------
x : array_like
Input signal
sample_frq : int, float
Sample frequency [Hz]
cutoff_frq : int, float
Cut off frequency [Hz]
order : int
Order of the filter (1th order: 20db per decade, 2th order:)
Returns
-------
y : ndarray
Low pass filtered signal
Examples
--------
>>>
"""
nyquist_frq = 0.5 * sample_frq
normal_cutoff = cutoff_frq / nyquist_frq
b,a = signal.butter(order, normal_cutoff, btype='low', analog=False)
return signal.filtfilt(b, a, x)
def high_pass(x, sample_frq, cutoff_frq, order=5):
"""Low pass filter (butterworth)
Parameters
----------
x : array_like
Input signal
sample_frq : int, float
Sample frequency [Hz]
cutoff_frq : int, float
Cut off frequency [Hz]
order : int
Order of the filter (1th order: 20db per decade, 2th order:)
Returns
-------
y : ndarray
Low pass filtered signal
Examples
--------
>>>
"""
nyquist_frq = 0.5 * sample_frq
normal_cutoff = cutoff_frq / nyquist_frq
b,a = signal.butter(order, normal_cutoff, btype='high', analog=False)
return signal.filtfilt(b, a, x)
def frequency_response(sample_frq, cutoff_frq, type, order, plt=None):
"""Frequency response of low/high pass filter (butterworth
Parameters
----------
sample_frq : int, float
Sample frequency [Hz]
cutoff_frq : int, float
Cut off frequency [Hz]
type : {'low','high'}
Low or high pass filter
order : int
Order of the filter (1th order: 20db per decade, 2th order: 40db per decade)
plt : pyplot, optional
If specified, the frequency response is plotted
Returns
-------
w,h : ndarray, ndarray
Frequency (w) in Hz and filter response in db
Examples
--------
>>>
"""
nyquist_frq = 0.5 * sample_frq
normal_cutoff = cutoff_frq / nyquist_frq
assert 0<normal_cutoff<1, "cutoff frequency must be <= nyquist frequency"
b,a = signal.butter(order, cutoff_frq, btype=type, analog=True)
w, h = signal.freqs(b, a)
h_db = 20 * np.log10(abs(h))
if plt:
plt.plot(w, h_db, label='%d order filter response'%order)
plt.legend(loc=0)
title = 'Butterworth filter frequency response'
if plt.axes().get_title()!=title:
plt.title(title)
plt.xlabel('Frequency [Hz]')
plt.ylabel('Amplitude [dB]')
plt.margins(.1, .1)
plt.xscale('log')
plt.grid(which='both', axis='both')
plt.axvline(cutoff_frq, color='green') # cutoff frequency
return w,h_db
\ No newline at end of file
......@@ -4,5 +4,6 @@ d = dir()
from wetb.signal.fit._linear_fit import *
from wetb.signal.fit._bin_fit import *
from wetb.signal.fit._fourier_fit import *
from wetb.signal.fit._spline_fit import *
__all__ = sorted([m for m in set(dir()) - set(d)])
......@@ -103,14 +103,9 @@ def bin_fit(x,y, bins=10, kind='linear', bin_func=np.nanmean, bin_min_count=3, l
else:
raise NotImplementedError("Argument for handling upper observations, %s, not implemented"%upper)
#Create mean function
def fit(x):
x = x[:].copy().astype(np.float)
x[x<bin_x_fit[0]] = np.nan
x[x>bin_x_fit[-1]] = np.nan
return interp1d(bin_x_fit, bin_y_fit, kind)(x[:])
return bin_x_fit, fit
return bin_x_fit, _interpolate_fit(bin_x_fit, bin_y_fit, kind)
def perpendicular_bin_fit(x, y, bins = 30, fit_func=None, bin_min_count=3, plt=None):
"""Fit a curve to the values, (x,y) using bins that are perpendicular to an initial fit
......@@ -190,6 +185,15 @@ def perpendicular_bin_fit(x, y, bins = 30, fit_func=None, bin_min_count=3, plt=N
plt.plot(pbfx, pbfy, 'gray', label="perpendicular fit")
plt.legend()
#PlotData(None, bfx,bfy)
return np.array(pc).T
bin_x_fit, bin_y_fit = np.array(pc).T
return bin_x_fit, _interpolate_fit(bin_x_fit, bin_y_fit, kind="linear")
#Create mean function
def _interpolate_fit(bin_x_fit, bin_y_fit, kind='linear'):
def fit(x):
x = x[:].copy().astype(np.float)
x[x<bin_x_fit[0]] = np.nan
x[x>bin_x_fit[-1]] = np.nan
return interp1d(bin_x_fit, bin_y_fit, kind)(x[:])
return fit
import numpy as np
def spline_fit(xp,yp):
def akima(x, y):
n = len(x)
var = np.zeros((n + 3))
z = np.zeros((n))
co = np.zeros((n, 4))
for i in range(n - 1):
var[i + 2] = (y[i + 1] - y[i]) / (x[i + 1] - x[i])
var[n + 1] = 2 * var[n] - var[n - 1]
var[n + 2] = 2 * var[n + 1] - var[n]
var[1] = 2 * var[2] - var[3]
var[0] = 2 * var[1] - var[2]
for i in range(n):
wi1 = abs(var[i + 3] - var[i + 2])
wi = abs(var[i + 1] - var[i])
if (wi1 + wi) == 0:
z[i] = (var[i + 2] + var[i + 1]) / 2
else:
z[i] = (wi1 * var[i + 1] + wi * var[i + 2]) / (wi1 + wi)
for i in range(n - 1):
dx = x[i + 1] - x[i]
a = (z[i + 1] - z[i]) * dx
b = y[i + 1] - y[i] - z[i] * dx
co[i, 0] = y[i]
co[i, 1] = z[i]
co[i, 2] = (3 * var[i + 2] - 2 * z[i] - z[i + 1]) / dx
co[i, 3] = (z[i] + z[i + 1] - 2 * var[i + 2]) / dx ** 2
co[n - 1, 0] = y[n - 1]
co[n - 1, 1] = z[n - 1]
co[n - 1, 2] = 0
co[n - 1, 3] = 0
return co
p_lst = [lambda x_, c=c, x0=x0: np.poly1d(c[::-1])(x_-x0) for c,x0 in zip(akima(xp,yp), xp)]
def spline(x):
y = np.empty_like(x)+np.nan
segment = np.searchsorted(xp,x, 'right')-1
for i in np.unique(segment):
m = segment==i
y[m] = p_lst[i](x[m])
return y
# def coef2spline(x, xp, co):
#
# print (np.searchsorted(xp,x)-1)
# x, y = [], []
# for i, c in enumerate(co.tolist()[:-1]):
# p = np.poly1d(c[::-1])
# z = np.linspace(0, s[i + 1] - s[i ], 10, endpoint=i >= co.shape[0] - 2)
# x.extend(s[i] + z)
# y.extend(p(z))
# return y
#
return spline
#x, y, z = [coef2spline(curve_z_nd, akima(curve_z_nd, self.c2def[:, i])) for i in range(3)]
#return x, y, z
if __name__=="__main__":
import matplotlib.pyplot as plt
x = np.random.randint(0,100,10)
t = np.arange(0,100,10)
plt.plot(t,x,'.',label='points')
t_ = np.arange(100)
spline = spline_fit(t,x)
print (np.abs(np.diff(np.diff(np.interp(t_, t,x)))).max())
print (np.abs(np.diff(np.diff(spline(t_)))).max())
plt.plot(t_, np.interp(t_, t,x))
plt.plot(t_, spline(t_),label='spline')
plt.show()
from ._rotor_position import *
\ No newline at end of file
'''
Created on 30. mar. 2017
@author: mmpe
'''
import numpy as np
from wetb.signal.fit._spline_fit import spline_fit
def fix_rotor_position(rotor_position, sample_frq, rotor_speed, fix_dt=None, plt=None):
"""Rotor position fitted with spline
Parameters
----------
rotor_position : array_like
Rotor position [deg] (0-360)
sample_frq : int or float
Sample frequency [Hz]
rotor_speed : array_like
Rotor speed [RPM]
fix_dt : int, float or None, optional
Time distance [s] between spline fix points\n
If None (default) a range of seconds is tested and the result that minimize the RMS
between differentiated rotor position fit and rotor speed is used.\n
Note that a significant speed up is achievable by specifying the parameter
plt : PyPlot or None
If PyPlot a visual interpretation is plotted
Returns
-------
y : nd_array
Fitted rotor position
"""
from wetb.signal.subset_mean import revolution_trigger
t = np.arange(len(rotor_position))
indexes = revolution_trigger(rotor_position[:], sample_frq, rotor_speed, max_no_round_diff=4)
rp = rotor_position[:].copy()
for i in indexes:
rp[i:] += 360
if fix_dt is None:
fix_dt = find_fix_dt(rotor_position, sample_frq, rotor_speed)
N = int(np.round(fix_dt*sample_frq))
N2 = N//2
if plt:
a = (rp.max()-rp.min())/t.max()
plt.plot(t/sample_frq,rp-t*a,label='Continus rotor position (detrended)')
points = []
for j, i in enumerate(range(0,len(rp), N)):
#indexes for subsets for overlapping a and b polynomials
i1 = max(0,i-N2)
i2 = min(len(rp),i+N2)
#fit a polynomial
if i1<len(rp):
poly_coef = np.polyfit(t[i1:i2]-t[i1], rp[i1:i2], 1)
points.append((t[i],np.poly1d(poly_coef)(t[i]-t[i1])))
if plt:
plt.plot(t[i1:i2]/sample_frq, np.poly1d(poly_coef)(t[i1:i2]-t[i1])- t[i1:i2]*a, 'mc'[j%2], label=('', "Line fit for points (detrended)")[j<2])
x,y = np.array(points).T
if plt:
plt.plot(x/sample_frq,y-x*a,'.', label='Fit points (detrended)')
plt.plot(t/sample_frq, spline_fit(x,y)(t)-t*a, label='Spline (detrended)')
plt.legend()
plt.show()
return spline_fit(x,y)(t)%360
def find_fix_dt(rotor_position, sample_frq, rotor_speed, plt=None):
"""Find the optimal fix_dt parameter for fix_rotor_position (function above).
Optimal is defined as the value that minimizes the sum of squared differences
between differentiated rotor position and rotor speed
Parameters
----------
rotor_position : array_like
Rotor position [deg] (0-360)
sample_frq : int or float
Sample frequency [Hz]
rotor_speed : array_like
Rotor speed [RPM]
plt : pyplot or None
If pyplot, a visual interpretation is plotted
Returns
-------
y : int
Optimal value for the fix_dt parameter for fix_rotor_position
"""
from wetb.signal.filters import differentiation
def err(i):
rpm_pos = differentiation(fix_rotor_position(rotor_position, sample_frq, rotor_speed, i))%180 / 360 * sample_frq * 60
return np.sum((rpm_pos - rotor_speed)**2)
best = 27
for step in [9,3,1]:
x_lst = np.arange(-2,3)*step + best
res = [err(x) for x in x_lst]
if plt is not None:
plt.plot(x_lst, res,'.-')
best = x_lst[np.argmin(res)]
if plt is not None:
plt.show()
return best
\ No newline at end of file
import numpy as np
import numpy.ma as ma
#from pylab import *
def interpolate(x, xp, yp, max_xp_step=None, max_dydxp=None, cyclic_range=None, max_repeated=None):
"""Interpolation similar to numpy.interp that handles nan and missing values
Parameters
----------
x : array_like
The x-coordinates of the interpolated values.
xp : 1-D sequence of floats
The x-coordinates of the data points, must be increasing.
yp : 1-D sequence of floats
The y-coordinates of the data points, same length as xp.
max_xp_step : int, float or None, optional
Maximum xp-time step that is interpolated to x.\n
If time step > max_xp_step then NAN is returned for intermediate x values
If None, default, then this fix is not applied
max_dydxp : int, float, None, optional
Maximum absolute dydxp (slop of yp) that is interpolated to y.\n
If dydxp > max_dydxp then NAN is returned for intermediate y values
If None, default, then this fix is not applied
cyclick_range : int, float, None, optional
Range of posible values, e.g. 360 for degrees (both 0..360 and -180..180)
If None (default), data not interpreted as cyclic
max_repeated : int, float, None, optional
Maximum xp that yp are allowed to be repeated
if yp[i]==yp[i+1]==..==yp[i+j] and xp[i+j]-xp[i]>max_repeated_yp then
NAN is returned for xp[i]<x<=xp[i+j]
Returns
-------
y : {float, ndarray}
The interpolated values, same shape as x.
Examples
--------
>>> interpolate(x=[1,1.5,2,3], xp=[0.5,1.5,3], yp=[5,15,30])
[10,15,20,30]
>>> interpolate(x=[0, 1, 2, 7, 12], xp=[0, 2, 12], yp=[359, 0, 350], max_dydxp=45)
[359,nan,0,175,350]
"""
xp = np.array(xp, dtype=np.float)
yp = np.array(yp, dtype=np.float)
assert xp.shape[0] == yp.shape[0], "xp and yp must have same length (%d!=%d)" % (xp.shape[0], yp.shape[0])
non_nan = ~(np.isnan(xp) & np.isnan(yp))
yp = yp[non_nan]
xp = xp[non_nan]
y = np.interp(x, xp, yp, np.nan, np.nan)
if cyclic_range is not None:
cr = cyclic_range
y2 = np.interp(x, xp, (yp + cr / 2) % cr - cr / 2, np.nan, np.nan) % cr
y = np.choose(np.r_[0, np.abs(np.diff(y)) > np.abs(np.diff(y2))], np.array([y, y2]))
if max_xp_step:
diff = np.diff(xp)
diff[np.isnan(diff)] = 0
indexes = (np.where(diff > max_xp_step)[0])
for index in indexes:
y[(x > xp[index]) & (x < xp[index + 1])] = np.nan
if max_dydxp:
if cyclic_range is None:
abs_dydxp = np.abs(np.diff(yp) / np.diff(xp))
else:
abs_dydxp = np.min([np.abs(np.diff((yp + cyclic_range / 2) % cyclic_range)) , np.abs(np.diff(yp % cyclic_range)) ], 0) / np.diff(xp)
abs_dydxp[np.isnan(abs_dydxp)] = 0
indexes = (np.where(abs_dydxp > max_dydxp)[0])
for index in indexes:
y[(x > xp[index]) & (x < xp[index + 1])] = np.nan
if max_repeated:
rep = np.r_[False, yp[1:] == yp[:-1], False]
tr = rep[1:] ^ rep[:-1]
itr = np.where(tr)[0]
for start, stop, l in zip (itr[::2] , itr[1::2], xp[itr[1::2]] - xp[itr[::2]]):
if l >= max_repeated:
y[(x > xp[start]) & (x <= xp[stop])] = np.nan
return y
#print (interpolate(x=[0, 1, 2, 3, 4], xp=[0, 1, 2, 4], yp=[5, 5, 6, 6], max_repeated=1))
'''
Created on 02/11/2015
@author: MMPE
'''
import numpy as np
from wetb.signal.filters import replacer
def replace_by_mean(x):
return replacer.replace_by_mean(x, np.isnan(x))
def replace_by_line(x):
return replacer.replace_by_line(x, np.isnan(x))
def replace_by_polynomial(x, deg=3, no_base_points=12):
return replacer.replace_by_polynomial(x, np.isnan(x), deg, no_base_points)
def max_no_nan(x):
return replacer.max_cont_mask_length(np.isnan(x))
......@@ -136,7 +136,53 @@ def cycle_trigger(values, trigger_value=None, step=1, ascending=True, tolerance=
else:
return np.where((values[1:] < trigger_value - tolerance) & (values[:-1] >= trigger_value + tolerance))[0][::step]
def revolution_trigger(values, rpm_dt=None, dmin=5, dmax=10, ):
def revolution_trigger(rotor_position, sample_frq, rotor_speed, max_no_round_diff=1):
"""Returns one index per revolution (minimum rotor position)
Parameters
----------
rotor_position : array_like
Rotor position [deg] (0-360)
sample_frq : int or float
Sample frequency [Hz]
rotor_speed : array_like
Rotor speed [RPM]
Returns
-------
nd_array : Array of indexes
"""
if isinstance(rotor_speed, (float, int)):
rotor_speed = np.ones_like(rotor_position)*rotor_speed
deg_per_sample = rotor_speed*360/60/sample_frq
sample_per_round = 1/(rotor_speed/60/sample_frq)
thresshold = deg_per_sample.max()*2
nround_rotor_speed = np.nansum(rotor_speed/60/sample_frq)
mod = [v for v in [5,10,30,60,90] if v>thresshold][0]
nround_rotor_position = np.nansum(np.diff(rotor_position)%mod)/360
#assert abs(nround_rotor_position-nround_rotor_speed)<max_no_round_diff, "No of rounds from rotor_position (%.2f) mismatch with no_rounds from rotor_speed (%.2f)"%(nround_rotor_position, nround_rotor_speed)
#print (nround_rotor_position, nround_rotor_speed)
rp = np.array(rotor_position).copy()
#filter degree increase > thresshold
rp[np.r_[False, np.diff(rp)>thresshold]] = 180
upper_indexes = np.where((rp[:-1]>(360-thresshold))&(rp[1:]<(360-thresshold)))[0]
lower_indexes = np.where((rp[:-1]>thresshold)&(rp[1:]<thresshold))[0] +1
# Best lower is the first lower after upper
best_lower = lower_indexes[np.searchsorted(lower_indexes, upper_indexes)]
upper2lower = best_lower - upper_indexes
best_lower = best_lower[upper2lower<upper2lower.mean()*2]
#max_dist_error = max([np.abs((i2-i1)- np.mean(sample_per_round[i1:i2])) for i1,i2 in zip(best_lower[:-1], best_lower[1:])])
#assert max_dist_error < sample_frq/5, max_dist_error
return best_lower
def revolution_trigger_old(values, rpm_dt=None, dmin=5, dmax=10, ):
"""Return indexes where values are > max(values)-dmin and decreases more than dmax
If RPM and time step is provided, triggers steps < time of 1rpm is removed
......
'''
Created on 29. mar. 2017
@author: mmpe
'''
import unittest
import numpy as np
from wetb.signal.filters._differentiation import differentiation
class Test(unittest.TestCase):
def testDifferentiation(self):
np.testing.assert_array_equal(differentiation([1,2,1,0,1,1]), [1,0,-1,0,.5,0])
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testDifferentiation']
unittest.main()
\ No newline at end of file
......@@ -10,6 +10,7 @@ import os
import unittest
from wetb.signal.fit import fourier_fit
from wetb.signal.error_measures import rms
from wetb.signal.fit import spline_fit
tfp = os.path.join(os.path.dirname(__file__), 'test_files/')
class TestFit(unittest.TestCase):
......@@ -88,11 +89,11 @@ class TestFit(unittest.TestCase):
x,y = ds('Wsp_metmast'), ds('Power')
if 0:
import matplotlib.pyplot as plt
fx, fy = perpendicular_bin_fit(x,y,30,plt=plt)
fx, fit = perpendicular_bin_fit(x,y,30,plt=plt)
plt.show()
else:
fx, fy = perpendicular_bin_fit(x,y,30)
fx, fit = perpendicular_bin_fit(x,y,30)
self.assertEqual(len(fx), 30)
......@@ -143,6 +144,24 @@ class TestFit(unittest.TestCase):
# plt.plot(fourier_fit.F2x(np.fft.fft(y) / len(y)), label='fft')
# plt.legend()
# plt.show()
def test_spline(self):
x = np.random.randint(0,100,10)
t = np.arange(0,100,10)
t_ = np.arange(100)
spline = spline_fit(t,x)
acc_lin = np.diff(np.diff(np.interp(t_, t,x)))
acc_spline = np.diff(np.diff(spline(t_)))
self.assertLess(np.abs(acc_spline).max(), np.abs(acc_lin).max())
if 0:
import matplotlib.pyplot as plt
plt.plot(t,x,'.',label='points')
plt.plot(t_, spline(t_),label='spline')
plt.legend()
plt.show()
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
\ No newline at end of file
'''
Created on 30. mar. 2017
@author: mmpe
'''
import os
import unittest
from wetb import gtsdf
from wetb.signal.fix._rotor_position import fix_rotor_position, find_fix_dt
from wetb.signal.filters._differentiation import differentiation
tfp = os.path.join(os.path.dirname(__file__), 'test_files/')
import numpy as np
class TestFix(unittest.TestCase):
def testRotorPositionFix(self):
ds = gtsdf.Dataset(tfp+'azi.hdf5')
sample_frq = 25
#import matplotlib.pyplot as plt
#print (find_fix_dt(ds.azi, sample_frq, ds.Rot_cor, plt))
rp_fit = fix_rotor_position(ds.azi, sample_frq, ds.Rot_cor)
rpm_pos = differentiation(rp_fit)%180 / 360 * sample_frq * 60
err_sum = np.sum((rpm_pos - ds.Rot_cor)**2)
self.assertLess(err_sum,40)
if 0:
import matplotlib.pyplot as plt
t = ds.Time-ds.Time[0]
plt.plot(t, differentiation(ds.azi)%180 / 360 * sample_frq * 60, label='fit')
plt.plot(t, ds.Rot_cor)
plt.plot(t, differentiation(rp_fit)%180 / 360 * sample_frq * 60, label='fit')
plt.ylim(10,16)
plt.show()
def test_find_fix_dt(self):
ds = gtsdf.Dataset(tfp+'azi.hdf5')
sample_frq = 25
self.assertEqual(find_fix_dt(ds.azi, sample_frq, ds.Rot_cor), 4)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testRotorPositionFix']
unittest.main()
\ No newline at end of file
'''
Created on 27. mar. 2017
@author: mmpe
'''
import unittest
import numpy as np
from wetb.signal.filters.frq_filters import sine_generator, low_pass, \
frequency_response, high_pass
class Test(unittest.TestCase):
def test_sine_generator(self):
t,y = sine_generator(10,1,2)
self.assertEqual(np.diff(t).mean(),.1)
self.assertAlmostEqual(t.max(),1.9)
if 0:
import matplotlib.pyplot as plt
plt.plot(*sine_generator(10,1,2), label="1Hz sine")
plt.plot(*sine_generator(100,2,2), label="2Hz sine")
plt.legend()
plt.show()
def test_low_pass(self):
sf = 100 # sample frequency
t,y1 = sine_generator(sf,1,5) # 1Hz sine
t,y10 = sine_generator(sf,10,5) # 10Hz sine
sig = y1+y10
y_lp = low_pass(sig, sf, 1, order=1)
# 1 order:
# cut off frq: -3db
# Above cut off: 20db/decade
np.testing.assert_almost_equal(y_lp[100:400], y1[100:400]*.501, 1)
np.testing.assert_almost_equal((y_lp-y1*.501)[100:400], y10[100:400]*.01, 1)
if 0:
import matplotlib.pyplot as plt
plt.plot(t,sig, label='Input signal')
plt.plot(t, y1*0.501, label='1Hz sine (-3db)')
plt.plot(t,y_lp, label='Output signal')
plt.plot(t,y_lp - y1*0.501, label="Output - 1Hz sine(-3db)")
plt.plot(t, y10*0.01, label='10Hz sine (-20db)')
plt.plot(t, y_lp - y1*0.501 - y10*.01, label="(Output - 1Hz sine(-3db)) - 10Hz sine (-20db)")
plt.legend()
plt.show()
def test_frq_response(self):
w,h = frequency_response(100,10,'low',1)
self.assertAlmostEqual(np.interp(10, w, h), -3.01,2) # cut off frq -3.01 db
self.assertAlmostEqual(np.interp(100, w, h), -20,1) # -20 db per decade
if 0:
import matplotlib.pyplot as plt
frequency_response(100,10,'low',1, plt=plt)
frequency_response(100,10,'low',2, plt=plt)
frequency_response(100,10,'low',5, plt=plt)
frequency_response(100,10,'low',8, plt=plt)
frequency_response(100,10,'low',10, plt=plt)
frequency_response(100,10,'high',1, plt=plt)
frequency_response(100,10,'high',2, plt=plt)
plt.show()
def test_high_pass(self):
sf = 100 # sample frequency
t,y1 = sine_generator(sf,1,5) # 1Hz sine
t,y10 = sine_generator(sf,10,5) # 10Hz sine
sig = y1+y10
y_lp = high_pass(sig, sf, 10, order=1)
# 1 order:
# cut off frq: -3db
# Below cut off: 20db/decade
np.testing.assert_almost_equal(y_lp[100:400], y10[100:400]*.501, 1)
np.testing.assert_almost_equal((y_lp-y10*.501)[100:400], y1[100:400]*.01, 1)
if 0:
import matplotlib.pyplot as plt
plt.plot(t,sig, label='Input signal')
plt.plot(t, y10*0.501, label='10Hz sine (-3db)')
plt.plot(t,y_lp, label='Output signal')
plt.plot(t,y_lp - y10*0.501, label="Output - 10Hz sine(-3db)")
plt.plot(t, y1*0.01, label='1Hz sine (-20db)')
plt.plot(t, y_lp - y10*0.501 - y1*.01, label="(Output - 10Hz sine(-3db)) - 1Hz sine (-20db)")
plt.legend()
plt.show()
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.test_sine_generator']
unittest.main()
\ No newline at end of file
'''
Created on 19/12/2014
@author: MMPE
'''
import unittest
from matplotlib.pyplot import *
import numpy as np
from wetb import signal
from wetb import gtsdf
import os
from wetb.gtsdf.unix_time import from_unix
import datetime
class TestInterpolation(unittest.TestCase):
def test_interpolate1(self):
x = [1, 1.5, 2, 3]
xp = [0.5, 1.5, 3]
yp = [5, 15, 30]
np.testing.assert_array_equal(signal.interpolate(x, xp, yp), [10., 15., 20, 30.])
np.testing.assert_array_equal(signal.interpolate(x, xp, yp, 1), [10., 15., np.nan, 30.])
def test_interpolate2(self):
x = np.arange(0, 100, 5, dtype=np.float)
xp = np.arange(0, 100, 10, dtype=np.float)
xp = np.r_[xp[:3], xp[5:]]
yp = np.arange(10, dtype=np.float)
yp[7:8] = np.nan
yp = np.r_[yp[:3], yp[5:]]
#x = [ 0. 5. 10. 15. 20. 25. 30. 35. 40. 45. 50. 55. 60. 65. 70. 75. 80. 85. 90. 95.]
#xp = [0.0, 10.0, 20.0, 50.0, 60.0, 70.0, 80.0, 90.0]
#yp = [0.0, 1.0, 2.0, 5.0, 6.0, nan, 8.0, 9.0]
y = signal.interpolate(x, xp, yp)
np.testing.assert_array_equal(y[~np.isnan(y)], [0., 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 8. , 8.5, 9. ])
y = signal.interpolate(x, xp, yp, 10)
np.testing.assert_array_equal(y[~np.isnan(y)], [ 0. , 0.5, 1. , 1.5, 2. , 5. , 5.5, 8. , 8.5, 9. ])
def test_interpolate_max_dydxp(self):
x = np.arange(7)
xp = [0, 2, 4, 6]
yp = [358, 359, 0, 1]
y = signal.interpolate(x, xp, yp, max_dydxp=30)
np.testing.assert_array_equal(y, [ 358., 358.5, 359., np.nan, 0., 0.5, 1. ])
y = signal.interpolate(x, xp, yp, max_dydxp=180)
np.testing.assert_array_equal(y, [ 358., 358.5, 359., 179.5, 0., 0.5, 1. ])
def test_interpolate_max_dydxp_cyclic(self):
x = np.arange(7)
xp = [0, 2, 4, 6]
yp = [358, 359, 0, 1]
y = signal.interpolate(x, xp, [178, 179, -180, -179], max_dydxp=30, cyclic_range=360)
np.testing.assert_array_equal(y, [ 178. , 178.5, 179. , -0.5, -180. , -179.5, -179. ])
y = signal.interpolate(x, xp, yp, max_dydxp=30, cyclic_range=360)
np.testing.assert_array_equal(y, [ 358., 358.5, 359., 359.5, 0., 0.5, 1. ])
y = signal.interpolate(xp, x, [ 358., 358.5, 359., 359.5, 0., 0.5, 1. ], max_dydxp=30, cyclic_range=360)
np.testing.assert_array_equal(y, [ 358., 359., 0., 1. ])
y = signal.interpolate(x, xp, yp, max_dydxp=180)
np.testing.assert_array_equal(y, [ 358., 358.5, 359., 179.5, 0., 0.5, 1. ])
def test_interpolate_max_repeated(self):
x = np.arange(7)
xp = [0, 1, 2, 3, 4, 5, 6]
yp = [4, 5, 5, 5, 6, 6, 7]
y = signal.interpolate(x, xp, yp, max_repeated=2)
np.testing.assert_array_equal(y, [ 4, 5, np.nan, np.nan, 6, 6, 7])
x = np.arange(7)
xp = [0, 3, 4, 5, 6]
yp = [5, 5, 7, 6, 6]
y = signal.interpolate(x, xp, yp, max_repeated=2)
np.testing.assert_array_equal(y, [ 5, np.nan, np.nan, np.nan, 7, 6, 6])
xp = [0, 2, 3, 4, 6]
y = signal.interpolate(x, xp, yp, max_repeated=1)
np.testing.assert_array_equal(y, [ 5, np.nan, np.nan, 7, 6, np.nan, np.nan])
xp = [0, 1, 2, 3, 6]
y = signal.interpolate(x, xp, yp, max_repeated=2)
np.testing.assert_array_equal(y, [ 5, 5, 7, 6, np.nan, np.nan, np.nan])
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment