From f71df4ae2e6e88e0e8cf64f8d066903bbe900500 Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Tue, 5 Apr 2016 15:23:38 +0200 Subject: [PATCH] updates --- wetb/hawc2/simulation.py | 89 ++++++++++++------------------ wetb/utils/cluster_tools/pbsjob.py | 15 +++-- 2 files changed, 44 insertions(+), 60 deletions(-) diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index 39410ab1..8ab8af92 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -44,12 +44,11 @@ class Simulation(object): self.folder = os.path.dirname(htcfilename) if not os.path.isabs(htcfilename): htcfilename = os.path.join(modelpath, htcfilename) - self.htcfilename = htcfilename self.filename = os.path.basename(htcfilename) self.htcFile = HTCFile(htcfilename) self.time_stop = self.htcFile.simulation.time_stop[0] self.copy_turbulence = True - self.simulation_id = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) + self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) self.stdout_filename = "%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: self.log_filename = self.htcFile.simulation.logfile[0] @@ -283,9 +282,7 @@ class SimulationThread(Thread): self.res = errorcode, stdout def stop(self): - print ("stop") subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) - print ("stop2") class UpdateStatusThread(Thread): def __init__(self, simulation, interval=1): @@ -305,7 +302,7 @@ class UpdateStatusThread(Thread): class SimulationResource(object): def __init__(self, simulation): self.sim = simulation - logFile = property(lambda self : self.sim.logFile) + logFile = property(lambda self : self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v)) errors = property(lambda self : self.sim.errors) modelpath = property(lambda self : self.sim.modelpath) tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) @@ -315,7 +312,9 @@ class SimulationResource(object): input_sources = property(lambda self : self.sim.input_sources) output_sources = property(lambda self : self.sim.output_sources) log_filename = property(lambda self : self.sim.log_filename) + status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) + is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) class LocalSimulationResource(SimulationResource, LocalResource): @@ -381,30 +380,32 @@ class LocalSimulationResource(SimulationResource, LocalResource): -class PBSClusterSimulationResouce(PBSClusterResource): - def __init__(self, simulation, htcfilename, host, username, password, port=22): +class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): + def __init__(self, simulation, host, username, password, port=22): + SimulationResource.__init__(self, simulation) PBSClusterResource.__init__(self, host, username, password, port) - self.htc = htcfilename self.pbsjob = PBSJob(self) + hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) + def _prepare_simulation(self): - with self.ssh: - self.ssh.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) - self.ssh.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) + with self: + self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) + self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) for src in self.input_sources(): for src_file in glob.glob(os.path.join(self.modelpath, src)): dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/") - self.ssh.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) - self.ssh.upload(src_file, dst, verbose=False) + self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) + self.upload(src_file, dst, verbose=False) ##assert self.ssh.file_exists(dst) f = io.StringIO(self.pbsjobfile(self.simulation_id)) f.seek(0) - self.ssh.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) + self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) - self.ssh.execute("rm -f %s" % remote_log_filename) + self.execute("rm -f %s" % remote_log_filename) @@ -432,7 +433,7 @@ class PBSClusterSimulationResouce(PBSClusterResource): sleeptime = 1 print ("simulate2", self.simulation_id) while self.is_simulating: - self.update_status() + #self.__update_logFile_status() time.sleep(sleeptime) print ("simulate3", self.simulation_id) @@ -453,36 +454,27 @@ class PBSClusterSimulationResouce(PBSClusterResource): self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) -# @property -# def status(self): -# return self._status -# -# @status.setter -# def status(self, status): -# self._status = status - - def update_status(self, *args, **kwargs): + def update_logFile_status(self): status = self.pbsjob.status if status == pbsjob.NOT_SUBMITTED: - return self.status + pass elif status == pbsjob.DONE: self.is_simulating = False - return self.status - try: - _, out, _ = self.ssh.execute("cat .hawc2launcher/status_%s" % self.simulation_id) - out = out.split(";") - if len(out) == 5: - self.status = out[0] - self.logFile = LogInfo(*out[1:]) - - except Exception as e: - if "No such file or directory" in str(e): - pass - else: - raise - - Simulation.update_status(self) + pass + else: + try: + _, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id) + out = out.split(";") + if len(out) == 5: + self.status = out[0] + self.logFile = LogInfo(*out[1:]) + + except Exception as e: + if "No such file or directory" in str(e): + pass + else: + raise def start(self): """Start non blocking distributed simulation""" @@ -509,10 +501,10 @@ class PBSClusterSimulationResouce(PBSClusterResource): for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) - rel_htcfilename = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "/") + rel_htcfilename = os.path.relpath(self.htcFile.filename, self.modelpath).replace("\\", "/") return """ ### Standard Output -#PBS -N %s +#PBS -N h2l_%s ### merge stderr into stdout #PBS -j oe #PBS -o %s.out @@ -545,14 +537,3 @@ exit""" % (simulation_id, simulation_id, rel_htcfilename, self.hawc2exe, cp_back - -if __name__ == "__main__": - sim = Simulation('C:\mmpe\HAWC2\Hawc2_model/', 'htc/long1/long01.htc', hawc2exe="hawc2-123_beta.exe") - sim.start() - - for i in range(10): - sim.show_status() - time.sleep(0.1) - sim.abort() - sim.show_status() - sim.show_status() diff --git a/wetb/utils/cluster_tools/pbsjob.py b/wetb/utils/cluster_tools/pbsjob.py index 8679e199..f4e5cfdc 100644 --- a/wetb/utils/cluster_tools/pbsjob.py +++ b/wetb/utils/cluster_tools/pbsjob.py @@ -19,6 +19,7 @@ DONE = "Done" class PBSJob(object): _status = NOT_SUBMITTED nodeid = None + jobid = None def __init__(self, sshclient): self.ssh = sshclient @@ -49,6 +50,7 @@ class PBSJob(object): self._status = RUNNING elif self.ssh.file_exists(self.pbs_out_file): self._status = DONE + self.jobid = None return self._status def get_nodeid(self): @@ -61,12 +63,13 @@ class PBSJob(object): #raise e def stop(self): - try: - self.ssh.execute("qdel %s" % self.jobid) - except Warning as e: - if 'qdel: Unknown Job Id' in str(e): - return - raise e + if self.jobid: + try: + self.ssh.execute("qdel %s" % self.jobid) + except Warning as e: + if 'qdel: Unknown Job Id' in str(e): + return + raise e def is_executing(self): -- GitLab