Skip to content
Snippets Groups Projects
test_Simulations.py 8.64 KiB
Newer Older
'''
Created on 05/11/2015

@author: MMPE
'''
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()

import unittest
import os
import filecmp
import shutil
from zipfile import ZipFile

import numpy as np
import pandas as pd

from wetb.prepost import dlctemplate as tmpl
from wetb.prepost import Simulations as sim
from wetb.prepost.misc import DictDiff


class Template(unittest.TestCase):
    def setUp(self):
        self.basepath = os.path.dirname(__file__)


class TestErrorLogs(Template):

    def test_loganalysis(self):
        # select a few log cases and do the analysis, save to csv and convert
        # saved result to DataFrame
        pass


class TestGenerateInputs(Template):

    def test_launch_dlcs_excel(self):
        # manually configure paths, HAWC2 model root path is then constructed as
        # p_root_remote/PROJECT/sim_id, and p_root_local/PROJECT/sim_id
        # adopt accordingly when you have configured your directories differently
        p_root = os.path.join(self.basepath, 'data/')
        # project name, sim_id, master file name
        tmpl.PROJECT = 'demo_dlc'
        tmpl.MASTERFILE = 'demo_dlc_master_A0001.htc'
        # MODEL SOURCES, exchanche file sources
        tmpl.P_RUN = os.path.join(p_root, tmpl.PROJECT, 'remote/')
        tmpl.P_SOURCE = os.path.join(p_root, tmpl.PROJECT, 'source/')
        # location of the master file
        tmpl.P_MASTERFILE = os.path.join(p_root, tmpl.PROJECT,
                                         'source', 'htc', '_master/')
        # location of the pre and post processing data
        tmpl.POST_DIR = os.path.join(p_root, tmpl.PROJECT, 'remote',
                                     'prepost/')

        # make sure the remote dir is empty so a test does not pass on data
        # generated during a previous cycle
        if os.path.exists(os.path.join(p_root, tmpl.PROJECT, 'remote')):
            shutil.rmtree(os.path.join(p_root, tmpl.PROJECT, 'remote'))

        tmpl.force_dir = tmpl.P_RUN
        tmpl.launch_dlcs_excel('remote', silent=True, runmethod='gorm',
                               pbs_turb=True, zipchunks=True,
                               postpro_node_zipchunks=False,
                               postpro_node=False)

        # we can not check-in empty dirs so we can not compare the complete
        # directory structure withouth manually creating the empty dirs here
        for subdir in ['control', 'data', 'htc', 'pbs_in', 'pbs_in_turb',
                       'htc/_master', 'htc/dlc01_demos', 'pbs_in/dlc01_demos',
                       'zip-chunks-gorm', 'zip-chunks-jess']:
            remote = os.path.join(p_root, tmpl.PROJECT, 'remote', subdir)
            ref = os.path.join(p_root, tmpl.PROJECT, 'ref', subdir)
            # the zipfiles are taken care of separately
            ignore = ['remote_chnk_00000.zip']
            cmp = filecmp.dircmp(remote, ref, ignore=ignore)
            self.assertEqual(len(cmp.diff_files), 0,
                             "{} {}".format(subdir, cmp.diff_files))
            self.assertEqual(len(cmp.right_only), 0,
                             "{} {}".format(subdir, cmp.right_only))
            self.assertEqual(len(cmp.left_only), 0,
                             "{} {}".format(subdir, cmp.left_only))

        # compare the zip files
        for fname in ['demo_dlc_remote.zip',
                      'zip-chunks-gorm/remote_chnk_00000.zip',
                      'zip-chunks-jess/remote_chnk_00000.zip']:
            remote = os.path.join(p_root, tmpl.PROJECT, 'remote', fname)
            ref = os.path.join(p_root, tmpl.PROJECT, 'ref', fname)

            with ZipFile(remote) as zrem, ZipFile(ref) as zref:
                self.assertEqual(len(zrem.infolist()), len(zref.infolist()))
                frem = {f.filename:f.file_size for f in zrem.infolist()}
                fref = {f.filename:f.file_size for f in zref.infolist()}
                dd = DictDiff(frem, fref)
                self.assertEqual(len(dd.added()), 0,
                                 "{} {}".format(fname, dd.added()))
                self.assertEqual(len(dd.removed()), 0,
                                 "{} {}".format(fname, dd.removed()))
                self.assertEqual(len(dd.changed()), 0,
                                 "{} {}".format(fname, dd.changed()))

        # for the pickled file we can just read it
        remote = os.path.join(p_root, tmpl.PROJECT, 'remote', 'prepost')
        ref = os.path.join(p_root, tmpl.PROJECT, 'ref', 'prepost')
        cmp = filecmp.cmp(os.path.join(remote, 'remote_tags.txt'),
                          os.path.join(ref, 'remote_tags.txt'), shallow=False)
        self.assertTrue(cmp)
