From 974761495475786fbc882843e9f60cf7a76af30c Mon Sep 17 00:00:00 2001 From: dave <dave@dtu.dk> Date: Mon, 8 Aug 2016 10:28:54 +0200 Subject: [PATCH] prepost: move zip-chunk related code into simchunks module --- wetb/prepost/Simulations.py | 362 --------------------------------- wetb/prepost/simchunks.py | 394 ++++++++++++++++++++++++++++++++++++ 2 files changed, 394 insertions(+), 362 deletions(-) create mode 100644 wetb/prepost/simchunks.py diff --git a/wetb/prepost/Simulations.py b/wetb/prepost/Simulations.py index 76c9934..42d66a9 100755 --- a/wetb/prepost/Simulations.py +++ b/wetb/prepost/Simulations.py @@ -991,368 +991,6 @@ def prepare_launch_cases(cases, runmethod='gorm', verbose=False,write_htc=True, return cases_new -def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20, - nr_procs_series=9, processes=1, queue='workq', - walltime='24:00:00', chunks_dir='zip-chunks-jess', - pyenv='wetb_py3'): - """Group a large number of simulations htc and pbs launch scripts into - different zip files so we can run them with find+xargs on various nodes. - """ - - def chunker(seq, size): - # for DataFrames you can also use groupby, as taken from: - # http://stackoverflow.com/a/25703030/3156685 - # for k,g in df.groupby(np.arange(len(df))//10) - # but this approach is faster, see also: - # http://stackoverflow.com/a/25701576/3156685 - # http://stackoverflow.com/a/434328/3156685 - return (seq[pos:pos + size] for pos in range(0, len(seq), size)) - - def make_zip_chunks(df, ii, sim_id, run_dir, model_zip): - """Create zip cunks and also create an index - """ - - # create a new zip file, give index of the first element. THis is - # quasi random due to the sorting we applied earlier -# ii = df.index[0] - rpl = (sim_id, ii) - fname = os.path.join(run_dir, chunks_dir, '%s_chunk_%05i' % rpl) - zf = zipfile.ZipFile(fname+'.zip', 'w', compression=zipfile.ZIP_STORED) - - # start with appending the base model zip file - fname_model = os.path.join(run_dir, model_zip) - with zipfile.ZipFile(fname_model, 'r') as zf_model: - for n in zf_model.namelist(): - zf.writestr(n, zf_model.open(n).read()) - - # create all necessary directories in the zip file - dirtags = ['[htc_dir]', '[res_dir]','[log_dir]','[animation_dir]', - '[pbs_in_dir]', '[eigenfreq_dir]','[turb_dir]','[wake_dir]', - '[meander_dir]','[hydro_dir]', '[mooring_dir]', - '[pbs_in_dir]', '[pbs_out_dir]'] - dirnames = [] - for tag in dirtags: - for dirname in set(df[tag].unique().tolist()): - if not dirname or dirname.lower() not in ['false', 'none', 0]: - dirnames.append(dirname) - for dirname in set(dirnames): - if dirname != 0: - zf.write('.', os.path.join(dirname, '.')) - - # and the post-processing data - # FIXME: do not use hard coded paths! - zf.write('.', 'prepost-data/') - - # HTC files - df_src = df['[run_dir]'] + df['[htc_dir]'] + df['[case_id]'] - df_dst = df['[htc_dir]'] + df['[case_id]'] - # create an index so given the htc file, we can find the chunk nr - df_index = pd.DataFrame(index=df['[case_id]'].copy(), - columns=['chunk_nr'], dtype=np.int32) - df_index['chunk_nr'] = ii - # Since df_src and df_dst are already Series, iterating is fast an it - # is slower to first convert to a list - for src, dst_rel in zip(df_src, df_dst): - zf.write(src+'.htc', dst_rel+'.htc') - - # PBS files - df_src = df['[run_dir]'] + df['[pbs_in_dir]'] + df['[case_id]'] - df_dst = df['[pbs_in_dir]'] + df['[case_id]'] - # Since df_src and df_dst are already Series, iterating is fast an it - # is slower to first convert to a list - for src, dst_rel in zip(df_src, df_dst): - zf.write(src+'.p', dst_rel+'.p') - - # copy and rename input files with given versioned name to the - # all files that will have to be renamed to their non-changeable - # default file name. - # this is a bit more tricky since unique() will not work on list items - copyto_files_tmp = df['[copyto_files]'].astype(str) - copyto_files = [] - # cycle through the unique elements - for k in set(copyto_files_tmp): - # k is of form: "['some/file.txt', 'another/file1.txt']" - if len(k) < 2: - continue - items = [kk[1:-1] for kk in k.split('[')[1].split(']')[0].split(', ')] - copyto_files.extend(items) - # we might still have non unique elements - copyto_files = set(copyto_files) - for copyto_file, dst_rel in zip(copyto_files, df_dst): - src = os.path.join(run_dir, copyto_file) - # make dir if it does not exist - zf.write('.', os.path.dirname(copyto_file), '.') - zf.write(src, copyto_file) - - zf.close() - - return fname, df_index - - pbs_tmplate =""" -### Standard Output -#PBS -N [job_name] -#PBS -o [std_out] -### Standard Error -#PBS -e [std_err] -#PBS -W umask=[umask] -### Maximum wallclock time format HOURS:MINUTES:SECONDS -#PBS -l walltime=[walltime] -#PBS -l nodes=[nodes]:ppn=[ppn] -### Queue name -#PBS -q [queue] - -""" - - def make_pbs_chunks(df, ii, sim_id, run_dir, model_zip): - """Create a PBS that: - * copies all required files (zip chunk) to scratch disk - * copies all required turbulence files to scratch disk - * runs everything with find+xargs - * copies back what's need to mimer - """ -# ii = df.index[0] - cmd_find = '/home/MET/sysalt/bin/find' - cmd_xargs = '/home/MET/sysalt/bin/xargs' - jobid = '%s_chunk_%05i' % (sim_id, ii) - - pbase = os.path.join('/scratch','$USER', '$PBS_JOBID', '') - post_dir_base = post_dir.split(sim_id)[1] - if post_dir_base[0] == os.path.sep: - post_dir_base = post_dir_base[1:] - - pbs_in_base = os.path.commonpath(df['[pbs_in_dir]'].unique().tolist()) - pbs_in_base = os.path.join(pbs_in_base, '') - htc_base = os.path.commonpath(df['[htc_dir]'].unique().tolist()) - htc_base = os.path.join(htc_base, '') - res_base = os.path.commonpath(df['[res_dir]'].unique().tolist()) - res_base = os.path.join(res_base, '') - log_base = os.path.commonpath(df['[log_dir]'].unique().tolist()) - log_base = os.path.join(log_base, '') - - # ===================================================================== - # PBS HEADER - pbs = copy.copy(pbs_tmplate) - pbs = pbs.replace('[job_name]', jobid) - pbs = pbs.replace('[std_out]', './%s.out' % jobid) - pbs = pbs.replace('[std_err]', './%s.err' % jobid) - pbs = pbs.replace('[umask]', '0003') - pbs = pbs.replace('[walltime]', walltime) - pbs = pbs.replace('[nodes]', str(nodes)) - pbs = pbs.replace('[ppn]', str(ppn)) - pbs = pbs.replace('[queue]', queue) - pbs += '\necho "%s"\n' % ('-'*70) - - # ===================================================================== - # activate the python environment - pbs += 'echo "activate python environment %s"\n' % pyenv - pbs += 'source activate %s\n' % pyenv - - # ===================================================================== - # create all necessary directories at CPU_NR dirs, turb db dirs, sim_id - # browse to scratch directory - pbs += '\necho "%s"\n' % ('-'*70) - pbs += 'cd %s\n' % pbase - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n\n' - pbs += 'echo "create CPU directories on the scratch disk"\n' - pbs += 'mkdir -p %s\n' % os.path.join(pbase, sim_id, '') - for k in range(ppn): - pbs += 'mkdir -p %s\n' % os.path.join(pbase, '%i' % k, '') - # pretend to be on the scratch sim_id directory to maintain the same - # database turb structure - pbs += '\necho "%s"\n' % ('-'*70) - pbs += 'cd %s\n' % os.path.join(pbase, sim_id, '') - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "create turb_db directories"\n' - db_dir_tags = ['[turb_db_dir]', '[meand_db_dir]', '[wake_db_dir]'] - turb_dirs = [] - for tag in db_dir_tags: - for dirname in set(df[tag].unique().tolist()): - if not dirname or dirname.lower() not in ['false', 'none']: - turb_dirs.append(dirname) - turb_dirs = set(turb_dirs) - for dirname in turb_dirs: - pbs += 'mkdir -p %s\n' % os.path.join(dirname, '') - - # ===================================================================== - # get the zip-chunk file from the PBS_O_WORKDIR - pbs += '\n' - pbs += 'echo "%s"\n' % ('-'*70) - pbs += 'cd $PBS_O_WORKDIR\n' - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "get the zip-chunk file from the PBS_O_WORKDIR"\n' - # copy the relevant zip chunk file to the scratch main directory - rpl = (os.path.join('./', chunks_dir, jobid), os.path.join(pbase, '')) - pbs += 'cp %s.zip %s\n' % rpl - - # turb_db_dir might not be set, same for turb_base_name, for those - # cases we do not need to copy anything from the database to the node - base_name_tags = ['[turb_base_name]', '[meand_base_name]', - '[wake_base_name]'] - for db, base_name in zip(db_dir_tags, base_name_tags): - turb_db_dirs = df[db] + df[base_name] - # When set to None, the DataFrame will have text as None - turb_db_src = turb_db_dirs[turb_db_dirs.str.find('None')==-1] - pbs += '\n' - pbs += '# copy to scratch db directory for %s, %s\n' % (db, base_name) - for k in turb_db_src.unique(): - dst = os.path.dirname(os.path.join(pbase, sim_id, k)) - pbs += 'cp %s* %s\n' % (k, os.path.join(dst, '.')) - - # ===================================================================== - # browse back to the scratch directory - pbs += '\necho "%s"\n' % ('-'*70) - pbs += 'cd %s\n' % pbase - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "unzip chunk, create dirs in cpu and sim_id folders"\n' - # unzip chunk, this contains all relevant folders already, and also - # contains files defined in [copyto_files] - for k in list(range(ppn)) + [sim_id]: - dst = os.path.join('%s' % k, '.') - pbs += '/usr/bin/unzip %s -d %s >> /dev/null\n' % (jobid+'.zip', dst) - - # create hard links for all the turbulence files - turb_dir_base = os.path.join(os.path.commonpath(list(turb_dirs)), '') - pbs += '\necho "%s"\n' % ('-'*70) - pbs += 'cd %s\n' % pbase - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "copy all turb files into CPU dirs"\n' - for k in range(ppn): - rpl = (os.path.relpath(os.path.join(sim_id, turb_dir_base)), k) - pbs += 'find %s -iname *.bin -exec cp {} %s/{} \\;\n' % rpl - - # ===================================================================== - # finally we can run find+xargs!!! - pbs += '\n' - pbs += 'echo "%s"\n' % ('-'*70) - pbs += 'cd %s\n' % pbase - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "START RUNNING JOBS IN find+xargs MODE"\n' - pbs += 'WINEARCH=win32 WINEPREFIX=~/.wine32 winefix\n' - pbs += '# run all the PBS *.p files in find+xargs mode\n' - pbs += 'echo "following cases will be run from following path:"\n' - pbs += 'echo "%s"\n' % (os.path.join(sim_id, pbs_in_base)) - pbs += 'export LAUNCH_PBS_MODE=false\n' - rpl = (cmd_find, os.path.join(sim_id, pbs_in_base)) - pbs += "%s %s -type f -name '*.p' | sort -z\n" % rpl - pbs += '\n' - pbs += 'echo "number of files to be launched: "' - pbs += '`find %s -type f | wc -l`\n' % os.path.join(sim_id, pbs_in_base) - rpl = (cmd_find, os.path.join(sim_id, pbs_in_base), cmd_xargs, ppn) - cmd = ("%s %s -type f -name '*.p' -print0 | sort -z | %s -0 -I{} " - "--process-slot-var=CPU_NR -n 1 -P %i sh {}\n" % rpl) - pbs += cmd - pbs += 'echo "END OF JOBS IN find+xargs MODE"\n' - - # ===================================================================== - # move results back from the node sim_id dir to the origin - pbs += '\n' - pbs += '\necho "%s"\n' % ('-'*70) - pbs += "echo 'total scratch disk usage:'\n" - pbs += 'du -hs %s\n' % pbase - pbs += 'cd %s\n' % os.path.join(pbase, sim_id) - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "Results saved at sim_id directory:"\n' - rpl = (os.path.join(pbs_in_base, '*'), os.path.join(htc_base, '*')) - pbs += 'find \n' - - # compress all result files into an archive, first *.sel files - # FIXME: why doesn this work with -name "*.sel" -o -name "*.dat"?? - pbs += '\necho "move results into compressed archive"\n' - pbs += 'find %s -name "*.sel" -print0 ' % res_base - fname = os.path.join(res_base, 'resfiles_chunk_%05i' % ii) - pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname - # now add the *.dat files to the archive - pbs += 'find %s -name "*.dat" -print0 ' % res_base - fname = os.path.join(res_base, 'resfiles_chunk_%05i' % ii) - pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname - - pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) - - # compress all logfiles into an archive - pbs += '\necho "move logfiles into compressed archive"\n' - pbs += 'find %s -name "*.log" -print0 ' % log_base - fname = os.path.join(log_base, 'logfiles_chunk_%05i' % ii) - pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname - pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) - - # compress all post-processing results (saved as csv's) into an archive - pbs += '\necho "move statsdel into compressed archive"\n' - pbs += 'find %s -name "*.csv" -print0 ' % res_base - fname = os.path.join(post_dir_base, 'statsdel_chunk_%05i' % ii) - pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname - pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) - - # compress all post-processing results (saved as csv's) into an archive - pbs += '\necho "move log analysis into compressed archive"\n' - pbs += 'find %s -name "*.csv" -print0 ' % log_base - fname = os.path.join(post_dir_base, 'loganalysis_chunk_%05i' % ii) - pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname - pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) - - pbs += '\n' - pbs += '\necho "%s"\n' % ('-'*70) - pbs += 'cd %s\n' % pbase - pbs += "echo 'current working directory:'\n" - pbs += 'pwd\n' - pbs += 'echo "move results back from node scratch/sim_id to origin, ' - pbs += 'but ignore htc, and pbs_in directories."\n' - - tmp = os.path.join(sim_id, '*') - pbs += 'echo "copy from %s to $PBS_O_WORKDIR/"\n' % tmp - pbs += 'time rsync -au --remove-source-files %s $PBS_O_WORKDIR/ \\\n' % tmp - pbs += ' --exclude %s \\\n' % os.path.join(pbs_in_base, '*') - pbs += ' --exclude %s \n' % os.path.join(htc_base, '*') - # when using -u, htc and pbs_in files should be ignored -# pbs += 'time cp -ru %s $PBS_O_WORKDIR/\n' % tmp - pbs += 'source deactivate\n' - pbs += 'echo "DONE !!"\n' - pbs += '\necho "%s"\n' % ('-'*70) - pbs += 'exit\n' - - rpl = (sim_id, ii) - fname = os.path.join(run_dir, chunks_dir, '%s_chunk_%05i' % rpl) - with open(fname+'.p', 'w') as f: - f.write(pbs) - - cc = Cases(cases) - df = cc.cases2df() - # sort on the specified values in the given columns - df.sort_values(by=sort_by_values, inplace=True) - - # create the directory to store all zipped chunks - try: - os.mkdir(os.path.join(df['[run_dir]'].iloc[0], chunks_dir)) - # FIXME: how do you make this work pythonically on both PY2 and PY3? - except (FileExistsError, OSError): - pass - - df_iter = chunker(df, nr_procs_series*ppn) - sim_id = df['[sim_id]'].iloc[0] - run_dir = df['[run_dir]'].iloc[0] - model_zip = df['[model_zip]'].iloc[0] - post_dir = df['[post_dir]'].iloc[0] - nodes = 1 - df_ind = pd.DataFrame(columns=['chunk_nr'], dtype=np.int32) - df_ind.index.name = '[case_id]' - for ii, dfi in enumerate(df_iter): - fname, ind = make_zip_chunks(dfi, ii, sim_id, run_dir, model_zip) - make_pbs_chunks(dfi, ii, sim_id, run_dir, model_zip) - df_ind = df_ind.append(ind) - print(fname) - - fname = os.path.join(post_dir, 'case_id-chunk-index') - df_ind['chunk_nr'] = df_ind['chunk_nr'].astype(np.int32) - df_ind.to_hdf(fname+'.h5', 'table', compression=9, complib='zlib') - df_ind.to_csv(fname+'.csv') - - def launch(cases, runmethod='local', verbose=False, copyback_turb=True, silent=False, check_log=True, windows_nr_cpus=2, qsub='time', wine_appendix='', pbs_fname_appendix=True, short_job_names=True, diff --git a/wetb/prepost/simchunks.py b/wetb/prepost/simchunks.py new file mode 100644 index 0000000..0e230d1 --- /dev/null +++ b/wetb/prepost/simchunks.py @@ -0,0 +1,394 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Aug 8 10:22:49 2016 + +@author: dave +""" +from __future__ import print_function +from __future__ import division +from __future__ import unicode_literals +from __future__ import absolute_import +from builtins import dict +from io import open +from builtins import zip +from builtins import range +from builtins import str +from builtins import int +from future import standard_library +standard_library.install_aliases() +from builtins import object + +# standard python library +import os +import zipfile +import copy + +import numpy as np +import pandas as pd + +from wetb.prepost.Simulations import Cases + + +def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20, + nr_procs_series=9, processes=1, queue='workq', + walltime='24:00:00', chunks_dir='zip-chunks-jess', + pyenv='wetb_py3'): + """Group a large number of simulations htc and pbs launch scripts into + different zip files so we can run them with find+xargs on various nodes. + """ + + def chunker(seq, size): + # for DataFrames you can also use groupby, as taken from: + # http://stackoverflow.com/a/25703030/3156685 + # for k,g in df.groupby(np.arange(len(df))//10) + # but this approach is faster, see also: + # http://stackoverflow.com/a/25701576/3156685 + # http://stackoverflow.com/a/434328/3156685 + return (seq[pos:pos + size] for pos in range(0, len(seq), size)) + + def make_zip_chunks(df, ii, sim_id, run_dir, model_zip): + """Create zip cunks and also create an index + """ + + # create a new zip file, give index of the first element. THis is + # quasi random due to the sorting we applied earlier +# ii = df.index[0] + rpl = (sim_id, ii) + fname = os.path.join(run_dir, chunks_dir, '%s_chunk_%05i' % rpl) + zf = zipfile.ZipFile(fname+'.zip', 'w', compression=zipfile.ZIP_STORED) + + # start with appending the base model zip file + fname_model = os.path.join(run_dir, model_zip) + with zipfile.ZipFile(fname_model, 'r') as zf_model: + for n in zf_model.namelist(): + zf.writestr(n, zf_model.open(n).read()) + + # create all necessary directories in the zip file + dirtags = ['[htc_dir]', '[res_dir]','[log_dir]','[animation_dir]', + '[pbs_in_dir]', '[eigenfreq_dir]','[turb_dir]','[wake_dir]', + '[meander_dir]','[hydro_dir]', '[mooring_dir]', + '[pbs_in_dir]', '[pbs_out_dir]'] + dirnames = [] + for tag in dirtags: + for dirname in set(df[tag].unique().tolist()): + if not dirname or dirname.lower() not in ['false', 'none', 0]: + dirnames.append(dirname) + for dirname in set(dirnames): + if dirname != 0: + zf.write('.', os.path.join(dirname, '.')) + + # and the post-processing data + # FIXME: do not use hard coded paths! + zf.write('.', 'prepost-data/') + + # HTC files + df_src = df['[run_dir]'] + df['[htc_dir]'] + df['[case_id]'] + df_dst = df['[htc_dir]'] + df['[case_id]'] + # create an index so given the htc file, we can find the chunk nr + df_index = pd.DataFrame(index=df['[case_id]'].copy(), + columns=['chunk_nr'], dtype=np.int32) + df_index['chunk_nr'] = ii + # Since df_src and df_dst are already Series, iterating is fast an it + # is slower to first convert to a list + for src, dst_rel in zip(df_src, df_dst): + zf.write(src+'.htc', dst_rel+'.htc') + + # PBS files + df_src = df['[run_dir]'] + df['[pbs_in_dir]'] + df['[case_id]'] + df_dst = df['[pbs_in_dir]'] + df['[case_id]'] + # Since df_src and df_dst are already Series, iterating is fast an it + # is slower to first convert to a list + for src, dst_rel in zip(df_src, df_dst): + zf.write(src+'.p', dst_rel+'.p') + + # copy and rename input files with given versioned name to the + # all files that will have to be renamed to their non-changeable + # default file name. + # this is a bit more tricky since unique() will not work on list items + copyto_files_tmp = df['[copyto_files]'].astype(str) + copyto_files = [] + # cycle through the unique elements + for k in set(copyto_files_tmp): + # k is of form: "['some/file.txt', 'another/file1.txt']" + if len(k) < 2: + continue + items = [kk[1:-1] for kk in k.split('[')[1].split(']')[0].split(', ')] + copyto_files.extend(items) + # we might still have non unique elements + copyto_files = set(copyto_files) + for copyto_file, dst_rel in zip(copyto_files, df_dst): + src = os.path.join(run_dir, copyto_file) + # make dir if it does not exist + zf.write('.', os.path.dirname(copyto_file), '.') + zf.write(src, copyto_file) + + zf.close() + + return fname, df_index + + pbs_tmplate =""" +### Standard Output +#PBS -N [job_name] +#PBS -o [std_out] +### Standard Error +#PBS -e [std_err] +#PBS -W umask=[umask] +### Maximum wallclock time format HOURS:MINUTES:SECONDS +#PBS -l walltime=[walltime] +#PBS -l nodes=[nodes]:ppn=[ppn] +### Queue name +#PBS -q [queue] + +""" + + def make_pbs_chunks(df, ii, sim_id, run_dir, model_zip): + """Create a PBS that: + * copies all required files (zip chunk) to scratch disk + * copies all required turbulence files to scratch disk + * runs everything with find+xargs + * copies back what's need to mimer + """ +# ii = df.index[0] + cmd_find = '/home/MET/sysalt/bin/find' + cmd_xargs = '/home/MET/sysalt/bin/xargs' + jobid = '%s_chunk_%05i' % (sim_id, ii) + + pbase = os.path.join('/scratch','$USER', '$PBS_JOBID', '') + post_dir_base = post_dir.split(sim_id)[1] + if post_dir_base[0] == os.path.sep: + post_dir_base = post_dir_base[1:] + + pbs_in_base = os.path.commonpath(df['[pbs_in_dir]'].unique().tolist()) + pbs_in_base = os.path.join(pbs_in_base, '') + htc_base = os.path.commonpath(df['[htc_dir]'].unique().tolist()) + htc_base = os.path.join(htc_base, '') + res_base = os.path.commonpath(df['[res_dir]'].unique().tolist()) + res_base = os.path.join(res_base, '') + log_base = os.path.commonpath(df['[log_dir]'].unique().tolist()) + log_base = os.path.join(log_base, '') + + # ===================================================================== + # PBS HEADER + pbs = copy.copy(pbs_tmplate) + pbs = pbs.replace('[job_name]', jobid) + pbs = pbs.replace('[std_out]', './%s.out' % jobid) + pbs = pbs.replace('[std_err]', './%s.err' % jobid) + pbs = pbs.replace('[umask]', '0003') + pbs = pbs.replace('[walltime]', walltime) + pbs = pbs.replace('[nodes]', str(nodes)) + pbs = pbs.replace('[ppn]', str(ppn)) + pbs = pbs.replace('[queue]', queue) + pbs += '\necho "%s"\n' % ('-'*70) + + # ===================================================================== + # activate the python environment + pbs += 'echo "activate python environment %s"\n' % pyenv + pbs += 'source activate %s\n' % pyenv + + # ===================================================================== + # create all necessary directories at CPU_NR dirs, turb db dirs, sim_id + # browse to scratch directory + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n\n' + pbs += 'echo "create CPU directories on the scratch disk"\n' + pbs += 'mkdir -p %s\n' % os.path.join(pbase, sim_id, '') + for k in range(ppn): + pbs += 'mkdir -p %s\n' % os.path.join(pbase, '%i' % k, '') + # pretend to be on the scratch sim_id directory to maintain the same + # database turb structure + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % os.path.join(pbase, sim_id, '') + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "create turb_db directories"\n' + db_dir_tags = ['[turb_db_dir]', '[meand_db_dir]', '[wake_db_dir]'] + turb_dirs = [] + for tag in db_dir_tags: + for dirname in set(df[tag].unique().tolist()): + if not dirname or dirname.lower() not in ['false', 'none']: + turb_dirs.append(dirname) + turb_dirs = set(turb_dirs) + for dirname in turb_dirs: + pbs += 'mkdir -p %s\n' % os.path.join(dirname, '') + + # ===================================================================== + # get the zip-chunk file from the PBS_O_WORKDIR + pbs += '\n' + pbs += 'echo "%s"\n' % ('-'*70) + pbs += 'cd $PBS_O_WORKDIR\n' + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "get the zip-chunk file from the PBS_O_WORKDIR"\n' + # copy the relevant zip chunk file to the scratch main directory + rpl = (os.path.join('./', chunks_dir, jobid), os.path.join(pbase, '')) + pbs += 'cp %s.zip %s\n' % rpl + + # turb_db_dir might not be set, same for turb_base_name, for those + # cases we do not need to copy anything from the database to the node + base_name_tags = ['[turb_base_name]', '[meand_base_name]', + '[wake_base_name]'] + for db, base_name in zip(db_dir_tags, base_name_tags): + turb_db_dirs = df[db] + df[base_name] + # When set to None, the DataFrame will have text as None + turb_db_src = turb_db_dirs[turb_db_dirs.str.find('None')==-1] + pbs += '\n' + pbs += '# copy to scratch db directory for %s, %s\n' % (db, base_name) + for k in turb_db_src.unique(): + dst = os.path.dirname(os.path.join(pbase, sim_id, k)) + pbs += 'cp %s* %s\n' % (k, os.path.join(dst, '.')) + + # ===================================================================== + # browse back to the scratch directory + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "unzip chunk, create dirs in cpu and sim_id folders"\n' + # unzip chunk, this contains all relevant folders already, and also + # contains files defined in [copyto_files] + for k in list(range(ppn)) + [sim_id]: + dst = os.path.join('%s' % k, '.') + pbs += '/usr/bin/unzip %s -d %s >> /dev/null\n' % (jobid+'.zip', dst) + + # create hard links for all the turbulence files + turb_dir_base = os.path.join(os.path.commonpath(list(turb_dirs)), '') + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "copy all turb files into CPU dirs"\n' + for k in range(ppn): + rpl = (os.path.relpath(os.path.join(sim_id, turb_dir_base)), k) + pbs += 'find %s -iname *.bin -exec cp {} %s/{} \\;\n' % rpl + + # ===================================================================== + # finally we can run find+xargs!!! + pbs += '\n' + pbs += 'echo "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "START RUNNING JOBS IN find+xargs MODE"\n' + pbs += 'WINEARCH=win32 WINEPREFIX=~/.wine32 winefix\n' + pbs += '# run all the PBS *.p files in find+xargs mode\n' + pbs += 'echo "following cases will be run from following path:"\n' + pbs += 'echo "%s"\n' % (os.path.join(sim_id, pbs_in_base)) + pbs += 'export LAUNCH_PBS_MODE=false\n' + rpl = (cmd_find, os.path.join(sim_id, pbs_in_base)) + pbs += "%s %s -type f -name '*.p' | sort -z\n" % rpl + pbs += '\n' + pbs += 'echo "number of files to be launched: "' + pbs += '`find %s -type f | wc -l`\n' % os.path.join(sim_id, pbs_in_base) + rpl = (cmd_find, os.path.join(sim_id, pbs_in_base), cmd_xargs, ppn) + cmd = ("%s %s -type f -name '*.p' -print0 | sort -z | %s -0 -I{} " + "--process-slot-var=CPU_NR -n 1 -P %i sh {}\n" % rpl) + pbs += cmd + pbs += 'echo "END OF JOBS IN find+xargs MODE"\n' + + # ===================================================================== + # move results back from the node sim_id dir to the origin + pbs += '\n' + pbs += '\necho "%s"\n' % ('-'*70) + pbs += "echo 'total scratch disk usage:'\n" + pbs += 'du -hs %s\n' % pbase + pbs += 'cd %s\n' % os.path.join(pbase, sim_id) + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "Results saved at sim_id directory:"\n' + rpl = (os.path.join(pbs_in_base, '*'), os.path.join(htc_base, '*')) + pbs += 'find \n' + + # compress all result files into an archive, first *.sel files + # FIXME: why doesn this work with -name "*.sel" -o -name "*.dat"?? + pbs += '\necho "move results into compressed archive"\n' + pbs += 'find %s -name "*.sel" -print0 ' % res_base + fname = os.path.join(res_base, 'resfiles_chunk_%05i' % ii) + pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname + # now add the *.dat files to the archive + pbs += 'find %s -name "*.dat" -print0 ' % res_base + fname = os.path.join(res_base, 'resfiles_chunk_%05i' % ii) + pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname + + pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) + + # compress all logfiles into an archive + pbs += '\necho "move logfiles into compressed archive"\n' + pbs += 'find %s -name "*.log" -print0 ' % log_base + fname = os.path.join(log_base, 'logfiles_chunk_%05i' % ii) + pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname + pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) + + # compress all post-processing results (saved as csv's) into an archive + pbs += '\necho "move statsdel into compressed archive"\n' + pbs += 'find %s -name "*.csv" -print0 ' % res_base + fname = os.path.join(post_dir_base, 'statsdel_chunk_%05i' % ii) + pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname + pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) + + # compress all post-processing results (saved as csv's) into an archive + pbs += '\necho "move log analysis into compressed archive"\n' + pbs += 'find %s -name "*.csv" -print0 ' % log_base + fname = os.path.join(post_dir_base, 'loganalysis_chunk_%05i' % ii) + pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname + pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) + + pbs += '\n' + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'cd %s\n' % pbase + pbs += "echo 'current working directory:'\n" + pbs += 'pwd\n' + pbs += 'echo "move results back from node scratch/sim_id to origin, ' + pbs += 'but ignore htc, and pbs_in directories."\n' + + tmp = os.path.join(sim_id, '*') + pbs += 'echo "copy from %s to $PBS_O_WORKDIR/"\n' % tmp + pbs += 'time rsync -au --remove-source-files %s $PBS_O_WORKDIR/ \\\n' % tmp + pbs += ' --exclude %s \\\n' % os.path.join(pbs_in_base, '*') + pbs += ' --exclude %s \n' % os.path.join(htc_base, '*') + # when using -u, htc and pbs_in files should be ignored +# pbs += 'time cp -ru %s $PBS_O_WORKDIR/\n' % tmp + pbs += 'source deactivate\n' + pbs += 'echo "DONE !!"\n' + pbs += '\necho "%s"\n' % ('-'*70) + pbs += 'exit\n' + + rpl = (sim_id, ii) + fname = os.path.join(run_dir, chunks_dir, '%s_chunk_%05i' % rpl) + with open(fname+'.p', 'w') as f: + f.write(pbs) + + cc = Cases(cases) + df = cc.cases2df() + # sort on the specified values in the given columns + df.sort_values(by=sort_by_values, inplace=True) + + # create the directory to store all zipped chunks + try: + os.mkdir(os.path.join(df['[run_dir]'].iloc[0], chunks_dir)) + # FIXME: how do you make this work pythonically on both PY2 and PY3? + except (FileExistsError, OSError): + pass + + df_iter = chunker(df, nr_procs_series*ppn) + sim_id = df['[sim_id]'].iloc[0] + run_dir = df['[run_dir]'].iloc[0] + model_zip = df['[model_zip]'].iloc[0] + post_dir = df['[post_dir]'].iloc[0] + nodes = 1 + df_ind = pd.DataFrame(columns=['chunk_nr'], dtype=np.int32) + df_ind.index.name = '[case_id]' + for ii, dfi in enumerate(df_iter): + fname, ind = make_zip_chunks(dfi, ii, sim_id, run_dir, model_zip) + make_pbs_chunks(dfi, ii, sim_id, run_dir, model_zip) + df_ind = df_ind.append(ind) + print(fname) + + fname = os.path.join(post_dir, 'case_id-chunk-index') + df_ind['chunk_nr'] = df_ind['chunk_nr'].astype(np.int32) + df_ind.to_hdf(fname+'.h5', 'table', compression=9, complib='zlib') + df_ind.to_csv(fname+'.csv') + +if __name__ == '__main__': + pass -- GitLab