From 1cdd75262e11c91c3616f97718179d3e24b2fd46 Mon Sep 17 00:00:00 2001 From: mmpe <mmpe@dtu.dk> Date: Fri, 2 Nov 2018 15:30:15 +0100 Subject: [PATCH] Simulation update --- wetb/hawc2/simulation.py | 172 ++++++++++----------- wetb/hawc2/simulation_resources.py | 235 ++++++++++++++--------------- 2 files changed, 201 insertions(+), 206 deletions(-) diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index a5e4edf..94de707 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -6,12 +6,9 @@ from __future__ import unicode_literals from builtins import str import glob from io import open -import io import json import os import re -import shutil -import subprocess import sys from threading import Thread import time @@ -20,13 +17,14 @@ from future import standard_library from wetb.hawc2 import log_file from wetb.hawc2.htc_file import HTCFile, fmt_path from wetb.hawc2.log_file import LogFile +import tempfile standard_library.install_aliases() -QUEUED = "queued" #until start +QUEUED = "queued" # until start PREPARING = "Copy to host" # during prepare simulation -INITIALIZING = "Initializing" #when starting +INITIALIZING = "Initializing" # when starting SIMULATING = "Simulating" # when logfile.status=simulating FINISHING = "Copy from host" # during prepare simulation FINISH = "Simulation finish" # when HAWC2 finish @@ -71,44 +69,49 @@ class Simulation(object): is_simulating = False is_done = False status = QUEUED + def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True): self.modelpath = os.path.abspath(modelpath) + "/" if os.path.isabs(htcfilename): htcfilename = os.path.relpath(htcfilename, modelpath) if htcfilename.startswith("input/"): - htcfilename=htcfilename[6:] - exists = [os.path.isfile(os.path.join(modelpath, htcfilename)), + htcfilename = htcfilename[6:] + exists = [os.path.isfile(os.path.join(modelpath, htcfilename)), os.path.isfile(os.path.join(modelpath, "input/", htcfilename))] if all(exists): - raise Exception("Both standard and input/output file structure available for %s in %s. Delete one of the options"%(htcfilename, modelpath) ) + raise Exception( + "Both standard and input/output file structure available for %s in %s. Delete one of the options" % (htcfilename, modelpath)) if not any(exists): - raise Exception ("%s not found in %s"%(htcfilename, modelpath)) + raise Exception("%s not found in %s" % (htcfilename, modelpath)) else: - self.ios = exists[1] #input/output file structure - + self.ios = exists[1] # input/output file structure + if self.ios: self.exepath = self.modelpath + "input/" else: - self.exepath = self.modelpath + self.exepath = self.modelpath + # model_path: top level path containing all resources + # exepath: parent path for relative paths + htcfilename = fmt_path(htcfilename) - + self.tmp_modelpath = self.exepath self.folder = os.path.dirname(htcfilename) - + self.filename = os.path.basename(htcfilename) self.htcFile = HTCFile(os.path.join(self.exepath, htcfilename), self.exepath) self.time_stop = self.htcFile.simulation.time_stop[0] self.hawc2exe = hawc2exe self.copy_turbulence = copy_turbulence - self.simulation_id = (htcfilename.replace("\\","/").replace("/", "_")[:50]+ "_%d" % id(self)) + self.simulation_id = (htcfilename.replace("\\", "/").replace("/", "_")[:50] + "_%d" % id(self)) if self.simulation_id.startswith("input_"): self.simulation_id = self.simulation_id[6:] - self.stdout_filename = fmt_path(os.path.join(os.path.relpath(self.exepath, self.modelpath), + self.stdout_filename = fmt_path(os.path.join(os.path.relpath(self.exepath, self.modelpath), (os.path.splitext(htcfilename)[0] + ".out").replace('htc', 'stdout', 1))) if self.ios: assert self.stdout_filename.startswith("input/") self.stdout_filename = self.stdout_filename.replace("input/", "../output/") - #self.stdout_filename = "stdout/%s.out" % self.simulation_id + # self.stdout_filename = "stdout/%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: self.log_filename = self.htcFile.simulation.logfile[0] else: @@ -122,7 +125,8 @@ class Simulation(object): self.logFile.clear() self.last_status = self.status self.errors = [] - self.non_blocking_simulation_thread = Thread(target=self.simulate_distributed) + self.non_blocking_simulation_thread = Thread( + target=lambda: self.simulate_distributed(raise_simulation_errors=False)) self.updateSimStatusThread = UpdateSimStatusThread(self) from wetb.hawc2.simulation_resources import LocalSimulationHost self.host = LocalSimulationHost(self) @@ -163,55 +167,55 @@ class Simulation(object): # self.update_status() def show_status(self): - #print ("log status:", self.logFile.status) + # print ("log status:", self.logFile.status) if self.logFile.status == log_file.SIMULATING: if self.last_status != log_file.SIMULATING: - print ("|" + ("-"*50) + "|" + ("-"*49) + "|") + print("|" + ("-" * 50) + "|" + ("-" * 49) + "|") sys.stdout.write("|") - sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0))) + sys.stdout.write("." * (self.logFile.pct - getattr(self, 'last_pct', 0))) sys.stdout.flush() self.last_pct = self.logFile.pct elif self.last_status == log_file.SIMULATING: - sys.stdout.write("."*(100 - self.last_pct) + "|") + sys.stdout.write("." * (100 - self.last_pct) + "|") sys.stdout.flush() - print ("\n") + print("\n") elif self.logFile.status == log_file.UNKNOWN: - print (self.status) + print(self.status) else: - print (self.logFile.status) + print(self.logFile.status) if self.logFile.status != log_file.SIMULATING: if self.logFile.errors: - print (self.logFile.errors) + print(self.logFile.errors) self.last_status = self.logFile.status - - - def prepare_simulation(self): self.status = PREPARING - self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) - self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.exepath, self.modelpath) ) + "/" + # self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) + # self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.exepath, self.modelpath) ) + "/" self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath) def fmt(src): if os.path.isabs(src): src = os.path.relpath(os.path.abspath(src), self.exepath) else: - src = os.path.relpath (src) - assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src + src = os.path.relpath(src) + assert not src.startswith( + ".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src return src if self.ios: input_folder_files = [] for root, _, filenames in os.walk(os.path.join(self.modelpath, "input/")): for filename in filenames: input_folder_files.append(os.path.join(root, filename)) - - input_patterns = [fmt(src) for src in input_folder_files + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + self.additional_files().get('input', [])] + + input_patterns = [fmt(src) for src in input_folder_files + ([], self.htcFile.turbulence_files()) + [self.copy_turbulence] + self.additional_files().get('input', [])] else: - input_patterns = [fmt(src) for src in self.htcFile.input_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + self.additional_files().get('input', [])] - input_files = set([f for pattern in input_patterns for f in glob.glob(os.path.join(self.exepath, pattern)) if os.path.isfile(f) and ".hawc2launcher" not in f]) - if not os.path.isdir(os.path.dirname(self.exepath + self.stdout_filename)): - os.makedirs(os.path.dirname(self.exepath + self.stdout_filename)) + input_patterns = [fmt(src) for src in self.htcFile.input_files( + ) + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + self.additional_files().get('input', [])] + input_files = set([f for pattern in input_patterns for f in glob.glob( + os.path.join(self.exepath, pattern)) if os.path.isfile(f) and ".hawc2launcher" not in f]) + self.host._prepare_simulation(input_files) @@ -224,8 +228,8 @@ class Simulation(object): # self.host._prepare_simulation() def simulate(self): - #starts blocking simulation - + # starts blocking simulation + self.is_simulating = True self.errors = [] self.status = INITIALIZING @@ -233,12 +237,12 @@ class Simulation(object): self.host._simulate() self.returncode, self.stdout = self.host.returncode, self.host.stdout if self.host.returncode or 'error' in self.host.stdout.lower(): - if self.status==ABORTED: + if self.status == ABORTED: return if "error" in self.host.stdout.lower(): self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()]))) self.status = ERROR - if 'HAWC2MB version:' not in self.host.stdout: + if 'HAWC2MB version:' not in self.host.stdout: self.errors.append(self.host.stdout) self.status = ERROR @@ -249,11 +253,11 @@ class Simulation(object): if self.host.returncode or self.errors: raise Exception("Simulation error:\nReturn code: %d\n%s" % (self.host.returncode, "\n".join(self.errors))) elif self.logFile.status != log_file.DONE or self.logFile.errors: - raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % (self.logFile.status, "\n".join(self.logFile.errors))) + raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % + (self.logFile.status, "\n".join(self.logFile.errors))) else: self.status = FINISH - def finish_simulation(self): if self.status == ABORTED: return @@ -264,18 +268,24 @@ class Simulation(object): if os.path.isabs(dst): dst = os.path.relpath(os.path.abspath(dst), self.exepath) else: - dst = os.path.relpath (dst) + dst = os.path.relpath(dst) dst = fmt_path(dst) - assert not os.path.relpath(os.path.join(self.exepath, dst), self.modelpath).startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst + assert not os.path.relpath(os.path.join(self.exepath, dst), self.modelpath).startswith( + ".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst return dst - turb_files = [f for f in self.htcFile.turbulence_files() if self.copy_turbulence and not os.path.isfile(os.path.join(self.exepath, f))] + turb_files = [f for f in self.htcFile.turbulence_files() + if self.copy_turbulence and not os.path.isfile(os.path.join(self.exepath, f))] if self.ios: - output_patterns = ["../output/*", "../output/"] + turb_files + [os.path.join(self.exepath, self.stdout_filename)] + output_patterns = ["../output/*", "../output/"] + turb_files + \ + [os.path.join(self.exepath, self.stdout_filename)] else: - output_patterns = self.htcFile.output_files() + turb_files + [os.path.join(self.exepath, self.stdout_filename)] - output_files = set(self.host.glob([fmt_path(os.path.join(self.tmp_exepath,fmt(p))) for p in output_patterns], recursive=self.ios)) - - + output_patterns = self.htcFile.output_files() + turb_files + \ + [os.path.join(self.exepath, self.stdout_filename)] + output_files = set(self.host.glob([fmt_path(os.path.join(self.tmp_exepath, fmt(p))) + for p in output_patterns], recursive=self.ios)) + if not os.path.isdir(os.path.dirname(self.exepath + self.stdout_filename)): + os.makedirs(os.path.dirname(self.exepath + self.stdout_filename)) + try: self.host._finish_simulation(output_files) if self.status != ERROR: @@ -287,15 +297,12 @@ class Simulation(object): except Exception as e: self.errors.append(str(e)) raise - + finally: self.set_id(self.filename) self.logFile.reset() self.htcFile.reset() - - - def update_status(self, *args, **kwargs): self.host.update_logFile_status() if self.status in [INITIALIZING, SIMULATING]: @@ -304,14 +311,9 @@ class Simulation(object): if self.logFile.status == log_file.DONE and self.is_simulating is False: self.status = FINISH - - def __str__(self): return "Simulation(%s)" % self.filename - - - def additional_files(self): additional_files_file = os.path.join(self.modelpath, 'additional_files.txt') additional_files = {} @@ -325,42 +327,43 @@ class Simulation(object): additional_files['input'] = list(set(additional_files.get('input', []) + [file])) additional_files_file = os.path.join(self.modelpath, 'additional_files.txt') with open(additional_files_file, 'w', encoding='utf-8') as fid: - json.dump(additional_files, fid) + json.dump(additional_files, fid) - - def simulate_distributed(self): + def simulate_distributed(self, raise_simulation_errors=True): try: - self.prepare_simulation() - try: - self.simulate() - except Warning as e: - print ("simulation failed", str(self)) - print ("Trying to finish") - raise - finally: + with tempfile.TemporaryDirectory(prefix="h2launcher_") as tmpdirname: + self.tmp_modelpath = tmpdirname + "/" + self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.exepath, self.modelpath)) + "/" + self.prepare_simulation() try: - if self.status!=ABORTED: - self.finish_simulation() - except: - print ("finish_simulation failed", str(self)) + self.simulate() + except Warning as e: + print("simulation failed", str(self)) + print("Trying to finish") raise + finally: + try: + if self.status != ABORTED: + self.finish_simulation() + except Exception: + print("finish_simulation failed", str(self)) + raise except Exception as e: self.status = ERROR self.errors.append(str(e)) - raise e + if raise_simulation_errors: + raise e finally: self.is_done = True - - - def fix_errors(self): def confirm_add_additional_file(folder, file): if os.path.isfile(os.path.join(self.modelpath, folder, file)): filename = fmt_path(os.path.join(folder, file)) if self.get_confirmation("File missing", "'%s' seems to be missing in the temporary working directory. \n\nDo you want to add it to additional_files.txt" % filename): self.add_additional_input_file(filename) - self.show_message("'%s' is now added to additional_files.txt.\n\nPlease restart the simulation" % filename) + self.show_message( + "'%s' is now added to additional_files.txt.\n\nPlease restart the simulation" % filename) for error in self.errors: for regex in [r".*\*\*\* ERROR \*\*\* File '(.*)' does not exist in the (.*) folder", r".*\*\*\* ERROR \*\*\* DLL (.*)()"]: @@ -383,14 +386,15 @@ class Simulation(object): return True def show_message(self, msg, title="Information"): - print (msg) + print(msg) def set_id(self, *args, **kwargs): pass - def progress_callback(self,*args, **kwargs): + def progress_callback(self, *args, **kwargs): pass + class UpdateSimStatusThread(Thread): def __init__(self, simulation, interval=1): Thread.__init__(self) diff --git a/wetb/hawc2/simulation_resources.py b/wetb/hawc2/simulation_resources.py index f89a20c..e373a7d 100644 --- a/wetb/hawc2/simulation_resources.py +++ b/wetb/hawc2/simulation_resources.py @@ -24,29 +24,31 @@ from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, NOT_SUBMITTED, DONE from wetb.hawc2.htc_file import fmt_path import numpy as np + class SimulationHost(object): def __init__(self, simulation): self.sim = simulation - logFile = property(lambda self : self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v)) - errors = property(lambda self : self.sim.errors) - modelpath = property(lambda self : self.sim.modelpath) - exepath = property(lambda self : self.sim.exepath) - tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) - tmp_exepath = property(lambda self : self.sim.tmp_exepath, lambda self, v: setattr(self.sim, "tmp_exepath", v)) - simulation_id = property(lambda self : self.sim.simulation_id) - stdout_filename = property(lambda self : self.sim.stdout_filename) - htcFile = property(lambda self : self.sim.htcFile) - additional_files = property(lambda self : self.sim.additional_files) - _input_sources = property(lambda self : self.sim._input_sources) - _output_sources = property(lambda self : self.sim._output_sources) - log_filename = property(lambda self : self.sim.log_filename) - - status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) - is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) + logFile = property(lambda self: self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v)) + errors = property(lambda self: self.sim.errors) + modelpath = property(lambda self: self.sim.modelpath) + exepath = property(lambda self: self.sim.exepath) + tmp_modelpath = property(lambda self: self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) + tmp_exepath = property(lambda self: self.sim.tmp_exepath, lambda self, v: setattr(self.sim, "tmp_exepath", v)) + simulation_id = property(lambda self: self.sim.simulation_id) + stdout_filename = property(lambda self: self.sim.stdout_filename) + htcFile = property(lambda self: self.sim.htcFile) + additional_files = property(lambda self: self.sim.additional_files) + _input_sources = property(lambda self: self.sim._input_sources) + _output_sources = property(lambda self: self.sim._output_sources) + log_filename = property(lambda self: self.sim.log_filename) + + status = property(lambda self: self.sim.status, lambda self, v: setattr(self.sim, "status", v)) + is_simulating = property(lambda self: self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) def __str__(self): return self.resource.host - + + class LocalSimulationHost(SimulationHost): def __init__(self, simulation, resource=None): SimulationHost.__init__(self, simulation) @@ -68,37 +70,36 @@ class LocalSimulationHost(SimulationHost): def _prepare_simulation(self, input_files): # must be called through simulation object - self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) - self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.sim.exepath, self.sim.modelpath) ) + "/" + # self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) + # self.tmp_exepath = os.path.join(self.tmp_modelpath, os.path.relpath(self.sim.exepath, self.sim.modelpath) ) + "/" self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) for src_file in input_files: dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) # exist_ok does not exist in Python27 if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) #, exist_ok=True) + os.makedirs(os.path.dirname(dst)) # , exist_ok=True) shutil.copy(src_file, dst) if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: - print ("error copy ", dst) + print("error copy ", dst) stdout_folder = os.path.join(self.tmp_exepath, os.path.dirname(self.sim.stdout_filename)) if not os.path.exists(stdout_folder): - os.makedirs(stdout_folder) #, exist_ok=True) + os.makedirs(stdout_folder) # , exist_ok=True) self.logFile.filename = os.path.join(self.tmp_exepath, self.log_filename) self.simulationThread.modelpath = self.tmp_modelpath self.simulationThread.exepath = self.tmp_exepath - def _simulate(self): - #must be called through simulation object + # must be called through simulation object self.returncode, self.stdout = 1, "Simulation failed" self.simulationThread.start() - self.sim.set_id(self.sim.simulation_id, "Localhost(pid:%d)" % self.simulationThread.process.pid, self.tmp_modelpath) + self.sim.set_id(self.sim.simulation_id, "Localhost(pid:%d)" % + self.simulationThread.process.pid, self.tmp_modelpath) self.simulationThread.join() self.returncode, self.stdout = self.simulationThread.res self.logFile.update_status() self.errors.extend(list(set(self.logFile.errors))) - def _finish_simulation(self, output_files): missing_result_files = [] for src_file in output_files: @@ -106,39 +107,37 @@ class LocalSimulationHost(SimulationHost): # exist_ok does not exist in Python27 try: if not os.path.isdir(os.path.dirname(dst_file)): - os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) + os.makedirs(os.path.dirname(dst_file)) # , exist_ok=True) if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): shutil.copy(src_file, dst_file) - except: + except Exception: missing_result_files.append(dst_file) self.logFile.filename = os.path.join(self.sim.exepath, self.log_filename) if missing_result_files: - raise Warning("Failed to copy %s from %s"%(",".join(missing_result_files), self.resource.host)) - try: - shutil.rmtree(self.tmp_modelpath, ignore_errors=False) - except (PermissionError, OSError) as e: - try: - #change permissions and delete - for root, folders, files in os.walk(self.tmp_modelpath): - for folder in folders: - os.chmod(os.path.join(root, folder), 0o666) - for file in files: - os.chmod(os.path.join(root, file), 0o666) - shutil.rmtree(self.tmp_modelpath) - except (PermissionError, OSError) as e: - raise Warning("Fail to remove temporary files and folders on %s\n%s"%(self.resource.host, str(e))) - + raise Warning("Failed to copy %s from %s" % (",".join(missing_result_files), self.resource.host)) +# try: +# shutil.rmtree(self.tmp_modelpath, ignore_errors=False) +# except (PermissionError, OSError) as e: +# try: +# # change permissions and delete +# for root, folders, files in os.walk(self.tmp_modelpath): +# for folder in folders: +# os.chmod(os.path.join(root, folder), 0o666) +# for file in files: +# os.chmod(os.path.join(root, file), 0o666) +# shutil.rmtree(self.tmp_modelpath) +# except (PermissionError, OSError) as e: +# raise Warning("Fail to remove temporary files and folders on %s\n%s" % (self.resource.host, str(e))) def update_logFile_status(self): self.logFile.update_status() def stop(self): - self.simulationThread.stop() - if self.simulationThread.is_alive(): - self.simulationThread.join() - print ("simulatino_resources.stop joined") - + self.simulationThread.stop() + if self.simulationThread.is_alive(): + self.simulationThread.join() + print("simulatino_resources.stop joined") class SimulationThread(Thread): @@ -151,41 +150,42 @@ class SimulationThread(Thread): self.res = [0, "", ""] self.low_priority = low_priority - def start(self): CREATE_NO_WINDOW = 0x08000000 - exepath = self.exepath #overwritten in _prepare_simulation - modelpath = self.modelpath #overwritten in _prepare_simulation + exepath = self.exepath # overwritten in _prepare_simulation + modelpath = self.modelpath # overwritten in _prepare_simulation htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.exepath) - + hawc2exe = self.sim.hawc2exe stdout = self.sim.stdout_filename if not os.path.isdir(os.path.dirname(exepath + self.sim.stdout_filename)): os.makedirs(os.path.dirname(exepath + self.sim.stdout_filename)) - - with open (os.path.join(exepath, stdout), 'wb') as stdout: + + with open(os.path.join(exepath, stdout), 'wb') as stdout: if isinstance(hawc2exe, tuple): wine, hawc2exe = hawc2exe - self.process = subprocess.Popen(" ".join([wine, hawc2exe, htcfile]), stdout=stdout, stderr=STDOUT, shell=True, cwd=exepath) #shell must be True to inwoke wine + # shell must be True to inwoke wine + self.process = subprocess.Popen(" ".join([wine, hawc2exe, htcfile]), + stdout=stdout, stderr=STDOUT, shell=True, cwd=exepath) else: - self.process = subprocess.Popen([hawc2exe, htcfile], stdout=stdout, stderr=STDOUT, shell=False, cwd=exepath, creationflags=CREATE_NO_WINDOW) - #self.process.communicate() + self.process = subprocess.Popen([hawc2exe, htcfile], stdout=stdout, + stderr=STDOUT, shell=False, cwd=exepath, creationflags=CREATE_NO_WINDOW) + # self.process.communicate() import psutil try: self.sim.host.resource.process_name = psutil.Process(self.process.pid).name() - except: + except Exception: pass Thread.start(self) - def run(self): import psutil p = psutil.Process(os.getpid()) try: if self.low_priority: p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) - except: + except Exception: pass self.process.communicate() errorcode = self.process.returncode @@ -198,9 +198,9 @@ class SimulationThread(Thread): if hasattr(self, 'process'): try: subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) - except: + except Exception: pass - + class PBSClusterSimulationResource(SSHPBSClusterResource): def __init__(self, sshclient, min_cpu, min_free, init_cmd, wine_cmd, python_cmd, queue="workq"): @@ -209,19 +209,18 @@ class PBSClusterSimulationResource(SSHPBSClusterResource): self.wine_cmd = wine_cmd self.python_cmd = python_cmd self.queue = queue - - + def is_clean(self): return self.execute("find .hawc2launcher/ -type f | wc -l")[1] > 0 def clean(self): try: self.execute('rm .hawc2launcher -r -f') - except: + except Exception: pass try: self.shared_ssh.close() - except: + except Exception: pass def update_resource_status(self): @@ -229,31 +228,35 @@ class PBSClusterSimulationResource(SSHPBSClusterResource): _, out, _ = self.ssh.execute("find .hawc2launcher/ -name '*.out'") self.finished = set([f.split("/")[1] for f in out.split("\n") if "/" in f]) except Exception as e: - print ("resource_manager.update_status, out", str(e)) + print("resource_manager.update_status, out", str(e)) pass - + try: _, out, _ = self.ssh.execute("find .hawc2launcher -name 'status*' -exec cat {} \;") - self.loglines = {l.split(";")[0] : l.split(";")[1:] for l in out.split("\n") if ";" in l} + self.loglines = {l.split(";")[0]: l.split(";")[1:] for l in out.split("\n") if ";" in l} except Exception as e: - print ("resource_manager.update_status, status file", str(e)) + print("resource_manager.update_status, status file", str(e)) pass try: _, out, _ = self.ssh.execute("qstat -u %s" % self.username) self.is_executing = set([j.split(".")[0] for j in out.split("\n")[5:] if "." in j]) except Exception as e: - print ("resource_manager.update_status, qstat", str(e)) + print("resource_manager.update_status, qstat", str(e)) pass + class GormSimulationResource(PBSClusterSimulationResource): init_cmd = """export PATH=/home/python/miniconda3/bin:$PATH source activate wetb_py3""" queue = "workq" host = "gorm.risoe.dk" + def __init__(self, username, password, wine_cmd="WINEARCH=win32 WINEPREFIX=~/.wine32 wine"): from wetb.utils.cluster_tools.ssh_client import SSHClient - PBSClusterSimulationResource.__init__(self, SSHClient(self.host, username, password, 22), 25, 100, self.init_cmd, wine_cmd, "python", self.queue) + PBSClusterSimulationResource.__init__(self, SSHClient( + self.host, username, password, 22), 25, 100, self.init_cmd, wine_cmd, "python", self.queue) + class JessSimulationResource(PBSClusterSimulationResource): host = 'jess.dtu.dk' @@ -261,12 +264,12 @@ class JessSimulationResource(PBSClusterSimulationResource): source activate wetb_py3 WINEARCH=win32 WINEPREFIX=~/.wine32 winefix""" queue = "windq" - def __init__(self, username, password, wine_cmd="WINEARCH=win32 WINEPREFIX=~/.wine32 wine"): - - from wetb.utils.cluster_tools.ssh_client import SSHClient - PBSClusterSimulationResource.__init__(self, SSHClient(self.host, username, password, 22), 25, 600, self.init_cmd, wine_cmd, "python", self.queue) + def __init__(self, username, password, wine_cmd="WINEARCH=win32 WINEPREFIX=~/.wine32 wine"): + from wetb.utils.cluster_tools.ssh_client import SSHClient + PBSClusterSimulationResource.__init__(self, SSHClient( + self.host, username, password, 22), 25, 600, self.init_cmd, wine_cmd, "python", self.queue) class PBSClusterSimulationHost(SimulationHost): @@ -276,22 +279,22 @@ class PBSClusterSimulationHost(SimulationHost): self.pbsjob = SSHPBSJob(resource.new_ssh_connection()) self.resource = resource - hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) + hawc2exe = property(lambda self: os.path.basename(self.sim.hawc2exe)) def glob(self, filepattern, cwd="", recursive=False): return self.ssh.glob(filepattern, cwd, recursive) - + def get_datetime(self): v, out, err = self.ssh.execute('date "+%Y,%m,%d,%H,%M,%S"') if v == 0: return datetime.strptime(out.strip(), "%Y,%m,%d,%H,%M,%S") - #@print_time def _prepare_simulation(self, input_files): with self.ssh: self.ssh.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) self.ssh.execute("mkdir -p %s%s" % (self.tmp_exepath, os.path.dirname(self.log_filename))) - self.ssh.upload_files(self.modelpath, self.tmp_modelpath, file_lst = [os.path.relpath(f, self.modelpath) for f in input_files], callback=self.sim.progress_callback("Copy to host")) + self.ssh.upload_files(self.modelpath, self.tmp_modelpath, file_lst=[os.path.relpath( + f, self.modelpath) for f in input_files], callback=self.sim.progress_callback("Copy to host")) # for src_file in input_files: # dst = unix_path(self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)) # self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) @@ -303,39 +306,38 @@ class PBSClusterSimulationHost(SimulationHost): self.ssh.execute("mkdir -p %s%s" % (self.tmp_exepath, os.path.dirname(self.stdout_filename))) remote_log_filename = "%s%s" % (self.tmp_exepath, self.log_filename) self.ssh.execute("rm -f %s" % remote_log_filename) - - #@print_time + def _finish_simulation(self, output_files): with self.ssh: download_failed = [] try: - self.ssh.download_files(self.tmp_modelpath, self.modelpath, file_lst = [os.path.relpath(f, self.tmp_modelpath) for f in output_files], callback=self.sim.progress_callback("Copy from host") ) - except: -# -# for src_file in output_files: -# try: -# dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) -# os.makedirs(os.path.dirname(dst_file), exist_ok=True) -# self.download(src_file, dst_file, retry=10) -# except Exception as e: -# download_failed.append(dst_file) -# if download_failed: - raise Warning("Failed to download %s from %s"%(",".join(download_failed), self.ssh.host)) + self.ssh.download_files(self.tmp_modelpath, self.modelpath, file_lst=[os.path.relpath( + f, self.tmp_modelpath) for f in output_files], callback=self.sim.progress_callback("Copy from host")) + except Exception: + # + # for src_file in output_files: + # try: + # dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + # os.makedirs(os.path.dirname(dst_file), exist_ok=True) + # self.download(src_file, dst_file, retry=10) + # except Exception as e: + # download_failed.append(dst_file) + # if download_failed: + raise Warning("Failed to download %s from %s" % (",".join(download_failed), self.ssh.host)) else: try: self.ssh.execute('rm -r .hawc2launcher/%s' % self.simulation_id) finally: try: self.ssh.execute('rm .hawc2launcher/status_%s' % self.simulation_id) - except: - raise Warning("Fail to remove temporary files and folders on %s"%self.ssh.host) - + except Exception: + raise Warning("Fail to remove temporary files and folders on %s" % self.ssh.host) def _simulate(self): """starts blocking simulation""" self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") - self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_exepath , self.sim.stdout_filename) + self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_exepath, self.sim.stdout_filename) sleeptime = 1 while self.is_simulating: time.sleep(sleeptime) @@ -353,11 +355,9 @@ class PBSClusterSimulationHost(SimulationHost): try: self.ssh.download(self.tmp_exepath + self.log_filename, self.exepath + self.log_filename) except Exception: - raise Warning ("Logfile not found", self.tmp_modelpath + self.log_filename) + raise Warning("Logfile not found", self.tmp_modelpath + self.log_filename) self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.exepath) - - def update_logFile_status(self): def pbsjob_status(): if self.pbsjob._status in [NOT_SUBMITTED, DONE]: @@ -374,7 +374,7 @@ class PBSClusterSimulationHost(SimulationHost): self.logline = self.resource.loglines[self.simulation_id] self.status = self.logline[0] self.logFile = LogInfo(*self.logline[1:]) - + status = pbsjob_status() if status == pbsjob.NOT_SUBMITTED: pass @@ -385,19 +385,16 @@ class PBSClusterSimulationHost(SimulationHost): else: set_status() - def start(self): """Start non blocking distributed simulation""" self.non_blocking_simulation_thread.start() - - def abort(self): self.pbsjob.stop() self.stop() try: self.finish_simulation() - except: + except Exception: pass self.is_simulating = False self.is_done = True @@ -407,9 +404,6 @@ class PBSClusterSimulationHost(SimulationHost): def stop(self): self.is_simulating = False self.pbsjob.stop() - - - def pbsjobfile(self, ios=False): cp_back = "" @@ -419,10 +413,10 @@ class PBSClusterSimulationHost(SimulationHost): rel_htcfilename = fmt_path(os.path.relpath(self.htcFile.filename, self.exepath)) try: steps = self.htcFile.simulation.time_stop[0] / self.htcFile.simulation.newmark.deltat[0] - walltime = "%02d:00:00"%np.ceil(steps/500/60) - except: + walltime = "%02d:00:00" % np.ceil(steps / 500 / 60) + except Exception: walltime = "04:00:00" - init=""" + init = """ ### Standard Output #PBS -N h2l_%s ### merge stderr into stdout @@ -436,21 +430,21 @@ class PBSClusterSimulationHost(SimulationHost): #PBS -q %s ### Create scratch directory and copy data to it cd $PBS_O_WORKDIR -pwd"""% (self.simulation_id, self.stdout_filename, walltime, self.resource.queue) - copy_to=""" +pwd""" % (self.simulation_id, self.stdout_filename, walltime, self.resource.queue) + copy_to = """ cp -R %s /scratch/$USER/$PBS_JOBID ### Execute commands on scratch nodes cd /scratch/$USER/$PBS_JOBID%s -pwd"""%((".","../")[ios], ("", "/input")[ios]) - run=''' +pwd""" % ((".", "../")[ios], ("", "/input")[ios]) + run = ''' %s ### modelpath: %s -### htc: %s +### htc: %s echo "---------------------" %s -c "from wetb.hawc2.cluster_simulation import ClusterSimulation;ClusterSimulation('.','%s', ('%s','%s'))" echo "---------------------" echo $? -echo "---------------------"'''% (self.resource.init_cmd, self.modelpath, self.htcFile.filename, self.resource.python_cmd, rel_htcfilename, self.resource.wine_cmd, self.hawc2exe) +echo "---------------------"''' % (self.resource.init_cmd, self.modelpath, self.htcFile.filename, self.resource.python_cmd, rel_htcfilename, self.resource.wine_cmd, self.hawc2exe) copy_back = """ ### Copy back from scratch directory cd /scratch/$USER/$PBS_JOBID%s @@ -459,7 +453,4 @@ echo $PBS_JOBID cd /scratch/ ### rm -r $PBS_JOBID exit""" % (("", "/input")[ios], cp_back) - return init+copy_to+run+copy_back - - - + return init + copy_to + run + copy_back -- GitLab