Skip to content
Snippets Groups Projects
Commit 34ce42ad authored by mads's avatar mads
Browse files

add gtsdf

parent 0affcf69
No related branches found
No related tags found
No related merge requests found
File added
"""
The 'General Time Series Data Format', gtsdf, is a binary hdf5 data format for storing time series data,\n
specified by \n
Mads M. Pedersen (mmpe@dtu.dk), DTU-Wind Energy, Aeroelastic design (AED)
Features:
- Single file
- Optional data type, e.g. 16bit integer (compact) or 64 bit floating point (high precision)
- Precise time representation (including absolute times)
- Additional data blocks can be appended continuously
- Optional specification of name and description of dataset
- Optional specification of name, unit and description of attributes
- NaN support
This module contains three methods:
- load_
- save_
- append_block_
.. _load: gtsdf.html#gtsdf.load
.. _save: gtsdf.html#gtsdf.save
.. _append_block: gtsdf.html#gtsdf.append_block
"""
d = None
d = dir()
from .gtsdf import save
from .gtsdf import load
from .gtsdf import append_block
from .gtsdf import load_pandas
__all__ = sorted([m for m in set(dir()) - set(d)])
from __future__ import division, print_function, absolute_import, unicode_literals
import warnings
from wetb.gtsdf.unix_time import from_unix
try:
import h5py
except ImportError as e:
raise ImportError("HDF5 library cannot be loaded. Windows XP is a known cause of this problem\n%s" % e)
import os
import numpy as np
import numpy.ma as ma
block_name_fmt = "block%04d"
def load(filename, dtype=None):
"""Load a 'General Time Series Data Format'-hdf5 datafile
Parameters
----------
filename : str or h5py.File
filename or open file object
dtype: data type, optional
type of returned data array, e.g. float16, float32 or float64.
If None(default) the type of the returned data depends on the type of the file data
Returns
-------
time : ndarray(dtype=float64), shape (no_observations,)
time
data : ndarray(dtype=dtype), shape (no_observations, no_attributes)
data
info : dict
info containing:
- type: "General Time Series Data Format"
- name: name of dataset or filename if not present in file
- no_attributes: Number of attributes
- no_blocks: Number of datablocks
- [description]: description of dataset or "" if not present in file
- [attribute_names]: list of attribute names
- [attribute_units]: list of attribute units
- [attribute_descriptions]: list of attribute descriptions
See Also
--------
gtsdf, save
Examples
--------
>>> import gtsdf
>>> data = np.arange(6).reshape(3,2)
>>> gtsdf.save('test.hdf5', data)
>>> time, data, info = gtsdf.load('test.hdf5')
>>> print time
[ 0. 1. 2.]
>>> print data
[[ 0. 1.]
[ 2. 3.]
[ 4. 5.]]
>>> print info
{'no_blocks': 1, 'type': 'General time series data format', 'name': 'test', 'no_attributes': 2, 'description': ''}
>>> gtsdf.save('test.hdf5', data, name='MyDataset',
description='MyDatasetDescription',
attribute_names=['Att1', 'Att2'],
attribute_units=['m', "m/s"],
attribute_descriptions=['Att1Desc', 'Att2Desc'],
time = np.array([0,1,4]),
time_start = 10,
time_step=2,
dtype=np.float64)
>>> time, data, info = gtsdf.load('test.hdf5')
>>> print time
[ 10. 12. 18.]
>>> print data
[[ 0. 1.]
[ 2. 3.]
[ 4. 5.]]
>>> print info
{'attribute_names': array(['Att1', 'Att2'], dtype='|S4'),
'attribute_units': array(['m', 'm/s'], dtype='|S3'),
'attribute_descriptions': array(['Att1Desc', 'Att2Desc'], dtype='|S8'),
'name': 'MyDataset',
'no_attributes': 2,
'no_blocks': 1,
'type': 'General time series data format',
'description': 'MyDatasetDescription'}
"""
if isinstance(filename, h5py.File):
f = filename
filename = f.filename
else:
assert os.path.isfile(filename), "File, %s, does not exists" % filename
f = h5py.File(filename, 'r')
try:
def decode(v):
if isinstance(v, bytes):
return v.decode('latin1')
return v
info = {k: decode(v) for k, v in f.attrs.items()}
check_type(f)
if (block_name_fmt % 0) not in f:
raise ValueError("HDF5 file must contain a group named '%s'" % (block_name_fmt % 0))
block0 = f[block_name_fmt % 0]
if 'data' not in block0:
raise ValueError("group %s must contain a dataset called 'data'" % (block_name_fmt % 0))
_, no_attributes = block0['data'].shape
if 'name' not in info:
info['name'] = os.path.splitext(os.path.basename(filename))[0]
if 'attribute_names' in f:
info['attribute_names'] = [v.decode('latin1') for v in f['attribute_names']]
if 'attribute_units' in f:
info['attribute_units'] = [v.decode('latin1') for v in f['attribute_units']]
if 'attribute_descriptions' in f:
info['attribute_descriptions'] = [v.decode('latin1') for v in f['attribute_descriptions']]
no_blocks = f.attrs['no_blocks']
if dtype is None:
file_dtype = f[block_name_fmt % 0]['data'].dtype
if "float" in str(file_dtype):
dtype = file_dtype
elif file_dtype in [np.int8, np.uint8, np.int16, np.uint16]:
dtype = np.float32
else:
dtype = np.float64
data = np.empty((0, no_attributes), dtype=dtype)
time = np.empty((0), dtype=np.float64)
for i in range(no_blocks):
block = f[block_name_fmt % i]
no_observations, no_attributes = block['data'].shape
block_time = (block.get('time', np.arange(no_observations))[:]).astype(np.float64)
if 'time_step' in block.attrs:
block_time *= block.attrs['time_step']
if 'time_start' in block.attrs:
block_time += block.attrs['time_start']
time = np.append(time, block_time)
block_data = block['data'][:].astype(dtype)
if "int" in str(block['data'].dtype):
block_data[block_data == np.iinfo(block['data'].dtype).max] = np.nan
if 'gains' in block:
block_data *= block['gains'][:]
if 'offsets' in block:
block_data += block['offsets'][:]
data = np.append(data, block_data, 0)
f.close()
return time, data.astype(dtype), info
except (ValueError, AssertionError):
f.close()
raise
def save(filename, data, **kwargs):
"""Save a 'General Time Series Data Format'-hdf5 datafile
Additional datablocks can be appended later using gtsdf.append_block
Parameters
----------
filename : str
data : array_like, shape (no_observations, no_attributes)
name : str, optional
Name of dataset
description : str, optional
Description of dataset
attribute_names : array_like, shape (no_attributes,), optional
Names of attributes
attribute_units : array_like, shape (no_attributes,), optinoal
Units of attributes
attribute_descriptions : array_like, shape(no_attributes,), optional
Descriptions of attributes
time : array_like, shape (no_observations, ), optional
Time, default is [0..no_observations-1]
time_start : int or float, optional
Time offset (e.g. start time in seconds since 1/1/1970), default is 0, see notes
time_step : int or float, optional
Time scale factor (e.g. 1/sample frequency), default is 1, see notes
dtype : data-type, optional
Data type of saved data array, default uint16.\n
Recommended choices:
- uint16: Data is compressed into 2 byte integers using a gain and offset factor for each attribute
- float64: Data is stored with high precision using 8 byte floats
Notes
-----
Time can be specified by either
- time (one value for each observation). Required inhomogeneous time distributions
- time_start and/or time_step (one or two values), Recommended for homogeneous time distributions
- time and time_start and/or time_step (one value for each observation + one or two values)
When reading the file, the returned time-array is calculated as time * time_step + time_start
See Also
--------
gtsdf, append_block, load
Examples
--------
>>> import gtsdf
>>> data = np.arange(12).reshape(6,2)
>>> gtsdf.save('test.hdf5', data)
>>> gtsdf.save('test.hdf5', data, name='MyDataset',
description='MyDatasetDescription',
attribute_names=['Att1', 'Att2'],
attribute_units=['m', "m/s"],
attribute_descriptions=['Att1Desc', 'Att2Desc'],
time = np.array([0,1,2,6,7,8]),
time_start = 10,
time_step=2,
dtype=np.float64)
"""
if not filename.lower().endswith('.hdf5'):
filename += ".hdf5"
os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True)
f = h5py.File(filename, "w")
try:
f.attrs["type"] = "General time series data format"
no_observations, no_attributes = data.shape
if 'name' in kwargs:
f.attrs['name'] = kwargs['name']
if 'description' in kwargs:
f.attrs['description'] = kwargs['description']
f.attrs['no_attributes'] = no_attributes
if 'attribute_names' in kwargs:
assert len(kwargs['attribute_names']) == no_attributes, "len(attribute_names)=%d but data shape is %s" % (len(kwargs['attribute_names']), data.shape)
f.create_dataset("attribute_names", data=np.array([v.encode('utf-8') for v in kwargs['attribute_names']]))
if 'attribute_units' in kwargs:
assert(len(kwargs['attribute_units']) == no_attributes)
f.create_dataset("attribute_units", data=np.array([v.encode('utf-8') for v in kwargs['attribute_units']]))
if 'attribute_descriptions' in kwargs:
assert(len(kwargs['attribute_descriptions']) == no_attributes)
f.create_dataset("attribute_descriptions", data=np.array([v.encode('utf-8') for v in kwargs['attribute_descriptions']]))
f.attrs['no_blocks'] = 0
except Exception:
raise
finally:
f.close()
append_block(filename, data, **kwargs)
def append_block(filename, data, **kwargs):
"""Append a data block and corresponding time data to already existing file
Parameters
----------
filename : str
data : array_like, shape (no_observations, no_attributes)
time : array_like, shape (no_observations, ), optional
Time, default is [0..no_observations-1]
time_start : int or float, optional
Time offset (e.g. start time in seconds since 1/1/1970), default is 0, see notes
time_step : int or float, optional
Time scale factor (e.g. 1/sample frequency), default is 1, see notes
dtype : data-type, optional
Data type of saved data array, default uint16.\n
Recommended choices:
- uint16: Data is compressed into 2 byte integers using a gain and offset factor for each attribute
- float64: Data is stored with high precision using 8 byte floats
Notes
-----
Time can be specified by either
- time (one value for each observation). Required inhomogeneous time distributions
- time_start and/or time_step (one or two values), Recommended for homogeneous time distributions
- time and time_start and/or time_step (one value for each observation + one or two values)
When reading the file, the returned time-array is calculated as time * time_step + time_start
See Also
--------
gtsdf, save
Examples
--------
>>> import gtsdf
>>> data = np.arange(12).reshape(6,2)
>>> gtsdf.save('test.hdf5', data)
>>> gtsdf.append_block('test.hdf5', data+6)
>>> time, data, info = gtsdf.load('test.hdf5')
>>> print time
[ 0. 1. 2. 3. 4. 5.]
>>> print data
[[ 0. 1.]
[ 2. 3.]
[ 4. 5.]
[ 6. 7.]
[ 8. 9.]
[ 10. 11.]]
>>> print info
{'no_blocks': 2, 'type': 'General time series data format', 'name': 'test', 'no_attributes': 2}
"""
try:
f = h5py.File(filename, "a")
check_type(f)
no_observations, no_attributes = data.shape
assert(no_attributes == f.attrs['no_attributes'])
blocknr = f.attrs['no_blocks']
if blocknr == 0:
dtype = kwargs.get('dtype', np.uint16)
else:
dtype = f[block_name_fmt % 0]['data'].dtype
block = f.create_group(block_name_fmt % blocknr)
if 'time' in kwargs:
assert(len(kwargs['time']) == no_observations)
block.create_dataset('time', data=kwargs['time'])
if 'time_step' in kwargs:
time_step = kwargs['time_step']
block.attrs['time_step'] = time_step
if 'time_start' in kwargs:
block.attrs['time_start'] = kwargs['time_start']
pct_res = np.array([1])
if "int" in str(dtype):
if np.any(np.isinf(data)):
f.close()
raise ValueError ("Int compression does not support 'inf'\nConsider removing outliers or use float datatype")
nan = np.isnan(data)
non_nan_data = ma.masked_array(data, nan)
offsets = np.min(non_nan_data, 0)
try:
data = np.copy(data).astype(np.float64)
except MemoryError:
data = np.copy(data)
data -= offsets
with warnings.catch_warnings():
warnings.simplefilter("ignore") # ignore warning caused by abs(nan) and np.nanmax(nan)
pct_res = (np.percentile(data[~np.isnan(data)], 75, 0) - np.percentile(data[~np.isnan(data)], 25, 0)) / np.nanmax(np.abs(data), 0) # percent of resolution for middle half of data
gains = np.max(non_nan_data - offsets, 0).astype(np.float64) / (np.iinfo(dtype).max - 1) #-1 to save value for NaN
not0 = np.where(gains != 0)
data[:, not0] /= gains[not0]
data = data.astype(dtype)
data[nan] = np.iinfo(dtype).max
block.create_dataset('gains', data=gains)
block.create_dataset('offsets', data=offsets)
block.create_dataset("data", data=data.astype(dtype))
f.attrs['no_blocks'] = blocknr + 1
f.close()
if "int" in str(dtype):
int_res = (np.iinfo(dtype).max - np.iinfo(dtype).min)
with np.errstate(invalid='ignore'):
if min(pct_res[pct_res > 0]) * int_res < 256:
raise Warning("Less than 256 values are used to represent 50%% of the values in column(s): %s\nConsider removing outliers or use float datatype" % np.where(pct_res[pct_res > 0] * int_res < 256)[0])
except Exception:
try:
f.close()
except:
pass
raise
def load_pandas(filename, dtype=None):
import pandas as pd
time, data, info = load(filename, dtype)
df = pd.DataFrame()
df["Time"] = time
df["Date"] = [from_unix(t) for t in time]
for n, d in zip(info['attribute_names'], data.T):
df[n] = d
return df
def check_type(f):
if 'type' not in f.attrs or \
(f.attrs['type'].lower() != "general time series data format" and f.attrs['type'].lower() != b"general time series data format"):
raise ValueError("HDF5 file must contain a 'type'-attribute with the value 'General time series data format'")
if 'no_blocks' not in f.attrs:
raise ValueError("HDF5 file must contain an attribute named 'no_blocks'")
function [time, data, info] = gtsdf_load(filename)
if nargin==0
filename = 'examples/all.hdf5';
end
%h5disp('examples/minimum.hdf5');
%info = h5info(filename);
function value = att_value(name, addr, default)
try
value = h5readatt(filename, addr,name);
catch
if nargin==3
value = default;
else
value = '';
end
end
end
function r = read_dataset(name, addr, default)
try
r = h5read(filename, strcat(addr,name));
catch
r = default;
end
end
if not (strcmpi(att_value('type','/'), 'general time series data format'))
error('HDF5 file must contain a ''type''-attribute with the value ''General time series data format''')
end
if strcmp(att_value('no_blocks','/'),'')
error('HDF5 file must contain an attribute named ''no_blocks''')
end
hdf5info = h5info(filename);
if not (strcmp(hdf5info.Groups(1).Name,'/block0000'))
error('HDF5 file must contain a group named ''block0000''')
end
datainfo = h5info(filename,'/block0000/data');
no_attributes = datainfo.Dataspace.Size(1);
type = att_value('type','/');
name = att_value('name', '/','no_name');
description = att_value('description', '/');
attribute_names = read_dataset('attribute_names','/', {});
attribute_units = read_dataset('attribute_units','/', {});
attribute_descriptions = read_dataset('attribute_descriptions','/', {});
info = struct('type',type, 'name', name, 'description', description, 'attribute_names', {attribute_names}, 'attribute_units', {attribute_units}, 'attribute_descriptions',{attribute_descriptions});
no_blocks = att_value('no_blocks','/');
time = [];
data = [];
for i=0:no_blocks-1
blockname = num2str(i,'/block%04d/');
blokdatainfo = h5info(filename,strcat(blockname,'data'));
no_observations = datainfo.Dataspace.Size(2);
blocktime = double(read_dataset('time', blockname, [0:no_observations-1]'));
blocktime_start = double(att_value('time_start',blockname,0));
blocktime_step = double(att_value('time_step',blockname,1));
time = [time;(blocktime*blocktime_step) + blocktime_start];
block_data = read_dataset('data', blockname)';
if isinteger(block_data)
nan_pos = block_data==intmax(class(block_data));
block_data = double(block_data);
block_data(nan_pos) = nan;
gains = double(read_dataset('gains',blockname,1.));
offsets = double(read_dataset('offsets', blockname,0));
for c = 1:no_attributes
block_data(:,c) = block_data(:,c)*gains(c)+offsets(c);
end
end
data = [data;block_data];
end
end
'''
Created on 12/09/2013
@author: mmpe
'''
import h5py
import numpy as np
from wetb import gtsdf
import unittest
import os
import shutil
class Test(unittest.TestCase):
f = "tmp/"
def setUp(self):
unittest.TestCase.setUp(self)
if not os.path.isdir(self.f):
os.makedirs(self.f)
def test_minimum_requirements (self):
fn = self.f + "minimum.hdf5"
f = h5py.File(fn, "w")
#no type
self.assertRaises(ValueError, gtsdf.load, fn)
f.attrs["type"] = "General time series data format"
#no no_blocks
self.assertRaises(ValueError, gtsdf.load, fn)
f.attrs["no_blocks"] = 0
#no block0000
self.assertRaises(ValueError, gtsdf.load, fn)
b = f.create_group("block0000")
#no data
self.assertRaises(ValueError, gtsdf.load, fn)
b.create_dataset("data", data=np.empty((0, 0)))
gtsdf.load(fn)
def test_save_no_hdf5_ext(self):
fn = self.f + "no_hdf5_ext"
gtsdf.save(fn, np.arange(12).reshape(4, 3))
_, _, info = gtsdf.load(fn + ".hdf5")
self.assertEqual(info['name'], 'no_hdf5_ext')
def test_load_filename(self):
fn = self.f + "filename.hdf5"
gtsdf.save(fn, np.arange(12).reshape(4, 3))
_, _, info = gtsdf.load(fn)
self.assertEqual(info['name'], 'filename')
def test_load_fileobject(self):
fn = self.f + "fileobject.hdf5"
gtsdf.save(fn, np.arange(12).reshape(4, 3))
_, _, info = gtsdf.load(fn)
self.assertEqual(info['name'], 'fileobject')
def test_save_wrong_no_attr_info(self):
fn = self.f + "wrong_no_attr_info.hdf5"
self.assertRaises(AssertionError, gtsdf.save, fn, np.arange(12).reshape(4, 3), attribute_names=['Att1'])
self.assertRaises(AssertionError, gtsdf.save, fn, np.arange(12).reshape(4, 3), attribute_units=['s'])
self.assertRaises(AssertionError, gtsdf.save, fn, np.arange(12).reshape(4, 3), attribute_descriptions=['desc'])
def test_info(self):
fn = self.f + "info.hdf5"
gtsdf.save(fn, np.arange(12).reshape(6, 2),
name='datasetname',
description='datasetdescription',
attribute_names=['att1', 'att2'],
attribute_units=['s', 'm/s'],
attribute_descriptions=['d1', 'd2'])
_, _, info = gtsdf.load(fn)
self.assertEqual(info['name'], "datasetname")
self.assertEqual(info['type'], "General time series data format")
self.assertEqual(info['description'], "datasetdescription")
self.assertEqual(list(info['attribute_names']), ['att1', 'att2'])
self.assertEqual(list(info['attribute_units']), ['s', 'm/s'])
self.assertEqual(list(info['attribute_descriptions']), ['d1', 'd2'])
def test_no_time(self):
fn = self.f + 'time.hdf5'
gtsdf.save(fn, np.arange(12).reshape(6, 2))
time, _, _ = gtsdf.load(fn)
np.testing.assert_array_equal(time, np.arange(6))
def test_int_time(self):
fn = self.f + 'time.hdf5'
gtsdf.save(fn, np.arange(12).reshape(6, 2), time=range(4, 10))
time, _, _ = gtsdf.load(fn)
np.testing.assert_array_equal(time, range(4, 10))
def test_time_offset(self):
fn = self.f + 'time.hdf5'
gtsdf.save(fn, np.arange(12).reshape(6, 2), time=range(6), time_start=4)
time, _, _ = gtsdf.load(fn)
np.testing.assert_array_equal(time, range(4, 10))
def test_time_gain_offset(self):
fn = self.f + 'time.hdf5'
gtsdf.save(fn, np.arange(12).reshape(6, 2), time=range(6), time_step=1 / 4, time_start=4)
time, _, _ = gtsdf.load(fn)
np.testing.assert_array_equal(time, np.arange(4, 5.5, .25))
def test_float_time(self):
fn = self.f + 'time.hdf5'
gtsdf.save(fn, np.arange(12).reshape(6, 2), time=np.arange(4, 5.5, .25))
time, _, _ = gtsdf.load(fn)
np.testing.assert_array_equal(time, np.arange(4, 5.5, .25))
def test_data(self):
fn = self.f + 'data.hdf5'
d = np.arange(12).reshape(6, 2)
gtsdf.save(fn, d)
f = h5py.File(fn)
self.assertEqual(f['block0000']['data'].dtype, np.uint16)
f.close()
_, data, _ = gtsdf.load(fn)
np.testing.assert_array_almost_equal(data, np.arange(12).reshape(6, 2), 4)
def test_data_float(self):
fn = self.f + 'time.hdf5'
d = np.arange(12).reshape(6, 2)
gtsdf.save(fn, d, dtype=np.float32)
f = h5py.File(fn)
self.assertEqual(f['block0000']['data'].dtype, np.float32)
f.close()
_, data, _ = gtsdf.load(fn)
np.testing.assert_array_equal(data, np.arange(12).reshape(6, 2))
def test_all(self):
fn = self.f + "all.hdf5"
gtsdf.save(fn, np.arange(12).reshape(6, 2),
name='datasetname',
time=range(6), time_step=1 / 4, time_start=4,
description='datasetdescription',
attribute_names=['att1', 'att2'],
attribute_units=['s', 'm/s'],
attribute_descriptions=['d1', 'd2'])
time, data, info = gtsdf.load(fn)
self.assertEqual(info['name'], "datasetname")
self.assertEqual(info['type'], "General time series data format")
self.assertEqual(info['description'], "datasetdescription")
self.assertEqual(list(info['attribute_names']), ['att1', 'att2'])
self.assertEqual(list(info['attribute_units']), ['s', 'm/s'])
self.assertEqual(list(info['attribute_descriptions']), ['d1', 'd2'])
np.testing.assert_array_equal(time, np.arange(4, 5.5, .25))
np.testing.assert_array_almost_equal(data, np.arange(12).reshape(6, 2), 4)
def test_append(self):
fn = self.f + 'append.hdf5'
d = np.arange(12, dtype=np.float32).reshape(6, 2)
d[2, 0] = np.nan
gtsdf.save(fn, d)
_, data, _ = gtsdf.load(fn)
np.testing.assert_array_almost_equal(data, d, 4)
gtsdf.append_block(fn, d)
_, data, _ = gtsdf.load(fn)
self.assertEqual(data.shape, (12, 2))
np.testing.assert_array_almost_equal(data, np.append(d, d, 0), 4)
def test_nan_float(self):
fn = self.f + 'nan.hdf5'
d = np.arange(12, dtype=np.float32).reshape(6, 2)
d[2, 0] = np.nan
gtsdf.save(fn, d)
_, data, _ = gtsdf.load(fn)
np.testing.assert_array_almost_equal(data, d, 4)
def test_outlier(self):
fn = self.f + 'outlier.hdf5'
d = np.arange(12, dtype=np.float32).reshape(6, 2)
d[2, 0] = 10 ** 4
d[3, 1] = 10 ** 4
self.assertRaises(Warning, gtsdf.save, fn, d)
_, data, _ = gtsdf.load(fn)
def test_inf(self):
fn = self.f + 'outlier.hdf5'
d = np.arange(12, dtype=np.float32).reshape(6, 2)
d[2, 0] = np.inf
d[3, 1] = 10 ** 3
self.assertRaises(ValueError, gtsdf.save, fn, d)
def test_loadpandas(self):
fn = self.f + "all.hdf5"
gtsdf.save(fn, np.arange(12).reshape(6, 2),
name='datasetname',
time=range(6), time_step=1 / 4, time_start=4,
description='datasetdescription',
attribute_names=['att1', 'att2'],
attribute_units=['s', 'm/s'],
attribute_descriptions=['d1', 'd2'])
df = gtsdf.load_pandas(fn)
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()
from datetime import datetime, date
import numpy as np
def to_unix(dateTime):
return (dateTime - datetime.utcfromtimestamp(0)).total_seconds()
def from_unix_old(sec):
if np.isnan(sec):
return datetime.utcfromtimestamp(0)
return datetime.utcfromtimestamp(sec)
day_dict = {}
def from_unix(sec):
global day_dict
if isinstance(sec, (float, int)):
if np.isnan(sec):
return datetime.utcfromtimestamp(0)
return datetime.utcfromtimestamp(sec)
else:
sec = np.array(sec)
ms = np.atleast_1d((sec * 1000000 % 1000000).astype(np.int))
sec = sec.astype(np.int)
S = np.atleast_1d(sec % 60)
M = np.atleast_1d(sec % 3600 // 60)
H = np.atleast_1d(sec % 86400 // 3600)
d = np.atleast_1d(sec // 86400)
for du in np.unique(d):
if du not in day_dict:
day_dict[du] = date.fromordinal(719163 + du).timetuple()[:3]
y, m, d = zip(*[day_dict[d_] for d_ in d])
return ([datetime(*ymdhmsu) for ymdhmsu in zip(y, m, d, H.tolist(), M.tolist(), S.tolist(), ms.tolist())])
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment