From 66e7c1caf9c2e4802fe440345673911a647e23ac Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Mon, 4 Apr 2016 11:11:37 +0200 Subject: [PATCH] updating pbs job --- wetb/hawc2/simulation.py | 10 +-- wetb/utils/cluster_tools/pbsjob.py | 85 +++++++++++++------------- wetb/utils/cluster_tools/ssh_client.py | 16 ++++- 3 files changed, 63 insertions(+), 48 deletions(-) diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index d1d20e69..20d25e93 100644 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -28,7 +28,7 @@ QUEUED = "queued" #until start PREPARING = "Copy to host" # during prepare simulation INITIALIZING = "Initializing" #when starting SIMULATING = "Simulating" # when logfile.status=simulating -FINISH = "Finish" # when finish +FINISH = "Finish" # when HAWC2 finish ERROR = "Error" # when hawc2 returns error ABORTED = "Aborted" # when stopped and logfile.status != Done CLEANED = "Cleaned" # after copy back @@ -87,7 +87,7 @@ class Simulation(object): if self.logFile.status == log_file.SIMULATING: self._status = SIMULATING - if self.logFile.status == log_file.DONE: + if self.logFile.status == log_file.DONE and self.is_simulating is False: self._status = FINISH def show_status(self): @@ -282,11 +282,12 @@ class Simulation(object): class SimulationThread(Thread): - def __init__(self, simulation): + def __init__(self, simulation, low_priority=True): Thread.__init__(self) self.sim = simulation self.modelpath = self.sim.modelpath self.res = [0, "", ""] + self.low_priority = low_priority def start(self): @@ -304,7 +305,8 @@ class SimulationThread(Thread): def run(self): p = psutil.Process(os.getpid()) - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) self.process.communicate() errorcode = self.process.returncode with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid: diff --git a/wetb/utils/cluster_tools/pbsjob.py b/wetb/utils/cluster_tools/pbsjob.py index ae18eaf8..8679e199 100644 --- a/wetb/utils/cluster_tools/pbsjob.py +++ b/wetb/utils/cluster_tools/pbsjob.py @@ -4,41 +4,35 @@ Created on 04/12/2015 @author: mmpe ''' -#import x + import time -from wetb.utils.cluster_tools.ssh_client import SSHClient import os import paramiko -import subprocess - NOT_SUBMITTED = "Job not submitted" PENDING = "Pending" RUNNING = "Running" DONE = "Done" + + class PBSJob(object): _status = NOT_SUBMITTED nodeid = None - def __init__(self, host, username, password): - self.client = SSHClient(host, username, password, port=22) - - - def execute(self, cmd, cwd="./"): - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd) - stdout, stderr = proc.communicate() - errorcode = proc.returncode - return errorcode, stdout.decode(), stderr.decode() + def __init__(self, sshclient): + self.ssh = sshclient def submit(self, job, cwd, pbs_out_file): self.cwd = cwd self.pbs_out_file = os.path.relpath(cwd + pbs_out_file).replace("\\", "/") self.nodeid = None - try: - os.remove (self.pbs_out_file) - except FileNotFoundError: - pass - _, out, _ = self.execute("qsub %s" % job, cwd) + #self.execute() + + cmds = ['rm -f %s' % self.pbs_out_file] + if cwd != "": + cmds.append("cd %s" % cwd) + cmds.append("qsub %s" % job) + _, out, _ = self.ssh.execute(";".join(cmds)) self.jobid = out.split(".")[0] self._status = PENDING @@ -47,43 +41,50 @@ class PBSJob(object): if self._status in [NOT_SUBMITTED, DONE]: return self._status - if self.nodeid is None: - self.nodeid = self.get_nodeid() - if self.nodeid is not None: - self._status = RUNNING - - if self.in_queue() and self.nodeid is None: - self._status = PENDING - elif os.path.isfile(self.pbs_out_file): +# if self.nodeid is None: +# self.nodeid = self.get_nodeid() +# if self.nodeid is not None: +# self._status = RUNNING + if self.is_executing(): + self._status = RUNNING + elif self.ssh.file_exists(self.pbs_out_file): self._status = DONE return self._status def get_nodeid(self): - errorcode, out, err = self.execute("qstat -f %s | grep exec_host" % self.jobid) - if errorcode == 0: - return out.strip().replace("exec_host = ", "").split(".")[0] - elif errorcode == 1 and out == "": - return None - elif errorcode == 153 and 'qstat: Unknown Job Id' in err: + try: + _, out, _ = self.ssh.execute("qstat -f %s | grep exec_host" % self.jobid) + return out.strip().replace("exec_host = ", "").split(".")[0] + except Warning as e: + if 'qstat: Unknown Job Id' in str(e): return None - else: - raise Exception(str(errorcode) + out + err) + #raise e def stop(self): try: - self.execute("qdel %s" % self.jobid) + self.ssh.execute("qdel %s" % self.jobid) except Warning as e: if 'qdel: Unknown Job Id' in str(e): return raise e - def in_queue(self): - errorcode, out, err = self.execute("qstat %s" % self.jobid) - if errorcode == 0: + def is_executing(self): + try: + self.ssh.execute("qstat %s" % self.jobid) return True - elif 'qstat: Unknown Job Id' in str(err): - return False - else: - raise Exception(str(errorcode) + out + err) + except Warning as e: + if 'qstat: Unknown Job Id' in str(e): + return False + raise e + + +if __name__ == "__main__": + x = None + username, password = "mmpe", x.password #q.get_login("mmpe") + pbsjob = PBSJob('gorm', username, password, 22) + #pbsjob.submit("pbsjob", ".hawc2launcher/__1__/", "pbs_out.txt") + pbsjob.nodeid = "g-080" + print (pbsjob.execute_on_node("tail -20 /scratch/mmpe/1996208.g-000.risoe.dk/logfiles/structure_wind.log\n")) + diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py index 1d70e472..a3dac380 100644 --- a/wetb/utils/cluster_tools/ssh_client.py +++ b/wetb/utils/cluster_tools/ssh_client.py @@ -19,6 +19,8 @@ class SSHClient(object): self.password = password self.port = port self.key = key + self.disconnect = 0 + self.client = None if key is not None: self.key = paramiko.RSAKey.from_private_key(StringIO(key), password=passphrase) @@ -26,6 +28,12 @@ class SSHClient(object): return self.host, self.username, self.password, self.port def __enter__(self): + self.disconnect += 1 + if self.client is None: + self.connect() + + def connect(self): + print ("start connect") self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(self.host, self.port, username=self.username, password=self.password, pkey=self.key, timeout=self.TIMEOUT) @@ -33,10 +41,13 @@ class SSHClient(object): self.transport = paramiko.Transport((self.host, self.port)) self.transport.connect(username=self.username, password=self.password) self.sftp = paramiko.SFTPClient.from_transport(self.transport) + print ("End connect") return self def __exit__(self, *args): - self.close() + self.disconnect -= 1 + if self.disconnect == 0: + self.close() def download(self, remotefilepath, localfile, verbose=False): @@ -68,6 +79,7 @@ class SSHClient(object): self.client = None self.sftp.close() self.transport.close() + self.disconnect = False def file_exists(self, filename): _, out, _ = (self.execute('[ -f %s ] && echo "File exists" || echo "File does not exists"' % filename.replace("\\", "/"))) @@ -133,7 +145,7 @@ class SSHClient(object): if __name__ == "__main__": from mmpe.ui.qt_ui import QtInputUI q = QtInputUI(None) - import x + x = None username, password = "mmpe", x.password #q.get_login("mmpe") -- GitLab