diff --git a/wetb/prepost/tests/test_windIO.py b/wetb/prepost/tests/test_windIO.py index 3c7cd6da76b463fa66306acadfb6de38e5c1000e..67239c76e00e8dbf4684830bcc34f1e5ab8ff089 100644 --- a/wetb/prepost/tests/test_windIO.py +++ b/wetb/prepost/tests/test_windIO.py @@ -11,176 +11,53 @@ from future import standard_library standard_library.install_aliases() import unittest -import os -import struct import numpy as np -import scipy.io as sio -from wetb.prepost.windIO import Turbulence, LoadResults +from wetb.prepost.windIO import LoadResults -# path for test data files -fpath = os.path.join(os.path.dirname(__file__), 'data/') class TestsLoadResults(unittest.TestCase): def setUp(self): - pass - - def test_load(self): - - respath = '../../hawc2/tests/test_files/hawc2io/' - resfile = 'Hawc2ascii' - res = LoadResults(respath, resfile) - - - -class TestsTurbulence(unittest.TestCase): - - def setUp(self): - pass - - def print_test_info(self): - pass - - def test_reshaped(self): - """ - Make sure we correctly reshape the array instead of the manual - index reassignments - """ - fpath = 'data/turb_s100_3.00w.bin' - fid = open(fpath, 'rb') - turb = np.fromfile(fid, 'float32', 32*32*8192) - turb.shape - fid.close() - u = np.zeros((8192,32,32)) - - for i in range(8192): - for j in range(32): - for k in range(32): - u[i,j,k] = turb[ i*1024 + j*32 + k] - - u2 = np.reshape(turb, (8192, 32, 32)) - - self.assertTrue(np.alltrue(np.equal(u, u2))) - - def test_headers(self): - - fpath = 'data/' - - basename = 'turb_s100_3.00_refoctave_header' - fid = open(fpath + basename + '.wnd', 'rb') - R1 = struct.unpack("h",fid.read(2))[0] - R2 = struct.unpack("h",fid.read(2))[0] - turb = struct.unpack("i",fid.read(4))[0] - lat = struct.unpack("f",fid.read(4))[0] - # last line - fid.seek(100) - LongVertComp = struct.unpack("f",fid.read(4))[0] - fid.close() - - basename = 'turb_s100_3.00_python_header' - fid = open(fpath + basename + '.wnd', 'rb') - R1_p = struct.unpack("h",fid.read(2))[0] - R2_p = struct.unpack("h",fid.read(2))[0] - turb_p = struct.unpack("i",fid.read(4))[0] - lat_p = struct.unpack("f",fid.read(4))[0] - # last line - fid.seek(100) - LongVertComp_p = struct.unpack("f",fid.read(4))[0] - fid.close() - - self.assertEqual(R1, R1_p) - self.assertEqual(R2, R2_p) - self.assertEqual(turb, turb_p) - self.assertEqual(lat, lat_p) - self.assertEqual(LongVertComp, LongVertComp_p) - - def test_write_bladed(self): - - fpath = 'data/' - turb = Turbulence() - # write with Python - basename = 'turb_s100_3.00' - turb.write_bladed(fpath, basename, shape=(8192,32,32)) - python = turb.read_bladed(fpath, basename) - - # load octave - basename = 'turb_s100_3.00_refoctave' - octave = turb.read_bladed(fpath, basename) - - # float versions of octave - basename = 'turb_s100_3.00_refoctave_float' - fid = open(fpath + basename + '.wnd', 'rb') - octave32 = np.fromfile(fid, 'float32', 8192*32*32*3) - - # find the differences - nr_diff = (python-octave).__ne__(0).sum() - print(nr_diff) - print(nr_diff/len(python)) - - self.assertTrue(np.alltrue(python == octave)) - - def test_turbdata(self): - - shape = (8192,32,32) - - fpath = 'data/' - basename = 'turb_s100_3.00_refoctave' - fid = open(fpath + basename + '.wnd', 'rb') - - # check the last element of the header - fid.seek(100) - print(struct.unpack("f",fid.read(4))[0]) - # save in a list using struct - items = (os.path.getsize(fpath + basename + '.wnd')-104)/2 - data_list = [struct.unpack("h",fid.read(2))[0] for k in range(items)] - - - fid.seek(104) - data_16 = np.fromfile(fid, 'int16', shape[0]*shape[1]*shape[2]*3) - - fid.seek(104) - data_8 = np.fromfile(fid, 'int8', shape[0]*shape[1]*shape[2]*3) - - self.assertTrue(np.alltrue( data_16 == data_list )) - self.assertFalse(np.alltrue( data_8 == data_list )) - - def test_compare_octave(self): - """ - Compare the results from the original script run via octave - """ - - turb = Turbulence() - iu, iv, iw = turb.convert2bladed('data/', 'turb_s100_3.00', - shape=(8192,32,32)) - res = sio.loadmat('data/workspace.mat') - # increase tolerances, values have a range up to 5000-10000 - # and these values will be written to an int16 format for BLADED! - self.assertTrue(np.allclose(res['iu'], iu, rtol=1e-03, atol=1e-2)) - self.assertTrue(np.allclose(res['iv'], iv, rtol=1e-03, atol=1e-2)) - self.assertTrue(np.allclose(res['iw'], iw, rtol=1e-03, atol=1e-2)) - - def test_allindices(self): - """ - Verify that all indices are called - """ - fpath = 'data/turb_s100_3.00w.bin' - fid = open(fpath, 'rb') - turb = np.fromfile(fid, 'float32', 32*32*8192) - turb.shape - fid.close() - - check = [] - for i in range(8192): - for j in range(32): - for k in range(32): - check.append(i*1024 + j*32 + k) - - qq = np.array(check) - qdiff = np.diff(qq) - - self.assertTrue(np.alltrue(np.equal(qdiff, np.ones(qdiff.shape)))) + self.respath = '../../hawc2/tests/test_files/hawc2io/' + self.fascii = 'Hawc2ascii' + self.fbin = 'Hawc2bin' + + def loadresfile(self, resfile): + res = LoadResults(self.respath, resfile) + self.assertTrue(hasattr(res, 'sig')) + self.assertEqual(res.Freq, 40.0) + self.assertEqual(res.N, 800) + self.assertEqual(res.Nch, 28) + self.assertEqual(res.Time, 20.0) + self.assertEqual(res.sig.shape, (800, 28)) + return res + + def test_load_ascii(self): + res = self.loadresfile(self.fascii) + self.assertEqual(res.FileType, 'ASCII') + + def test_load_binary(self): + res = self.loadresfile(self.fbin) + self.assertEqual(res.FileType, 'BINARY') + + def test_compare_ascii_bin(self): + res_ascii = LoadResults(self.respath, self.fascii) + res_bin = LoadResults(self.respath, self.fbin) + + for k in range(res_ascii.sig.shape[1]): + np.testing.assert_allclose(res_ascii.sig[:,k], res_bin.sig[:,k], + rtol=1e-02, atol=0.001) + + def test_unified_chan_names(self): + res = LoadResults(self.respath, self.fascii, readdata=False) + self.assertFalse(hasattr(res, 'sig')) + + np.testing.assert_array_equal(res.ch_df.index.values, np.arange(0,28)) + self.assertEqual(res.ch_df.ch_name.values[0], 'Time') + self.assertEqual(res.ch_df.ch_name.values[27], + 'windspeed-global-Vy--2.50-1.00--52.50') if __name__ == "__main__":