From 8b72969dd0669fe2739539a059179871c912c953 Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Fri, 22 Jan 2016 10:53:16 +0100 Subject: [PATCH] works without logfile specification + more --- wetb/hawc2/simulation.py | 136 ++++++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 46 deletions(-) diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index ab067706..eb2ad0b3 100644 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -12,21 +12,24 @@ import json import glob from wetb.hawc2 import log_file import re +import threading QUEUED = "queued" #until start -INITIALIZING = "initializing" #when starting -SIMULATING = "SIMULATING" # when logfile.status=simulating -FINISH = "finish" # when finish -ERROR = "error" # when hawc2 returns error -ABORTED = "aborted" # when stopped and logfile.status != Done -CLEANED = "cleaned" # after copy back +PREPARING = "Copy to host" # during prepare simulation +INITIALIZING = "Initializing" #when starting +SIMULATING = "Simulating" # when logfile.status=simulating +FINISH = "Finish" # when finish +ERROR = "Error" # when hawc2 returns error +ABORTED = "Aborted" # when stopped and logfile.status != Done +CLEANED = "Cleaned" # after copy back class Simulation(object): is_simulating = False + _status = QUEUED def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"): - self.modelpath = modelpath + self.modelpath = os.path.abspath(modelpath) + "/" self.folder = os.path.dirname(htcfilename) if not os.path.isabs(htcfilename): htcfilename = os.path.join(modelpath, htcfilename) @@ -35,20 +38,41 @@ class Simulation(object): self.htcFile = HTCFile(htcfilename) self.time_stop = self.htcFile.simulation.time_stop[0] self.copy_turbulence = True - self.log_filename = self.htcFile.simulation.logfile[0] + self.simulation_id = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) + self.stdout_filename = "%s.out" % self.simulation_id + if 'logfile' in self.htcFile.simulation: + self.log_filename = self.htcFile.simulation.logfile[0] + else: + self.log_filename = self.stdout_filename if os.path.isabs(self.log_filename): self.log_filename = os.path.relpath(self.log_filename, self.modelpath) + else: + self.log_filename = os.path.relpath(self.log_filename) + self.log_filename = self.log_filename.replace("\\", "/") self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) - self._status = QUEUED + self.logFile.clear() + self.last_status = self._status self.errors = [] - self.thread = Thread(target=self.simulate) - self.simulationThread = SimulationThread(self.modelpath, self.htcFile.filename, hawc2exe) - self.timer = RepeatedTimer(1, self.update_status) + self.thread = Thread(target=self.simulate_distributed) + self.dist_thread = Thread() + self.hawc2exe = hawc2exe + self.simulationThread = SimulationThread(self) + self.timer = RepeatedTimer(self.update_status) + def __str__(self): return "Simulation(%s)" % self.filename + @property + def status(self): + return self._status + + @status.setter + def status(self, status): + self._status = status + self.show_status() + def update_status(self, *args, **kwargs): if self.status in [INITIALIZING, SIMULATING]: self.logFile.update_status() @@ -59,25 +83,25 @@ class Simulation(object): self._status = FINISH def show_status(self): - status = self.logFile.status() - if status[1] == SIMULATING: - if self.last_status[1] != SIMULATING: + #print ("log status:", self.logFile.status) + if self.logFile.status == log_file.SIMULATING: + if self.last_status != log_file.SIMULATING: print ("|" + ("-"*50) + "|" + ("-"*49) + "|") sys.stdout.write("|") - #print (status) - sys.stdout.write("."*(status[0] - self.last_status[0])) + sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0))) sys.stdout.flush() - elif self.last_status[1] == SIMULATING and status[1] != SIMULATING: - sys.stdout.write("."*(status[0] - self.last_status[0]) + "|") + self.last_pct = self.logFile.pct + elif self.last_status == log_file.SIMULATING: + sys.stdout.write("."*(100 - self.last_pct) + "|") sys.stdout.flush() print ("\n") - if status[1] != SIMULATING: + else: + print (self.logFile.status) + if self.logFile.status != log_file.SIMULATING: + if self.logFile.errors: + print (self.logFile.errors) + self.last_status = self.logFile.status - if status[2]: - print (status[1:]) - else: - print (status[1]) - self.last_status = status def additional_files(self): @@ -95,8 +119,10 @@ class Simulation(object): with open(additional_files_file, 'w') as fid: json.dump(additional_files, fid) - def prepare_simulation(self, id): - self.tmp_modelpath = os.path.join(self.modelpath, id + "/") + def prepare_simulation(self): + self.status = PREPARING + + self.tmp_modelpath = os.path.join(self.modelpath, "tmp_%s/" % self.simulation_id) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', []): @@ -119,7 +145,12 @@ class Simulation(object): def finish_simulation(self): - if self.status == CLEANED: return + lock = threading.Lock() + with lock: + if self.status == CLEANED: return + if self.status != ERROR: + self.status = CLEANED + files = self.htcFile.output_files() if self.copy_turbulence: files.extend(self.htcFile.turbulence_files()) @@ -141,17 +172,17 @@ class Simulation(object): shutil.rmtree(self.tmp_modelpath) except (PermissionError, OSError) as e: raise Warning(str(e)) - if self.status != ERROR: - self.status = CLEANED def simulate(self): + #starts blocking simulation self.is_simulating = True self.errors = [] self.logFile.clear() self.status = INITIALIZING + self.returncode, self.stdout = 1, "Simulation failed" self.simulationThread.start() self.simulationThread.join() self.returncode, self.stdout = self.simulationThread.res @@ -160,10 +191,17 @@ class Simulation(object): self.status = ERROR self.is_simulating = False self.logFile.update_status() - if self.returncode or self.errors: + if self.returncode: raise Exception("Simulation error:\n" + "\n".join(self.errors)) - if self.logFile.status != log_file.DONE or self.logFile.errors: + elif self.logFile.status != log_file.DONE or self.errors or self.logFile.errors: raise Warning("Simulation succeded with errors:\nLog status:%s\n" % self.logFile.status + "\n".join(self.logFile.errors)) + else: + self.status = FINISH + + def simulate_distributed(self): + self.prepare_simulation() + self.simulate() + self.finish_simulation() @@ -195,7 +233,8 @@ class Simulation(object): print (msg) def start(self): - """Start non blocking simulation""" + """Start non blocking distributed simulation""" + self.timer.start(1000) self.thread.start() def stop(self): @@ -207,6 +246,7 @@ class Simulation(object): pass if self.logFile.status not in [log_file.DONE]: self.status = ABORTED + self.update_status() #class SimulationProcess(Process): # @@ -227,11 +267,9 @@ class Simulation(object): class SimulationThread(Thread): - def __init__(self, modelpath, htcfile, hawc2exe): + def __init__(self, simulation): Thread.__init__(self) - self.modelpath = modelpath - self.htcfile = os.path.relpath(htcfile, self.modelpath) - self.hawc2exe = hawc2exe + self.sim = simulation self.res = [0, "", ""] @@ -239,24 +277,29 @@ class SimulationThread(Thread): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW CREATE_NO_WINDOW = 0x08000000 - self.process = subprocess.Popen([self.hawc2exe, self.htcfile], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, cwd=self.modelpath, creationflags=CREATE_NO_WINDOW) + modelpath = self.modelpath + htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) + hawc2exe = self.sim.hawc2exe + stdout = self.sim.stdout_filename + self.process = subprocess.Popen("%s %s 1> %s 2>&1" % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath, creationflags=CREATE_NO_WINDOW) Thread.start(self) def run(self): p = psutil.Process(os.getpid()) - p.nice = psutil.BELOW_NORMAL_PRIORITY_CLASS - stdout, _ = self.process.communicate() + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + self.process.communicate() errorcode = self.process.returncode - self.res = errorcode, stdout.decode() + with open(self.modelpath + self.sim.stdout_filename) as fid: + stdout = fid.read() + self.res = errorcode, stdout class RepeatedTimer(object): - def __init__(self, interval, function, *args, **kwargs): + def __init__(self, function, *args, **kwargs): self._timer = None - self.interval = interval self.function = function self.args = args self.kwargs = kwargs @@ -265,12 +308,13 @@ class RepeatedTimer(object): def _run(self): self.is_running = False - self.start() + self.start(self.interval) self.function(*self.args, **self.kwargs) - def start(self): + def start(self, interval_ms=None): + self.interval = interval_ms if not self.is_running: - self._timer = Timer(self.interval, self._run) + self._timer = Timer(interval_ms / 1000, self._run) self._timer.start() self.is_running = True -- GitLab