Newer
Older
'rainflow_windap' and 'rainflow_astm' are two different methods to for rainflow counting
'eq_load' calculate equivalent loads using one of the two rain flow counting methods
'cycle_matrix' calculates a matrix of cycles (binned on amplitude and mean value)
'eq_load_and_cycles' is used to calculate eq_loads of multiple time series (e.g. life time equivalent load)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
'''
#try:
# """
# The cython_import function compiles modules using cython.
# It is found at: https://github.com/madsmpedersen/MMPE/blob/master/cython_compile/cython_compile.py
# """
# from mmpe.cython_compile.cython_compile import cython_import
#except ImportError:
# cython_import = __import__
import numpy as np
def rfc_hist(sig_rf, nrbins=46):
"""
Histogram of rainflow counted cycles
====================================
hist, bin_edges, bin_avg = rfc_hist(sig, nrbins=46)
Divide the rainflow counted cycles of a signal into equally spaced bins.
Created on Wed Feb 16 16:53:18 2011
@author: David Verelst
Modified 10.10.2011 by Mads M Pedersen to elimintate __copy__ and __eq__
Parameters
----------
sig_rf : array-like
As output by rfc_astm or rainflow
nrbins : int, optional
Divide the rainflow counted amplitudes in a number of equally spaced
bins.
Returns
-------
hist : array-like
Counted rainflow cycles per bin, has nrbins elements
bin_edges : array-like
Edges of the bins, has nrbins+1 elements.
bin_avg : array-like
Average rainflow cycle amplitude per bin, has nrbins elements.
"""
rf_half = sig_rf
# the Matlab approach is to divide into 46 bins
bin_edges = np.linspace(0, 1, num=nrbins + 1) * rf_half.max()
hist = np.histogram(rf_half, bins=bin_edges)[0]
# calculate the average per bin
hist_sum = np.histogram(rf_half, weights=rf_half, bins=bin_edges)[0]
# replace zeros with one, to avoid 0/0
hist_ = hist.copy()
hist_[(hist == 0).nonzero()] = 1.0
# since the sum is also 0, the avg remains zero for those whos hist is zero
bin_avg = hist_sum / hist_
return hist, bin_edges, bin_avg
def check_signal(signal):
# check input data validity
if not type(signal).__name__ == 'ndarray':
raise TypeError('signal must be ndarray, not: ' + type(signal).__name__)
elif len(signal.shape) not in (1, 2):
raise TypeError('signal must be 1D or 2D, not: ' + str(len(signal.shape)))
if len(signal.shape) == 2:
if signal.shape[1] > 1:
raise TypeError('signal must have one column only, not: ' + str(signal.shape[1]))
if np.min(signal) == np.max(signal):
raise TypeError("Signal contains no variation")
def rainflow_windap(signal, levels=255., thresshold=(255 / 50)):
"""Windap equivalent rainflow counting
Calculate the amplitude and mean values of half cycles in signal
This algorithms used by this routine is implemented directly as described in
"Recommended Practices for Wind Turbine Testing - 3. Fatigue Loads", 2. edition 1990, Appendix A
Parameters
----------
Signal : array-like
The raw signal
levels : int, optional
The signal is discretize into this number of levels.
255 is equivalent to the implementation in Windap
thresshold : int, optional
Cycles smaller than this thresshold are ignored
255/50 is equivalent to the implementation in Windap
Returns
-------
ampl : array-like
Peak to peak amplitudes of the half cycles
mean : array-like
Mean values of the half cycles
Example
-------
>>> signal = np.array([-2.0, 0.0, 1.0, 0.0, -3.0, 0.0, 5.0, 0.0, -1.0, 0.0, 3.0, 0.0, -4.0, 0.0, 4.0, 0.0, -2.0])
>>> ampl, mean = rainflow_windap(signal)
"""
check_signal(signal)
#type <double> is required by <find_extreme> and <rainflow>
signal = signal.astype(np.double)
offset = np.nanmin(signal)
signal -= offset
if np.nanmax(signal) > 0:
gain = np.nanmax(signal) / levels
signal = signal / gain
signal = np.round(signal).astype(np.int)
from wetb.fatigue_tools.peak_trough import peak_trough
#Convert to list of local minima/maxima where difference > thresshold
sig_ext = peak_trough(signal, thresshold)
from wetb.fatigue_tools.pair_range import pair_range_amplitude_mean
#rainflow count
ampl_mean = pair_range_amplitude_mean(sig_ext)
ampl_mean = np.array(ampl_mean)
ampl_mean = np.round(ampl_mean / thresshold) * gain * thresshold
ampl_mean[:, 1] += offset
return ampl_mean.T
def rainflow_astm(signal):
"""Matlab equivalent rainflow counting
Calculate the amplitude and mean values of half cycles in signal
This implemementation is based on the c-implementation by Adam Nieslony found at
the MATLAB Central File Exchange http://www.mathworks.com/matlabcentral/fileexchange/3026
Parameters
----------
Signal : array-like
The raw signal
Returns
-------
ampl : array-like
peak to peak amplitudes of the half cycles (note that the matlab implementation
uses peak amplitude instead of peak to peak)
mean : array-like
Mean values of the half cycles
Examples
--------
>>> signal = np.array([-2.0, 0.0, 1.0, 0.0, -3.0, 0.0, 5.0, 0.0, -1.0, 0.0, 3.0, 0.0, -4.0, 0.0, 4.0, 0.0, -2.0])
>>> ampl, mean = rainflow_astm(signal)
"""
check_signal(signal)
# type <double> is reuqired by <find_extreme> and <rainflow>
signal = signal.astype(np.double)
# Import find extremes and rainflow.
# If possible the module is compiled using cython otherwise the python implementation is used
#cython_import('fatigue_tools.rainflowcount_astm')
from wetb.fatigue_tools.rainflowcount_astm import find_extremes, rainflowcount
# Remove points which is not local minimum/maximum
sig_ext = find_extremes(signal)
# rainflow count
ampl_mean = np.array(rainflowcount(sig_ext))
return np.array(ampl_mean).T
def eq_load(signals, no_bins=46, m=[3, 4, 6, 8, 10, 12], neq=1, rainflow_func=rainflow_windap):
"""Equivalent load calculation
Calculate the equivalent loads for a list of Wohler exponent and number of equivalent loads
Parameters
----------
signals : list of tuples or array_like
- if list of tuples: list must have format [(sig1_weight, sig1),(sig2_weight, sig1),...] where\n
- sigx_weight is the weight of signal x\n
- sigx is signal x\n
- if array_like: The signal
no_bins : int, optional
Number of bins in rainflow count histogram
m : int, float or array-like, optional
Wohler exponent (default is [3, 4, 6, 8, 10, 12])
neq : int, float or array-like, optional
The equivalent number of load cycles (default is 1, but normally the time duration in seconds is used)
rainflow_func : {rainflow_windap, rainflow_astm}, optional
The rainflow counting function to use (default is rainflow_windap)
Returns
-------
eq_loads : array-like
List of lists of equivalent loads for the corresponding equivalent number(s) and Wohler exponents
Examples
--------
>>> signal = np.array([-2.0, 0.0, 1.0, 0.0, -3.0, 0.0, 5.0, 0.0, -1.0, 0.0, 3.0, 0.0, -4.0, 0.0, 4.0, 0.0, -2.0])
>>> eq_load(signal, no_bins=50, neq=[1, 17], m=[3, 4, 6], rainflow_func=rainflow_windap)
[[10.311095426959747, 9.5942535021382174, 9.0789213365013932], # neq = 1, m=[3,4,6]
[4.010099657859783, 4.7249689509841746, 5.6618639965313005]], # neq = 17, m=[3,4,6]
eq_load([(.4, signal), (.6, signal)], no_bins=50, neq=[1, 17], m=[3, 4, 6], rainflow_func=rainflow_windap)
[[10.311095426959747, 9.5942535021382174, 9.0789213365013932], # neq = 1, m=[3,4,6]
[4.010099657859783, 4.7249689509841746, 5.6618639965313005]], # neq = 17, m=[3,4,6]
"""
try:
return eq_load_and_cycles(signals, no_bins, m, neq, rainflow_func)[0]
except TypeError:
return [[np.nan] * len(np.atleast_1d(m))] * len(np.atleast_1d(neq))
# ampl, _ = rainflow_func(signals)
# if ampl is None:
# return []
# hist_data, x, bin_avg = rfc_hist(ampl, no_bins)
#
# m = np.atleast_1d(m)
#
# return np.array([np.power(np.sum(0.5 * hist_data * np.power(bin_avg, m[i])) / neq, 1. / m[i]) for i in range(len(m))])
def eq_load_and_cycles(signals, no_bins=46, m=[3, 4, 6, 8, 10, 12], neq=[10 ** 6, 10 ** 7, 10 ** 8], rainflow_func=rainflow_windap):
"""Calculate combined fatigue equivalent load
Parameters
----------
signals : list of tuples or array_like
- if list of tuples: list must have format [(sig1_weight, sig1),(sig2_weight, sig1),...] where\n
- sigx_weight is the weight of signal x\n
- sigx is signal x\n
- if array_like: The signal
no_bins : int, optional
Number of bins for rainflow counting
m : int, float or array-like, optional
Wohler exponent (default is [3, 4, 6, 8, 10, 12])
neq : int or array-like, optional
Equivalent number, default is [10^6, 10^7, 10^8]
rainflow_func : {rainflow_windap, rainflow_astm}, optional
The rainflow counting function to use (default is rainflow_windap)
Returns
-------
eq_loads : array-like
List of lists of equivalent loads for the corresponding equivalent number(s) and Wohler exponents
cycles : array_like
2d array with shape = (no_ampl_bins, no_mean_bins)
ampl_bin_mean : array_like
mean amplitude of the bins
ampl_bin_edges
Edges of the amplitude bins
"""
cycles, ampl_bin_mean, ampl_bin_edges, _, _ = cycle_matrix(signals, no_bins, 1, rainflow_func)
if 0: #to be similar to windap
ampl_bin_mean = (ampl_bin_edges[:-1] + ampl_bin_edges[1:]) / 2
cycles, ampl_bin_mean = cycles.flatten(), ampl_bin_mean.flatten()
eq_loads = [[((np.sum(cycles * ampl_bin_mean ** _m) / _neq) ** (1. / _m)) for _m in np.atleast_1d(m)] for _neq in np.atleast_1d(neq)]
return eq_loads, cycles, ampl_bin_mean, ampl_bin_edges
def cycle_matrix(signals, ampl_bins=10, mean_bins=10, rainflow_func=rainflow_windap):
"""Markow load cycle matrix
Calculate the Markow load cycle matrix
Parameters
----------
Signals : array-like or list of tuples
if array-like, the raw signal
if list of tuples, list of (weight, signal), e.g. [(0.1,sig1), (0.8,sig2), (.1,sig3)]
ampl_bins : int or array-like, optional
if int, Number of amplitude value bins (default is 10)
if array-like, the bin edges for amplitude
mean_bins : int or array-like, optional
if int, Number of mean value bins (default is 10)
if array-like, the bin edges for mea
rainflow_func : {rainflow_windap, rainflow_astm}, optional
The rainflow counting function to use (default is rainflow_windap)
Returns
-------
cycles : ndarray, shape(ampl_bins, mean_bins)
A bi-dimensional histogram of load cycles(full cycles). Amplitudes are histogrammed along the first dimension and mean values are histogrammed along the second dimension.
ampl_bin_mean : ndarray, shape(ampl_bins,)
The average cycle amplitude of the bins
ampl_edges : ndarray, shape(ampl_bins+1,)
The amplitude bin edges
mean_bin_mean : ndarray, shape(ampl_bins,)
The average cycle mean of the bins
mean_edges : ndarray, shape(mean_bins+1,)
The mean bin edges
Examples
--------
>>> signal = np.array([-2.0, 0.0, 1.0, 0.0, -3.0, 0.0, 5.0, 0.0, -1.0, 0.0, 3.0, 0.0, -4.0, 0.0, 4.0, 0.0, -2.0])
>>> cycles, ampl_bin_mean, ampl_edges, mean_bin_mean, mean_edges = cycle_matrix(signal)
>>> cycles, ampl_bin_mean, ampl_edges, mean_bin_mean, mean_edges = cycle_matrix([(.4, signal), (.6,signal)])
"""
if isinstance(signals[0], tuple):
ampls = np.empty((0,), dtype=np.float64)
means = np.empty((0,), dtype=np.float64)
weights = np.empty((0,), dtype=np.float64)
for w, signal in signals:
a, m = rainflow_func(signal[:])
ampls = np.r_[ampls, a]
means = np.r_[means, m]
weights = np.r_[weights, (np.zeros_like(a) + w)]
else:
ampls, means = rainflow_func(signals[:])
weights = np.ones_like(ampls)
if isinstance(ampl_bins, int):
ampl_bins = np.linspace(0, 1, num=ampl_bins + 1) * ampls.max()
cycles, ampl_edges, mean_edges = np.histogram2d(ampls, means, [ampl_bins, mean_bins], weights=weights)
ampl_bin_sum = np.histogram2d(ampls, means, [ampl_bins, mean_bins], weights=weights * ampls)[0]
ampl_bin_mean = np.zeros_like(cycles)
mask = (cycles > 0)
ampl_bin_mean[mask] = ampl_bin_sum[mask] / cycles[mask]
mean_bin_sum = np.histogram2d(ampls, means, [ampl_bins, mean_bins], weights=weights * means)[0]
mean_bin_mean = np.zeros_like(cycles)
mean_bin_mean[cycles > 0] = mean_bin_sum[cycles > 0] / cycles[cycles > 0]
cycles = cycles / 2 # to get full cycles
return cycles, ampl_bin_mean, ampl_edges, mean_bin_mean, mean_edges