From 8b8f21f5692bd79aca29f165eec47b7d0b8e064d Mon Sep 17 00:00:00 2001 From: dave <dave@dtu.dk> Date: Wed, 20 Jul 2016 17:04:09 +0200 Subject: [PATCH] refactor prepost.Simulations.PBS, add zip-chunk find+xargs method --- wetb/prepost/Simulations.py | 598 +++++++++++++++++++++++++++++------- 1 file changed, 481 insertions(+), 117 deletions(-) diff --git a/wetb/prepost/Simulations.py b/wetb/prepost/Simulations.py index 88b7b9a3..a68a4061 100755 --- a/wetb/prepost/Simulations.py +++ b/wetb/prepost/Simulations.py @@ -6,6 +6,7 @@ Created on Tue Nov 1 15:16:34 2011 __author__ = "David Verelst <dave@dtu.dk>" __license__ = "GPL-2+" """ + from __future__ import print_function from __future__ import division from __future__ import unicode_literals @@ -20,10 +21,6 @@ from future import standard_library standard_library.install_aliases() from builtins import object - - -#print(*objects, sep=' ', end='\n', file=sys.stdout) - # standard python library import os import subprocess as sproc @@ -42,6 +39,7 @@ from operator import itemgetter from time import time #import Queue #import threading +#from multiprocessing import Pool # numpy and scipy only used in HtcMaster._all_in_one_blade_tag import numpy as np @@ -993,6 +991,301 @@ def prepare_launch_cases(cases, runmethod='gorm', verbose=False,write_htc=True, return cases_new +def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20, + nr_procs_series=10, processes=1, queue='workq', + walltime='24:00:00', chunks_dir='zip-chunks', + pyenv='wetb_py3'): + """Group a large number of simulations htc and pbs launch scripts into + different zip files so we can run them with find+xargs on various nodes. + """ + + def chunker(seq, size): + # for DataFrames you can also use groupby, as taken from: + # http://stackoverflow.com/a/25703030/3156685 + # for k,g in df.groupby(np.arange(len(df))//10) + # but this approach is faster, see also: + # http://stackoverflow.com/a/25701576/3156685 + # http://stackoverflow.com/a/434328/3156685 + return (seq[pos:pos + size] for pos in range(0, len(seq), size)) + + def make_zip_chunks(df, ii, sim_id, run_dir, model_zip): + + # create a new zip file, give index of the first element. THis is + # quasi random due to the sorting we applied earlier +# ii = df.index[0] + rpl = (sim_id, ii) + fname = os.path.join(run_dir, chunks_dir, '%s_chunk_%05i' % rpl) + zf = zipfile.ZipFile(fname+'.zip', 'w', compression=zipfile.ZIP_STORED) + + # start with appending the base model zip file + fname_model = os.path.join(run_dir, model_zip) + with zipfile.ZipFile(fname_model, 'r') as zf_model: + for n in zf_model.namelist(): + zf.writestr(n, zf_model.open(n).read()) + + # create all necessary directories in the zip file + dirtags = ['[htc_dir]', '[res_dir]','[log_dir]','[animation_dir]', + '[pbs_in_dir]', '[eigenfreq_dir]','[turb_dir]','[wake_dir]', + '[meander_dir]','[hydro_dir]', '[mooring_dir]', + '[pbs_in_dir]', '[pbs_out_dir]'] + dirnames = [] + for tag in dirtags: + for dirname in set(df[tag].unique().tolist()): + if not dirname or dirname.lower() not in ['false', 'none', 0]: + dirnames.append(dirname) + for dirname in set(dirnames): + if dirname != 0: + zf.write('.', os.path.join(dirname, '.')) + + # HTC files + df_src = df['[run_dir]'] + df['[htc_dir]'] + df['[case_id]'] + df_dst = df['[htc_dir]'] + df['[case_id]'] + # Since df_src and df_dst are already Series, iterating is fast an it + # is slower to first convert to a list + for src, dst_rel in zip(df_src, df_dst): + zf.write(src+'.htc', dst_rel+'.htc') + + # PBS files + df_src = df['[run_dir]'] + df['[pbs_in_dir]'] + df['[case_id]'] + df_dst = df['[pbs_in_dir]'] + df['[case_id]'] + # Since df_src and df_dst are already Series, iterating is fast an it + # is slower to first convert to a list + for src, dst_rel in zip(df_src, df_dst): + zf.write(src+'.p', dst_rel+'.p') + + # copy and rename input files with given versioned name to the + # all files that will have to be renamed to their non-changeable + # default file name. + # this is a bit more tricky since unique() will not work on list items + copyto_files_tmp = df['[copyto_files]'].astype(str) + copyto_files = [] + # cycle through the unique elements + for k in set(copyto_files_tmp): + # k is of form: "['some/file.txt', 'another/file1.txt']" + if len(k) < 2: + continue + items = [kk[1:-1] for kk in k.split('[')[1].split(']')[0].split(', ')] + copyto_files.extend(items) + # we might still have non unique elements + copyto_files = set(copyto_files) + for copyto_file, dst_rel in zip(copyto_files, df_dst): + src = os.path.join(run_dir, copyto_file) + # make dir if it does not exist + zf.write('.', os.path.dirname(copyto_file), '.') + zf.write(src, copyto_file) + + zf.close() + + return fname + + pbs_tmplate =""" +### Standard Output +#PBS -N [job_name] +#PBS -o [std_out] +### Standard Error +#PBS -e [std_err] +#PBS -W umask=[umask] +### Maximum wallclock time format HOURS:MINUTES:SECONDS +#PBS -l walltime=[walltime] +#PBS -l nodes=[nodes]:ppn=[ppn] +### Queue name +#PBS -q [queue] + +""" + + def make_pbs_chunks(df, ii, sim_id, run_dir, model_zip): + """Create a PBS that: + * copies all required files (zip chunk) to scratch disk + * copies all required turbulence files to scratch disk + * runs everything with find+xargs + * copies back what's need to mimer + """ +# ii = df.index[0] + cmd_find = '/home/MET/sysalt/bin/find' + cmd_xargs = '/home/MET/sysalt/bin/xargs' + jobid = '%s_chunk_%05i' % (sim_id, ii) + + pbase = os.path.join('/scratch','$USER', '$PBS_JOBID', '') + + # ===================================================================== + # PBS HEADER + pbs = copy.copy(pbs_tmplate) + pbs = pbs.replace('[job_name]', jobid) + pbs = pbs.replace('[std_out]', './%s.out' % jobid) + pbs = pbs.replace('[std_err]', './%s.err' % jobid) + pbs = pbs.replace('[umask]', '0003') + pbs = pbs.replace('[walltime]', walltime) + pbs = pbs.replace('[nodes]', str(nodes)) + pbs = pbs.replace('[ppn]', str(ppn)) + pbs = pbs.replace('[queue]', queue) + pbs += '\necho "%s"\n' % ('-'*70) + + # ===================================================================== + # activate the python environment + pbs += 'echo "activate python environment %s"\n' % pyenv + pbs += 'source activate %s\n' % pyenv + + # ===================================================================== + # create all necessary directories at CPU_NR dirs, turb db dirs, sim_id + # browse to scratch directory + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n\n' + # initialize CPU directories on the scratch disk + pbs += 'mkdir -p %s\n' % os.path.join(pbase, sim_id, '') + for k in range(ppn): + pbs += 'mkdir -p %s\n' % os.path.join(pbase, '%i' % k, '') + # pretend to be on the scratch sim_id directory to maintain the same + # database turb structure + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % os.path.join(pbase, sim_id, '') + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + # initizialize turb_db directories + pbs += '\necho "%s"\n' % ('-'*70) + pbs += "echo 'create turb db directories:'\n" + db_dir_tags = ['[turb_db_dir]', '[meand_db_dir]', '[wake_db_dir]'] + dirnames = [] + for tag in db_dir_tags: + for dirname in set(df[tag].unique().tolist()): + if not dirname or dirname.lower() not in ['false', 'none']: + dirnames.append(dirname) + for dirname in set(dirnames): + pbs += 'mkdir -p %s\n' % os.path.join(dirname, '') + # all results will be collected in the sim_id directory, so create all + # necessary directories + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'echo "create all model dirs in the sim_id folder"\n' + dirtags = ['[htc_dir]', '[res_dir]','[log_dir]','[animation_dir]', + '[pbs_in_dir]', '[eigenfreq_dir]','[turb_dir]','[wake_dir]', + '[meander_dir]','[hydro_dir]', '[mooring_dir]', '[pbs_out_dir]'] + dirnames = [] + for tag in dirtags: + for dirname in set(df[tag].unique().tolist()): + if not dirname or dirname.lower() not in ['false', 'none', 0]: + dirnames.append(dirname) + dirname_unique = set(dirnames) + try: + dirname_unique.remove(0) + except KeyError: + pass + for dirname in sorted(dirname_unique): + pbs += 'mkdir -p %s\n' % dirname + + # ===================================================================== + # get the zip-chunk file from the PBS_O_WORKDIR + pbs += '\n' + pbs += 'echo "%s"\n' % ('-'*70) + pbs += '# get the zip-chunk file from the PBS_O_WORKDIR\n' + pbs += 'cd $PBS_O_WORKDIR\n' + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + # copy the relevant zip chunk file to the scratch main directory + rpl = (os.path.join('./', chunks_dir, jobid), os.path.join(pbase, '')) + pbs += 'cp %s.zip %s\n' % rpl + + # turb_db_dir might not be set, same for turb_base_name, for those + # cases we do not need to copy anything from the database to the node + base_name_tags = ['[turb_base_name]', '[meand_base_name]', + '[wake_base_name]'] + for db, base_name in zip(db_dir_tags, base_name_tags): + turb_db_dirs = df[db] + df[base_name] + # When set to None, the DataFrame will have text as None + turb_db_src = turb_db_dirs[turb_db_dirs.str.find('None')==-1] + pbs += '\n' + pbs += '# copy to scratch db directory for %s, %s\n' % (db, base_name) + for k in turb_db_src.unique(): + dst = os.path.dirname(os.path.join(pbase, sim_id, k)) + pbs += 'cp %s* %s\n' % (k, os.path.join(dst, '.')) + + # ===================================================================== + # browse back to the scratch directory + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + # unzip chunk, this contains all relevant folders already, and also + # contains files defined in [copyto_files] + for k in range(ppn): + dst = os.path.join('%i' % k, '.') + pbs += '/usr/bin/unzip %s -d %s >> /dev/null\n' % (jobid+'.zip', dst) + pbs += '# copy pbs_in from the first CPU to sim_id/pbs_in\n' + pbs += 'cp -r %s %s' % (os.path.join('0', pbs_in_dir, '*'), + os.path.join(sim_id, pbs_in_dir)) + + # ===================================================================== + # finally we can run find+xargs!!! + pbs += '\n' + pbs += 'echo "%s"\n' % ('-'*70) + pbs += 'echo "START RUNNING JOBS IN find+xargs MODE"\n' + pbs += 'WINEARCH=win32 WINEPREFIX=~/.wine32 winefix\n' + pbs += '# run all the PBS *.p files in find+xargs mode\n' + pbs += 'echo "following cases will be run:"\n' + pbs += 'export LAUNCH_PBS_MODE=false\n' + rpl = (cmd_find, os.path.join(sim_id, pbs_in_dir)) + pbs += "%s %s -type f -name '*.p' | sort -z\n" % rpl + pbs += '\n' + pbs += 'echo "number of files to be launched: "' + pbs += '`find %s -type f | wc -l`\n' % pbs_in_dir + rpl = (cmd_find, os.path.join(sim_id, pbs_in_dir), cmd_xargs, ppn) + cmd = ("%s %s -type f -name '*.p' -print0 | sort -z | %s -0 -I{} " + "--process-slot-var=CPU_NR -n 1 -P %i sh {}\n" % rpl) + pbs += cmd + pbs += 'wait\n' + pbs += 'echo "END OF JOBS IN find+xargs MODE"\n' + + # ===================================================================== + # move results back from the node sim_id dir to the origin + pbs += '\n' + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'echo "Results saved at sim_id directory:"\n' + # FIXME: DO NOT USE HARDCODED PATHS !!!! + rpl = (sim_id, 'pbs_in/*', 'htc/*') + pbs += 'find %s/. -not -iname "%s" -not -iname "%s"\n' % rpl + pbs += 'echo "move results back from node scratch/sim_id to origin, ' + pbs += 'but ignore htc, and pbs_in directories."\n' + + tmp = os.path.join(sim_id, '*') + pbs += 'echo "copy from %s to $PBS_O_WORKDIR/"\n' % tmp + pbs += 'time rsync -au --remove-source-files %s $PBS_O_WORKDIR/ \\ \n' % tmp + # FIXME: DO NOT USE HARDCODED PATHS !!!! + pbs += ' --exclude pbs_in/* \\ \n' + pbs += ' --exclude htc/* \n' + # when using -u, htc and pbs_in files should be ignored +# pbs += 'time cp -ru %s $PBS_O_WORKDIR/\n' % tmp + pbs += 'source deactivate\n' + pbs += 'echo "DONE !!"\n' + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'exit\n' + + rpl = (sim_id, ii) + fname = os.path.join(run_dir, chunks_dir, '%s_chunk_%05i' % rpl) + with open(fname+'.p', 'w') as f: + f.write(pbs) + + cc = Cases(cases) + df = cc.cases2df() + # sort on the specified values in the given columns + df.sort_values(by=sort_by_values, inplace=True) + + # create the directory to store all zipped chunks + try: + os.mkdir(os.path.join(df['[run_dir]'].iloc[0], chunks_dir)) + # FIXME: how do you make this work pythonically on both PY2 and PY3? + except (FileExistsError, OSError): + pass + + df_iter = chunker(df, nr_procs_series*ppn) + sim_id = df['[sim_id]'].iloc[0] + run_dir = df['[run_dir]'].iloc[0] + model_zip = df['[model_zip]'].iloc[0] + pbs_in_dir = df['[pbs_in_dir]'].iloc[0] + nodes = 1 + for ii, dfi in enumerate(df_iter): + fname = make_zip_chunks(dfi, ii, sim_id, run_dir, model_zip) + make_pbs_chunks(df, ii, sim_id, run_dir, model_zip) + print(fname) + def launch(cases, runmethod='local', verbose=False, copyback_turb=True, silent=False, check_log=True, windows_nr_cpus=2, qsub='time', @@ -1049,6 +1342,7 @@ def launch(cases, runmethod='local', verbose=False, copyback_turb=True, 'linux-script, windows-script, local-ram, none' raise ValueError(msg) + def post_launch(cases, save_iter=False, silent=False, suffix=None, path_errorlog=None): """ @@ -1932,8 +2226,8 @@ class PBS(object): elif server == 'jess': self.maxcpu = 1 self.secperiter = 0.012 - self.wine = 'WINEARCH=win32 WINEPREFIX=~/.wine32 winefix\n' - self.wine += 'time WINEARCH=win32 WINEPREFIX=~/.wine32 wine' + self.winefix = 'WINEARCH=win32 WINEPREFIX=~/.wine32 winefix\n' + self.wine = 'time WINEARCH=win32 WINEPREFIX=~/.wine32 wine' else: raise UserWarning('server support only for jess or gorm') @@ -2042,6 +2336,8 @@ class PBS(object): # load all relevant dir settings: the result/logfile/turbulence/zip # they are now also available for starting() and ending() parts hawc2_exe = tag_dict['[hawc2_exe]'] + self.case = case.replace('.htc', '') + self.sim_id = tag_dict['[sim_id]'] self.results_dir = tag_dict['[res_dir]'] self.eigenfreq_dir = tag_dict['[eigenfreq_dir]'] self.logs_dir = tag_dict['[log_dir]'] @@ -2063,6 +2359,7 @@ class PBS(object): self.pbs_queue_command = tag_dict['[pbs_queue_command]'] self.walltime = tag_dict['[walltime]'] self.dyn_walltime = tag_dict['[auto_walltime]'] + self.case_duration = tag_dict['[duration]'] # create the pbs_out_dir if necesary try: @@ -2139,12 +2436,13 @@ class PBS(object): self.pbs += "\n\n" self.pbs += '# ' + '-'*60 + '\n' # evaluates to true if LAUNCH_PBS_MODE is NOT set + self.pbs += "# evaluates to true if LAUNCH_PBS_MODE is NOT set\n" self.pbs += "if [ -z ${LAUNCH_PBS_MODE+x} ] ; then\n" self.pbs += " echo \n" self.pbs += " echo 'Execute commands on scratch nodes'\n" self.pbs += " cd %s/$USER/$PBS_JOBID\n" % self.node_run_root self.pbs += " # create unique dir for each CPU\n" - self.pbs += " mkdir %02i; cd %02i\n" % (count1, count1) + self.pbs += ' mkdir "%i"; cd "%i"\n' % (count1, count1) # output the current scratch directory self.pbs += " pwd\n" @@ -2216,7 +2514,17 @@ class PBS(object): for fname, fgen in zip(self.copyto_files, self.copyto_generic): self.pbs += " cp -R $PBS_O_WORKDIR/%s ./%s \n" % (fname, fgen) + # only apply the wine fix in PBS mode + self.pbs += ' ' + self.winefix + # TODO: activate python env, calculate post-processing +# self.pbs += 'echo `python -c "import wetb; print(wetb.__version__)"`\n' + # end of the file copying in PBS mode + self.pbs += '# ' + '-'*60 + '\n' + self.pbs += "else\n" + # when in find+xargs mode, browse to the relevant CPU + self.pbs += ' # with find+xargs we first browse to CPU folder\n' + self.pbs += ' cd "$CPU_NR"\n' self.pbs += "fi\n" self.pbs += '# ' + '-'*60 + '\n' @@ -2224,6 +2532,12 @@ class PBS(object): param = (self.wine, hawc2_exe, self.htc_dir+case, self.wine_appendix) self.pbs += "%s %s ./%s %s &\n" % param + self.pbs += '# POST-PROCESSING\n' + self.pbs += "if [ -z ${LAUNCH_PBS_MODE+x} ] ; then\n" + self.pbs += " " + self.postprocessing() + self.pbs += "fi\n" + #self.pbs += "wine get_mac_adresses" + '\n' # self.pbs += "cp -R ./*.mac $PBS_O_WORKDIR/." + '\n' # ----------------------------------------------------------------- @@ -2274,7 +2588,7 @@ class PBS(object): self.pbs += "### Standard Error" + ' \n' self.pbs += "#PBS -e ./" + self.pbs_out_dir + case_id + ".err" + '\n' # self.pbs += "#PBS -e ./pbs_out/" + jobid + ".err" + '\n' - self.pbs += '#PBS -W umask=003\n' + self.pbs += '#PBS -W umask=0003\n' self.pbs += "### Maximum wallclock time format HOURS:MINUTES:SECONDS\n" # self.pbs += "#PBS -l walltime=" + self.walltime + '\n' self.pbs += "#PBS -l walltime=[walltime]\n" @@ -2310,6 +2624,7 @@ class PBS(object): # short walltime queue (shorter than an hour): '#PBS -q xpresq' # or otherwise for longer jobs: '#PBS -q workq' self.pbs += self.pbs_queue_command + '\n' + self.pbs += '\n' + '# ' + '='*78 + '\n' # ignore all the file copying when running in xargs mode: # when varibale LAUNCH_PBS_MODE is set, file copying is ignored @@ -2336,7 +2651,7 @@ class PBS(object): self.pbs += " cp -R ./" + self.ModelZipFile + \ ' %s/$USER/$PBS_JOBID\n' % (self.node_run_root) self.pbs += "fi\n" - self.pbs += '# ' + '-'*60 + '\n' + self.pbs += '# ' + '-'*78 + '\n' def ending(self, pbs_path): """ @@ -2344,13 +2659,14 @@ class PBS(object): COPY BACK: from node to """ self.pbs += "\n\n" - self.pbs += '# ' + "="*79 + "\n" + self.pbs += '# ' + "="*78 + "\n" self.pbs += "### Epilogue\n" self.pbs += "### wait for jobs to finish \n" self.pbs += "wait\n" self.pbs += 'echo ""\n' - self.pbs += '# ' + '-'*60 + '\n' + self.pbs += '# ' + '-'*78 + '\n' # evaluates to true if LAUNCH_PBS_MODE is NOT set + self.pbs += '# evaluates to true if LAUNCH_PBS_MODE is NOT set\n' self.pbs += "if [ -z ${LAUNCH_PBS_MODE+x} ] ; then\n" self.pbs += ' echo "Copy back from scratch directory" \n' for i in range(1, self.maxcpu+1, 1): @@ -2359,110 +2675,12 @@ class PBS(object): # The batch system on Gorm allows more than one job per node. # Because of this the scratch directory name includes both the # user name and the job ID, that is /scratch/$USER/$PBS_JOBID - # NB! This is different from Thyra! - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - - # create the log, res etc dirs in case they do not exist - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.results_dir + "\n" - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.logs_dir + "\n" - if self.animation_dir: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.animation_dir + "\n" - if self.copyback_turb and self.TurbDb: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.TurbDb + "\n" - elif self.copyback_turb: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.TurbDirName + "\n" - if self.copyback_turb and self.wakeDb: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.wakeDb + "\n" - elif self.WakeDirName and self.WakeDirName != self.TurbDirName: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.WakeDirName + "\n" - if self.copyback_turb and self.meandDb: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.meandDb + "\n" - elif self.MeanderDirName and self.MeanderDirName != self.TurbDirName: - self.pbs += " mkdir -p $PBS_O_WORKDIR/" + self.MeanderDirName + "\n" - - # and copy the results and log files frome the node to the - # thyra home dir - self.pbs += " cp -R " + self.results_dir + \ - ". $PBS_O_WORKDIR/" + self.results_dir + ".\n" - self.pbs += " cp -R " + self.logs_dir + \ - ". $PBS_O_WORKDIR/" + self.logs_dir + ".\n" - if self.animation_dir: - self.pbs += " cp -R " + self.animation_dir + \ - ". $PBS_O_WORKDIR/" + self.animation_dir + ".\n" - - if self.eigenfreq_dir: - # just in case the eig dir has subdirs for the results, only - # select the base path and cp -r will take care of the rest - p1 = self.eigenfreq_dir.split('/')[0] - self.pbs += " cp -R %s/. $PBS_O_WORKDIR/%s/. \n" % (p1, p1) - # for eigen analysis with floater, modes are in root - eig_dir_sys = '%ssystem/' % self.eigenfreq_dir - self.pbs += ' mkdir -p $PBS_O_WORKDIR/%s \n' % eig_dir_sys - self.pbs += " cp -R mode* $PBS_O_WORKDIR/%s. \n" % eig_dir_sys - - # only copy the turbulence files back if they do not exist - # for all *.bin files on the node - cmd = ' for i in `ls *.bin`; do if [ -e $PBS_O_WORKDIR/%s$i ]; ' - cmd += 'then echo "$i exists no copyback"; else echo "$i copyback"; ' - cmd += 'cp $i $PBS_O_WORKDIR/%s; fi; done\n' - # copy back turbulence file? - # browse to the node turb dir - self.pbs += '\n echo ""\n' - self.pbs += ' echo "COPY BACK TURB IF APPLICABLE"\n' - if self.copyback_turb and self.TurbDb: - self.pbs += ' cd %s\n' % self.TurbDirName - tmp = (self.TurbDb, self.TurbDb) - self.pbs += cmd % tmp - # and back to normal model root - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - elif self.copyback_turb: - self.pbs += ' cd %s\n' % self.TurbDirName - tmp = (self.TurbDirName, self.TurbDirName) - self.pbs += cmd % tmp - # and back to normal model root - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - - if self.copyback_turb and self.wakeDb: - self.pbs += ' cd %s\n' % self.WakeDirName - tmp = (self.wakeDb, self.wakeDb) - self.pbs += cmd % tmp - # and back to normal model root - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - elif self.copyback_turb and self.WakeDirName: - self.pbs += ' cd %s\n' % self.WakeDirName - tmp = (self.WakeDirName, self.WakeDirName) - self.pbs += cmd % tmp - # and back to normal model root - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - - if self.copyback_turb and self.meandDb: - self.pbs += ' cd %s\n' % self.MeanderDirName - tmp = (self.meandDb, self.meandDb) - self.pbs += cmd % tmp - # and back to normal model root - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - elif self.copyback_turb and self.MeanderDirName: - self.pbs += ' cd %s\n' % self.MeanderDirName - tmp = (self.MeanderDirName, self.MeanderDirName) - self.pbs += cmd % tmp - # and back to normal model root - self.pbs += " cd %s/$USER/$PBS_JOBID/%02i\n" % (self.node_run_root, i) - - self.pbs += ' echo "END COPY BACK TURB"\n' - self.pbs += ' echo ""\n\n' - - # copy back any other kind of file specified - if len(self.copyback_frename) == 0: - self.copyback_frename = self.copyback_files - for fname, fnew in zip(self.copyback_files, self.copyback_frename): - self.pbs += " cp -R %s $PBS_O_WORKDIR/%s \n" % (fname, fnew) - - # check what is left - self.pbs += ' echo ""\n' - self.pbs += ' echo "following files are on the node/cpu%02i ' % i - self.pbs += '(find .):"\n' - self.pbs += ' find .\n' - self.pbs += '# ' + '-'*60 + '\n' + self.copyback_all_files("pbs_mode", i) + # find+xargs mode only makes sense when maxcpu==1, cpu handling + # for this mode is handled elsewhere + if self.maxcpu == 1: + self.pbs += 'else\n' + self.copyback_all_files("find+xargs", None) # # and delete it all (but that is not allowed) # self.pbs += 'cd ..\n' @@ -2474,7 +2692,7 @@ class PBS(object): # the batch file is still open at this point???? # self.pbs += "rm " - # end of PBS mode + # end of PBS/find+xargs mode switching if/else self.pbs += 'fi\n' # base walltime on the longest simulation in the batch @@ -2512,6 +2730,151 @@ class PBS(object): # make the string empty again, for memory self.pbs = '' + def copyback_all_files(self, mode, cpu_nr): + """Copy back all the files from either scratch to run_dir (PBS mode), + or from CPU sub-directory back to main directory in find+xargs mode. + """ + if mode=="find+xargs": + foper = "rsync -a --remove-source-files" # move files instead of copy + dst = os.path.join('..', self.sim_id, '') + dst_db = '../' + cd2model = " cd %s\n" % os.path.join(self.node_run_root, '$USER', + '$PBS_JOBID', '$CPU_NR', '') + pbs_mode = False + else: + foper = "cp -R" + dst = "$PBS_O_WORKDIR/" + dst_db = dst + pbs_mode = True + cd2model = " cd %s\n" % os.path.join(self.node_run_root, '$USER', + '$PBS_JOBID', '%i' % cpu_nr, '') + + # navigate to the cpu dir on the node + # The batch system on Gorm/Jess allows more than one job per node. + # Because of this the scratch directory name includes both the + # user name and the job ID, that is /scratch/$USER/$PBS_JOBID/CPU_NR + self.pbs += cd2model + + # create the log, res etc dirs in case they do not exist. Only relevant + # for pbs_mode, they are created in advance in find+xargs + if pbs_mode: + mk = ' mkdir -p' + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.results_dir)) + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.logs_dir)) + if self.animation_dir: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.animation_dir)) + if self.copyback_turb and self.TurbDb: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.TurbDb)) + elif self.copyback_turb: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.TurbDirName)) + if self.copyback_turb and self.wakeDb: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.wakeDb)) + elif self.WakeDirName and self.WakeDirName != self.TurbDirName: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.WakeDirName)) + if self.copyback_turb and self.meandDb: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.meandDb)) + elif self.MeanderDirName and self.MeanderDirName != self.TurbDirName: + self.pbs += "%s %s\n" % (mk, os.path.join(dst, self.MeanderDirName)) + + # and copy the results and log files frome the scratch to dst + res_dst = os.path.join(dst, self.results_dir, ".") + self.pbs += " %s %s. %s\n" % (foper, self.results_dir, res_dst) + log_dst = os.path.join(dst, self.logs_dir, ".") + self.pbs += " %s %s. %s\n" % (foper, self.logs_dir, log_dst) + if self.animation_dir: + ani_dst = os.path.join(dst, self.animation_dir, ".") + self.pbs += " %s %s. %s\n" % (foper, self.animation_dir, ani_dst) + + if self.eigenfreq_dir: + # just in case the eig dir has subdirs for the results, only + # select the base path and cp -r will take care of the rest + p1 = self.eigenfreq_dir.split('/')[0] + p2 = os.path.join(dst, p1, ".") + self.pbs += " cp -R %s/. %s \n" % (p1, p2) + # for eigen analysis with floater, modes are in root + eig_dir_sys = os.path.join(dst, self.eigenfreq_dir, 'system/', '.') + self.pbs += ' mkdir -p %s \n' % eig_dir_sys + self.pbs += " cp -R mode* %s \n" % eig_dir_sys + self.pbs += " %s mode* %s \n" % (foper, eig_dir_sys) + + # only copy the turbulence files back if they do not exist + # for all *.bin files on the node + cmd = ' for i in `ls *.bin`; do if [ -e %s$i ]; ' + cmd += 'then echo "$i exists no copyback"; else echo "$i copyback"; ' + cmd += 'cp $i %s; fi; done\n' + # copy back turbulence file? + # browse to the node turb dir + self.pbs += '\n echo ""\n' + self.pbs += ' echo "COPY BACK TURB IF APPLICABLE"\n' + if self.copyback_turb and self.TurbDb: + self.pbs += ' cd %s\n' % self.TurbDirName + tmp = (os.path.join(dst_db, self.TurbDb, ''),)*2 + self.pbs += cmd % tmp + # and back to normal model root + self.pbs += cd2model + elif self.copyback_turb: + self.pbs += ' cd %s\n' % self.TurbDirName + tmp = (os.path.join(dst, self.TurbDirName, ''),)*2 + self.pbs += cmd % tmp + # and back to normal model root + self.pbs += cd2model + + if self.copyback_turb and self.wakeDb: + self.pbs += ' cd %s\n' % self.WakeDirName + tmp = (os.path.join(dst_db, self.wakeDb, ''),)*2 + self.pbs += cmd % tmp + # and back to normal model root + self.pbs += cd2model + elif self.copyback_turb and self.WakeDirName: + self.pbs += ' cd %s\n' % self.WakeDirName + tmp = (os.path.join(dst, self.WakeDirName, ''),)*2 + self.pbs += cmd % tmp + # and back to normal model root + self.pbs += cd2model + + if self.copyback_turb and self.meandDb: + self.pbs += ' cd %s\n' % self.MeanderDirName + tmp = (os.path.join(dst_db, self.meandDb, ''),)*2 + self.pbs += cmd % tmp + # and back to normal model root + self.pbs += cd2model + elif self.copyback_turb and self.MeanderDirName: + self.pbs += ' cd %s\n' % self.MeanderDirName + tmp = (os.path.join(dst, self.MeanderDirName, ''),)*2 + self.pbs += cmd % tmp + # and back to normal model root + self.pbs += cd2model + + self.pbs += ' echo "END COPY BACK TURB"\n' + self.pbs += ' echo ""\n\n' + + # copy back any other kind of files, as specified in copyback_files + self.pbs += ' echo "COPYBACK [copyback_files]/[copyback_frename]"\n' + if len(self.copyback_frename) == 0: + self.copyback_frename = self.copyback_files + for fname, fnew in zip(self.copyback_files, self.copyback_frename): + dst_fnew = os.path.join(dst, fnew) + self.pbs += " %s %s %s \n" % (foper, fname, dst_fnew) + self.pbs += ' echo "END COPYBACK"\n' + self.pbs += ' echo ""\n\n' + + if pbs_mode: + # check what is left + self.pbs += ' echo ""\n' + self.pbs += ' echo "following files are on ' + self.pbs += 'node/cpu %i (find .):"\n' % cpu_nr + self.pbs += ' find .\n' + self.pbs += '# ' + '-'*78 + '\n' + + def postprocessing(self): + """Run post-processing just after HAWC2 has ran + """ + self.pbs += 'wait; python -c "from wetb.prepost import statsdel; ' + fsrc = os.path.join(self.results_dir, self.case) + rpl = (fsrc, str(self.case_duration), '.csv') + self.pbs += ('statsdel.calc(\'%s\', no_bins=46, m=[3, 4, 6, 8, 10, 12], ' + 'neq=%s, i0=0, i1=None, ftype=\'%s\')"\n' % rpl) + def check_results(self, cases): """ Cross-check if all simulations on the list have returned a simulation. @@ -2549,6 +2912,7 @@ class PBS(object): # length will be zero if there are no failures return cases_fail + # TODO: rewrite the error log analysis to something better. Take different # approach: start from the case and see if the results are present. Than we # also have the tags_dict available when log-checking a certain case @@ -5472,7 +5836,7 @@ class MannTurb64(prepost.PBSScript): super(MannTurb64, self).__init__() self.exe = 'time wine mann_turb_x64.exe' # PBS configuration - self.umask = '003' + self.umask = '0003' self.walltime = '00:59:59' self.queue = 'workq' self.lnodes = '1' -- GitLab