From 591d12223b1915a826d2ec193b83a7b92c82a179 Mon Sep 17 00:00:00 2001 From: madsmpedersen <m@madsp.dk> Date: Thu, 14 Apr 2016 15:36:18 +0200 Subject: [PATCH] some changes --- wetb/hawc2/htc_contents.py | 7 +- wetb/hawc2/htc_file.py | 19 +- wetb/hawc2/log_file.py | 2 +- wetb/hawc2/simulation.py | 244 +++++++++++-------- wetb/utils/cluster_tools/cluster_resource.py | 68 ++++-- wetb/utils/cluster_tools/ssh_client.py | 7 +- 6 files changed, 222 insertions(+), 125 deletions(-) diff --git a/wetb/hawc2/htc_contents.py b/wetb/hawc2/htc_contents.py index ee1f06d..54c338b 100644 --- a/wetb/hawc2/htc_contents.py +++ b/wetb/hawc2/htc_contents.py @@ -171,7 +171,7 @@ class HTCLine(HTCContents): ("", "\t" + self.str_values())[bool(self.values)], ("", "\t" + self.comments)[bool(self.comments.strip())]) def str_values(self): - return " ".join([str(v) for v in self.values]) + return " ".join([str(v).lower() for v in self.values]) def __getitem__(self, key): return self.values[key] @@ -351,7 +351,10 @@ class HTCDefaults(object): else: mann.add_line('create_turb_parameters', [L, ae23, Gamma, seed, int(high_frq_compensation)], "L, alfaeps, gamma, seed, highfrq compensation") if filenames is None: - filenames = ["./turb/turb_wsp%d_s%04d%s.bin" % (self.wind.wsp[0], seed, c) for c in ['u', 'v', 'w']] + fmt = "l%.1f_ae%.2f_g%.1f_h%d_%dx%dx%d_%.3fx%.2fx%.2f_s%04d%c.turb" + import numpy as np + dxyz = tuple(np.array(box_dimension) / no_grid_points) + filenames = ["./turb/" + fmt % ((L, ae23, Gamma, high_frq_compensation) + no_grid_points + dxyz + (seed, uvw)) for uvw in ['u', 'v', 'w']] if isinstance(filenames, str): filenames = ["./turb/%s_s%04d%s.bin" % (filenames, seed, c) for c in ['u', 'v', 'w']] for filename, c in zip(filenames, ['u', 'v', 'w']): diff --git a/wetb/hawc2/htc_file.py b/wetb/hawc2/htc_file.py index 95e3fb7..84492c8 100644 --- a/wetb/hawc2/htc_file.py +++ b/wetb/hawc2/htc_file.py @@ -24,6 +24,14 @@ from copy import copy class HTCFile(HTCContents, HTCDefaults): + """Wrapper for HTC files + + Examples: + --------- + >>> htcfile = HTCFile('htc/test.htc') + >>> htcfile.wind.wsp = 10 + >>> htcfile.save() + """ filename = None htc_inputfiles = [] @@ -95,13 +103,16 @@ class HTCFile(HTCContents, HTCDefaults): with open(filename, 'w', encoding='utf-8') as fid: fid.write(str(self)) - def set_name(self, name, folder="htc"): + def set_name(self, name, folder="htc/"): + if os.path.isabs(folder) is False and os.path.relpath(folder).startswith("htc" + os.path.sep): + folder = "./" + os.path.relpath(folder).replace("\\", "/") + self.filename = os.path.join(self.modelpath, folder, "%s.htc" % name).replace("\\", "/") if 'simulation' in self and 'logfile' in self.simulation: - self.simulation.logfile = "./log/%s.log" % name + self.simulation.logfile = os.path.join(folder.replace("htc", "log", 1), "%s.log" % name).replace("\\", "/") elif 'test_structure' in self and 'logfile' in self.test_structure: - self.test_structure.logfile = "./log/%s.log" % name - self.output.filename = "./res/%s" % name + self.test_structure.logfile = os.path.join(folder.replace("htc", "log", 1), "%s.log" % name).replace("\\", "/") + self.output.filename = os.path.join(folder.replace("htc", "res", 1), "%s" % name).replace("\\", "/") def input_files(self): files = self.htc_inputfiles diff --git a/wetb/hawc2/log_file.py b/wetb/hawc2/log_file.py index 48a810c..12dac0a 100644 --- a/wetb/hawc2/log_file.py +++ b/wetb/hawc2/log_file.py @@ -181,4 +181,4 @@ class LogInfo(LogFile): self.errors = [] def update_status(self): - pass \ No newline at end of file + pass diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index f7d8a44..c45a87d 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -13,14 +13,13 @@ import re import shutil import subprocess import sys -import threading import time from wetb.hawc2 import log_file from wetb.hawc2.htc_file import HTCFile from wetb.hawc2.log_file import LogFile, LogInfo from future import standard_library -import psutil + from wetb.utils.cluster_tools import pbsjob from wetb.utils.cluster_tools.cluster_resource import LocalResource @@ -33,10 +32,13 @@ QUEUED = "queued" #until start PREPARING = "Copy to host" # during prepare simulation INITIALIZING = "Initializing" #when starting SIMULATING = "Simulating" # when logfile.status=simulating -FINISH = "Finish" # when HAWC2 finish +FINISHING = "Copy from host" # during prepare simulation +FINISH = "Simulation finish" # when HAWC2 finish ERROR = "Error" # when hawc2 returns error ABORTED = "Aborted" # when stopped and logfile.status != Done CLEANED = "Cleaned" # after copy back +def unix_path(path): + return path.replace("\\", "/").lower() class Simulation(object): """Class for doing hawc2 simulations @@ -72,6 +74,7 @@ class Simulation(object): """ is_simulating = False + is_done = False status = QUEUED def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True): self.modelpath = os.path.abspath(modelpath) + "/" @@ -83,8 +86,9 @@ class Simulation(object): self.time_stop = self.htcFile.simulation.time_stop[0] self.hawc2exe = hawc2exe self.copy_turbulence = copy_turbulence - self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_").replace("/", "_") + "_%d" % id(self) - self.stdout_filename = "stdout/%s.out" % self.simulation_id + self.simulation_id = unix_path(os.path.relpath(htcfilename, self.modelpath) + "_%d" % id(self)).replace("/", "_") + self.stdout_filename = os.path.splitext(unix_path(os.path.relpath(htcfilename, self.modelpath)).replace('htc', 'stdout', 1))[0] + ".out" + #self.stdout_filename = "stdout/%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: self.log_filename = self.htcFile.simulation.logfile[0] else: @@ -93,7 +97,7 @@ class Simulation(object): self.log_filename = os.path.relpath(self.log_filename, self.modelpath) else: self.log_filename = os.path.relpath(self.log_filename) - self.log_filename = self.log_filename.replace("\\", "/") + self.log_filename = unix_path(self.log_filename) self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile.clear() @@ -104,25 +108,28 @@ class Simulation(object): self.host = LocalSimulationHost(self) - def start(self, update_interval=1): + def start(self, auto_status_update=True): """Start non blocking distributed simulation""" self.is_simulating = True - self.updateStatusThread.start() + if auto_status_update: + self.updateStatusThread.start() self.non_blocking_simulation_thread.start() def wait(self): self.non_blocking_simulation_thread.join() self.update_status() - def abort(self): - self.host.stop() - for _ in range(100): - if self.is_simulating: - break - time.sleep(0.1) + def abort(self, update_status=True): + if self.status != QUEUED: + self.host.stop() + for _ in range(100): + if self.is_simulating is False: + break + time.sleep(0.1) if self.logFile.status not in [log_file.DONE]: self.status = ABORTED - self.update_status() + if update_status: + self.update_status() def show_status(self): #print ("log status:", self.logFile.status) @@ -147,11 +154,34 @@ class Simulation(object): self.last_status = self.logFile.status + + def prepare_simulation(self): self.status = PREPARING self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath) - self.host._prepare_simulation() + + def fmt(src): + if os.path.isabs(src): + src = os.path.relpath(os.path.abspath(src), self.modelpath) + else: + src = os.path.relpath (src) + assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src + return src + input_patterns = [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] + input_files = set([f for pattern in input_patterns for f in glob.glob(os.path.join(self.modelpath, pattern))]) + if not os.path.isdir(os.path.dirname(self.modelpath + self.stdout_filename)): + os.makedirs(os.path.dirname(self.modelpath + self.stdout_filename)) + self.host._prepare_simulation(input_files) + + +# return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] +# +# for src in self._input_sources(): +# for src_file in glob.glob(os.path.join(self.modelpath, src)): +# +# +# self.host._prepare_simulation() def simulate(self): #starts blocking simulation @@ -181,13 +211,25 @@ class Simulation(object): def finish_simulation(self): - lock = threading.Lock() - with lock: - if self.status == CLEANED: return - if self.status != ERROR: - self.status = CLEANED - self.host._finish_simulation() - self.set_id(self.simulation_id) + if self.status == ABORTED: + return + if self.status != ERROR: + self.status = FINISHING + + def fmt(dst): + if os.path.isabs(dst): + dst = os.path.relpath(os.path.abspath(dst), self.modelpath) + else: + dst = os.path.relpath (dst) + dst = unix_path(dst) + assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst + return dst + output_patterns = [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] + output_files = set([f for pattern in output_patterns for f in self.host.glob(unix_path(os.path.join(self.tmp_modelpath, pattern)))]) + self.host._finish_simulation(output_files) + self.set_id(self.filename) + if self.status != ERROR: + self.status = CLEANED @@ -200,28 +242,6 @@ class Simulation(object): if self.logFile.status == log_file.DONE and self.is_simulating is False: self.status = FINISH - def _input_sources(self): - def fmt(src): - if os.path.isabs(src): - src = os.path.relpath(os.path.abspath(src), self.modelpath) - else: - src = os.path.relpath (src) - assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src - return src - return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])] - - def _output_sources(self): - def fmt(dst): - if os.path.isabs(dst): - dst = os.path.relpath(os.path.abspath(dst), self.modelpath) - else: - dst = os.path.relpath (dst) - dst = dst.replace("\\", "/") - assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst - return dst - return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] - - def __str__(self): return "Simulation(%s)" % self.filename @@ -234,7 +254,7 @@ class Simulation(object): additional_files = {} if os.path.isfile(additional_files_file): with open(additional_files_file, encoding='utf-8') as fid: - additional_files = json.load(fid) + additional_files = json.loads(fid.read().replace("\\", "/")) return additional_files def add_additional_input_file(self, file): @@ -246,15 +266,33 @@ class Simulation(object): def simulate_distributed(self): - self.prepare_simulation() - self.simulate() - self.finish_simulation() + try: + self.prepare_simulation() + try: + self.simulate() + except Warning as e: + print ("simulation failed", str(self)) + print ("Trying to finish") + raise + finally: + try: + self.finish_simulation() + except: + print ("finish_simulation failed", str(self)) + raise + except: + self.status = ERROR + raise + finally: + self.is_done = True + + def fix_errors(self): def confirm_add_additional_file(folder, file): if os.path.isfile(os.path.join(self.modelpath, folder, file)): - filename = os.path.join(folder, file).replace(os.path.sep, "/") + filename = unix_path(os.path.join(folder, file)) if self.get_confirmation("File missing", "'%s' seems to be missing in the temporary working directory. \n\nDo you want to add it to additional_files.txt" % filename): self.add_additional_input_file(filename) self.show_message("'%s' is now added to additional_files.txt.\n\nPlease restart the simulation" % filename) @@ -282,6 +320,8 @@ class Simulation(object): def show_message(self, msg, title="Information"): print (msg) + def set_id(self): + pass class UpdateStatusThread(Thread): @@ -294,7 +334,8 @@ class UpdateStatusThread(Thread): Thread.start(self) def run(self): - while self.simulation.is_simulating: + print ("Wrong updatestatus") + while self.simulation.is_done is False: self.simulation.update_status() time.sleep(self.interval) @@ -320,24 +361,28 @@ class SimulationResource(object): def __str__(self): return self.host class LocalSimulationHost(SimulationResource): - def __init__(self, simulation): + def __init__(self, simulation, resource=None): SimulationResource.__init__(self, simulation) LocalResource.__init__(self, "hawc2mb") + self.resource = resource self.simulationThread = SimulationThread(self.sim) - def _prepare_simulation(self): + + def glob(self, path): + return glob.glob(path) + + def _prepare_simulation(self, input_files): # must be called through simulation object self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) - for src in self._input_sources(): - for src_file in glob.glob(os.path.join(self.modelpath, src)): - dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) - # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) #, exist_ok=True) - shutil.copy(src_file, dst) - if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: - print ("error copy ", dst) + for src_file in input_files: + dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) + # exist_ok does not exist in Python27 + if not os.path.exists(os.path.dirname(dst)): + os.makedirs(os.path.dirname(dst)) #, exist_ok=True) + shutil.copy(src_file, dst) + if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: + print ("error copy ", dst) if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')): os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True) @@ -355,17 +400,14 @@ class LocalSimulationHost(SimulationResource): self.errors.extend(list(set(self.logFile.errors))) - def _finish_simulation(self): - for dst in self._output_sources(): - src = os.path.join(self.tmp_modelpath, dst) - - for src_file in glob.glob(src): - dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) - # exist_ok does not exist in Python27 - if not os.path.isdir(os.path.dirname(dst_file)): - os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) - if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): - shutil.copy(src_file, dst_file) + def _finish_simulation(self, output_files): + for src_file in output_files: + dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) + # exist_ok does not exist in Python27 + if not os.path.isdir(os.path.dirname(dst_file)): + os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) + if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): + shutil.copy(src_file, dst_file) self.logFile.filename = os.path.join(self.modelpath, self.log_filename) @@ -378,8 +420,9 @@ class LocalSimulationHost(SimulationResource): self.logFile.update_status() def stop(self): - self.simulationThread.stop() - self.simulationThread.join() + if self.simulationThread.is_alive(): + self.simulationThread.stop() + self.simulationThread.join() @@ -394,11 +437,13 @@ class SimulationThread(Thread): def start(self): - CREATE_NO_WINDOW = 0x08000000 + #CREATE_NO_WINDOW = 0x08000000 modelpath = self.modelpath htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) hawc2exe = self.sim.hawc2exe stdout = self.sim.stdout_filename + if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)): + os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename)) if os.name == "nt": self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) else: @@ -407,38 +452,39 @@ class SimulationThread(Thread): def run(self): + import psutil p = psutil.Process(os.getpid()) if self.low_priority: p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) self.process.communicate() errorcode = self.process.returncode - if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)): - os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename)) + with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid: stdout = fid.read() self.res = errorcode, stdout def stop(self): - subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) + if hasattr(self, 'process'): + subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) class PBSClusterSimulationHost(SimulationResource, SSHClient): - def __init__(self, simulation, host, username, password, port=22): + def __init__(self, simulation, resource, host, username, password, port=22): SimulationResource.__init__(self, simulation) SSHClient.__init__(self, host, username, password, port=port) self.pbsjob = SSHPBSJob(host, username, password, port) + self.resource = resource hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) - def _prepare_simulation(self): + def _prepare_simulation(self, input_files): with self: self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) - for src in self._input_sources(): - for src_file in glob.glob(os.path.join(self.modelpath, src)): - dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/") + for src_file in input_files: + dst = unix_path(self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)) self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) self.upload(src_file, dst, verbose=False) ##assert self.ssh.file_exists(dst) @@ -446,23 +492,27 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): f = io.StringIO(self.pbsjobfile()) f.seek(0) self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) - self.execute("mkdir -p .hawc2launcher/%s/stdout" % self.simulation_id) + self.execute("mkdir -p .hawc2launcher/%s/%s" % (self.simulation_id, os.path.dirname(self.stdout_filename))) remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) self.execute("rm -f %s" % remote_log_filename) - def _finish_simulation(self): + def _finish_simulation(self, output_files): with self: - for dst in self._output_sources(): - - src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/") - for src_file in self.glob(src): + for src_file in output_files: + try: dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) os.makedirs(os.path.dirname(dst_file), exist_ok=True) self.download(src_file, dst_file, verbose=False) - self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) - self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) + except ValueError as e: + print (self.modelpath, src_file, self.tmp_modelpath) + raise e + try: + self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) + self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) + except: + pass def _simulate(self): @@ -475,10 +525,10 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): #self.__update_logFile_status() time.sleep(sleeptime) - local_out_file = self.modelpath + self.sim.stdout_filename + local_out_file = self.modelpath + self.stdout_filename with self: try: - self.download(self.tmp_modelpath + self.sim.stdout_filename, local_out_file) + self.download(self.tmp_modelpath + self.stdout_filename, local_out_file) with open(local_out_file) as fid: _, self.stdout, returncode_str, _ = fid.read().split("---------------------") self.returncode = returncode_str.strip() != "0" @@ -488,7 +538,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): try: self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) except Exception: - raise Exception ("Logfile not found") + raise Warning ("Logfile not found", self.tmp_modelpath + self.log_filename) self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) @@ -536,10 +586,10 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def pbsjobfile(self): cp_back = "" - for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): + for folder in set([unix_path(os.path.relpath(os.path.dirname(f))) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) - rel_htcfilename = os.path.relpath(self.htcFile.filename, self.modelpath).replace("\\", "/") + rel_htcfilename = unix_path(os.path.relpath(self.htcFile.filename, self.modelpath)) return """ ### Standard Output #PBS -N h2l_%s diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py index 804c6ed..162036f 100644 --- a/wetb/utils/cluster_tools/cluster_resource.py +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -3,16 +3,22 @@ Created on 04/04/2016 @author: MMPE ''' -from wetb.utils.cluster_tools.ssh_client import SSHClient -from wetb.utils.cluster_tools import pbswrap import multiprocessing +import threading + import psutil +from wetb.utils.cluster_tools import pbswrap +from wetb.utils.cluster_tools.ssh_client import SSHClient + + class Resource(object): def __init__(self, min_cpu, min_free): self.min_cpu = min_cpu self.min_free = min_free + self.acquired = 0 + self.lock = threading.Lock() def ok2submit(self): """Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus""" @@ -25,6 +31,13 @@ class Resource(object): else: return False + def acquire(self): + with self.lock: + self.acquired += 1 + + def release(self): + with self.lock: + self.acquired -= 1 @@ -32,35 +45,52 @@ class SSHPBSClusterResource(Resource, SSHClient): def __init__(self, host, username, password, port, min_cpu, min_free): Resource.__init__(self, min_cpu, min_free) SSHClient.__init__(self, host, username, password, port=port) + self.lock = threading.Lock() def new_ssh_connection(self): return SSHClient(self.host, self.username, self.password, self.port) def check_resources(self): - with self: - _, output, _ = self.execute('pbsnodes -l all') - pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n")) + with self.lock: + try: + with self: + _, output, _ = self.execute('pbsnodes -l all') + pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n")) + + _, output, _ = self.execute('qstat -n1') + users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n")) + - _, output, _ = self.execute('qstat -n1') - users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n")) + # if the user does not have any jobs, this will not exist + try: + cpu_user = users[self.username]['cpus'] + cpu_user += users[self.username]['Q'] + except KeyError: + cpu_user = 0 + cpu_user = max(cpu_user, self.acquired) + cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes) + return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user + except IOError as e: + raise e + except: + raise EnvironmentError("check resources failed") - # if the user does not have any jobs, this will not exist - try: - cpu_user = users[self.username]['cpus'] - cpu_user += users[self.username]['Q'] - except KeyError: - cpu_user = 0 - cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes) + def jobids(self, jobname_prefix): + _, output, _ = self.execute('qstat -u %s' % self.username) + return [l.split()[0].split(".")[0] for l in output.split("\n")[5:] if l.strip() != "" and l.split()[3].startswith("h2l")] - return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user + def stop_pbsjobs(self, jobids): + if not hasattr(jobids, "len"): + jobids = list(jobids) + self.execute("qdel %s" % (" ".join(jobids))) class LocalResource(Resource): def __init__(self, process_name): - N = max(1, multiprocessing.cpu_count() / 4) - Resource.__init__(self, N, N) + N = max(1, multiprocessing.cpu_count() / 2) + Resource.__init__(self, N, multiprocessing.cpu_count()) self.process_name = process_name self.host = 'Localhost' @@ -68,10 +98,10 @@ class LocalResource(Resource): def name(i): try: return psutil.Process(i).name - except psutil._error.AccessDenied: + except (psutil._error.AccessDenied, psutil._error.NoSuchProcess): return "" no_cpu = multiprocessing.cpu_count() - cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu + cpu_free = no_cpu - self.acquired #(1 - psutil.cpu_percent(.5) / 100) * no_cpu no_current_process = len([i for i in psutil.get_pid_list() if name(i).lower().startswith(self.process_name.lower())]) return no_cpu, cpu_free, no_current_process diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py index ea125bb..cdda5b5 100644 --- a/wetb/utils/cluster_tools/ssh_client.py +++ b/wetb/utils/cluster_tools/ssh_client.py @@ -13,7 +13,7 @@ class SSHClient(object): "A wrapper of paramiko.SSHClient" TIMEOUT = 4 - def __init__(self, host, username, password, port=22, key=None, passphrase=None): + def __init__(self, host, username, password=None, port=22, key=None, passphrase=None): self.host = host self.username = username self.password = password @@ -33,6 +33,8 @@ class SSHClient(object): self.connect() def connect(self): + if self.password is None: + raise IOError("Password not set") self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(self.host, self.port, username=self.username, password=self.password, pkey=self.key, timeout=self.TIMEOUT) @@ -136,7 +138,8 @@ class SSHClient(object): _, out, _ = self.execute(r'find %s -maxdepth 1 -type f -name "%s"' % (cwd, filepattern)) files = [] for file in out.strip().split("\n"): - files.append(file.strip()) + if file != "": + files.append(file.strip()) return files -- GitLab