Skip to content
Snippets Groups Projects
Commit 66e7c1ca authored by mads's avatar mads
Browse files

updating pbs job

parent 01c9b86a
No related branches found
No related tags found
2 merge requests!6Clustertools,!5Clustertools
......@@ -28,7 +28,7 @@ QUEUED = "queued" #until start
PREPARING = "Copy to host" # during prepare simulation
INITIALIZING = "Initializing" #when starting
SIMULATING = "Simulating" # when logfile.status=simulating
FINISH = "Finish" # when finish
FINISH = "Finish" # when HAWC2 finish
ERROR = "Error" # when hawc2 returns error
ABORTED = "Aborted" # when stopped and logfile.status != Done
CLEANED = "Cleaned" # after copy back
......@@ -87,7 +87,7 @@ class Simulation(object):
if self.logFile.status == log_file.SIMULATING:
self._status = SIMULATING
if self.logFile.status == log_file.DONE:
if self.logFile.status == log_file.DONE and self.is_simulating is False:
self._status = FINISH
def show_status(self):
......@@ -282,11 +282,12 @@ class Simulation(object):
class SimulationThread(Thread):
def __init__(self, simulation):
def __init__(self, simulation, low_priority=True):
Thread.__init__(self)
self.sim = simulation
self.modelpath = self.sim.modelpath
self.res = [0, "", ""]
self.low_priority = low_priority
def start(self):
......@@ -304,7 +305,8 @@ class SimulationThread(Thread):
def run(self):
p = psutil.Process(os.getpid())
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
self.process.communicate()
errorcode = self.process.returncode
with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid:
......
......@@ -4,41 +4,35 @@ Created on 04/12/2015
@author: mmpe
'''
#import x
import time
from wetb.utils.cluster_tools.ssh_client import SSHClient
import os
import paramiko
import subprocess
NOT_SUBMITTED = "Job not submitted"
PENDING = "Pending"
RUNNING = "Running"
DONE = "Done"
class PBSJob(object):
_status = NOT_SUBMITTED
nodeid = None
def __init__(self, host, username, password):
self.client = SSHClient(host, username, password, port=22)
def execute(self, cmd, cwd="./"):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd)
stdout, stderr = proc.communicate()
errorcode = proc.returncode
return errorcode, stdout.decode(), stderr.decode()
def __init__(self, sshclient):
self.ssh = sshclient
def submit(self, job, cwd, pbs_out_file):
self.cwd = cwd
self.pbs_out_file = os.path.relpath(cwd + pbs_out_file).replace("\\", "/")
self.nodeid = None
try:
os.remove (self.pbs_out_file)
except FileNotFoundError:
pass
_, out, _ = self.execute("qsub %s" % job, cwd)
#self.execute()
cmds = ['rm -f %s' % self.pbs_out_file]
if cwd != "":
cmds.append("cd %s" % cwd)
cmds.append("qsub %s" % job)
_, out, _ = self.ssh.execute(";".join(cmds))
self.jobid = out.split(".")[0]
self._status = PENDING
......@@ -47,43 +41,50 @@ class PBSJob(object):
if self._status in [NOT_SUBMITTED, DONE]:
return self._status
if self.nodeid is None:
self.nodeid = self.get_nodeid()
if self.nodeid is not None:
self._status = RUNNING
if self.in_queue() and self.nodeid is None:
self._status = PENDING
elif os.path.isfile(self.pbs_out_file):
# if self.nodeid is None:
# self.nodeid = self.get_nodeid()
# if self.nodeid is not None:
# self._status = RUNNING
if self.is_executing():
self._status = RUNNING
elif self.ssh.file_exists(self.pbs_out_file):
self._status = DONE
return self._status
def get_nodeid(self):
errorcode, out, err = self.execute("qstat -f %s | grep exec_host" % self.jobid)
if errorcode == 0:
return out.strip().replace("exec_host = ", "").split(".")[0]
elif errorcode == 1 and out == "":
return None
elif errorcode == 153 and 'qstat: Unknown Job Id' in err:
try:
_, out, _ = self.ssh.execute("qstat -f %s | grep exec_host" % self.jobid)
return out.strip().replace("exec_host = ", "").split(".")[0]
except Warning as e:
if 'qstat: Unknown Job Id' in str(e):
return None
else:
raise Exception(str(errorcode) + out + err)
#raise e
def stop(self):
try:
self.execute("qdel %s" % self.jobid)
self.ssh.execute("qdel %s" % self.jobid)
except Warning as e:
if 'qdel: Unknown Job Id' in str(e):
return
raise e
def in_queue(self):
errorcode, out, err = self.execute("qstat %s" % self.jobid)
if errorcode == 0:
def is_executing(self):
try:
self.ssh.execute("qstat %s" % self.jobid)
return True
elif 'qstat: Unknown Job Id' in str(err):
return False
else:
raise Exception(str(errorcode) + out + err)
except Warning as e:
if 'qstat: Unknown Job Id' in str(e):
return False
raise e
if __name__ == "__main__":
x = None
username, password = "mmpe", x.password #q.get_login("mmpe")
pbsjob = PBSJob('gorm', username, password, 22)
#pbsjob.submit("pbsjob", ".hawc2launcher/__1__/", "pbs_out.txt")
pbsjob.nodeid = "g-080"
print (pbsjob.execute_on_node("tail -20 /scratch/mmpe/1996208.g-000.risoe.dk/logfiles/structure_wind.log\n"))
......@@ -19,6 +19,8 @@ class SSHClient(object):
self.password = password
self.port = port
self.key = key
self.disconnect = 0
self.client = None
if key is not None:
self.key = paramiko.RSAKey.from_private_key(StringIO(key), password=passphrase)
......@@ -26,6 +28,12 @@ class SSHClient(object):
return self.host, self.username, self.password, self.port
def __enter__(self):
self.disconnect += 1
if self.client is None:
self.connect()
def connect(self):
print ("start connect")
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, self.port, username=self.username, password=self.password, pkey=self.key, timeout=self.TIMEOUT)
......@@ -33,10 +41,13 @@ class SSHClient(object):
self.transport = paramiko.Transport((self.host, self.port))
self.transport.connect(username=self.username, password=self.password)
self.sftp = paramiko.SFTPClient.from_transport(self.transport)
print ("End connect")
return self
def __exit__(self, *args):
self.close()
self.disconnect -= 1
if self.disconnect == 0:
self.close()
def download(self, remotefilepath, localfile, verbose=False):
......@@ -68,6 +79,7 @@ class SSHClient(object):
self.client = None
self.sftp.close()
self.transport.close()
self.disconnect = False
def file_exists(self, filename):
_, out, _ = (self.execute('[ -f %s ] && echo "File exists" || echo "File does not exists"' % filename.replace("\\", "/")))
......@@ -133,7 +145,7 @@ class SSHClient(object):
if __name__ == "__main__":
from mmpe.ui.qt_ui import QtInputUI
q = QtInputUI(None)
import x
x = None
username, password = "mmpe", x.password #q.get_login("mmpe")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment