From 22a5b66822f1c11b6bd5b8d5cd0a272700683851 Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Thu, 7 Apr 2016 11:03:15 +0200 Subject: [PATCH] doc + clean up --- wetb/hawc2/simulation.py | 260 ++++++++++++++++++++++----------------- 1 file changed, 147 insertions(+), 113 deletions(-) diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index 91ddabf0..3d3c114d 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -39,9 +39,41 @@ ABORTED = "Aborted" # when stopped and logfile.status != Done CLEANED = "Cleaned" # after copy back class Simulation(object): + """Class for doing hawc2 simulations + + + + + Examples + -------- + >>> sim = Simulation("<path>/MyModel","htc/MyHtc") + + Blocking inplace simulation + >>> sim.simulate() + + Non-blocking distributed simulation(everything copied to temporary folder)\n + Starts two threads: + - non_blocking_simulation_thread: + - prepare_simulation() # copy to temporary folder + - simulate() # simulate + - finish_simulation # copy results back again + - updateStatusThread: + - update status every second + >>> sim.start() + >>> while sim.status!=CLEANED: + >>> sim.show_status() + + + The default host is LocalSimulationHost. To simulate on pbs featured cluster + >>> sim.host = PBSClusterSimulationHost(sim, <hostname>, <username>, <password>): + >>> sim.start() + >>> while sim.status!=CLEANED: + >>> sim.show_status() + """ + is_simulating = False status = QUEUED - def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"): + def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True): self.modelpath = os.path.abspath(modelpath) + "/" self.folder = os.path.dirname(htcfilename) if not os.path.isabs(htcfilename): @@ -49,7 +81,8 @@ class Simulation(object): self.filename = os.path.basename(htcfilename) self.htcFile = HTCFile(htcfilename) self.time_stop = self.htcFile.simulation.time_stop[0] - self.copy_turbulence = True + self.hawc2exe = hawc2exe + self.copy_turbulence = copy_turbulence self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_").replace("/", "_") + "_%d" % id(self) self.stdout_filename = "stdout/%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: @@ -66,31 +99,52 @@ class Simulation(object): self.logFile.clear() self.last_status = self.status self.errors = [] - self.thread = Thread(target=self.simulate_distributed) - self.hawc2exe = hawc2exe + self.non_blocking_simulation_thread = Thread(target=self.simulate_distributed) self.updateStatusThread = UpdateStatusThread(self) self.host = LocalSimulationHost(self) - def input_sources(self): - def fmt(src): - if os.path.isabs(src): - src = os.path.relpath(os.path.abspath(src), self.modelpath) - else: - src = os.path.relpath (src) - assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src - return src - return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] - def output_sources(self): - def fmt(dst): - if os.path.isabs(dst): - dst = os.path.relpath(os.path.abspath(dst), self.modelpath) - else: - dst = os.path.relpath (dst) - dst = dst.replace("\\", "/") - assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst - return dst - return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] + def start(self, update_interval=1): + """Start non blocking distributed simulation""" + self.is_simulating = True + self.updateStatusThread.start() + self.non_blocking_simulation_thread.start() + + def wait(self): + self.non_blocking_simulation_thread.join() + self.update_status() + + def abort(self): + self.host.stop() + for _ in range(100): + if self.is_simulating: + break + time.sleep(0.1) + if self.logFile.status not in [log_file.DONE]: + self.status = ABORTED + self.update_status() + + def show_status(self): + #print ("log status:", self.logFile.status) + if self.logFile.status == log_file.SIMULATING: + if self.last_status != log_file.SIMULATING: + print ("|" + ("-"*50) + "|" + ("-"*49) + "|") + sys.stdout.write("|") + sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0))) + sys.stdout.flush() + self.last_pct = self.logFile.pct + elif self.last_status == log_file.SIMULATING: + sys.stdout.write("."*(100 - self.last_pct) + "|") + sys.stdout.flush() + print ("\n") + elif self.logFile.status == log_file.UNKNOWN: + print (self.status) + else: + print (self.logFile.status) + if self.logFile.status != log_file.SIMULATING: + if self.logFile.errors: + print (self.logFile.errors) + self.last_status = self.logFile.status def prepare_simulation(self): @@ -146,31 +200,33 @@ class Simulation(object): if self.logFile.status == log_file.DONE and self.is_simulating is False: self.status = FINISH + def _input_sources(self): + def fmt(src): + if os.path.isabs(src): + src = os.path.relpath(os.path.abspath(src), self.modelpath) + else: + src = os.path.relpath (src) + assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src + return src + return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] + + def _output_sources(self): + def fmt(dst): + if os.path.isabs(dst): + dst = os.path.relpath(os.path.abspath(dst), self.modelpath) + else: + dst = os.path.relpath (dst) + dst = dst.replace("\\", "/") + assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst + return dst + return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] + + def __str__(self): return "Simulation(%s)" % self.filename - def show_status(self): - #print ("log status:", self.logFile.status) - if self.logFile.status == log_file.SIMULATING: - if self.last_status != log_file.SIMULATING: - print ("|" + ("-"*50) + "|" + ("-"*49) + "|") - sys.stdout.write("|") - sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0))) - sys.stdout.flush() - self.last_pct = self.logFile.pct - elif self.last_status == log_file.SIMULATING: - sys.stdout.write("."*(100 - self.last_pct) + "|") - sys.stdout.flush() - print ("\n") - elif self.logFile.status == log_file.UNKNOWN: - print (self.status) - else: - print (self.logFile.status) - if self.logFile.status != log_file.SIMULATING: - if self.logFile.errors: - print (self.logFile.errors) - self.last_status = self.logFile.status + def additional_files(self): @@ -220,74 +276,13 @@ class Simulation(object): continue def get_confirmation(self, title, msg): + """override in subclass""" return True def show_message(self, msg, title="Information"): print (msg) - def start(self, update_interval=1): - """Start non blocking distributed simulation""" - self.is_simulating = True - self.updateStatusThread.start() - self.thread.start() - - def wait(self): - self.thread.join() - self.update_status() - - def abort(self): - self.host.stop() - for _ in range(100): - if self.is_simulating: - break - time.sleep(0.1) -# try: -# self.finish_simulation() -# except Exception as e: -# print (str(e)) -# pass - if self.logFile.status not in [log_file.DONE]: - self.status = ABORTED - self.update_status() - -class SimulationThread(Thread): - - def __init__(self, simulation, low_priority=True): - Thread.__init__(self) - self.sim = simulation - self.modelpath = self.sim.modelpath - self.res = [0, "", ""] - self.low_priority = low_priority - - - def start(self): - CREATE_NO_WINDOW = 0x08000000 - modelpath = self.modelpath - htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) - hawc2exe = self.sim.hawc2exe - stdout = self.sim.stdout_filename - if os.name == "nt": - self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) - else: - self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) - Thread.start(self) - - - def run(self): - p = psutil.Process(os.getpid()) - if self.low_priority: - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) - self.process.communicate() - errorcode = self.process.returncode - if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)): - os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename)) - with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid: - stdout = fid.read() - self.res = errorcode, stdout - - def stop(self): - subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) class UpdateStatusThread(Thread): def __init__(self, simulation, interval=1): @@ -315,8 +310,8 @@ class SimulationResource(object): stdout_filename = property(lambda self : self.sim.stdout_filename) htcFile = property(lambda self : self.sim.htcFile) additional_files = property(lambda self : self.sim.additional_files) - input_sources = property(lambda self : self.sim.input_sources) - output_sources = property(lambda self : self.sim.output_sources) + _input_sources = property(lambda self : self.sim._input_sources) + _output_sources = property(lambda self : self.sim._output_sources) log_filename = property(lambda self : self.sim.log_filename) status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) @@ -334,7 +329,7 @@ class LocalSimulationHost(SimulationResource): # must be called through simulation object self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) - for src in self.input_sources(): + for src in self._input_sources(): for src_file in glob.glob(os.path.join(self.modelpath, src)): dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) # exist_ok does not exist in Python27 @@ -361,7 +356,7 @@ class LocalSimulationHost(SimulationResource): def _finish_simulation(self): - for dst in self.output_sources(): + for dst in self._output_sources(): src = os.path.join(self.tmp_modelpath, dst) for src_file in glob.glob(src): @@ -388,6 +383,45 @@ class LocalSimulationHost(SimulationResource): +class SimulationThread(Thread): + + def __init__(self, simulation, low_priority=True): + Thread.__init__(self) + self.sim = simulation + self.modelpath = self.sim.modelpath + self.res = [0, "", ""] + self.low_priority = low_priority + + + def start(self): + CREATE_NO_WINDOW = 0x08000000 + modelpath = self.modelpath + htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) + hawc2exe = self.sim.hawc2exe + stdout = self.sim.stdout_filename + if os.name == "nt": + self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) + else: + self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) + Thread.start(self) + + + def run(self): + p = psutil.Process(os.getpid()) + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + self.process.communicate() + errorcode = self.process.returncode + if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)): + os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename)) + with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid: + stdout = fid.read() + self.res = errorcode, stdout + + def stop(self): + subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) + + class PBSClusterSimulationHost(SimulationResource, SSHClient): def __init__(self, simulation, host, username, password, port=22): SimulationResource.__init__(self, simulation) @@ -402,7 +436,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) - for src in self.input_sources(): + for src in self._input_sources(): for src_file in glob.glob(os.path.join(self.modelpath, src)): dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/") self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) @@ -420,7 +454,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def _finish_simulation(self): with self: - for dst in self.output_sources(): + for dst in self._output_sources(): src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/") for src_file in self.glob(src): @@ -482,7 +516,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def start(self): """Start non blocking distributed simulation""" - self.thread.start() + self.non_blocking_simulation_thread.start() -- GitLab