#        with open(os.path.join(remote, 'remote.pkl'), 'rb') as FILE:
#            pkl_remote = pickle.load(FILE)
#        with open(os.path.join(ref, 'remote.pkl'), 'rb') as FILE:
#            pkl_ref = pickle.load(FILE)
#        self.assertTrue(pkl_remote == pkl_ref)


class TestFatigueLifetime(Template):

    def test_leq_life(self):
        """Verify if prepost.Simulation.Cases.fatigue_lifetime() returns
        the expected life time equivalent load.
        """
        # ---------------------------------------------------------------------
        # very simple case
        cases = {'case1':{'[post_dir]':'no-path', '[sim_id]':'A0'},
                 'case2':{'[post_dir]':'no-path', '[sim_id]':'A0'}}
        cc = sim.Cases(cases)

        fh_list = [('case1', 10/3600), ('case2', 20/3600)]
        dfs = pd.DataFrame({'m=1.0' : [2, 3],
                            'channel' : ['channel1', 'channel1'],
                            '[case_id]' : ['case1', 'case2']})
        neq_life = 1.0
        df_Leq = cc.fatigue_lifetime(dfs, neq_life, fh_lst=fh_list,
                                     save=False, update=False, csv=False,
                                     xlsx=False, silent=False)
        np.testing.assert_allclose(df_Leq['m=1.0'].values, 2*10 + 3*20)
        self.assertTrue(df_Leq['channel'].values[0]=='channel1')

        # ---------------------------------------------------------------------
        # slightly more complicated
        neq_life = 3.0
        df_Leq = cc.fatigue_lifetime(dfs, neq_life, fh_lst=fh_list,
                                     save=False, update=False, csv=False,
                                     xlsx=False, silent=False)
        np.testing.assert_allclose(df_Leq['m=1.0'].values,
                                   (2*10 + 3*20)/neq_life)

        # ---------------------------------------------------------------------
        # a bit more complex and also test the sorting of fh_lst and dfs
        cases = {'case1':{'[post_dir]':'no-path', '[sim_id]':'A0'},
                 'case2':{'[post_dir]':'no-path', '[sim_id]':'A0'},
                 'case3':{'[post_dir]':'no-path', '[sim_id]':'A0'},
                 'case4':{'[post_dir]':'no-path', '[sim_id]':'A0'}}
        cc = sim.Cases(cases)

        fh_list = [('case3', 10/3600), ('case2', 20/3600),
                   ('case1', 50/3600), ('case4', 40/3600)]
        dfs = pd.DataFrame({'m=3.0' : [2, 3, 4, 5],
                            'channel' : ['channel1']*4,
                            '[case_id]' : ['case4', 'case2', 'case3', 'case1']})
        neq_life = 5.0
        df_Leq = cc.fatigue_lifetime(dfs, neq_life, fh_lst=fh_list,
                                     save=False, update=False, csv=False,
                                     xlsx=False, silent=False)
        expected = ((2*2*2*40 + 3*3*3*20 + 4*4*4*10 + 5*5*5*50)/5)**(1/3)
        np.testing.assert_allclose(df_Leq['m=3.0'].values, expected)

        # ---------------------------------------------------------------------
        # more cases and with sorting
        base = {'[post_dir]':'no-path', '[sim_id]':'A0'}
        cases = {'case%i' % k : base for k in range(50)}
        cc = sim.Cases(cases)
        # reverse the order of how they appear in dfs and fh_lst
        fh_list = [('case%i' % k, k*10/3600) for k in range(49,-1,-1)]
        dfs = pd.DataFrame({'m=5.2' : np.arange(1,51,1),
                            'channel' : ['channel1']*50,
                            '[case_id]' : ['case%i' % k for k in range(50)]})
        df_Leq = cc.fatigue_lifetime(dfs, neq_life, fh_lst=fh_list,
                                     save=False, update=False, csv=False,
                                     xlsx=False, silent=False)
        expected = np.sum(np.power(np.arange(1,51,1), 5.2)*np.arange(0,50,1)*10)
        expected = np.power(expected/neq_life, 1/5.2)
        np.testing.assert_allclose(df_Leq['m=5.2'].values, expected)


if __name__ == "__main__":
    unittest.main()