From 27ccd0814c20606a34886a65e2600af5bd738978 Mon Sep 17 00:00:00 2001 From: "Mads M. Pedersen" <mmpe@dtu.dk> Date: Mon, 2 Jan 2017 09:44:13 +0100 Subject: [PATCH] features to support input/output htc file structure --- wetb/hawc2/htc_contents.py | 2 + wetb/hawc2/htc_file.py | 2 +- wetb/hawc2/log_file.py | 2 +- wetb/hawc2/simulation.py | 432 ++++--------------- wetb/hawc2/simulation_resources.py | 425 ++++++++++++++++++ wetb/utils/cluster_tools/cluster_resource.py | 23 +- wetb/utils/cluster_tools/ssh_client.py | 57 ++- 7 files changed, 573 insertions(+), 370 deletions(-) create mode 100644 wetb/hawc2/simulation_resources.py diff --git a/wetb/hawc2/htc_contents.py b/wetb/hawc2/htc_contents.py index 8d48f28..5337613 100644 --- a/wetb/hawc2/htc_contents.py +++ b/wetb/hawc2/htc_contents.py @@ -91,6 +91,8 @@ class HTCContents(object): def __contains__(self, key): + if self.contents is None: + return False return key in self.contents def get(self, section, default=None): diff --git a/wetb/hawc2/htc_file.py b/wetb/hawc2/htc_file.py index 6bc3b7e..56adcc3 100644 --- a/wetb/hawc2/htc_file.py +++ b/wetb/hawc2/htc_file.py @@ -177,7 +177,7 @@ class HTCFile(HTCContents, HTCDefaults): files.append(self.aero.dynstall_ateflap.get('flap', [None] * 3)[2]) if 'bemwake_method' in self.aero: files.append(self.aero.bemwake_method.get('a-ct-filename', [None] * 3)[0]) - for dll in [self.dll[dll] for dll in self.get('dll', {}).keys()]: + for dll in [self.dll[dll] for dll in self.get('dll', {}).keys() if 'filename' in self.dll[dll]]: files.append(dll.filename[0]) if 'wind' in self: files.append(self.wind.get('user_defined_shear', [None])[0]) diff --git a/wetb/hawc2/log_file.py b/wetb/hawc2/log_file.py index cf42b5d..1bf59af 100644 --- a/wetb/hawc2/log_file.py +++ b/wetb/hawc2/log_file.py @@ -17,7 +17,7 @@ from collections import OrderedDict import time import math UNKNOWN = "Unknown" -MISSING = "Log file cannot be found" +MISSING = "Log file not found (May be waiting for PBS allocation)" PENDING = "Simulation not started yet" INITIALIZATION = 'Initializing simulation' SIMULATING = "Simulating" diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index fce2cc1..f54b26e 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -13,23 +13,17 @@ import re import shutil import subprocess import sys +from threading import Thread import time -from wetb.hawc2 import log_file -from wetb.hawc2.htc_file import HTCFile, H2aeroHTCFile -from wetb.hawc2.log_file import LogFile, LogInfo from future import standard_library +from wetb.hawc2 import log_file +from wetb.hawc2.htc_file import HTCFile +from wetb.hawc2.log_file import LogFile +from wetb.utils.cluster_tools.cluster_resource import unix_path -from wetb.utils.cluster_tools import pbsjob -from wetb.utils.cluster_tools.cluster_resource import LocalResource -from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, DONE, NOT_SUBMITTED -from wetb.utils.cluster_tools.ssh_client import SSHClient -from wetb.utils.timing import print_time -from _datetime import datetime -from subprocess import STDOUT standard_library.install_aliases() -from threading import Thread QUEUED = "queued" #until start PREPARING = "Copy to host" # during prepare simulation @@ -40,8 +34,7 @@ FINISH = "Simulation finish" # when HAWC2 finish ERROR = "Error" # when hawc2 returns error ABORTED = "Aborted" # when stopped and logfile.status != Done CLEANED = "Cleaned" # after copy back -def unix_path(path): - return path.replace("\\", "/").lower() + class Simulation(object): """Class for doing hawc2 simulations @@ -62,7 +55,7 @@ class Simulation(object): - prepare_simulation() # copy to temporary folder - simulate() # simulate - finish_simulation # copy results back again - - updateStatusThread: + - updateSimStatusThread: - update status every second >>> sim.start() >>> while sim.status!=CLEANED: @@ -70,7 +63,7 @@ class Simulation(object): The default host is LocalSimulationHost. To simulate on pbs featured cluster - >>> sim.host = PBSClusterSimulationHost(sim, <hostname>, <username>, <password>): + >>> sim.host = PBSClusterSimulationHost(sim, <hostname>, <username>, <password>) >>> sim.start() >>> while sim.status!=CLEANED: >>> sim.show_status() @@ -81,17 +74,41 @@ class Simulation(object): status = QUEUED def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True): self.modelpath = os.path.abspath(modelpath) + "/" - self.tmp_modelpath = self.modelpath + if os.path.isabs(htcfilename): + htcfilename = os.path.relpath(htcfilename, modelpath).replace("\\","/") + if htcfilename.startswith("input/"): + htcfilename=htcfilename[6:] + exists = [os.path.isfile(os.path.join(modelpath, htcfilename)), + os.path.isfile(os.path.join(modelpath, "input/", htcfilename))] + if all(exists): + raise Exception("Both standard and input/output file structure available for %s in %s. Delete one of the options"%(htcfilename, modelpath) ) + if not any(exists): + raise Exception ("%s not found in %s"%(htcfilename, modelpath)) + else: + self.ios = exists[1] #input/output file structure + + if self.ios: + self.exepath = self.modelpath + "input/" + else: + self.exepath = self.modelpath + + self.tmp_modelpath = self.exepath self.folder = os.path.dirname(htcfilename) if not os.path.isabs(htcfilename): - htcfilename = os.path.join(modelpath, htcfilename) + htcfilename = os.path.join(self.exepath, htcfilename) self.filename = os.path.basename(htcfilename) - self.htcFile = HTCFile(htcfilename, os.path.relpath(self.modelpath,os.path.dirname(htcfilename))) + self.htcFile = HTCFile(htcfilename, self.exepath) self.time_stop = self.htcFile.simulation.time_stop[0] self.hawc2exe = hawc2exe self.copy_turbulence = copy_turbulence self.simulation_id = unix_path(os.path.relpath(htcfilename, self.modelpath) + "_%d" % id(self)).replace("/", "_") + if self.simulation_id.startswith("input_"): + self.simulation_id = self.simulation_id[6:] self.stdout_filename = os.path.splitext(unix_path(os.path.relpath(htcfilename, self.modelpath)).replace('htc', 'stdout', 1))[0] + ".out" + if self.ios: + + assert self.stdout_filename.startswith("input/") + self.stdout_filename = self.stdout_filename.replace("input/", "../output/") #self.stdout_filename = "stdout/%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: self.log_filename = self.htcFile.simulation.logfile[0] @@ -102,12 +119,13 @@ class Simulation(object): else: self.log_filename = os.path.relpath(self.log_filename) self.log_filename = unix_path(self.log_filename) - self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) + self.logFile = LogFile(os.path.join(self.exepath, self.log_filename), self.time_stop) self.logFile.clear() self.last_status = self.status self.errors = [] self.non_blocking_simulation_thread = Thread(target=self.simulate_distributed) - self.updateStatusThread = UpdateStatusThread(self) + self.updateSimStatusThread = UpdateSimStatusThread(self) + from wetb.hawc2.simulation_resources import LocalSimulationHost self.host = LocalSimulationHost(self) self.stdout = "" self.returncode = 0 @@ -116,8 +134,8 @@ class Simulation(object): """Start non blocking distributed simulation""" self.is_simulating = True if update_interval > 0: - self.updateStatusThread.interval = update_interval - self.updateStatusThread.start() + self.updateSimStatusThread.interval = update_interval + self.updateSimStatusThread.start() self.non_blocking_simulation_thread.start() def wait(self): @@ -166,19 +184,28 @@ class Simulation(object): def prepare_simulation(self): self.status = PREPARING self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) + self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.exepath, self.modelpath) ) + "/" self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath) def fmt(src): if os.path.isabs(src): - src = os.path.relpath(os.path.abspath(src), self.modelpath) + src = os.path.relpath(os.path.abspath(src), self.exepath) else: src = os.path.relpath (src) assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src return src - input_patterns = [fmt(src) for src in self.htcFile.input_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + self.additional_files().get('input', [])] - input_files = set([f for pattern in input_patterns for f in glob.glob(os.path.join(self.modelpath, pattern)) if os.path.isfile(f) and ".hawc2launcher" not in f]) - if not os.path.isdir(os.path.dirname(self.modelpath + self.stdout_filename)): - os.makedirs(os.path.dirname(self.modelpath + self.stdout_filename)) + if self.ios: + input_folder_files = [] + for root, _, filenames in os.walk(os.path.join(self.modelpath, "input/")): + for filename in filenames: + input_folder_files.append(os.path.join(root, filename)) + + input_patterns = [fmt(src) for src in input_folder_files + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + self.additional_files().get('input', [])] + else: + input_patterns = [fmt(src) for src in self.htcFile.input_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + self.additional_files().get('input', [])] + input_files = set([f for pattern in input_patterns for f in glob.glob(os.path.join(self.exepath, pattern)) if os.path.isfile(f) and ".hawc2launcher" not in f]) + if not os.path.isdir(os.path.dirname(self.exepath + self.stdout_filename)): + os.makedirs(os.path.dirname(self.exepath + self.stdout_filename)) self.host._prepare_simulation(input_files) @@ -192,7 +219,7 @@ class Simulation(object): def simulate(self): #starts blocking simulation - #self.simulation_start_time = self.host.get_datetime() + self.is_simulating = True self.errors = [] self.status = INITIALIZING @@ -227,18 +254,30 @@ class Simulation(object): def fmt(dst): if os.path.isabs(dst): - dst = os.path.relpath(os.path.abspath(dst), self.modelpath) + dst = os.path.relpath(os.path.abspath(dst), self.exepath) else: dst = os.path.relpath (dst) dst = unix_path(dst) - assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst + assert not os.path.relpath(os.path.join(self.exepath, dst), self.modelpath).startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst return dst - output_patterns = [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] - output_files = set([f for pattern in output_patterns for f in self.host.glob(unix_path(os.path.join(self.tmp_modelpath, pattern)))]) + if self.ios: + output_patterns = [fmt(dst) for dst in (["../output/*", "../output/"] + + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + + [os.path.join(self.exepath, self.stdout_filename)])] + output_files = set([f for pattern in output_patterns for f in self.host.glob(unix_path(os.path.join(self.tmp_exepath, pattern)), recursive=True)]) + else: + output_patterns = [fmt(dst) for dst in (self.htcFile.output_files() + + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + + [os.path.join(self.exepath, self.stdout_filename)])] + output_files = set([f for pattern in output_patterns for f in self.host.glob(unix_path(os.path.join(self.tmp_exepath, pattern)))]) try: self.host._finish_simulation(output_files) if self.status != ERROR: self.status = CLEANED + except Warning as e: + self.errors.append(str(e)) + if self.status != ERROR: + self.status = CLEANED except Exception as e: self.errors.append(str(e)) raise @@ -342,7 +381,7 @@ class Simulation(object): pass -class UpdateStatusThread(Thread): +class UpdateSimStatusThread(Thread): def __init__(self, simulation, interval=1): Thread.__init__(self) self.simulation = simulation @@ -358,330 +397,3 @@ class UpdateStatusThread(Thread): t = time.time() while self.simulation.is_simulating and time.time() < t + self.interval: time.sleep(1) - - -class SimulationResource(object): - def __init__(self, simulation): - self.sim = simulation - logFile = property(lambda self : self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v)) - errors = property(lambda self : self.sim.errors) - modelpath = property(lambda self : self.sim.modelpath) - tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) - simulation_id = property(lambda self : self.sim.simulation_id) - stdout_filename = property(lambda self : self.sim.stdout_filename) - htcFile = property(lambda self : self.sim.htcFile) - additional_files = property(lambda self : self.sim.additional_files) - _input_sources = property(lambda self : self.sim._input_sources) - _output_sources = property(lambda self : self.sim._output_sources) - log_filename = property(lambda self : self.sim.log_filename) - - status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) - is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) - - def __str__(self): - return self.host -class LocalSimulationHost(SimulationResource): - def __init__(self, simulation, resource=None): - SimulationResource.__init__(self, simulation) - LocalResource.__init__(self, "hawc2mb") - self.resource = resource - self.simulationThread = SimulationThread(self.sim) - - def get_datetime(self): - return datetime.now() - - def glob(self, path): - return glob.glob(path) - - def _prepare_simulation(self, input_files): - # must be called through simulation object - self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) - self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) - for src_file in input_files: - dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) - # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) #, exist_ok=True) - shutil.copy(src_file, dst) - if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: - print ("error copy ", dst) - - if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')): - os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True) - self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) - self.simulationThread.modelpath = self.tmp_modelpath - - - def _simulate(self): - #must be called through simulation object - self.returncode, self.stdout = 1, "Simulation failed" - self.simulationThread.start() - self.sim.set_id(self.sim.simulation_id, "Localhost(pid:%d)" % self.simulationThread.process.pid, self.tmp_modelpath) - self.simulationThread.join() - self.returncode, self.stdout = self.simulationThread.res - self.logFile.update_status() - self.errors.extend(list(set(self.logFile.errors))) - - - def _finish_simulation(self, output_files): - missing_result_files = [] - for src_file in output_files: - dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) - # exist_ok does not exist in Python27 - try: - if not os.path.isdir(os.path.dirname(dst_file)): - os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) - if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): - shutil.copy(src_file, dst_file) - except: - missing_result_files.append(dst_file) - - self.logFile.filename = os.path.join(self.modelpath, self.log_filename) - if missing_result_files: - raise Warning("Failed to copy %s from %s"%(",".join(missing_result_files), self.host)) - try: - shutil.rmtree(self.tmp_modelpath) - except (PermissionError, OSError) as e: - raise Warning("Fail to remove temporary files and folders on %s\n%s"%(self.host, str(e))) - - - def update_logFile_status(self): - self.logFile.update_status() - - def stop(self): - self.simulationThread.stop() - self.simulationThread.join() - - - -class SimulationThread(Thread): - - def __init__(self, simulation, low_priority=True): - Thread.__init__(self) - self.sim = simulation - self.modelpath = self.sim.modelpath - self.res = [0, "", ""] - self.low_priority = low_priority - - - def start(self): - #CREATE_NO_WINDOW = 0x08000000 - modelpath = self.modelpath - htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) - hawc2exe = self.sim.hawc2exe - stdout = self.sim.stdout_filename - if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)): - os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename)) - - with open (os.path.join(self.modelpath, stdout), 'wb') as stdout: - if isinstance(hawc2exe, tuple): - wine, hawc2exe = hawc2exe - self.process = subprocess.Popen(" ".join([wine, hawc2exe, htcfile]), stdout=stdout, stderr=STDOUT, shell=True, cwd=modelpath) #shell must be True to inwoke wine - else: - self.process = subprocess.Popen([hawc2exe, htcfile], stdout=stdout, stderr=STDOUT, shell=False, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) - self.process.communicate() - - import psutil - try: - self.sim.host.resource.process_name = psutil.Process(self.process.pid).name() - except: - pass - Thread.start(self) - - - def run(self): - import psutil - p = psutil.Process(os.getpid()) - try: - if self.low_priority: - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) - except: - pass - self.process.communicate() - errorcode = self.process.returncode - - with open(self.modelpath + self.sim.stdout_filename, encoding='cp1252') as fid: - stdout = fid.read() - self.res = errorcode, stdout - - def stop(self): - if hasattr(self, 'process'): - subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) - - -class PBSClusterSimulationHost(SimulationResource, SSHClient): - def __init__(self, simulation, resource, host, username, password, port=22): - SimulationResource.__init__(self, simulation) - SSHClient.__init__(self, host, username, password, port=port) - self.pbsjob = SSHPBSJob(resource.shared_ssh) - self.resource = resource - - hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) - - - def get_datetime(self): - v, out, err = self.execute('date "+%Y,%m,%d,%H,%M,%S"') - if v == 0: - return datetime.strptime(out.strip(), "%Y,%m,%d,%H,%M,%S") - - def _prepare_simulation(self, input_files): - with self: - self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) - self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) - - for src_file in input_files: - dst = unix_path(self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)) - self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) - self.upload(src_file, dst, verbose=False) - ##assert self.ssh.file_exists(dst) - - f = io.StringIO(self.pbsjobfile()) - f.seek(0) - self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) - self.execute("mkdir -p .hawc2launcher/%s/%s" % (self.simulation_id, os.path.dirname(self.stdout_filename))) - remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) - self.execute("rm -f %s" % remote_log_filename) - - - - - def _finish_simulation(self, output_files): - with self: - download_failed = [] - for src_file in output_files: - try: - dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) - os.makedirs(os.path.dirname(dst_file), exist_ok=True) - self.download(src_file, dst_file, retry=10) - except Exception as e: - download_failed.append(dst_file) - if download_failed: - raise Warning("Failed to download %s from %s"%(",".join(download_failed), self.host)) - else: - try: - self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) - finally: - try: - self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) - except: - raise Warning("Fail to remove temporary files and folders on %s"%self.host) - - - def _simulate(self): - """starts blocking simulation""" - self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") - - self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , self.sim.stdout_filename) - sleeptime = 1 - while self.is_simulating: - #self.__update_logFile_status() - time.sleep(sleeptime) - - local_out_file = self.modelpath + self.stdout_filename - with self: - try: - self.download(self.tmp_modelpath + self.stdout_filename, local_out_file) - with open(local_out_file) as fid: - _, self.stdout, returncode_str, _ = fid.read().split("---------------------") - self.returncode = returncode_str.strip() != "0" - except Exception: - self.returncode = 1 - self.stdout = "error: Could not download and read stdout file" - try: - self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) - except Exception: - raise Warning ("Logfile not found", self.tmp_modelpath + self.log_filename) - self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) - - - - def update_logFile_status(self): - def pbsjob_status(): - if self.pbsjob._status in [NOT_SUBMITTED, DONE]: - return self.pbsjob._status - if self.pbsjob.jobid in self.resource.is_executing: - self.pbsjob._status = pbsjob.RUNNING - elif self.simulation_id in self.resource.finished: - self.pbsjob._status = DONE - self.pbsjob.jobid = None - return self.pbsjob._status - - - status = pbsjob_status() - if status == pbsjob.NOT_SUBMITTED: - pass - elif status == pbsjob.DONE: - self.is_simulating = False - pass - else: - if self.simulation_id in self.resource.loglines: - self.logline = self.resource.loglines[self.simulation_id] - self.status = self.logline[0] - self.logFile = LogInfo(*self.logline[1:]) - - - def start(self): - """Start non blocking distributed simulation""" - self.non_blocking_simulation_thread.start() - - - - def abort(self): - self.pbsjob.stop() - self.stop() - try: - self.finish_simulation() - except: - pass - self.is_simulating = False - self.is_done = True - if self.status != ERROR and self.logFile.status not in [log_file.DONE]: - self.status = ABORTED - - def stop(self): - self.is_simulating = False - self.pbsjob.stop() - - def pbsjobfile(self): - cp_back = "" - for folder in set([unix_path(os.path.relpath(os.path.dirname(f))) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): - cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder - cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) - rel_htcfilename = unix_path(os.path.relpath(self.htcFile.filename, self.modelpath)) - return """ -### Standard Output -#PBS -N h2l_%s -### merge stderr into stdout -#PBS -j oe -#PBS -o %s -### Maximum wallclock time format HOURS:MINUTES:SECONDS -#PBS -l walltime=04:00:00 -###PBS -a 201547.53 -#PBS -lnodes=1:ppn=1 -### Queue name -#PBS -q workq -### Create scratch directory and copy data to it -cd $PBS_O_WORKDIR -pwd -cp -R . /scratch/$USER/$PBS_JOBID -### Execute commands on scratch nodes -cd /scratch/$USER/$PBS_JOBID -pwd -export PATH=/home/python/miniconda3/bin:$PATH -source activate wetb_py3 -WINEARCH=win32 WINEPREFIX=~/.wine32 winefix -### modelpath: %s -### htc: %s -echo "---------------------" -%s -c "from wetb.hawc2.cluster_simulation import ClusterSimulation;ClusterSimulation('.','%s', ('%s','%s'))" -echo "---------------------" -echo $? -echo "---------------------" -### Copy back from scratch directory -cd /scratch/$USER/$PBS_JOBID -%s -echo $PBS_JOBID -cd /scratch/ -### rm -r $PBS_JOBID -exit""" % (self.simulation_id, self.stdout_filename, self.modelpath, self.htcFile.filename, self.resource.python_cmd, rel_htcfilename, self.resource.wine_cmd, self.hawc2exe, cp_back) - diff --git a/wetb/hawc2/simulation_resources.py b/wetb/hawc2/simulation_resources.py new file mode 100644 index 0000000..973cb12 --- /dev/null +++ b/wetb/hawc2/simulation_resources.py @@ -0,0 +1,425 @@ +''' +Created on 22. dec. 2016 + +@author: mmpe +''' +from datetime import datetime +import glob +import io +import os +import shutil +from subprocess import STDOUT +import subprocess +from threading import Thread +import time + +from wetb.hawc2 import log_file +from wetb.hawc2.log_file import LogInfo, LogFile +from wetb.hawc2.simulation import ERROR, ABORTED +from wetb.utils.cluster_tools import pbsjob +from wetb.utils.cluster_tools.cluster_resource import LocalResource, \ + SSHPBSClusterResource, unix_path +from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, NOT_SUBMITTED, DONE +from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.timing import print_time + + +class SimulationHost(object): + def __init__(self, simulation): + self.sim = simulation + logFile = property(lambda self : self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v)) + errors = property(lambda self : self.sim.errors) + modelpath = property(lambda self : self.sim.modelpath) + exepath = property(lambda self : self.sim.exepath) + tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) + tmp_exepath = property(lambda self : self.sim.tmp_exepath, lambda self, v: setattr(self.sim, "tmp_exepath", v)) + simulation_id = property(lambda self : self.sim.simulation_id) + stdout_filename = property(lambda self : self.sim.stdout_filename) + htcFile = property(lambda self : self.sim.htcFile) + additional_files = property(lambda self : self.sim.additional_files) + _input_sources = property(lambda self : self.sim._input_sources) + _output_sources = property(lambda self : self.sim._output_sources) + log_filename = property(lambda self : self.sim.log_filename) + + status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) + is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) + + def __str__(self): + return self.resource.host + +class LocalSimulationHost(SimulationHost): + def __init__(self, simulation, resource=None): + SimulationHost.__init__(self, simulation) + if resource is None: + resource = LocalResource(1) + self.resource = resource + self.simulationThread = SimulationThread(self.sim) + + def get_datetime(self): + return datetime.now() + + def glob(self, path, recursive=False): + if recursive: + return [os.path.join(root, f) for root, _, files in os.walk(path) for f in files] + else: + return glob.glob(path) + + def _prepare_simulation(self, input_files): + # must be called through simulation object + self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) + self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.sim.exepath, self.sim.modelpath) ) + "/" + self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) + for src_file in input_files: + dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) + # exist_ok does not exist in Python27 + if not os.path.exists(os.path.dirname(dst)): + os.makedirs(os.path.dirname(dst)) #, exist_ok=True) + shutil.copy(src_file, dst) + if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: + print ("error copy ", dst) + + stdout_folder = os.path.join(self.tmp_exepath, os.path.dirname(self.sim.stdout_filename)) + if not os.path.exists(stdout_folder): + os.makedirs(stdout_folder) #, exist_ok=True) + self.logFile.filename = os.path.join(self.tmp_exepath, self.log_filename) + self.simulationThread.modelpath = self.tmp_modelpath + self.simulationThread.exepath = self.tmp_exepath + + + def _simulate(self): + #must be called through simulation object + self.returncode, self.stdout = 1, "Simulation failed" + self.simulationThread.start() + self.sim.set_id(self.sim.simulation_id, "Localhost(pid:%d)" % self.simulationThread.process.pid, self.tmp_modelpath) + self.simulationThread.join() + self.returncode, self.stdout = self.simulationThread.res + self.logFile.update_status() + self.errors.extend(list(set(self.logFile.errors))) + + + def _finish_simulation(self, output_files): + missing_result_files = [] + for src_file in output_files: + dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + # exist_ok does not exist in Python27 + try: + if not os.path.isdir(os.path.dirname(dst_file)): + os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) + if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): + shutil.copy(src_file, dst_file) + except: + missing_result_files.append(dst_file) + + self.logFile.filename = os.path.join(self.sim.exepath, self.log_filename) + if missing_result_files: + raise Warning("Failed to copy %s from %s"%(",".join(missing_result_files), self.resource.host)) + try: + shutil.rmtree(self.tmp_modelpath, ignore_errors=False) + except (PermissionError, OSError) as e: + raise Warning("Fail to remove temporary files and folders on %s\n%s"%(self.resource.host, str(e))) + + + def update_logFile_status(self): + self.logFile.update_status() + + def stop(self): + self.simulationThread.stop() + self.simulationThread.join() + + + +class SimulationThread(Thread): + + def __init__(self, simulation, low_priority=True): + Thread.__init__(self) + self.sim = simulation + self.modelpath = self.sim.modelpath + self.exepath = self.sim.exepath + self.res = [0, "", ""] + self.low_priority = low_priority + + + def start(self): + CREATE_NO_WINDOW = 0x08000000 + exepath = self.exepath #overwritten in _prepare_simulation + modelpath = self.modelpath #overwritten in _prepare_simulation + htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.exepath) + + hawc2exe = self.sim.hawc2exe + stdout = self.sim.stdout_filename + if not os.path.isdir(os.path.dirname(exepath + self.sim.stdout_filename)): + os.makedirs(os.path.dirname(exepath + self.sim.stdout_filename)) + + with open (os.path.join(exepath, stdout), 'wb') as stdout: + if isinstance(hawc2exe, tuple): + wine, hawc2exe = hawc2exe + self.process = subprocess.Popen(" ".join([wine, hawc2exe, htcfile]), stdout=stdout, stderr=STDOUT, shell=True, cwd=exepath) #shell must be True to inwoke wine + else: + self.process = subprocess.Popen([hawc2exe, htcfile], stdout=stdout, stderr=STDOUT, shell=False, cwd=exepath, creationflags=CREATE_NO_WINDOW) + self.process.communicate() + + import psutil + try: + self.sim.host.resource.process_name = psutil.Process(self.process.pid).name() + except: + pass + Thread.start(self) + + + def run(self): + import psutil + p = psutil.Process(os.getpid()) + try: + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + except: + pass + self.process.communicate() + errorcode = self.process.returncode + + with open(self.exepath + self.sim.stdout_filename, encoding='cp1252') as fid: + stdout = fid.read() + self.res = errorcode, stdout + + def stop(self): + if hasattr(self, 'process'): + subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) + + +class PBSClusterSimulationResource(SSHPBSClusterResource): + def __init__(self, host, username, password, port, min_cpu, min_free, init_cmd, wine_cmd, python_cmd): + SSHPBSClusterResource.__init__(self, host, username, password, port, min_cpu, min_free, init_cmd, wine_cmd, python_cmd) + + def is_clean(self): + return self.execute("find .hawc2launcher/ -type f | wc -l")[1] > 0 + + def clean(self): + try: + self.execute('rm .hawc2launcher -r -f') + except: + pass + try: + self.shared_ssh.close() + except: + pass + + def update_status(self): + try: + _, out, _ = self.execute("find .hawc2launcher/ -name '*.out'") + self.finished = set([f.split("/")[1] for f in out.split("\n") if "/" in f]) + except Exception: + #print ("resource_manager.update_status, out", str(e)) + pass + + try: + _, out, _ = self.execute("find .hawc2launcher -name 'status*' -exec cat {} \;") + self.loglines = {l.split(";")[0] : l.split(";")[1:] for l in out.split("\n") if ";" in l} + except Exception: + #print ("resource_manager.update_status, status file", str(e)) + pass + try: + _, out, _ = self.execute("qstat -u %s" % self.username) + self.is_executing = set([j.split(".")[0] for j in out.split("\n")[5:] if "." in j]) + except Exception: + #print ("resource_manager.update_status, qstat", str(e)) + pass + +class GormSimulationResource(PBSClusterSimulationResource): + def __init__(self, username, password, wine_cmd="WINEARCH=win32 WINEPREFIX=~/.wine32 wine"): + init_cmd = """export PATH=/home/python/miniconda3/bin:$PATH +source activate wetb_py3""" + PBSClusterSimulationResource.__init__(self, "gorm.risoe.dk", username, password, 22, 25, 100, init_cmd, wine_cmd, "python") + + +class PBSClusterSimulationHost(SimulationHost, SSHClient): + def __init__(self, simulation, resource): + SimulationHost.__init__(self, simulation) + SSHClient.__init__(self, resource.host, resource.username, resource.password, resource.port) + self.pbsjob = SSHPBSJob(resource.shared_ssh) + self.resource = resource + + hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) + + + def get_datetime(self): + v, out, err = self.execute('date "+%Y,%m,%d,%H,%M,%S"') + if v == 0: + return datetime.strptime(out.strip(), "%Y,%m,%d,%H,%M,%S") + + @print_time + def _prepare_simulation(self, input_files): + with self: + self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) + self.execute("mkdir -p %s%s" % (self.tmp_exepath, os.path.dirname(self.log_filename))) + + self.upload_files(self.modelpath, self.tmp_modelpath, file_lst = [os.path.relpath(f, self.modelpath) for f in input_files]) +# for src_file in input_files: +# dst = unix_path(self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)) +# self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) +# self.upload(src_file, dst, verbose=False) +# ##assert self.ssh.file_exists(dst) + + f = io.StringIO(self.pbsjobfile(self.sim.ios)) + f.seek(0) + self.upload(f, self.tmp_exepath + "%s.in" % self.simulation_id) + self.execute("mkdir -p %s%s" % (self.tmp_exepath, os.path.dirname(self.stdout_filename))) + remote_log_filename = "%s%s" % (self.tmp_exepath, self.log_filename) + self.execute("rm -f %s" % remote_log_filename) + + + + @print_time + def _finish_simulation(self, output_files): + with self: + download_failed = [] + try: + self.download_files(self.tmp_modelpath, self.modelpath, file_lst = [os.path.relpath(f, self.tmp_modelpath) for f in output_files] ) + except: +# +# for src_file in output_files: +# try: +# dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) +# os.makedirs(os.path.dirname(dst_file), exist_ok=True) +# self.download(src_file, dst_file, retry=10) +# except Exception as e: +# download_failed.append(dst_file) +# if download_failed: + raise Warning("Failed to download %s from %s"%(",".join(download_failed), self.host)) + else: + try: + self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) + finally: + try: + self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) + except: + raise Warning("Fail to remove temporary files and folders on %s"%self.host) + + + def _simulate(self): + """starts blocking simulation""" + self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") + + self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_exepath , self.sim.stdout_filename) + sleeptime = 1 + while self.is_simulating: + time.sleep(sleeptime) + + local_out_file = self.exepath + self.stdout_filename + with self: + try: + self.download(self.tmp_exepath + self.stdout_filename, local_out_file) + with open(local_out_file) as fid: + _, self.stdout, returncode_str, _ = fid.read().split("---------------------") + self.returncode = returncode_str.strip() != "0" + except Exception: + self.returncode = 1 + self.stdout = "error: Could not download and read stdout file" + try: + self.download(self.tmp_exepath + self.log_filename, self.exepath + self.log_filename) + except Exception: + raise Warning ("Logfile not found", self.tmp_modelpath + self.log_filename) + self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.exepath) + + + + def update_logFile_status(self): + def pbsjob_status(): + if self.pbsjob._status in [NOT_SUBMITTED, DONE]: + return self.pbsjob._status + if self.pbsjob.jobid in self.resource.is_executing: + self.pbsjob._status = pbsjob.RUNNING + elif self.simulation_id in self.resource.finished: + self.pbsjob._status = DONE + self.pbsjob.jobid = None + return self.pbsjob._status + + def set_status(): + if self.simulation_id in self.resource.loglines: + self.logline = self.resource.loglines[self.simulation_id] + self.status = self.logline[0] + self.logFile = LogInfo(*self.logline[1:]) + + status = pbsjob_status() + if status == pbsjob.NOT_SUBMITTED: + pass + elif status == pbsjob.DONE: + if self.is_simulating: + set_status() + self.is_simulating = False + else: + set_status() + + + def start(self): + """Start non blocking distributed simulation""" + self.non_blocking_simulation_thread.start() + + + + def abort(self): + self.pbsjob.stop() + self.stop() + try: + self.finish_simulation() + except: + pass + self.is_simulating = False + self.is_done = True + if self.status != ERROR and self.logFile.status not in [log_file.DONE]: + self.status = ABORTED + + def stop(self): + self.is_simulating = False + self.pbsjob.stop() + + + + + def pbsjobfile(self, ios=False): + cp_back = "" + for folder in set([unix_path(os.path.relpath(os.path.dirname(f))) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): + cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder + cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) + rel_htcfilename = unix_path(os.path.relpath(self.htcFile.filename, self.exepath)) + + init=""" +### Standard Output +#PBS -N h2l_%s +### merge stderr into stdout +#PBS -j oe +#PBS -o %s +### Maximum wallclock time format HOURS:MINUTES:SECONDS +#PBS -l walltime=04:00:00 +###PBS -a 201547.53 +#PBS -lnodes=1:ppn=1 +### Queue name +#PBS -q workq +### Create scratch directory and copy data to it +cd $PBS_O_WORKDIR +pwd"""% (self.simulation_id, self.stdout_filename) + copy_to=""" +cp -R %s /scratch/$USER/$PBS_JOBID +### Execute commands on scratch nodes +cd /scratch/$USER/$PBS_JOBID%s +pwd"""%((".","../")[ios], ("", "/input")[ios]) + run=''' +%s +### modelpath: %s +### htc: %s +echo "---------------------" +%s -c "from wetb.hawc2.cluster_simulation import ClusterSimulation;ClusterSimulation('.','%s', ('%s','%s'))" +echo "---------------------" +echo $? +echo "---------------------"'''% (self.resource.init_cmd, self.modelpath, self.htcFile.filename, self.resource.python_cmd, rel_htcfilename, self.resource.wine_cmd, self.hawc2exe) + copy_back = """ +### Copy back from scratch directory +cd /scratch/$USER/$PBS_JOBID%s +%s +echo $PBS_JOBID +cd /scratch/ +### rm -r $PBS_JOBID +exit""" % (("", "/input")[ios], cp_back) + return init+copy_to+run+copy_back + + + diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py index 4ae9d06..7e657d1 100644 --- a/wetb/utils/cluster_tools/cluster_resource.py +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -11,6 +11,8 @@ import threading from wetb.utils.cluster_tools import pbswrap from wetb.utils.cluster_tools.ssh_client import SSHClient, SharedSSHClient +def unix_path(path): + return path.replace("\\", "/").lower() class Resource(object): @@ -42,10 +44,23 @@ class Resource(object): self.acquired -= 1 + def update_status(self): + try: + self.no_cpu, self.cpu_free, self.no_current_process = self.check_resources() + except Exception: + pass + class SSHPBSClusterResource(Resource, SSHClient): - def __init__(self, host, username, password, port, min_cpu, min_free): + finished = [] + loglines = {} + is_executing = [] + + def __init__(self, host, username, password, port, min_cpu, min_free, init_cmd, wine_cmd, python_cmd): Resource.__init__(self, min_cpu, min_free) + self.init_cmd = init_cmd + self.wine_cmd = wine_cmd + self.python_cmd = python_cmd self.shared_ssh = SharedSSHClient(host, username, password, port) SSHClient.__init__(self, host, username, password, port=port) self.lock = threading.Lock() @@ -87,9 +102,9 @@ class SSHPBSClusterResource(Resource, SSHClient): if not hasattr(jobids, "len"): jobids = list(jobids) self.execute("qdel %s" % (" ".join(jobids))) - - - + + + class LocalResource(Resource): diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py index 015d75c..d3480e7 100644 --- a/wetb/utils/cluster_tools/ssh_client.py +++ b/wetb/utils/cluster_tools/ssh_client.py @@ -12,6 +12,9 @@ import threading from _collections import deque import time import traceback +import zipfile +from wetb.utils.timing import print_time +import glob class SSHClient(object): "A wrapper of paramiko.SSHClient" @@ -64,9 +67,12 @@ class SSHClient(object): def download(self, remotefilepath, localfile, verbose=False, retry=1): if verbose: + ret = None print ("Download %s > %s" % (remotefilepath, str(localfile))) with self: for i in range(retry): + if i>0: + print ("Retry download %s, #%d"%(remotefilepath, i)) try: if isinstance(localfile, (str, bytes, int)): ret = self.sftp.get(remotefilepath, localfile) @@ -75,11 +81,10 @@ class SSHClient(object): break except: pass - print ("retry", i) + print ("Download %s failed from %s"%(remotefilepath, self.host)) if verbose: print (ret) - def upload(self, localfile, filepath, verbose=False): if verbose: print ("Upload %s > %s" % (localfile, filepath)) @@ -90,6 +95,45 @@ class SSHClient(object): ret = self.sftp.putfo(localfile, filepath) if verbose: print (ret) + + + def upload_files(self, localpath, remotepath, file_lst=["."], compression_level=1): + assert os.path.isdir(localpath) + if not isinstance(file_lst, (tuple, list)): + file_lst = [file_lst] + files = ([os.path.join(root, f) for fp in file_lst for root,_,files in os.walk(os.path.join(localpath, fp )) for f in files] + + [f for fp in file_lst for f in glob.glob(os.path.join(localpath, fp)) ]) + files = set([os.path.abspath(f) for f in files]) + + compression_levels = {0:zipfile.ZIP_STORED, 1:zipfile.ZIP_DEFLATED, 2:zipfile.ZIP_BZIP2, 3:zipfile.ZIP_LZMA} + zn = 'tmp.zip' + zipf = zipfile.ZipFile(zn, 'w', compression_levels[compression_level]) + for f in files: + zipf.write(f, os.path.relpath(f, localpath)) + zipf.close() + remote_zn = os.path.join(remotepath, zn).replace("\\","/") + self.execute("mkdir -p %s"%(remotepath)) + + self.upload(zn, remote_zn) + os.remove(zn) + self.execute("unzip %s -d %s && rm %s"%(remote_zn, remotepath, remote_zn)) + + def download_files(self, remote_path, localpath, file_lst=["."], compression_level=1): + if not isinstance(file_lst, (tuple, list)): + file_lst = [file_lst] + file_lst = [f.replace("\\","/") for f in file_lst] + remote_zip = os.path.join(remote_path, "tmp.zip").replace("\\","/") + self.execute("cd %s && zip -r tmp.zip %s"%(remote_path, " ".join(file_lst))) + + local_zip = os.path.join(localpath, "tmp.zip") + if not os.path.isdir(localpath): + os.makedirs(localpath) + self.download(remote_zip, local_zip) + self.execute("rm -f %s" % remote_zip) + with zipfile.ZipFile(local_zip, "r") as z: + z.extractall(localpath) + os.remove(local_zip) + def close(self): for x in ["sftp", "client" ]: @@ -154,13 +198,18 @@ class SSHClient(object): self.upload('tmp.reg', 'tmp.reg') ret = self.execute('wine regedit tmp.reg') - def glob(self, filepattern, cwd=""): + def glob(self, filepattern, cwd="", recursive=False): cwd = os.path.join(cwd, os.path.split(filepattern)[0]).replace("\\", "/") filepattern = os.path.split(filepattern)[1] - _, out, _ = self.execute(r'find %s -maxdepth 1 -type f -name "%s"' % (cwd, filepattern)) + if recursive: + _, out, _ = self.execute(r'find %s -type f -name "%s"' % (cwd, filepattern)) + else: + _, out, _ = self.execute(r'find %s -maxdepth 1 -type f -name "%s"' % (cwd, filepattern)) return [file for file in out.strip().split("\n") if file != ""] + + class SharedSSHClient(SSHClient): def __init__(self, host, username, password=None, port=22, key=None, passphrase=None): SSHClient.__init__(self, host, username, password=password, port=port, key=key, passphrase=passphrase) -- GitLab