Skip to content
Snippets Groups Projects
Commit f895a78e authored by mads's avatar mads
Browse files

functions - utils

parent dab01062
No related branches found
No related tags found
No related merge requests found
Showing
with 761 additions and 6 deletions
import sys
sys.path.append("../../../../MMPE/")
from mmpe.cython_compile.cython_compile import cython_import
from wetb.utils.cython_compile.cython_compile import cython_import
cython_import('pair_range')
cython_import('peak_trough')
......
No preview for this file type
No preview for this file type
import numpy as np
from wetb.utils.cython_compile.cython_compile import cython_import
def check_signal(signal):
# check input data validity
if not type(signal).__name__ == 'ndarray':
......@@ -61,13 +64,18 @@ def rainflow_windap(signal, levels=255., thresshold=(255 / 50)):
signal = signal / gain
signal = np.round(signal).astype(np.int)
# If possible the module is compiled using cython otherwise the python implementation is used
cython_import('wetb.fatigue_tools.rainflowcounting.peak_trough')
cython_import('wetb.fatigue_tools.rainflowcounting.pair_range')
from wetb.fatigue_tools.rainflowcounting.peak_trough import peak_trough
from wetb.fatigue_tools.rainflowcounting.pair_range import pair_range_amplitude_mean
#Convert to list of local minima/maxima where difference > thresshold
sig_ext = peak_trough(signal, thresshold)
from wetb.fatigue_tools.rainflowcounting.pair_range import pair_range_amplitude_mean
#rainflow count
ampl_mean = pair_range_amplitude_mean(sig_ext)
......@@ -114,7 +122,7 @@ def rainflow_astm(signal):
# Import find extremes and rainflow.
# If possible the module is compiled using cython otherwise the python implementation is used
#cython_import('fatigue_tools.rainflowcount_astm')
cython_import('wetb.fatigue_tools.rainflowcounting.rainflowcount_astm')
from wetb.fatigue_tools.rainflowcounting.rainflowcount_astm import find_extremes, rainflowcount
# Remove points which is not local minimum/maximum
......@@ -123,4 +131,4 @@ def rainflow_astm(signal):
# rainflow count
ampl_mean = np.array(rainflowcount(sig_ext))
return np.array(ampl_mean).T
\ No newline at end of file
return np.array(ampl_mean).T
No preview for this file type
'''
Created on 03/12/2015
@author: mmpe
'''
import unittest
import numpy as np
import datetime
from wetb.utils.timing import print_time
from wetb.gtsdf.unix_time import to_unix
class Test(unittest.TestCase):
@print_time
def r(self, dt):
return [to_unix(dt) for dt in dt]
def test_to_unix(self):
dt = [datetime.datetime(2000, 1, 1, 12, s % 60) for s in np.arange(1000000)]
self.r(dt)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
'''
Created on 07/02/2014
@author: MMPE
'''
import inspect
def set_cache_property(obj, name, get_func, set_func=None):
"""Create a cached property
Parameters
----------
obj : object
Class to add property to
name : str
Name of property
get_func : func
Getter function
set_func : func, optional
Setter function
Examples
--------
>>> class Example(object):
>>> def __init__(self):
>>> set_cache_property(self, "test", self.slow_function)
>>>
>>> e = Example()
>>> e.test # Call, store and return result of e.slow_function
>>> e.test # Return stored result of e.slow_function
"""
_name = "_" + name
setattr(obj, _name, None)
def get(self):
if getattr(obj, _name) is None:
setattr(obj, _name, get_func())
return getattr(obj, _name)
p = property(lambda self:get(self), set_func)
return setattr(obj.__class__, name, p)
def cache_function(f):
"""Cache function decorator
Example:
>>> class Example(object):
>>> @cache_function
>>> def slow_function(self):
>>> # calculate slow result
>>> return 1
>>>
>>> e = Example()
>>> e.slow_function() # Call, store and return result of e.slow_function
>>> e.slow_function() # Return stored result of e.slow_function
"""
def wrap(*args, **kwargs):
self = args[0]
name = "_" + f.__name__
if not hasattr(self, name) or getattr(self, name) is None or kwargs.get("reload", False):
try:
del kwargs['reload']
except KeyError:
pass
# ======HERE============
setattr(self, name, f(*args, **kwargs))
# ======================
if not hasattr(self, "cache_attr_lst"):
self.cache_attr_lst = set()
def clear_cache():
for attr in self.cache_attr_lst:
delattr(self, attr)
self.cache_attr_lst = set()
self.clear_cache = clear_cache
self.cache_attr_lst.add(name)
return getattr(self, name)
if 'reload' in inspect.getargspec(f)[0]:
raise AttributeError("Functions decorated with cache_function are not allowed to take a parameter called 'reload'")
return wrap
d = None;d = dir()
from .cython_compile import cython_compile, cython_import, cython_compile_autodeclare, is_compiled
__all__ = [m for m in set(dir()) - set(d)]
\ No newline at end of file
'''
Created on 29/03/2013
@author: Mads
'''
import cython
import math
def cycheck(p):
for i in range(10):
for y in range(2, int(math.sqrt(p)) + 1):
if p % y == 0:
return False
return True
@cython.ccall
@cython.locals(y=cython.int, p=cython.ulonglong)
def cycheck_pure(p):
for i in range(10):
for y in range(2, int(math.sqrt(p)) + 1):
if p % y == 0:
return False
return True
def cycheck_cdef(p): #cpdef cycheck_cdef(unsigned long long p):
#cdef int y
for i in range(10):
for y in range(2, int(math.sqrt(p)) + 1):
if p % y == 0:
return False
return True
'''
Created on 11/07/2013
@author: Mads M. Pedersen (mmpe@dtu.dk)
'''
import math
from wetb.utils.cython_compile.cython_compile import cython_compile, \
cython_compile_autodeclare, cython_import
from wetb.utils.cython_compile.examples import cycheck
def pycheck(p):
for i in range(10):
for y in range(2, int(math.sqrt(p)) + 1):
if p % y == 0:
return False
return True
@cython_compile
def cycheck_compile(p):
import math
for i in range(10):
for y in range(2, int(math.sqrt(p)) + 1):
if p % y == 0:
return False
return True
@cython_compile_autodeclare
def cycheck_compile_autodeclare(p):
import math
for i in range(10):
for y in range(2, int(math.sqrt(p)) + 1):
if p % y == 0:
return False
return True
p = 17
print (pycheck(p))
cython_import('cycheck')
print (cycheck.cycheck(p))
print (cycheck.cycheck_pure(p))
print (cycheck.cycheck_cdef(p))
print (cycheck_compile(p))
print (cycheck_compile_autodeclare(p))
import numpy as np
def rad(deg):
return deg * np.pi / 180
def deg(rad):
return rad / np.pi * 180
def sind(dir_deg):
return np.sin(rad(dir_deg))
def cosd(dir_deg):
return np.cos(rad(dir_deg))
def tand(dir_deg):
return np.tan(rad(dir_deg))
def mean_deg(dir, axis=0):
"""Mean of angles in degrees
Parameters
----------
dir : array_like
Angles in degrees
Returns
-------
mean_deg : float
Mean angle
"""
return deg(np.arctan2(np.mean(sind(dir[:]), axis), np.mean(cosd(dir[:]), axis)))
def std_deg(dir):
"""Standard deviation of angles in degrees
Parameters
----------
dir : array_like
Angles in degrees
Returns
-------
std_deg : float
standard deviation
"""
return deg(np.sqrt(1 - (np.mean(sind(dir)) ** 2 + np.mean(cosd(dir)) ** 2)))
def wsp_dir2uv(wsp, dir, dir_ref=None):
"""Convert horizontal wind speed and direction to u,v
Parameters
----------
wsp : array_like
Horizontal wind speed
dir : array_like
Wind direction
dir_ref : int or float, optional
Reference direction\n
If None, default, the mean direction is used as reference
Returns
-------
u : array_like
u wind component
v : array_like
v wind component
"""
if dir_ref is None:
dir = dir[:] - mean_deg(dir[:])
else:
dir = dir[:] - dir_ref
u = np.cos(rad(dir)) * wsp[:]
v = -np.sin(rad(dir)) * wsp[:]
return np.array([u, v])
def wsp_dir_tilt2uvw(wsp, dir, tilt, wsp_horizontal, dir_ref=None):
"""Convert horizontal wind speed and direction to u,v,w
Parameters
----------
wsp : array_like
- if wsp_horizontal is True: Horizontal wind speed, $\sqrt{u^2+v^2}\n
- if wsp_horizontal is False: Wind speed, $\sqrt{u^2+v^2+w^2}
dir : array_like
Wind direction
tilt : array_like
Wind tilt
wsp_horizontal : bool
See wsp
dir_ref : int or float, optional
Reference direction\n
If None, default, the mean direction is used as reference
Returns
-------
u : array_like
u wind component
v : array_like
v wind component
w : array_like
v wind component
"""
wsp, dir, tilt = wsp[:], dir[:], tilt[:]
if wsp_horizontal:
w = tand(tilt) * wsp
u, v = wsp_dir2uv(wsp, dir, dir_ref)
else:
w = sind(tilt) * wsp
u, v = wsp_dir2uv(np.sqrt(wsp ** 2 - w ** 2), dir, dir_ref)
return np.array([u, v, w])
def xyz2uvw(x, y, z, left_handed=True):
"""Convert sonic x,y,z measurements to u,v,w wind components
Parameters
----------
x : array_like
Sonic x component
y : array_like
Sonic x component
z : array_like
Sonic x component
left_handed : boolean
if true (default), xyz are defined in left handed coodinate system (default for some sonics)
if false, xyz are defined in normal right handed coordinate system
Returns
-------
u : array_like
u wind component
v : array_like
v wind component
w : array_like
w wind component
"""
x, y, z = map(np.array, [x, y, z])
if left_handed:
y *= -1
theta = deg(np.arctan2(np.mean(y), np.mean(x)))
SV = cosd(theta) * y - sind(theta) * x
SUW = cosd(theta) * x + sind(theta) * y
#% rotation around y of tilt
tilt = deg(np.arctan2(np.mean(z), np.mean(SUW)))
SU = SUW * cosd(tilt) + z * sind(tilt);
SW = z * cosd(tilt) - SUW * sind(tilt);
return np.array([SU, SV, SW])
def rpm2rads(rpm):
return rpm * 2 * np.pi / 60
def abvrel2xyz(alpha, beta, vrel):
"""Convert pitot tube alpha, beta and relative velocity to local Cartesian wind speed velocities
Parameters
----------
alpha : array_like
Pitot tube angle of attack
beta : array_like
Pitot tube side slip angle
vrel : array_like
Pitot tube relative velocity
Returns
-------
x : array_like
Wind component towards pitot tube (positive for postive vrel and -90<beta<90)
y : array_like
Wind component in alpha plane (positive for positive alpha)
z : array_like
Wind component in beta plane (positive for positive beta)
"""
alpha = np.array(alpha, dtype=np.float)
beta = np.array(beta, dtype=np.float)
vrel = np.array(vrel, dtype=np.float)
sign_vsx = -((np.abs(beta) > np.pi / 2) * 2 - 1) # +1 for |beta| <= 90, -1 for |beta|>90
sign_vsy = np.sign(alpha) #+ for alpha > 0
sign_vsz = -np.sign(beta) #- for beta>0
x = sign_vsx * np.sqrt(vrel ** 2 / (1 + np.tan(alpha) ** 2 + np.tan(beta) ** 2))
m = alpha != 0
y = np.zeros_like(alpha)
y[m] = sign_vsy * np.sqrt(vrel[m] ** 2 / ((1 / np.tan(alpha[m])) ** 2 + 1 + (np.tan(beta[m]) / np.tan(alpha[m])) ** 2))
m = beta != 0
z = np.zeros_like(alpha)
z[m] = sign_vsz * np.sqrt(vrel[m] ** 2 / ((1 / np.tan(beta[m])) ** 2 + 1 + (np.tan(alpha[m]) / np.tan(beta[m])) ** 2))
return x, y, z
'''
Created on 10/03/2014
@author: MMPE
'''
import os
import psutil
DEBUG = False
def pexec(args, cwd=None):
"""
usage: errorcode, stdout, stderr, cmd = pexec("MyProgram.exe arg1, arg2", r"c:\tmp\")
"""
import subprocess
if not isinstance(args, (list, tuple)):
args = [args]
args = [str(arg) for arg in args]
for i in range(len(args)):
if os.path.exists(args[i]):
args[i] = str(args[i]).replace('/', os.path.sep).replace('\\', os.path.sep).replace('"', '')
cmd = "%s" % '{} /c "{}"'.format (os.environ.get("COMSPEC", "cmd.exe"), subprocess.list2cmdline(args))
if os.path.isfile(cwd):
cwd = os.path.dirname(cwd)
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd)
stdout, stderr = proc.communicate()
errorcode = proc.returncode
return errorcode, stdout.decode(), stderr.decode(), cmd
def process(args, cwd=None):
import subprocess
if not isinstance(args, (list, tuple)):
args = [args]
args = [str(arg) for arg in args]
for i in range(len(args)):
if os.path.exists(args[i]):
args[i] = str(args[i]).replace('/', os.path.sep).replace('\\', os.path.sep).replace('"', '')
cmd = "%s" % '{} /c "{}"'.format (os.environ.get("COMSPEC", "cmd.exe"), subprocess.list2cmdline(args))
if cwd is not None and os.path.isfile(cwd):
cwd = os.path.dirname(cwd)
return subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, cwd=cwd)
def exec_process(process):
stdout, stderr = process.communicate()
errorcode = process.returncode
return errorcode, stdout.decode(), stderr.decode()
'''
Created on 08/11/2013
@author: mmpe
'''
import multiprocessing
import time
import unittest
from wetb.utils.timing import get_time
from wetb.utils.caching import cache_function, set_cache_property
class Example(object):
def __init__(self, *args, **kwargs):
object.__init__(self, *args, **kwargs)
set_cache_property(self, "test", self.slow_function)
set_cache_property(self, 'pool', lambda : multiprocessing.Pool(20))
def slow_function(self):
time.sleep(1)
return 1
@cache_function
def test_cache_function(self):
return self.slow_function()
@get_time
def prop(self, prop):
return getattr(self, prop)
def f(x):
return x ** 2
class TestCacheProperty(unittest.TestCase):
def setUp(self):
pass
def testcache_property_test(self):
e = Example()
self.assertAlmostEqual(e.prop("test")[1], 1, 2)
self.assertAlmostEqual(e.prop("test")[1], 0, 2)
def testcache_property_pool(self):
e = Example()
print (e.prop("pool"))
self.assertAlmostEqual(e.prop("pool")[1], 0, places=4)
print (get_time(e.pool.map)(f, range(10)))
def test_cache_function(self):
e = Example()
self.assertAlmostEqual(get_time(e.test_cache_function)()[1], 1, places=2)
self.assertAlmostEqual(get_time(e.test_cache_function)()[1], 0, places=2)
self.assertAlmostEqual(get_time(e.test_cache_function)(reload=True)[1], 1, places=2)
self.assertAlmostEqual(get_time(e.test_cache_function)()[1], 0, places=2)
e.clear_cache()
self.assertAlmostEqual(get_time(e.test_cache_function)()[1], 1, places=2)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
'''
Created on 15/01/2014
@author: MMPE
'''
import unittest
import wetb.gtsdf
import numpy as np
from wetb.utils.geometry import rad, deg, mean_deg, sind, cosd, std_deg, xyz2uvw, \
wsp_dir2uv, wsp_dir_tilt2uvw, tand
class Test(unittest.TestCase):
def test_rad(self):
self.assertEqual(rad(45), np.pi / 4)
self.assertEqual(rad(135), np.pi * 3 / 4)
def test_deg(self):
self.assertEqual(45, deg(np.pi / 4))
self.assertEqual(135, deg(np.pi * 3 / 4))
def test_rad_deg(self):
for i in [15, 0.5, 355, 400]:
self.assertEqual(i, deg(rad(i)), i)
def test_sind(self):
self.assertAlmostEqual(sind(30), .5)
def test_cosd(self):
self.assertAlmostEqual(cosd(60), .5)
def test_tand(self):
self.assertAlmostEqual(tand(30), 0.5773, 3)
def test_mean_deg(self):
self.assertEqual(mean_deg(np.array([0, 90])), 45)
self.assertAlmostEqual(mean_deg(np.array([350, 10])), 0)
def test_mean_deg_array(self):
a = np.array([[0, 90], [350, 10], [0, -90]])
np.testing.assert_array_almost_equal(mean_deg(a, 1), [45, 0, -45])
np.testing.assert_array_almost_equal(mean_deg(a.T, 0), [45, 0, -45])
def test_std_deg(self):
self.assertEqual(std_deg(np.array([0, 0, 0])), 0)
self.assertAlmostEqual(std_deg(np.array([0, 90, 180, 270])), 57.296, 2)
def test_wspdir2uv(self):
u, v = wsp_dir2uv(np.array([1, 1, 1]), np.array([30, 0, 330]))
np.testing.assert_array_almost_equal(u, [0.8660, 1, 0.8660], 3)
np.testing.assert_array_almost_equal(v, [-0.5, 0, 0.5], 3)
def test_wspdir2uv_dir_ref(self):
u, v = wsp_dir2uv(np.array([1, 1, 1]), np.array([30, 0, 330]), 30)
np.testing.assert_array_almost_equal(u, [1, 0.8660, .5], 3)
np.testing.assert_array_almost_equal(v, [0, 0.5, .8660], 3)
def test_xyz2uvw(self):
u, v, w = xyz2uvw([1, 1, 0], [0, 1, 1], 0, left_handed=False)
np.testing.assert_almost_equal(u, [np.sqrt(1 / 2), np.sqrt(2), np.sqrt(1 / 2)])
np.testing.assert_almost_equal(v, [-np.sqrt(1 / 2), 0, np.sqrt(1 / 2)])
u, v, w = xyz2uvw([1, 1, 0], [0, 1, 1], 0, left_handed=True)
np.testing.assert_almost_equal(u, [np.sqrt(1 / 2), np.sqrt(2), np.sqrt(1 / 2)])
np.testing.assert_almost_equal(v, [np.sqrt(1 / 2), 0, -np.sqrt(1 / 2)])
u, v, w = xyz2uvw(np.array([-1, -1, -1]), np.array([-0.5, 0, .5]), np.array([0, 0, 0]), left_handed=False)
np.testing.assert_array_almost_equal(u, np.array([1, 1, 1]))
np.testing.assert_array_almost_equal(v, np.array([.5, 0, -.5]))
np.testing.assert_array_almost_equal(w, np.array([0, 0, 0]))
u, v, w = xyz2uvw(np.array([.5, cosd(30), 1]), np.array([sind(60), sind(30), 0]), np.array([0, 0, 0]), left_handed=False)
np.testing.assert_array_almost_equal(u, np.array([sind(60), 1, sind(60)]))
np.testing.assert_array_almost_equal(v, np.array([.5, 0, -.5]))
np.testing.assert_array_almost_equal(w, np.array([0, 0, 0]))
u, v, w = xyz2uvw(np.array([.5, cosd(30), 1]), np.array([0, 0, 0]), np.array([sind(60), sind(30), 0]), left_handed=False)
np.testing.assert_array_almost_equal(u, np.array([sind(60), 1, sind(60)]))
np.testing.assert_array_almost_equal(v, np.array([0, 0, 0]))
np.testing.assert_array_almost_equal(w, np.array([.5, 0, -.5]))
def test_wspdir2uv2(self):
time, data, info = wetb.gtsdf.load("test_files/SonicDataset.hdf5")
stat, x, y, z, temp, wsp, dir, tilt = data[2:3].T #xyz is left handed
np.testing.assert_array_almost_equal(xyz2uvw(*wsp_dir2uv(wsp, dir), z=0), xyz2uvw(x, y, 0))
def test_wspdirtil2uvw(self):
time, data, info = wetb.gtsdf.load("test_files/SonicDataset.hdf5")
stat, x, y, z, temp, wsp, dir, tilt = data[3:6].T #xyz is left handed
wsp = np.sqrt(wsp ** 2 + z ** 2)
np.testing.assert_array_almost_equal(xyz2uvw(*wsp_dir_tilt2uvw(wsp, dir, tilt, wsp_horizontal=False), left_handed=False), xyz2uvw(x, y, z))
def test_wspdirtil2uvw_horizontal_wsp(self):
time, data, info = wetb.gtsdf.load("test_files/SonicDataset.hdf5")
stat, x, y, z, temp, wsp, dir, tilt = data[:].T #xyz is left handed
np.testing.assert_array_almost_equal(xyz2uvw(*wsp_dir_tilt2uvw(wsp, dir, tilt, wsp_horizontal=True), left_handed=False), xyz2uvw(x, y, z))
np.testing.assert_array_almost_equal(wsp_dir_tilt2uvw(wsp, dir, tilt, wsp_horizontal=True, dir_ref=180), np.array([x, -y, z]), 5)
np.testing.assert_array_almost_equal(xyz2uvw(*wsp_dir_tilt2uvw(wsp, dir, tilt, wsp_horizontal=True), left_handed=False), xyz2uvw(x, y, z))
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.test_rad']
unittest.main()
from six import exec_
import time
import inspect
def get_time(f):
"""Get time decorator
returns (return_values, time_of_execution)
>>> @get_time
>>> def test():
>>> time.sleep(1)
>>> return "end"
>>>
>>> test()
('end', 0.999833492421551)
"""
def wrap(*args, **kwargs):
t = time.clock()
res = f(*args, **kwargs)
return res, time.clock() - t
w = wrap
w.__name__ = f.__name__
return w
def print_time(f):
"""Print time decorator
prints name of method and time of execution
>>> @print_time
>>> def test():
>>> time.sleep(1)
>>>
>>> test()
test 1.000s
"""
def wrap(*args, **kwargs):
t = time.time()
res = f(*args, **kwargs)
print ("%-12s\t%.3fs" % (f.__name__, time.time() - t))
return res
w = wrap
w.__name__ = f.__name__
return w
cum_time = {}
def print_cum_time(f):
"""Print cumulated time decorator
prints name of method and cumulated time of execution
>>> @print_cum_time
>>> def test():
>>> time.sleep(1)
>>>
>>> test()
test 0001 calls, 1.000000s, 1.000000s pr. call'
>>> test()
test 0002 calls, 2.000000s, 1.000000s pr. call'
"""
if f not in cum_time:
cum_time[f] = (0, 0)
def wrap(*args, **kwargs):
t = time.time()
res = f(*args, **kwargs)
ct = cum_time[f][1] + time.time() - t
n = cum_time[f][0] + 1
cum_time[f] = (n, ct)
print ("%-12s\t%.4d calls, %03fs, %fs pr. call'" % (f.__name__, n, ct, ct / n))
return res
w = wrap
w.__name__ = f.__name__
return w
def print_line_time(f):
"""Execute one line at the time and prints the time of execution.
Only for non-branching and non-looping code
prints: time_of_line, cumulated_time, code_line
>>> @print_line_time
>>> def test():
>>> time.sleep(.3)
>>> time.sleep(.5)
>>>
>>> test()
0.300s 0.300s time.sleep(.3)
0.510s 0.810s time.sleep(.51)
"""
def wrap(*args, **kwargs):
arg_names, varargs, varkw, defaults = inspect.getargspec(f)
kwargs[varargs] = args[len(arg_names):]
kwargs[varkw] = {}
for k, v in kwargs.items():
if k not in tuple(arg_names) + (varargs, varkw):
kwargs.pop(k)
kwargs[varkw][k] = v
if defaults:
kwargs.update(dict(zip(arg_names[::-1], defaults[::-1])))
kwargs.update(dict(zip(arg_names, args)))
lines = inspect.getsourcelines(f)[0][2:]
tcum = time.clock()
locals = kwargs
gl = f.__globals__
for l in lines:
tline = time.clock()
exec(l.strip(), locals, gl) #res = f(*args, **kwargs)
print ("%.3fs\t%.3fs\t%s" % (time.clock() - tline, time.clock() - tcum, l.strip()))
w = wrap
w.__name__ = f.__name__
return w
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment