From d39da16f918c3f84e83d12c8059bd1d397633ed7 Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Tue, 5 Apr 2016 14:37:52 +0200 Subject: [PATCH] stuff --- wetb/hawc2/htc_file.py | 2 +- wetb/hawc2/log_file.py | 14 +- wetb/hawc2/simulation.py | 491 ++++++++++++++----- wetb/utils/cluster_tools/cluster_resource.py | 57 +-- 4 files changed, 401 insertions(+), 163 deletions(-) diff --git a/wetb/hawc2/htc_file.py b/wetb/hawc2/htc_file.py index d539794e..95e3fb74 100644 --- a/wetb/hawc2/htc_file.py +++ b/wetb/hawc2/htc_file.py @@ -169,7 +169,7 @@ class HTCFile(HTCContents, HTCDefaults): return [f for f in files if f] def turbulence_files(self): - if self.wind.turb_format[0] == 0: + if 'wind' not in self.contents.keys() or self.wind.turb_format[0] == 0: return [] elif self.wind.turb_format[0] == 1: files = [self.get('wind.mann.filename_%s' % comp, [None])[0] for comp in ['u', 'v', 'w']] diff --git a/wetb/hawc2/log_file.py b/wetb/hawc2/log_file.py index 622bb6fc..48a810c8 100644 --- a/wetb/hawc2/log_file.py +++ b/wetb/hawc2/log_file.py @@ -169,6 +169,16 @@ class LogFile(LogInterpreter): if txt != "": LogInterpreter.update_status(self, txt) +class LogInfo(LogFile): + def __init__(self, status, pct, remaining_time, lastline): + self.status = status + self.pct = int(pct) + try: + self.remaining_time = float(remaining_time) + except: + self.remaining_time = None + self.lastline = lastline + self.errors = [] - - + def update_status(self): + pass \ No newline at end of file diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index 6e47b2ab..39410ab1 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -5,10 +5,16 @@ from __future__ import absolute_import from io import open from builtins import str from future import standard_library +from wetb.utils.cluster_tools.cluster_resource import LocalResource, \ + PBSClusterResource +from wetb.utils.cluster_tools.pbsjob import PBSJob +import io +import time +from wetb.utils.cluster_tools import pbsjob standard_library.install_aliases() import os from wetb.hawc2.htc_file import HTCFile -from wetb.hawc2.log_file import LogFile +from wetb.hawc2.log_file import LogFile, LogInfo from threading import Timer, Thread import sys from multiprocessing import Process @@ -21,8 +27,6 @@ from wetb.hawc2 import log_file import re import threading - - QUEUED = "queued" #until start PREPARING = "Copy to host" # during prepare simulation INITIALIZING = "Initializing" #when starting @@ -34,7 +38,7 @@ CLEANED = "Cleaned" # after copy back class Simulation(object): is_simulating = False - _status = QUEUED + status = QUEUED def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"): self.modelpath = os.path.abspath(modelpath) + "/" self.folder = os.path.dirname(htcfilename) @@ -59,33 +63,90 @@ class Simulation(object): self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile.clear() - self.last_status = self._status + self.last_status = self.status self.errors = [] self.thread = Thread(target=self.simulate_distributed) self.hawc2exe = hawc2exe - self.simulationThread = SimulationThread(self) - self.timer = RepeatedTimer(self.update_status) + self.updateStatusThread = UpdateStatusThread(self) + self.resource = LocalSimulationResource(self) + + def input_sources(self): + def fmt(src): + if os.path.isabs(src): + src = os.path.relpath(os.path.abspath(src), self.modelpath) + else: + src = os.path.relpath (src) + assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src + return src + return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] + + def output_sources(self): + def fmt(dst): + if os.path.isabs(dst): + dst = os.path.relpath(os.path.abspath(dst), self.modelpath) + else: + dst = os.path.relpath (dst) + dst = dst.replace("\\", "/") + assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst + return dst + return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence]] - def __str__(self): - return "Simulation(%s)" % self.filename + def prepare_simulation(self): + self.status = PREPARING + self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) + self.resource._prepare_simulation() - @property - def status(self): - return self._status + def simulate(self): + #starts blocking simulation + self.is_simulating = True + self.errors = [] + self.status = INITIALIZING + self.logFile.clear() - @status.setter - def status(self, status): - self._status = status - self.show_status() + self.resource._simulate() + + if self.resource.returncode or 'error' in self.resource.stdout.lower(): + self.errors = (list(set([l for l in self.resource.stdout.split("\n") if 'error' in l.lower()]))) + self.status = ERROR + if 'HAWC2MB version:' not in self.resource.stdout: + self.errors.append(self.stdout) + self.status = ERROR - def update_status(self, *args, **kwargs): self.logFile.update_status() + self.errors.extend(list(set(self.logFile.errors))) + self.update_status() + self.is_simulating = False + if self.resource.returncode or self.errors: + raise Exception("Simulation error:\n" + "\n".join(self.errors)) + elif self.logFile.status != log_file.DONE or self.logFile.errors: + raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % (self.logFile.status, "\n".join(self.logFile.errors))) + else: + self.status = FINISH + + + def finish_simulation(self): + lock = threading.Lock() + with lock: + if self.status == CLEANED: return + if self.status != ERROR: + self.status = CLEANED + self.resource._finish_simulation() + + + + + def update_status(self, *args, **kwargs): + self.resource.update_logFile_status() if self.status in [INITIALIZING, SIMULATING]: if self.logFile.status == log_file.SIMULATING: - self._status = SIMULATING + self.status = SIMULATING if self.logFile.status == log_file.DONE and self.is_simulating is False: - self._status = FINISH + self.status = FINISH + + + def __str__(self): + return "Simulation(%s)" % self.filename def show_status(self): #print ("log status:", self.logFile.status) @@ -100,6 +161,8 @@ class Simulation(object): sys.stdout.write("."*(100 - self.last_pct) + "|") sys.stdout.flush() print ("\n") + elif self.logFile.status == log_file.UNKNOWN: + print (self.status) else: print (self.logFile.status) if self.logFile.status != log_file.SIMULATING: @@ -123,89 +186,6 @@ class Simulation(object): with open(additional_files_file, 'w', encoding='utf-8') as fid: json.dump(additional_files, fid) - def prepare_simulation(self): - self.status = PREPARING - - self.tmp_modelpath = os.path.join(self.modelpath, "tmp_%s/" % self.simulation_id) - - - for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', []): - if not os.path.isabs(src): - src = os.path.join(self.modelpath, src) - for src_file in glob.glob(src): - dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) - # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) #, exist_ok=True) - shutil.copy(src_file, dst) - if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: - print ("error copy ", dst) - else: - #print (dst) - pass - - - self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) - self.simulationThread.modelpath = self.tmp_modelpath - - - - def finish_simulation(self): - lock = threading.Lock() - with lock: - if self.status == CLEANED: return - if self.status != ERROR: - self.status = CLEANED - - files = self.htcFile.output_files() - if self.copy_turbulence: - files.extend(self.htcFile.turbulence_files()) - for dst in files: - if not os.path.isabs(dst): - dst = os.path.join(self.modelpath, dst) - src = os.path.join(self.tmp_modelpath, os.path.relpath(dst, self.modelpath)) - - for src_file in glob.glob(src): - dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) - # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst_file)): - os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) - if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): - shutil.copy(src_file, dst_file) - - self.logFile.filename = os.path.join(self.modelpath, self.log_filename) - - - try: - shutil.rmtree(self.tmp_modelpath) - except (PermissionError, OSError) as e: - raise Warning(str(e)) - - - - def simulate(self): - #starts blocking simulation - self.is_simulating = True - self.errors = [] - self.logFile.clear() - self.status = INITIALIZING - - self.returncode, self.stdout = 1, "Simulation failed" - self.simulationThread.start() - self.simulationThread.join() - self.returncode, self.stdout = self.simulationThread.res - if self.returncode or 'error' in self.stdout.lower(): - self.errors = list(set([l for l in self.stdout.split("\n") if 'error' in l.lower()])) - self.status = ERROR - self.is_simulating = False - self.logFile.update_status() - self.errors.extend(list(set(self.logFile.errors))) - if self.returncode: - raise Exception("Simulation error:\n" + "\n".join(self.errors)) - elif self.logFile.status != log_file.DONE or self.errors or self.logFile.errors: - raise Warning("Simulation succeded with errors:\nLog status:%s\n" % self.logFile.status + "\n".join(self.logFile.errors)) - else: - self.status = FINISH def simulate_distributed(self): self.prepare_simulation() @@ -213,7 +193,6 @@ class Simulation(object): self.finish_simulation() - def fix_errors(self): def confirm_add_additional_file(folder, file): if os.path.isfile(os.path.join(self.modelpath, folder, file)): @@ -240,25 +219,30 @@ class Simulation(object): def get_confirmation(self, title, msg): return True + def show_message(self, msg, title="Information"): print (msg) def start(self, update_interval=1): """Start non blocking distributed simulation""" - self.timer.start(update_interval*1000) + self.is_simulating = True + self.updateStatusThread.start() self.thread.start() def wait(self): self.thread.join() - self.timer.stop() self.update_status() - def stop(self): - self.timer.stop() - self.simulationThread.process.kill() + def abort(self): + self.resource.stop() + for _ in range(100): + if self.is_simulating: + break + time.sleep(0.1) try: self.finish_simulation() - except: + except Exception as e: + print (str(e)) pass if self.logFile.status not in [log_file.DONE]: self.status = ABORTED @@ -281,8 +265,8 @@ class SimulationThread(Thread): htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) hawc2exe = self.sim.hawc2exe stdout = self.sim.stdout_filename - if os.name=="nt": - self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath, creationflags=CREATE_NO_WINDOW) + if os.name == "nt": + self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) else: self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) Thread.start(self) @@ -298,34 +282,277 @@ class SimulationThread(Thread): stdout = fid.read() self.res = errorcode, stdout + def stop(self): + print ("stop") + subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) + print ("stop2") + +class UpdateStatusThread(Thread): + def __init__(self, simulation, interval=1): + Thread.__init__(self) + self.simulation = simulation + self.interval = interval + + def start(self): + Thread.start(self) + + def run(self): + while self.simulation.is_simulating: + self.simulation.update_status() + time.sleep(self.interval) + + +class SimulationResource(object): + def __init__(self, simulation): + self.sim = simulation + logFile = property(lambda self : self.sim.logFile) + errors = property(lambda self : self.sim.errors) + modelpath = property(lambda self : self.sim.modelpath) + tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) + simulation_id = property(lambda self : self.sim.simulation_id) + htcFile = property(lambda self : self.sim.htcFile) + additional_files = property(lambda self : self.sim.additional_files) + input_sources = property(lambda self : self.sim.input_sources) + output_sources = property(lambda self : self.sim.output_sources) + log_filename = property(lambda self : self.sim.log_filename) + status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) + + +class LocalSimulationResource(SimulationResource, LocalResource): + def __init__(self, simulation): + SimulationResource.__init__(self, simulation) + LocalResource.__init__(self, "hawc2mb") + self.simulationThread = SimulationThread(self.sim) + + def _prepare_simulation(self): + # must be called through simulation object + self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) + for src in self.input_sources(): + for src_file in glob.glob(os.path.join(self.modelpath, src)): + dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) + # exist_ok does not exist in Python27 + if not os.path.exists(os.path.dirname(dst)): + os.makedirs(os.path.dirname(dst)) #, exist_ok=True) + shutil.copy(src_file, dst) + if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: + print ("error copy ", dst) + + + self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) + self.simulationThread.modelpath = self.tmp_modelpath + + + def _simulate(self): + #must be called through simulation object + self.returncode, self.stdout = 1, "Simulation failed" + self.simulationThread.start() + self.simulationThread.join() + self.returncode, self.stdout = self.simulationThread.res + self.logFile.update_status() + self.errors.extend(list(set(self.logFile.errors))) + + def _finish_simulation(self): + for dst in self.output_sources(): + src = os.path.join(self.tmp_modelpath, dst) -class RepeatedTimer(object): - def __init__(self, function, *args, **kwargs): - self._timer = None - self.function = function - self.args = args - self.kwargs = kwargs - self.is_running = False + for src_file in glob.glob(src): + dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + # exist_ok does not exist in Python27 + if not os.path.exists(os.path.dirname(dst_file)): + os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) + if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): + shutil.copy(src_file, dst_file) + self.logFile.filename = os.path.join(self.modelpath, self.log_filename) - def _run(self): - self.is_running = False - self.start(self.interval) - self.function(*self.args, **self.kwargs) - def start(self, interval_ms=None): - self.interval = interval_ms - if not self.is_running: - self._timer = Timer(interval_ms / 1000, self._run) - self._timer.start() - self.is_running = True + try: + shutil.rmtree(self.tmp_modelpath) + except (PermissionError, OSError) as e: + raise Warning(str(e)) + + def update_logFile_status(self): + self.logFile.update_status() def stop(self): - self._timer.cancel() - self.is_running = False + self.simulationThread.stop() + self.simulationThread.join() + + + +class PBSClusterSimulationResouce(PBSClusterResource): + def __init__(self, simulation, htcfilename, host, username, password, port=22): + PBSClusterResource.__init__(self, host, username, password, port) + self.htc = htcfilename + self.pbsjob = PBSJob(self) + + + def _prepare_simulation(self): + with self.ssh: + self.ssh.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) + self.ssh.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) + + for src in self.input_sources(): + for src_file in glob.glob(os.path.join(self.modelpath, src)): + dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/") + self.ssh.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) + self.ssh.upload(src_file, dst, verbose=False) + ##assert self.ssh.file_exists(dst) + + f = io.StringIO(self.pbsjobfile(self.simulation_id)) + f.seek(0) + self.ssh.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) + remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) + self.ssh.execute("rm -f %s" % remote_log_filename) + + + + def _finish_simulation(self): + with self.ssh: + for dst in self.output_sources(): + + src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/") + for src_file in self.ssh.glob(src): + dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + os.makedirs(os.path.dirname(dst_file), exist_ok=True) + self.ssh.download(src_file, dst_file, verbose=False) + self.ssh.execute('rm -r .hawc2launcher/%s' % self.simulation_id) + self.ssh.execute('rm .hawc2launcher/status_%s' % self.simulation_id) + print ("finish3", self.simulation_id) + + + def _simulate(self): + """starts blocking simulation""" + print ("simulate1", self.simulation_id) + self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") + + pbs_out_file = "%s.out" % self.simulation_id + self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , pbs_out_file) + sleeptime = 1 + print ("simulate2", self.simulation_id) + while self.is_simulating: + self.update_status() + time.sleep(sleeptime) + + print ("simulate3", self.simulation_id) + local_out_file = self.modelpath + os.path.splitext(self.log_filename)[0] + ".out" + with self.ssh: + try: + self.ssh.download(self.tmp_modelpath + pbs_out_file, local_out_file) + with open(local_out_file) as fid: + _, self.stdout, returncode_str, _ = fid.read().split("---------------------") + self.returncode = returncode_str.strip() != "0" + except Exception: + self.returncode = 1 + self.stdout = "error: Could not download and read stdout file" + try: + self.ssh.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) + except Exception: + raise Exception ("Logfile not found") + self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) + + +# @property +# def status(self): +# return self._status +# +# @status.setter +# def status(self, status): +# self._status = status + + + def update_status(self, *args, **kwargs): + status = self.pbsjob.status + if status == pbsjob.NOT_SUBMITTED: + return self.status + elif status == pbsjob.DONE: + self.is_simulating = False + return self.status + try: + _, out, _ = self.ssh.execute("cat .hawc2launcher/status_%s" % self.simulation_id) + out = out.split(";") + if len(out) == 5: + self.status = out[0] + self.logFile = LogInfo(*out[1:]) + + except Exception as e: + if "No such file or directory" in str(e): + pass + else: + raise + + Simulation.update_status(self) + + def start(self): + """Start non blocking distributed simulation""" + self.thread.start() + + + + def abort(self): + self.pbsjob.stop() + self.stop() + try: + self.finish_simulation() + except: + pass + if self.status != ERROR and self.logFile.status not in [log_file.DONE]: + self.status = ABORTED + + def stop(self): + self.is_simulating = False + self.pbsjob.stop() + + def pbsjobfile(self, simulation_id): + cp_back = "" + for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): + cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder + cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) + rel_htcfilename = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "/") + return """ +### Standard Output +#PBS -N %s +### merge stderr into stdout +#PBS -j oe +#PBS -o %s.out +### Maximum wallclock time format HOURS:MINUTES:SECONDS +#PBS -l walltime=01:00:00 +###PBS -a 201547.53 +#PBS -lnodes=1:ppn=1 +### Queue name +#PBS -q workq +### Create scratch directory and copy data to it +cd $PBS_O_WORKDIR +pwd +cp -R . /scratch/$USER/$PBS_JOBID +### Execute commands on scratch nodes +cd /scratch/$USER/$PBS_JOBID +pwd +echo "---------------------" +python -c "from wetb.hawc2.cluster_simulation import ClusterSimulation;ClusterSimulation('.','%s', '%s')" +echo "---------------------" +echo $? +echo "---------------------" +### Copy back from scratch directory +cd /scratch/$USER/$PBS_JOBID +%s +echo $PBS_JOBID +cd /scratch/ +### rm -r $PBS_JOBID +exit""" % (simulation_id, simulation_id, rel_htcfilename, self.hawc2exe, cp_back) + + + if __name__ == "__main__": - sim = Simulation('C:\mmpe\HAWC2\Hawc2_model/', 'htc/error.htc') - sim.simulate() + sim = Simulation('C:\mmpe\HAWC2\Hawc2_model/', 'htc/long1/long01.htc', hawc2exe="hawc2-123_beta.exe") + sim.start() + + for i in range(10): + sim.show_status() + time.sleep(0.1) + sim.abort() + sim.show_status() + sim.show_status() diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py index eba30db5..dac06a1c 100644 --- a/wetb/utils/cluster_tools/cluster_resource.py +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -11,33 +11,6 @@ import psutil class Resource(object): pass -class LocalResource(Resource): - def __init__(self, process_name): - self.process_name = process_name - self.no_users = 1 - self.host = 'Localhost' - - def check_resources(self): - def name(i): - try: - return psutil.Process(i).name - except psutil._error.AccessDenied: - pass - - - no_cpu = multiprocessing.cpu_count() - cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu - no_current_process = len([i for i in psutil.get_pid_list() if name(i) == self.process_name]) - return no_cpu, cpu_free, no_current_process - - def ok2submit(self): - - total, free, user = self.check_resources() - minimum_cpus = total * 1 / self.no_users - if user < minimum_cpus and free > 2: - return True - else: - return False @@ -78,5 +51,33 @@ class PBSClusterResource(Resource, SSHClient): return True else: return False - pass + +class LocalResource(Resource): + def __init__(self, process_name): + self.process_name = process_name + self.no_users = 1 + self.host = 'Localhost' + + def check_resources(self): + def name(i): + try: + return psutil.Process(i).name + except psutil._error.AccessDenied: + pass + + + no_cpu = multiprocessing.cpu_count() + cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu + no_current_process = len([i for i in psutil.get_pid_list() if name(i) == self.process_name]) + return no_cpu, cpu_free, no_current_process + + def ok2submit(self): + + total, free, user = self.check_resources() + minimum_cpus = total * 1 / self.no_users + if user < minimum_cpus and free > 2: + return True + else: + return False + -- GitLab