diff --git a/wetb/hawc2/cluster_simulation.py b/wetb/hawc2/cluster_simulation.py new file mode 100644 index 0000000000000000000000000000000000000000..c67377f58a723b324b91f1cafe3665520770dcd1 --- /dev/null +++ b/wetb/hawc2/cluster_simulation.py @@ -0,0 +1,23 @@ +from wetb.hawc2.simulation import Simulation, SimulationThread +import os +import sys +from threading import Thread +class ClusterSimulation(Simulation): + def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"): + Simulation.__init__(self, modelpath, htcfilename, hawc2exe=hawc2exe) + + self.simulation_id = [f for f in os.listdir('.') if f.endswith('.in')][0][:-3] + self.host.simulationThread.low_priority = False + self.non_blocking_simulation_thread = Thread(target=self.simulate) + self.start(1) + self.wait() + print (self.host.simulationThread.res[1]) # print hawc2 output to stdout + sys.exit(self.host.simulationThread.res[0]) + + def update_status(self, *args, **kwargs): + Simulation.update_status(self, *args, **kwargs) + with open("/home/mmpe/.hawc2launcher/status_%s" % self.simulation_id, 'w') as fid: + fid.write (";".join([self.status] + [str(getattr(self.logFile, v)) for v in ['status', 'pct', 'remaining_time', 'lastline']]) + "\n") + + def show_status(self): + pass diff --git a/wetb/hawc2/hawc2_pbsjob.py b/wetb/hawc2/hawc2_pbsjob.py deleted file mode 100755 index 1091786a8f939406c2fac23384111f3da3dd0460..0000000000000000000000000000000000000000 --- a/wetb/hawc2/hawc2_pbsjob.py +++ /dev/null @@ -1,63 +0,0 @@ - - -from wetb.hawc2.htc_file import HTCFile -from wetb.utils.cluster_tools.pbsjob import PBSJob -from wetb.hawc2.log_file import LogInterpreter -import os -import time -from wetb.utils.cluster_tools import pbsjob -class HAWC2PBSJob(PBSJob): - - def __init__(self, host, username, password): - PBSJob.__init__(self, host, username, password) - - - def submit(self, job, cwd, pbs_out_file): - with open (cwd + job) as fid: - htcfilename = [l for l in fid if l.startswith('wine')][0].rsplit(" ", 1)[1].strip() - print (htcfilename) - htcfile = HTCFile(cwd + htcfilename) - - self.log_filename = htcfile.simulation.logfile[0] - self.loginterpreter = LogInterpreter(htcfile.simulation.time_stop[0]) - PBSJob.submit(self, job, cwd, pbs_out_file) - - def status_monitor(self, update=5): - i = 0 - self.loglinenumber = 0 - while self.in_queue(): - i += 1 - print (i, self.status, self.get_nodeid()) - if self.status is pbsjob.RUNNING: - #self.test() - scratch_log_filename = "/scratch/%s/%s.g-000.risoe.dk/%s" % (self.client.username, self.jobid, self.log_filename) - try: - n, out, err = self.client.execute('tail --lines=+%d %s' % (self.loglinenumber, scratch_log_filename)) - self.loginterpreter.update_status(out) - print (self.loginterpreter.status, self.loginterpreter.pct, self.loginterpreter.remaining_time, self.loginterpreter.lastline) - with open("status" + self.jobid, 'w') as fid: - fid.write(";".join([self.loginterpreter.status, str(self.loginterpreter.pct), str(self.loginterpreter.remaining_time), self.loginterpreter.lastline])) - #print (out) - self.loglinenumber += out.count ("\n") - #print (err) - - except Warning as e: - if not "tail: cannot open" in str(e): - print (str(e)) - - time.sleep(update) - print (i, self.status, self.get_nodeid()) - - def test(self): - self.log_filename = "logfiles/short.log" - scratch_log_filename = "/scratch/%s/%s.g-000.risoe.dk/%s" % (self.client.username, self.jobid, self.log_filename) - print (scratch_log_filename) - try: - n, out, err = self.client.execute('tail --lines=+%d %s' % (self.loglinenumber, scratch_log_filename)) - print (n) - print (out) - self.loglinenumber += out.count ("\n") - print (err) - - except Warning as e: - print (str(e)) diff --git a/wetb/hawc2/htc_file.py b/wetb/hawc2/htc_file.py index d539794e32d0f8107757e1e8b4e30b5f40d6f4c8..95e3fb7485eca3f670b95e98174977cffb617b31 100644 --- a/wetb/hawc2/htc_file.py +++ b/wetb/hawc2/htc_file.py @@ -169,7 +169,7 @@ class HTCFile(HTCContents, HTCDefaults): return [f for f in files if f] def turbulence_files(self): - if self.wind.turb_format[0] == 0: + if 'wind' not in self.contents.keys() or self.wind.turb_format[0] == 0: return [] elif self.wind.turb_format[0] == 1: files = [self.get('wind.mann.filename_%s' % comp, [None])[0] for comp in ['u', 'v', 'w']] diff --git a/wetb/hawc2/log_file.py b/wetb/hawc2/log_file.py index 622bb6fc009df5834ecf6ab9f2ee3bfe26be5748..48a810c8455258c636a9f521835bfbb63ab6c8c8 100644 --- a/wetb/hawc2/log_file.py +++ b/wetb/hawc2/log_file.py @@ -169,6 +169,16 @@ class LogFile(LogInterpreter): if txt != "": LogInterpreter.update_status(self, txt) +class LogInfo(LogFile): + def __init__(self, status, pct, remaining_time, lastline): + self.status = status + self.pct = int(pct) + try: + self.remaining_time = float(remaining_time) + except: + self.remaining_time = None + self.lastline = lastline + self.errors = [] - - + def update_status(self): + pass \ No newline at end of file diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py old mode 100644 new mode 100755 index d1d20e698568c0f91786c47a954c9cdd942955c3..f7d8a444032777b06f4cec184426cac11f17bd64 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -1,53 +1,90 @@ -from __future__ import print_function +from __future__ import absolute_import from __future__ import division +from __future__ import print_function from __future__ import unicode_literals -from __future__ import absolute_import -from io import open + from builtins import str -from future import standard_library -standard_library.install_aliases() -import os -from wetb.hawc2.htc_file import HTCFile -from wetb.hawc2.log_file import LogFile -from threading import Timer, Thread -import sys -from multiprocessing.process import Process -import psutil -from wetb.utils.process_exec import process, exec_process -import subprocess -import shutil -import json import glob -from wetb.hawc2 import log_file +from io import open +import io +import json +import os import re +import shutil +import subprocess +import sys import threading +import time +from wetb.hawc2 import log_file +from wetb.hawc2.htc_file import HTCFile +from wetb.hawc2.log_file import LogFile, LogInfo +from future import standard_library +import psutil +from wetb.utils.cluster_tools import pbsjob +from wetb.utils.cluster_tools.cluster_resource import LocalResource +from wetb.utils.cluster_tools.pbsjob import SSHPBSJob +from wetb.utils.cluster_tools.ssh_client import SSHClient +standard_library.install_aliases() +from threading import Thread QUEUED = "queued" #until start PREPARING = "Copy to host" # during prepare simulation INITIALIZING = "Initializing" #when starting SIMULATING = "Simulating" # when logfile.status=simulating -FINISH = "Finish" # when finish +FINISH = "Finish" # when HAWC2 finish ERROR = "Error" # when hawc2 returns error ABORTED = "Aborted" # when stopped and logfile.status != Done CLEANED = "Cleaned" # after copy back class Simulation(object): + """Class for doing hawc2 simulations + + + + + Examples + -------- + >>> sim = Simulation("<path>/MyModel","htc/MyHtc") + + Blocking inplace simulation + >>> sim.simulate() + + Non-blocking distributed simulation(everything copied to temporary folder)\n + Starts two threads: + - non_blocking_simulation_thread: + - prepare_simulation() # copy to temporary folder + - simulate() # simulate + - finish_simulation # copy results back again + - updateStatusThread: + - update status every second + >>> sim.start() + >>> while sim.status!=CLEANED: + >>> sim.show_status() + + + The default host is LocalSimulationHost. To simulate on pbs featured cluster + >>> sim.host = PBSClusterSimulationHost(sim, <hostname>, <username>, <password>): + >>> sim.start() + >>> while sim.status!=CLEANED: + >>> sim.show_status() + """ + is_simulating = False - _status = QUEUED - def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"): + status = QUEUED + def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True): self.modelpath = os.path.abspath(modelpath) + "/" self.folder = os.path.dirname(htcfilename) if not os.path.isabs(htcfilename): htcfilename = os.path.join(modelpath, htcfilename) - self.htcfilename = htcfilename self.filename = os.path.basename(htcfilename) self.htcFile = HTCFile(htcfilename) self.time_stop = self.htcFile.simulation.time_stop[0] - self.copy_turbulence = True - self.simulation_id = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) - self.stdout_filename = "%s.out" % self.simulation_id + self.hawc2exe = hawc2exe + self.copy_turbulence = copy_turbulence + self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_").replace("/", "_") + "_%d" % id(self) + self.stdout_filename = "stdout/%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: self.log_filename = self.htcFile.simulation.logfile[0] else: @@ -60,35 +97,32 @@ class Simulation(object): self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile.clear() - self.last_status = self._status + self.last_status = self.status self.errors = [] - self.thread = Thread(target=self.simulate_distributed) - self.dist_thread = Thread() - self.hawc2exe = hawc2exe - self.simulationThread = SimulationThread(self) - self.timer = RepeatedTimer(self.update_status) - + self.non_blocking_simulation_thread = Thread(target=self.simulate_distributed) + self.updateStatusThread = UpdateStatusThread(self) + self.host = LocalSimulationHost(self) - def __str__(self): - return "Simulation(%s)" % self.filename - @property - def status(self): - return self._status - - @status.setter - def status(self, status): - self._status = status - self.show_status() + def start(self, update_interval=1): + """Start non blocking distributed simulation""" + self.is_simulating = True + self.updateStatusThread.start() + self.non_blocking_simulation_thread.start() - def update_status(self, *args, **kwargs): - if self.status in [INITIALIZING, SIMULATING]: - self.logFile.update_status() + def wait(self): + self.non_blocking_simulation_thread.join() + self.update_status() - if self.logFile.status == log_file.SIMULATING: - self._status = SIMULATING - if self.logFile.status == log_file.DONE: - self._status = FINISH + def abort(self): + self.host.stop() + for _ in range(100): + if self.is_simulating: + break + time.sleep(0.1) + if self.logFile.status not in [log_file.DONE]: + self.status = ABORTED + self.update_status() def show_status(self): #print ("log status:", self.logFile.status) @@ -103,6 +137,8 @@ class Simulation(object): sys.stdout.write("."*(100 - self.last_pct) + "|") sys.stdout.flush() print ("\n") + elif self.logFile.status == log_file.UNKNOWN: + print (self.status) else: print (self.logFile.status) if self.logFile.status != log_file.SIMULATING: @@ -111,47 +147,37 @@ class Simulation(object): self.last_status = self.logFile.status - - def additional_files(self): - additional_files_file = os.path.join(self.modelpath, 'additional_files.txt') - additional_files = {} - if os.path.isfile(additional_files_file): - with open(additional_files_file, encoding='utf-8') as fid: - additional_files = json.load(fid) - return additional_files - - def add_additional_input_file(self, file): - additional_files = self.additional_files() - additional_files['input'] = list(set(additional_files.get('input', []) + [file])) - additional_files_file = os.path.join(self.modelpath, 'additional_files.txt') - with open(additional_files_file, 'w', encoding='utf-8') as fid: - json.dump(additional_files, fid) - def prepare_simulation(self): self.status = PREPARING + self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) + self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath) + self.host._prepare_simulation() - self.tmp_modelpath = os.path.join(self.modelpath, "tmp_%s/" % self.simulation_id) - - - for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', []): - if not os.path.isabs(src): - src = os.path.join(self.modelpath, src) - for src_file in glob.glob(src): - dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) - # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) #, exist_ok=True) - shutil.copy(src_file, dst) - if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: - print ("error copy ", dst) - else: - #print (dst) - pass - - - self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) - self.simulationThread.modelpath = self.tmp_modelpath + def simulate(self): + #starts blocking simulation + self.is_simulating = True + self.errors = [] + self.status = INITIALIZING + self.logFile.clear() + self.host._simulate() + if self.host.returncode or 'error' in self.host.stdout.lower(): + if "error" in self.host.stdout.lower(): + self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()]))) + self.status = ERROR + if 'HAWC2MB version:' not in self.host.stdout: + self.errors.append(self.host.stdout) + self.status = ERROR + self.logFile.update_status() + self.errors.extend(list(set(self.logFile.errors))) + self.update_status() + self.is_simulating = False + if self.host.returncode or self.errors: + raise Exception("Simulation error:\n" + "\n".join(self.errors)) + elif self.logFile.status != log_file.DONE or self.logFile.errors: + raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % (self.logFile.status, "\n".join(self.logFile.errors))) + else: + self.status = FINISH def finish_simulation(self): @@ -160,56 +186,64 @@ class Simulation(object): if self.status == CLEANED: return if self.status != ERROR: self.status = CLEANED + self.host._finish_simulation() + self.set_id(self.simulation_id) - files = self.htcFile.output_files() - if self.copy_turbulence: - files.extend(self.htcFile.turbulence_files()) - for dst in files: - if not os.path.isabs(dst): - dst = os.path.join(self.modelpath, dst) - src = os.path.join(self.tmp_modelpath, os.path.relpath(dst, self.modelpath)) - for src_file in glob.glob(src): - dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) - # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst_file)): - os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) - if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): - shutil.copy(src_file, dst_file) - self.logFile.filename = os.path.join(self.modelpath, self.log_filename) + def update_status(self, *args, **kwargs): + self.host.update_logFile_status() + if self.status in [INITIALIZING, SIMULATING]: + if self.logFile.status == log_file.SIMULATING: + self.status = SIMULATING + if self.logFile.status == log_file.DONE and self.is_simulating is False: + self.status = FINISH + + def _input_sources(self): + def fmt(src): + if os.path.isabs(src): + src = os.path.relpath(os.path.abspath(src), self.modelpath) + else: + src = os.path.relpath (src) + assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src + return src + return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] + + def _output_sources(self): + def fmt(dst): + if os.path.isabs(dst): + dst = os.path.relpath(os.path.abspath(dst), self.modelpath) + else: + dst = os.path.relpath (dst) + dst = dst.replace("\\", "/") + assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst + return dst + return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] - try: - shutil.rmtree(self.tmp_modelpath) - except (PermissionError, OSError) as e: - raise Warning(str(e)) + def __str__(self): + return "Simulation(%s)" % self.filename - def simulate(self): - #starts blocking simulation - self.is_simulating = True - self.errors = [] - self.logFile.clear() - self.status = INITIALIZING - self.returncode, self.stdout = 1, "Simulation failed" - self.simulationThread.start() - self.simulationThread.join() - self.returncode, self.stdout = self.simulationThread.res - if self.returncode or 'error' in self.stdout.lower(): - self.errors = list(set([l for l in self.stdout.split("\n") if 'error' in l.lower()])) - self.status = ERROR - self.is_simulating = False - self.logFile.update_status() - self.errors.extend(list(set(self.logFile.errors))) - if self.returncode: - raise Exception("Simulation error:\n" + "\n".join(self.errors)) - elif self.logFile.status != log_file.DONE or self.errors or self.logFile.errors: - raise Warning("Simulation succeded with errors:\nLog status:%s\n" % self.logFile.status + "\n".join(self.logFile.errors)) - else: - self.status = FINISH + + + def additional_files(self): + additional_files_file = os.path.join(self.modelpath, 'additional_files.txt') + additional_files = {} + if os.path.isfile(additional_files_file): + with open(additional_files_file, encoding='utf-8') as fid: + additional_files = json.load(fid) + return additional_files + + def add_additional_input_file(self, file): + additional_files = self.additional_files() + additional_files['input'] = list(set(additional_files.get('input', []) + [file])) + additional_files_file = os.path.join(self.modelpath, 'additional_files.txt') + with open(additional_files_file, 'w', encoding='utf-8') as fid: + json.dump(additional_files, fid) + def simulate_distributed(self): self.prepare_simulation() @@ -217,7 +251,6 @@ class Simulation(object): self.finish_simulation() - def fix_errors(self): def confirm_add_additional_file(folder, file): if os.path.isfile(os.path.join(self.modelpath, folder, file)): @@ -243,102 +276,302 @@ class Simulation(object): continue def get_confirmation(self, title, msg): + """override in subclass""" return True + def show_message(self, msg, title="Information"): print (msg) + + +class UpdateStatusThread(Thread): + def __init__(self, simulation, interval=1): + Thread.__init__(self) + self.simulation = simulation + self.interval = interval + def start(self): - """Start non blocking distributed simulation""" - self.timer.start(1000) - self.thread.start() + Thread.start(self) + + def run(self): + while self.simulation.is_simulating: + self.simulation.update_status() + time.sleep(self.interval) + + +class SimulationResource(object): + def __init__(self, simulation): + self.sim = simulation + logFile = property(lambda self : self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v)) + errors = property(lambda self : self.sim.errors) + modelpath = property(lambda self : self.sim.modelpath) + tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) + simulation_id = property(lambda self : self.sim.simulation_id) + stdout_filename = property(lambda self : self.sim.stdout_filename) + htcFile = property(lambda self : self.sim.htcFile) + additional_files = property(lambda self : self.sim.additional_files) + _input_sources = property(lambda self : self.sim._input_sources) + _output_sources = property(lambda self : self.sim._output_sources) + log_filename = property(lambda self : self.sim.log_filename) + + status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) + is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) + + def __str__(self): + return self.host +class LocalSimulationHost(SimulationResource): + def __init__(self, simulation): + SimulationResource.__init__(self, simulation) + LocalResource.__init__(self, "hawc2mb") + self.simulationThread = SimulationThread(self.sim) + + def _prepare_simulation(self): + # must be called through simulation object + self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) + self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) + for src in self._input_sources(): + for src_file in glob.glob(os.path.join(self.modelpath, src)): + dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) + # exist_ok does not exist in Python27 + if not os.path.exists(os.path.dirname(dst)): + os.makedirs(os.path.dirname(dst)) #, exist_ok=True) + shutil.copy(src_file, dst) + if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: + print ("error copy ", dst) + + if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')): + os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True) + self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) + self.simulationThread.modelpath = self.tmp_modelpath + + + def _simulate(self): + #must be called through simulation object + self.returncode, self.stdout = 1, "Simulation failed" + self.simulationThread.start() + self.simulationThread.join() + self.returncode, self.stdout = self.simulationThread.res + self.logFile.update_status() + self.errors.extend(list(set(self.logFile.errors))) + + + def _finish_simulation(self): + for dst in self._output_sources(): + src = os.path.join(self.tmp_modelpath, dst) + + for src_file in glob.glob(src): + dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + # exist_ok does not exist in Python27 + if not os.path.isdir(os.path.dirname(dst_file)): + os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) + if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): + shutil.copy(src_file, dst_file) + + self.logFile.filename = os.path.join(self.modelpath, self.log_filename) - def stop(self): - self.timer.stop() - self.simulationThread.process.kill() try: - self.finish_simulation() - except: - pass - if self.logFile.status not in [log_file.DONE]: - self.status = ABORTED - self.update_status() + shutil.rmtree(self.tmp_modelpath) + except (PermissionError, OSError) as e: + raise Warning(str(e)) + + def update_logFile_status(self): + self.logFile.update_status() + + def stop(self): + self.simulationThread.stop() + self.simulationThread.join() -#class SimulationProcess(Process): -# -# def __init__(self, modelpath, htcfile, hawc2exe="HAWC2MB.exe"): -# Process.__init__(self) -# self.modelpath = modelpath -# self.htcfile = os.path.abspath(htcfile) -# self.hawc2exe = hawc2exe -# self.res = [0, "", "", ""] -# self.process = process([self.hawc2exe, self.htcfile] , self.modelpath) -# -# -# def run(self): -# p = psutil.Process(os.getpid()) -# p.nice = psutil.BELOW_NORMAL_PRIORITY_CLASS -# exec_process(self.process) class SimulationThread(Thread): - def __init__(self, simulation): + def __init__(self, simulation, low_priority=True): Thread.__init__(self) self.sim = simulation self.modelpath = self.sim.modelpath self.res = [0, "", ""] + self.low_priority = low_priority def start(self): - si = subprocess.STARTUPINFO() - si.dwFlags |= subprocess.STARTF_USESHOWWINDOW CREATE_NO_WINDOW = 0x08000000 modelpath = self.modelpath htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) hawc2exe = self.sim.hawc2exe stdout = self.sim.stdout_filename - self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath, creationflags=CREATE_NO_WINDOW) - + if os.name == "nt": + self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) + else: + self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) Thread.start(self) def run(self): p = psutil.Process(os.getpid()) - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) self.process.communicate() errorcode = self.process.returncode + if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)): + os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename)) with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid: stdout = fid.read() self.res = errorcode, stdout + def stop(self): + subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) + + +class PBSClusterSimulationHost(SimulationResource, SSHClient): + def __init__(self, simulation, host, username, password, port=22): + SimulationResource.__init__(self, simulation) + SSHClient.__init__(self, host, username, password, port=port) + self.pbsjob = SSHPBSJob(host, username, password, port) + + hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) + + + def _prepare_simulation(self): + with self: + self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) + self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) + + for src in self._input_sources(): + for src_file in glob.glob(os.path.join(self.modelpath, src)): + dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/") + self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) + self.upload(src_file, dst, verbose=False) + ##assert self.ssh.file_exists(dst) + + f = io.StringIO(self.pbsjobfile()) + f.seek(0) + self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) + self.execute("mkdir -p .hawc2launcher/%s/stdout" % self.simulation_id) + remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) + self.execute("rm -f %s" % remote_log_filename) + + + + def _finish_simulation(self): + with self: + for dst in self._output_sources(): + + src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/") + for src_file in self.glob(src): + dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + os.makedirs(os.path.dirname(dst_file), exist_ok=True) + self.download(src_file, dst_file, verbose=False) + self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) + self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) + + + def _simulate(self): + """starts blocking simulation""" + self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") + + self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , self.sim.stdout_filename) + sleeptime = 1 + while self.is_simulating: + #self.__update_logFile_status() + time.sleep(sleeptime) + + local_out_file = self.modelpath + self.sim.stdout_filename + with self: + try: + self.download(self.tmp_modelpath + self.sim.stdout_filename, local_out_file) + with open(local_out_file) as fid: + _, self.stdout, returncode_str, _ = fid.read().split("---------------------") + self.returncode = returncode_str.strip() != "0" + except Exception: + self.returncode = 1 + self.stdout = "error: Could not download and read stdout file" + try: + self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) + except Exception: + raise Exception ("Logfile not found") + self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) + + + + def update_logFile_status(self): + status = self.pbsjob.status + if status == pbsjob.NOT_SUBMITTED: + pass + elif status == pbsjob.DONE: + self.is_simulating = False + pass + else: + try: + _, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id) + out = out.split(";") + if len(out) == 5: + self.status = out[0] + self.logFile = LogInfo(*out[1:]) + + except Exception as e: + if "No such file or directory" in str(e): + pass + else: + raise - -class RepeatedTimer(object): - def __init__(self, function, *args, **kwargs): - self._timer = None - self.function = function - self.args = args - self.kwargs = kwargs - self.is_running = False + def start(self): + """Start non blocking distributed simulation""" + self.non_blocking_simulation_thread.start() - def _run(self): - self.is_running = False - self.start(self.interval) - self.function(*self.args, **self.kwargs) - def start(self, interval_ms=None): - self.interval = interval_ms - if not self.is_running: - self._timer = Timer(interval_ms / 1000, self._run) - self._timer.start() - self.is_running = True + def abort(self): + self.pbsjob.stop() + self.stop() + try: + self.finish_simulation() + except: + pass + if self.status != ERROR and self.logFile.status not in [log_file.DONE]: + self.status = ABORTED def stop(self): - self._timer.cancel() - self.is_running = False + self.is_simulating = False + self.pbsjob.stop() + + def pbsjobfile(self): + cp_back = "" + for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): + cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder + cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) + rel_htcfilename = os.path.relpath(self.htcFile.filename, self.modelpath).replace("\\", "/") + return """ +### Standard Output +#PBS -N h2l_%s +### merge stderr into stdout +#PBS -j oe +#PBS -o %s +### Maximum wallclock time format HOURS:MINUTES:SECONDS +#PBS -l walltime=01:00:00 +###PBS -a 201547.53 +#PBS -lnodes=1:ppn=1 +### Queue name +#PBS -q workq +### Create scratch directory and copy data to it +cd $PBS_O_WORKDIR +pwd +cp -R . /scratch/$USER/$PBS_JOBID +### Execute commands on scratch nodes +cd /scratch/$USER/$PBS_JOBID +pwd +echo "---------------------" +python -c "from wetb.hawc2.cluster_simulation import ClusterSimulation;ClusterSimulation('.','%s', '%s')" +echo "---------------------" +echo $? +echo "---------------------" +### Copy back from scratch directory +cd /scratch/$USER/$PBS_JOBID +%s +echo $PBS_JOBID +cd /scratch/ +### rm -r $PBS_JOBID +exit""" % (self.simulation_id, self.stdout_filename, rel_htcfilename, self.hawc2exe, cp_back) + + -if __name__ == "__main__": - sim = Simulation('C:\mmpe\HAWC2\Hawc2_model/', 'htc/error.htc') - sim.simulate() diff --git a/wetb/prepost/Simulations.py b/wetb/prepost/Simulations.py index 164aa1866638be0fce68b42f14bddeb9f5db2f35..a8abd120fc7b0bebedb5994d95d327bf82e6833b 100755 --- a/wetb/prepost/Simulations.py +++ b/wetb/prepost/Simulations.py @@ -3379,7 +3379,6 @@ class Cases(object): self.loadstats = kwargs.get('loadstats', False) self.rem_failed = kwargs.get('rem_failed', True) self.config = kwargs.get('config', {}) - print(self.config) # determine the input argument scenario if len(args) == 1: if type(args[0]).__name__ == 'dict': @@ -5202,5 +5201,4 @@ def eigenstructure(cases, debug=False): return cases if __name__ == '__main__': - pass - + pass \ No newline at end of file diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py new file mode 100644 index 0000000000000000000000000000000000000000..804c6ed904e531b4e025ba5c8d3d22ee3386d925 --- /dev/null +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -0,0 +1,77 @@ +''' +Created on 04/04/2016 + +@author: MMPE +''' +from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.cluster_tools import pbswrap +import multiprocessing +import psutil + +class Resource(object): + + def __init__(self, min_cpu, min_free): + self.min_cpu = min_cpu + self.min_free = min_free + + def ok2submit(self): + """Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus""" + total, free, user = self.check_resources() + + if user < self.min_cpu: + return True + elif free > self.min_free: + return True + else: + return False + + + + +class SSHPBSClusterResource(Resource, SSHClient): + def __init__(self, host, username, password, port, min_cpu, min_free): + Resource.__init__(self, min_cpu, min_free) + SSHClient.__init__(self, host, username, password, port=port) + + def new_ssh_connection(self): + return SSHClient(self.host, self.username, self.password, self.port) + + def check_resources(self): + with self: + _, output, _ = self.execute('pbsnodes -l all') + pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n")) + + _, output, _ = self.execute('qstat -n1') + users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n")) + + + # if the user does not have any jobs, this will not exist + try: + cpu_user = users[self.username]['cpus'] + cpu_user += users[self.username]['Q'] + except KeyError: + cpu_user = 0 + cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes) + + return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user + + + +class LocalResource(Resource): + def __init__(self, process_name): + N = max(1, multiprocessing.cpu_count() / 4) + Resource.__init__(self, N, N) + self.process_name = process_name + self.host = 'Localhost' + + def check_resources(self): + def name(i): + try: + return psutil.Process(i).name + except psutil._error.AccessDenied: + return "" + + no_cpu = multiprocessing.cpu_count() + cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu + no_current_process = len([i for i in psutil.get_pid_list() if name(i).lower().startswith(self.process_name.lower())]) + return no_cpu, cpu_free, no_current_process diff --git a/wetb/utils/cluster_tools/pbsjob.py b/wetb/utils/cluster_tools/pbsjob.py index ae18eaf8ccd00bf8c0129c65d5743900b0ba0dad..7b6d6f8e8827eecad29c19fab91f6e3ca280577c 100644 --- a/wetb/utils/cluster_tools/pbsjob.py +++ b/wetb/utils/cluster_tools/pbsjob.py @@ -3,42 +3,35 @@ Created on 04/12/2015 @author: mmpe ''' - -#import x -import time -from wetb.utils.cluster_tools.ssh_client import SSHClient import os -import paramiko -import subprocess - - +from wetb.utils.cluster_tools.ssh_client import SSHClient NOT_SUBMITTED = "Job not submitted" PENDING = "Pending" RUNNING = "Running" DONE = "Done" -class PBSJob(object): + + +class SSHPBSJob(SSHClient): _status = NOT_SUBMITTED nodeid = None - def __init__(self, host, username, password): - self.client = SSHClient(host, username, password, port=22) + jobid = None - def execute(self, cmd, cwd="./"): - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd) - stdout, stderr = proc.communicate() - errorcode = proc.returncode - return errorcode, stdout.decode(), stderr.decode() + def __init__(self, host, username, password, port=22): + SSHClient.__init__(self, host, username, password, port=port) def submit(self, job, cwd, pbs_out_file): self.cwd = cwd self.pbs_out_file = os.path.relpath(cwd + pbs_out_file).replace("\\", "/") self.nodeid = None - try: - os.remove (self.pbs_out_file) - except FileNotFoundError: - pass - _, out, _ = self.execute("qsub %s" % job, cwd) + #self.execute() + + cmds = ['rm -f %s' % self.pbs_out_file] + if cwd != "": + cmds.append("cd %s" % cwd) + cmds.append("qsub %s" % job) + _, out, _ = self.execute(";".join(cmds)) self.jobid = out.split(".")[0] self._status = PENDING @@ -46,44 +39,38 @@ class PBSJob(object): def status(self): if self._status in [NOT_SUBMITTED, DONE]: return self._status - - if self.nodeid is None: - self.nodeid = self.get_nodeid() - if self.nodeid is not None: + with self: + if self.is_executing(): self._status = RUNNING - - if self.in_queue() and self.nodeid is None: - self._status = PENDING - elif os.path.isfile(self.pbs_out_file): - self._status = DONE + elif self.file_exists(self.pbs_out_file): + self._status = DONE + self.jobid = None return self._status def get_nodeid(self): - errorcode, out, err = self.execute("qstat -f %s | grep exec_host" % self.jobid) - if errorcode == 0: - return out.strip().replace("exec_host = ", "").split(".")[0] - elif errorcode == 1 and out == "": - return None - elif errorcode == 153 and 'qstat: Unknown Job Id' in err: + try: + _, out, _ = self.execute("qstat -f %s | grep exec_host" % self.jobid) + return out.strip().replace("exec_host = ", "").split(".")[0] + except Warning as e: + if 'qstat: Unknown Job Id' in str(e): return None - else: - raise Exception(str(errorcode) + out + err) + #raise e def stop(self): - try: - self.execute("qdel %s" % self.jobid) - except Warning as e: - if 'qdel: Unknown Job Id' in str(e): - return - raise e + if self.jobid: + try: + self.execute("qdel %s" % self.jobid) + except Warning as e: + if 'qdel: Unknown Job Id' in str(e): + return + raise e - def in_queue(self): - errorcode, out, err = self.execute("qstat %s" % self.jobid) - if errorcode == 0: + def is_executing(self): + try: + self.execute("qstat %s" % self.jobid) return True - elif 'qstat: Unknown Job Id' in str(err): - return False - else: - raise Exception(str(errorcode) + out + err) - + except Warning as e: + if 'qstat: Unknown Job Id' in str(e): + return False + raise e diff --git a/wetb/utils/cluster_tools/pbswrap.py b/wetb/utils/cluster_tools/pbswrap.py new file mode 100644 index 0000000000000000000000000000000000000000..5eb3f37091f97405676dac4fd4347c12050bbbe2 --- /dev/null +++ b/wetb/utils/cluster_tools/pbswrap.py @@ -0,0 +1,389 @@ +#!/usr/bin/python +#----------------------------------------------------------------------------- +# Author: David Verelst - david.verelst@gmail.com +# Version: 0.10 - 15/03/2010 +# Version: 0.11 - 13/04/2010 : adding nodes down, offline to the total +# overview of available nodes +# version: 0.20 - 17/11/2011 : major rewrite, support added for gorm +# version: 0.30 - 19/12/2012 : refactoring, added the node overview +# version: 0.40 - 26/03/2014 : removed Thyra, added support for Jess +#----------------------------------------------------------------------------- + +import os +import socket + +def print_dashboard(users, host, pbsnodes): + + # print nicely + # the header + # --------------------------------------------- + # User Running Queued Waiting Other + # --------------------------------------------- + # jber 3 0 0 0 + + print + print ('-' * 54) + print ('cpus'.rjust(18) + 'nodes'.rjust(9)) + print ('User'.rjust(9) + 'Running'.rjust(9) + 'Running'.rjust(9) \ + + 'Queued'.rjust(9) + 'Waiting'.rjust(9) + 'Other'.rjust(9)) + # nodeSum: overview (summation of all jobs) nodes per user: + # nodeSum = [running, queued, waiting, other, cpus] + nodeSum = [0, 0, 0, 0, 0] + print ('-' * 54) + # print all values in the table: the nodes used per user + #userlist = users['users'].keys() + #userlist.sort() + for uid in sorted(users): + + # or get the unique nodes the user is on + try: + R = len(users[uid]['nodes']) + except KeyError: + # means it is not running yet but queued, waiting or otherwise + R = users[uid]['R'] + Q = users[uid]['Q'] + W = users[uid]['W'] + O = users[uid]['E'] + users[uid]['H'] + users[uid]['T'] \ + + users[uid]['S'] + users[uid]['O'] + users[uid]['C'] + + cpus = users[uid]['cpus'] + print (uid.rjust(9) + str(cpus).rjust(9) + str(R).rjust(9) \ + + str(Q).rjust(9) + str(W).rjust(9) + str(O).rjust(9)) + nodeSum[0] += R + nodeSum[1] += Q + nodeSum[2] += W + nodeSum[3] += O + nodeSum[4] += cpus + + nr_nodes = pbsnodes['nr_nodes'] + down = pbsnodes['down'] + others = pbsnodes['others'] + total_cpu = host['cpu_per_node'] * nr_nodes + + # the summed up for each node status (queued, running,...) + print ('-' * 54) + print ('total'.rjust(9) + str(nodeSum[4]).rjust(9) + str(nodeSum[0]).rjust(9) \ + + str(nodeSum[1]).rjust(9) + str(nodeSum[2]).rjust(9)\ + + str(nodeSum[3]).rjust(9)) + print ('-' * 54) + print ('free'.rjust(9) + str(total_cpu - nodeSum[4]).rjust(9) \ + + str(nr_nodes - nodeSum[0] - others - down).rjust(9)) + print ('down'.rjust(9) + str(down).rjust(18)) + print ('-' * 54) + print + + +def print_node_loading(users, host, nodes, nodesload): + """ + Give an overview of how each node is loaded + """ + + if len(host) < 1: + print ('It is very quit, nobody is working on the cluster.') + return + + hostname = host['name'] + cpunode = host['cpu_per_node'] + + print + # print a header + if hostname == 'gorm': + print ('-' * 79) + header = '|'.join([str(k).center(5) for k in range(1, 13, 1)]) + '|' + print ('id'.center(5), header) + print ('-' * 79) + elif hostname == 'jess': + print ('-' * 126) + header = '|'.join([str(k).center(5) for k in range(1, 21, 1)]) + '|' + print ('id'.center(5), header) + print ('-' * 126) + + # print who is using the nodes + for node in sorted(nodes): + status = nodes[node] + # now we have a list of user on this node + try: + users = sorted(nodesload[node]) + for kk in range(len(users), cpunode): + users.append('') + # limit uid names to 5 characters + printlist = '|'.join([k[:5].center(5) for k in users]) + '|' + # if it doesn't exist in the nodesload, just print the status + except KeyError: + printlist = status.center(5) + + print (node, printlist) + + # print a header + if hostname == 'gorm': + print ('-' * 79) + print ('id'.center(5), header) + print ('-' * 79) + elif hostname == 'jess': + print ('-' * 126) + print ('id'.center(5), header) + print ('-' * 126) + #print + + +def parse_pbsnode_lall(output): + # read the qstat output + frees, exclusives, others, down = 0, 0, 0, 0 + nr_nodes = 0 + + nodes = {} + + for k in output: + if len(k) > 2: + line = k.split() + status = line[1] + node = line[0].split('.')[0] + + if node.startswith('v-'): + #host = 'valde' + # uglye monkey patch: ignore any valde nodes + continue + + #elif node.startswith('g-'): + #host = 'gorm' + #elif node.startswith('th-'): + #host = 'thyra' + + if status == 'free': + frees += 1 + nr_nodes += 1 + elif status == 'job-exclusive': + exclusives += 1 + nr_nodes += 1 + elif status == 'down,offline': + down += 1 + elif status == 'offline': + down += 1 + elif status == 'down': + down += 1 + else: + others += 1 + + nodes[node] = status + + #check = frees + exclusives + down + others + + pbsnodes = {'frees' : frees, 'nr_nodes' : nr_nodes, 'others' : others, + 'exclusives' : exclusives, 'down' : down} + + return pbsnodes, nodes + + +def parse_qstat_n1(output): + """ + Parse the output of qstat -n1 + """ + + # for global node usage, keep track of how many processes are running on + # each of the nodes + nodesload = {} + # store it all in dictionaries + host = {} + users = {} + # get the hostname + hostname = output[1] + if hostname[:5] == 'g-000': + host['name'] = 'gorm' + host['cpu_per_node'] = 12 + else: + # 272 nodes are 2 x 10 core (twenty) processors + host['name'] = 'jess' + #total_nodes = 80 + host['cpu_per_node'] = 20 + # take the available nodes in nr_nodes: it excludes the ones + # who are down + #queue['_total_cpu_'] = cpu_node*nr_nodes + + for line in output[5:]: # first 5 are not relevant + if line == "": + continue + items = line.split() + queue = items[2] + + # uglye monkey patch: ignore any valde nodes + if queue == 'valdeq': + continue + + jobid = items[0] + # valid line starts with the jobid, which is an int + jobid = jobid.split('.')[0] + userid = items[1] + # nr nodes used by the job + job_nodes = int(items[5]) + # status of the job + job_status = items[9] + # is the user already in the queue dict? + try: + users[userid]['jobs'].append(jobid) + users[userid][job_status] += job_nodes + # if the user wasn't added before, create the sub dictionaries + except KeyError: + # initialise the users dictionary and job list + users[userid] = dict() + users[userid]['C'] = 0 + users[userid]['E'] = 0 + users[userid]['H'] = 0 + users[userid]['Q'] = 0 + users[userid]['R'] = 0 + users[userid]['T'] = 0 + users[userid]['W'] = 0 + users[userid]['S'] = 0 + users[userid]['O'] = 0 + users[userid]['cpus'] = 0 + users[userid]['jobs'] = [] + users[userid]['nodes'] = set() + # set the values + users[userid]['jobs'].append(jobid) + users[userid][job_status] += job_nodes + + if job_status == 'R': + # each occurance of the node name is seprated by a + and + # indicates a process running on a CPU of that node + nodes = items[11].split('+') + # TODO: take into account cpu number for jess: j-304/5 + # on jess, the cpu number of the node is indicated, ignore for now + if host['name'].startswith('jess'): + for i, node in enumerate(nodes): + nodes[i] = node.split('/')[0] + # keep track of the number of processes the user running + users[userid]['cpus'] += len(nodes) + # for the number of used nodes, keep track of the unique nodes used + users[userid]['nodes'].update(set(nodes)) + # remember all the nodes the user is on in a dictionary + for node in nodes: + try: + nodesload[node].append(userid) + except KeyError: + nodesload[node] = [userid] + + return users, host, nodesload + + +def count_cpus(users, host, pbsnodes): + """ + See how many cpu's are actually free + """ + nodeSum = {'R':0, 'Q':0, 'W':0, 'O':0, 'used_cpu':0, 'H':0} + + for uid in users: + + # or get the unique nodes the user is on + try: + nodeSum['R'] = len(users[uid]['nodes']) + except KeyError: + # means it is not running yet but queued, waiting or otherwise + nodeSum['R'] = users[uid]['R'] + + nodeSum['Q'] += users[uid]['Q'] + nodeSum['W'] += users[uid]['W'] + nodeSum['H'] += users[uid]['H'] + nodeSum['used_cpu'] += users[uid]['cpus'] + nodeSum['O'] += users[uid]['E'] + users[uid]['T'] + users[uid]['S'] \ + + users[uid]['O'] + users[uid]['C'] + + # free cpus + down_cpu = host['cpu_per_node'] * pbsnodes['down'] + total_cpu = host['cpu_per_node'] * pbsnodes['nr_nodes'] + cpu_free = total_cpu - down_cpu - nodeSum['used_cpu'] + + return cpu_free, nodeSum + + +PBS_TEMP = """ +### Standard Output +#PBS -N [jobname] +#PBS -o ./[pbs_out_file] +### Standard Error +#PBS -e ./[pbs_err_file] +#PBS -W umask=003 +### Maximum wallclock time format HOURS:MINUTES:SECONDS +#PBS -l walltime=[walltime] +#PBS -lnodes=[lnodes]:ppn=[ppn] +### Queue name +#PBS -q [queue] +### Browse to current working dir +cd $PBS_O_WORKDIR +pwd +### =========================================================================== +### set environment variables, activate python virtual environment, execute +[commands] + +### =========================================================================== +### wait for jobs to finish +wait +exit +""" +# TODO: this is very similar compared to what happens in qsub-wrap +def create_input(walltime='00:59:59', queue='xpresq', pbs_in='pbs_in/', ppn=1, + pbs_out='pbs_out/', jobname=None, commands=None, lnodes=1): + """ + Create a PBS script for a command. Optionally, define a python environment. + """ + + pbs_err_file = os.path.join(pbs_out, jobname + '.err') + pbs_out_file = os.path.join(pbs_out, jobname + '.out') + pbs_in_file = os.path.join(pbs_in, jobname + '.pbswrap') + + pbs_script = PBS_TEMP + pbs_script = pbs_script.replace('[jobname]', jobname) + pbs_script = pbs_script.replace('[pbs_out_file]', pbs_out_file) + pbs_script = pbs_script.replace('[pbs_err_file]', pbs_err_file) + pbs_script = pbs_script.replace('[walltime]', walltime) + pbs_script = pbs_script.replace('[lnodes]', str(lnodes)) + pbs_script = pbs_script.replace('[ppn]', str(ppn)) + pbs_script = pbs_script.replace('[queue]', queue) + pbs_script = pbs_script.replace('[commands]', commands) + + print ('following commands will be executed on the cluster:') + print ('%s' % (commands)) + + # make sure a pbs_in and pbs_out directory exists + if not os.path.exists(pbs_in): + os.makedirs(pbs_in) + if not os.path.exists(pbs_out): + os.makedirs(pbs_out) + + # write the pbs_script + FILE = open(pbs_in_file, 'w') + FILE.write(pbs_script) + FILE.close() + + return pbs_in_file + + +def test(): + + # sample output + FILE = open('tests/sampleoutput_pbsnodes', 'rb') + output = FILE.readlines() + FILE.close() + pbsnodes, nodes = parse_pbsnode_lall(output) + + # sample output + FILE = open('tests/sampleoutput_qstat', 'rb') + output = FILE.readlines() + FILE.close() + users, host, nodesload = parse_qstat_n1(output) + + print_node_loading(users, host, nodes, nodesload) + print_dashboard(users, host, pbsnodes) + + +if __name__ == '__main__': + + #command = 'pbsnodes -l all' # | cut -c 22-35 + + output = os.popen('pbsnodes -l all').readlines() + pbsnodes, nodes = parse_pbsnode_lall(output) + + output = os.popen('qstat -n1').readlines() + users, host, nodesload = parse_qstat_n1(output) + + print_node_loading(users, host, nodes, nodesload) + print_dashboard(users, host, pbsnodes) + diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py index 1d70e472b1aed1705b436038bed2d5ff42e4b564..ea125bb77ade4aeeb662d91e9701ce201d552b8e 100644 --- a/wetb/utils/cluster_tools/ssh_client.py +++ b/wetb/utils/cluster_tools/ssh_client.py @@ -19,6 +19,8 @@ class SSHClient(object): self.password = password self.port = port self.key = key + self.disconnect = 0 + self.client = None if key is not None: self.key = paramiko.RSAKey.from_private_key(StringIO(key), password=passphrase) @@ -26,6 +28,11 @@ class SSHClient(object): return self.host, self.username, self.password, self.port def __enter__(self): + self.disconnect += 1 + if self.client is None: + self.connect() + + def connect(self): self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(self.host, self.port, username=self.username, password=self.password, pkey=self.key, timeout=self.TIMEOUT) @@ -36,7 +43,9 @@ class SSHClient(object): return self def __exit__(self, *args): - self.close() + self.disconnect -= 1 + if self.disconnect == 0: + self.close() def download(self, remotefilepath, localfile, verbose=False): @@ -68,6 +77,7 @@ class SSHClient(object): self.client = None self.sftp.close() self.transport.close() + self.disconnect = False def file_exists(self, filename): _, out, _ = (self.execute('[ -f %s ] && echo "File exists" || echo "File does not exists"' % filename.replace("\\", "/"))) @@ -133,7 +143,7 @@ class SSHClient(object): if __name__ == "__main__": from mmpe.ui.qt_ui import QtInputUI q = QtInputUI(None) - import x + x = None username, password = "mmpe", x.password #q.get_login("mmpe")