From e7e88be2b732ca8ca297e242d4c9e81d2a76f1dd Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Wed, 11 May 2016 12:40:26 +0200 Subject: [PATCH] different things --- wetb/hawc2/simulation.py | 59 +++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index aae4a609..9874f3e8 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -23,8 +23,9 @@ from future import standard_library from wetb.utils.cluster_tools import pbsjob from wetb.utils.cluster_tools.cluster_resource import LocalResource -from wetb.utils.cluster_tools.pbsjob import SSHPBSJob +from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, DONE, NOT_SUBMITTED from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.timing import print_time standard_library.install_aliases() from threading import Thread @@ -98,7 +99,6 @@ class Simulation(object): else: self.log_filename = os.path.relpath(self.log_filename) self.log_filename = unix_path(self.log_filename) - self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile.clear() self.last_status = self.status @@ -129,6 +129,8 @@ class Simulation(object): time.sleep(0.1) if self.logFile.status not in [log_file.DONE]: self.status = ABORTED + self.is_simulating = False + self.is_done = True if update_status: self.update_status() @@ -191,6 +193,7 @@ class Simulation(object): self.status = INITIALIZING self.logFile.clear() self.host._simulate() + self.returncode, self.stdout = self.host.returncode, self.host.stdout if self.host.returncode or 'error' in self.host.stdout.lower(): if "error" in self.host.stdout.lower(): self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()]))) @@ -244,6 +247,7 @@ class Simulation(object): self.status = FINISH + def __str__(self): return "Simulation(%s)" % self.filename @@ -335,10 +339,12 @@ class UpdateStatusThread(Thread): Thread.start(self) def run(self): - print ("Wrong updatestatus") while self.simulation.is_done is False: self.simulation.update_status() - time.sleep(self.interval) + time.sleep(0.5) + t = time.time() + while self.simulation.is_simulating and time.time() < t + self.interval: + time.sleep(1) class SimulationResource(object): @@ -455,8 +461,11 @@ class SimulationThread(Thread): def run(self): import psutil p = psutil.Process(os.getpid()) - if self.low_priority: - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + try: + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + except: + pass self.process.communicate() errorcode = self.process.returncode @@ -473,7 +482,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def __init__(self, simulation, resource, host, username, password, port=22): SimulationResource.__init__(self, simulation) SSHClient.__init__(self, host, username, password, port=port) - self.pbsjob = SSHPBSJob(host, username, password, port) + self.pbsjob = SSHPBSJob(resource.shared_ssh) self.resource = resource hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) @@ -505,8 +514,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): try: dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) os.makedirs(os.path.dirname(dst_file), exist_ok=True) - self.download(src_file, dst_file, verbose=False) - except ValueError as e: + self.download(src_file, dst_file, retry=3) + except Exception as e: print (self.modelpath, src_file, self.tmp_modelpath) raise e try: @@ -545,25 +554,29 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def update_logFile_status(self): - status = self.pbsjob.status + def pbsjob_status(): + if self.pbsjob._status in [NOT_SUBMITTED, DONE]: + return self.pbsjob._status + if self.pbsjob.jobid in self.resource.is_executing: + self.pbsjob._status = pbsjob.RUNNING + elif self.simulation_id in self.resource.finished: + self.pbsjob._status = DONE + self.pbsjob.jobid = None + return self.pbsjob._status + + + status = pbsjob_status() if status == pbsjob.NOT_SUBMITTED: pass elif status == pbsjob.DONE: self.is_simulating = False pass else: - try: - _, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id) - out = out.split(";") - if len(out) == 5: - self.status = out[0] - self.logFile = LogInfo(*out[1:]) - - except Exception as e: - if "No such file or directory" in str(e): - pass - else: - raise + if self.simulation_id in self.resource.loglines: + self.logline = self.resource.loglines[self.simulation_id] + self.status = self.logline[0] + self.logFile = LogInfo(*self.logline[1:]) + def start(self): """Start non blocking distributed simulation""" @@ -578,6 +591,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): self.finish_simulation() except: pass + self.is_simulating = False + self.is_done = True if self.status != ERROR and self.logFile.status not in [log_file.DONE]: self.status = ABORTED -- GitLab