Skip to content
Snippets Groups Projects
Commit 3489b268 authored by Mads M. Pedersen's avatar Mads M. Pedersen
Browse files

add first_order and despiking filters

parent f1036883
No related branches found
No related tags found
No related merge requests found
'''
Created on 29/05/2013
@author: Mads M. Pedersen (mmpe@dtu.dk)
'''
import cython
import numpy as np
#cimport numpy as np
@cython.ccall
@cython.locals(alpha=cython.float, i=cython.int)
def cy_low_pass_filter(inp, delta_t, tau): #cpdef cy_low_pass_filter(np.ndarray[double,ndim=1] inp, double delta_t, double tau):
#cdef np.ndarray[double,ndim=1] output
output = np.empty_like(inp, dtype=np.float)
output[0] = inp[0]
alpha = delta_t / (tau + delta_t)
for i in range(1, inp.shape[0]):
output[i] = output[i - 1] + alpha * (inp[i] - output[i - 1]) # Same as output[i] = alpha*inp[i]+(1-alpha)*output[i-1]
return output
def cy_dynamic_low_pass_filter(inp, delta_t, tau, method): #cpdef cy_dynamic_low_pass_filter(np.ndarray[double,ndim=1] inp, double delta_t, np.ndarray[double,ndim=1] tau, int method):
#cdef np.ndarray[double,ndim=1] output, alpha
#cdef int i
output = np.empty_like(inp, dtype=np.float)
output[0] = inp[0]
if method == 1:
alpha = delta_t / (tau + delta_t)
for i in range(1, inp.shape[0]):
output[i] = output[i - 1] + alpha[i] * (inp[i] - output[i - 1]) # Same as output[i] = alpha*inp[i]+(1-alpha)*output[i-1]
elif method == 2:
for i in range(1, inp.shape[0]):
output[i] = (delta_t * (inp[i] + inp[i - 1] - output[i - 1]) + 2 * tau[i] * output[i - 1]) / (delta_t + 2 * tau[i])
elif method == 3:
for i in range(1, inp.shape[0]):
output[i] = output[i - 1] * np.exp(-delta_t / tau[i]) + inp[i] * (1 - np.exp(-delta_t / tau[i]))
return output
@cython.ccall
@cython.locals(alpha=cython.float, i=cython.int)
def cy_high_pass_filter(inp, delta_t, tau): #cpdef cy_high_pass_filter(np.ndarray[double,ndim=1] inp, double delta_t, double tau):
#cdef np.ndarray[double,ndim=1] output
output = np.empty_like(inp, dtype=np.float)
output[0] = inp[0]
alpha = tau / (tau + delta_t)
for i in range(1, inp.shape[0]):
output[i] = alpha * (output[i - 1] + inp[i] - inp[i - 1])
return output
'''
Created on 13/07/2016
@author: MMPE
'''
import numpy as np
from wetb.signal_tools.filters.first_order import low_pass
from wetb.signal_tools.filters import replacer
from wetb.signal_tools.sonic.despike_sonic import thresshold_finder
replace_by_nan = replacer.replace_by_nan
replace_by_line = replacer.replace_by_line
replace_by_mean = replacer.replace_by_mean
replace_by_polynomial = replacer.replace_by_polynomial
def nanmedian(x):
"""Median ignoring nan (similar to numpy.nanmax"""
return np.median(x[~np.isnan(x)])
def thresshold_finder(data, thresshold, plt=None):
spike_mask = np.abs(data) > thresshold
if plt:
plt.plot(data, label='Fluctuations')
plt.plot ([0, data.shape[0]], [thresshold, thresshold], 'r', label='Thresshold')
plt.plot ([0, data.shape[0]], [-thresshold, -thresshold], 'r')
return spike_mask
def univeral_thresshold_finder(data, variation='mad', plt=None):
## Three variation measures in decreasing order of sensitivity to outliers
variation = {'std': np.sqrt(np.mean((data - np.mean(data)) ** 2)), # standard deviation
'abs': np.mean(np.abs(data - np.mean(data))), # mean abs deviation
'mad': nanmedian(np.abs(data - nanmedian(data))) # median abs deviation (mad)
}.get(variation, variation)
thresshold = np.sqrt(2 * np.log(data.shape[0])) * variation # Universal thresshold (expected maximum of n random variables)
return thresshold_finder(data, thresshold, plt)
def despike(data, dt, spike_finder=univeral_thresshold_finder, spike_replacer=replace_by_nan, plt=None):
"""Despike data
Parameters
---------
data : array_like
data
dt : int or float
time step
spike_finder : function
Function returning indexes of the spikes
spike_replacer : function
function that replaces spikes
Returns
-------
Despiked data
"""
if plt:
plt.plot(data, label='Input')
data = np.array(data).copy()
lp_data = low_pass(data, dt, 1)
hp_data = data - lp_data
spike_mask = spike_finder(hp_data, plt=plt)
despike_data = spike_replacer(data, spike_mask)
if plt:
plt.plot(despike_data, 'k.', label='Output')
plt.legend()
return despike_data
if __name__ == '__main__':
from wetb import gtsdf
time, data, info = gtsdf.load(r'C:\data\SWT36Hovsore\metdata/const_10hz.hdf5')
wsp = data[:, 45]
import matplotlib.pyplot as plt
despike(wsp, .1, lambda data, plt : univeral_thresshold_finder(data, 'std', plt), plt=plt)
plt.show()
'''
Created on 10/01/2015
@author: mmpe
'''
import numpy as np
from wetb.signal_tools.filters import cy_filters
def low_pass(input, delta_t, tau):
if isinstance(tau, (int, float)):
return cy_filters.cy_low_pass_filter(input.astype(np.float64), delta_t, tau)
else:
return cy_filters.cy_dynamic_low_pass_filter(input.astype(np.float64), delta_t, tau)
def high_pass(input, delta_t, tau):
return cy_filters.cy_high_pass_filter(input.astype(np.float64), delta_t, tau)
'''
Created on 19/07/2016
@author: MMPE
'''
'''
Created on 02/11/2015
@author: MMPE
'''
import numpy as np
def _begin_end(x, mask):
begin = np.where(~mask[:-1] & mask[1:])[0]
end = np.where(mask[:-1] & ~mask[1:])[0] + 1
if begin[0] > end[0]:
x[:end[0]] = x[end[0]]
end = end[1:]
if begin[-1] > end[-1]:
x[begin[-1]:] = x[begin[-1]]
begin = begin[:-1]
return begin, end
def replace_by_nan(data, spike_mask):
data[spike_mask] = np.nan
return data
def replace_by_mean(x, mask):
x = x.copy()
for b, e in zip(*_begin_end(x, mask)):
x[b + 1:e] = np.mean([x[b], x[e]])
return x
def replace_by_line(x, mask):
x = x.copy()
for b, e in zip(*_begin_end(x, mask)):
x[b + 1:e] = np.interp(np.arange(b + 1, e), [b, e], [x[b], x[e]])
return x
def replace_by_polynomial(x, mask, deg=3, no_base_points=12):
if not np.any(mask):
return x
x = x.copy()
begin, end = _begin_end(x, mask)
for last_e, b, e, next_b in zip(np.r_[0, end[:-1]], begin, end, np.r_[begin[1:], len(x)]):
if b - last_e >= no_base_points and next_b - e >= no_base_points:
pbegin, pend = max(b - no_base_points, last_e), min(e + no_base_points, next_b)
inter_points = np.r_[np.arange(pbegin + 1, b + 1), np.arange(e, pend)]
pol = np.poly1d(np.polyfit(inter_points , x[inter_points], deg))(np.arange(b + 1, e))
if pol.max() - pol.min() > 2 * (x[inter_points].max() - x[inter_points].min()):
# use linear interpolation if range is too big
x[b + 1:e] = np.interp(np.arange(b + 1, e), [b, e], [x[b], x[e]])
else:
x[b + 1:e] = pol
else:
x[b + 1:e] = np.interp(np.arange(b + 1, e), [b, e], [x[b], x[e]])
return x
def max_cont_mask_length(mask):
if not np.any(mask):
return 0
begin = np.where(~mask[:-1] & mask[1:])[0]
end = np.where(mask[:-1] & ~mask[1:])[0] + 1
no_nan = []
if begin[0] > end[0]:
no_nan.append(end[0])
end = end[1:]
if begin[-1] > end[-1]:
no_nan.append(len(mask) - begin[-1] - 1)
begin = begin[:-1]
return max(np.r_[no_nan, end - 1 - begin])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment