diff --git a/wetb/hawc2/htc_contents.py b/wetb/hawc2/htc_contents.py index 54c338b47826883c96a64e3db4fa055c6593e2ba..7bbe2c4548aec27ad8c6ca1f6fd633f83465ed5f 100644 --- a/wetb/hawc2/htc_contents.py +++ b/wetb/hawc2/htc_contents.py @@ -351,7 +351,7 @@ class HTCDefaults(object): else: mann.add_line('create_turb_parameters', [L, ae23, Gamma, seed, int(high_frq_compensation)], "L, alfaeps, gamma, seed, highfrq compensation") if filenames is None: - fmt = "l%.1f_ae%.2f_g%.1f_h%d_%dx%dx%d_%.3fx%.2fx%.2f_s%04d%c.turb" + fmt = "mann_l%.1f_ae%.2f_g%.1f_h%d_%dx%dx%d_%.3fx%.2fx%.2f_s%04d%c.turb" import numpy as np dxyz = tuple(np.array(box_dimension) / no_grid_points) filenames = ["./turb/" + fmt % ((L, ae23, Gamma, high_frq_compensation) + no_grid_points + dxyz + (seed, uvw)) for uvw in ['u', 'v', 'w']] diff --git a/wetb/hawc2/htc_file.py b/wetb/hawc2/htc_file.py index 10eb514ce850b58647367475c58d960550e30bd7..0a58d861b490e8f8865517e9de4ea8a653891e12 100644 --- a/wetb/hawc2/htc_file.py +++ b/wetb/hawc2/htc_file.py @@ -146,8 +146,10 @@ class HTCFile(HTCContents, HTCDefaults): if 'soil' in self: if 'soil_element' in self.soil: files.append(self.soil.soil_element.get('datafile', [None])[0]) - if 'force' in self: - files.append(self.force.get('dll', [None])[0]) + try: + files.append(self.force.dll.dll[0]) + except: + pass return [f for f in set(files) if f] diff --git a/wetb/hawc2/log_file.py b/wetb/hawc2/log_file.py index 12dac0ab0be8b0bdda5acd154a733ca999fd9ff1..0316f709232bf56a7f97e26dbd1a01bc17ba57e2 100644 --- a/wetb/hawc2/log_file.py +++ b/wetb/hawc2/log_file.py @@ -101,7 +101,7 @@ class LogInterpreter(object): i1 = simulation_txt.rfind("Global time") if i1 > -1: self.current_time = self.extract_time(simulation_txt[i1:]) - if self.time_stop > 0: + if self.current_time is not None and self.time_stop > 0: self.pct = int(100 * self.current_time // self.time_stop) try: self.remaining_time = (time.time() - self.start_time[1]) / (self.current_time - self.start_time[0]) * (self.time_stop - self.current_time) @@ -123,11 +123,11 @@ class LogInterpreter(object): if self.remaining_time: if self.remaining_time < 3600: m, s = divmod(self.remaining_time, 60) - return "%02d:%02d" % (m, math.ceil(s)) + return "%02d:%02d" % (m, int(s)) else: h, ms = divmod(self.remaining_time, 3600) m, s = divmod(ms, 60) - return "%d:%02d:%02d" % (h, m, math.ceil(s)) + return "%d:%02d:%02d" % (h, m, int(s)) else: return "--:--" diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index aae4a60932023bf471ce231ad0806dc34101fc11..9874f3e81f7e950ed6ae5a995dcfd2cdb9d727f2 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -23,8 +23,9 @@ from future import standard_library from wetb.utils.cluster_tools import pbsjob from wetb.utils.cluster_tools.cluster_resource import LocalResource -from wetb.utils.cluster_tools.pbsjob import SSHPBSJob +from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, DONE, NOT_SUBMITTED from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.timing import print_time standard_library.install_aliases() from threading import Thread @@ -98,7 +99,6 @@ class Simulation(object): else: self.log_filename = os.path.relpath(self.log_filename) self.log_filename = unix_path(self.log_filename) - self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile.clear() self.last_status = self.status @@ -129,6 +129,8 @@ class Simulation(object): time.sleep(0.1) if self.logFile.status not in [log_file.DONE]: self.status = ABORTED + self.is_simulating = False + self.is_done = True if update_status: self.update_status() @@ -191,6 +193,7 @@ class Simulation(object): self.status = INITIALIZING self.logFile.clear() self.host._simulate() + self.returncode, self.stdout = self.host.returncode, self.host.stdout if self.host.returncode or 'error' in self.host.stdout.lower(): if "error" in self.host.stdout.lower(): self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()]))) @@ -244,6 +247,7 @@ class Simulation(object): self.status = FINISH + def __str__(self): return "Simulation(%s)" % self.filename @@ -335,10 +339,12 @@ class UpdateStatusThread(Thread): Thread.start(self) def run(self): - print ("Wrong updatestatus") while self.simulation.is_done is False: self.simulation.update_status() - time.sleep(self.interval) + time.sleep(0.5) + t = time.time() + while self.simulation.is_simulating and time.time() < t + self.interval: + time.sleep(1) class SimulationResource(object): @@ -455,8 +461,11 @@ class SimulationThread(Thread): def run(self): import psutil p = psutil.Process(os.getpid()) - if self.low_priority: - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + try: + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + except: + pass self.process.communicate() errorcode = self.process.returncode @@ -473,7 +482,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def __init__(self, simulation, resource, host, username, password, port=22): SimulationResource.__init__(self, simulation) SSHClient.__init__(self, host, username, password, port=port) - self.pbsjob = SSHPBSJob(host, username, password, port) + self.pbsjob = SSHPBSJob(resource.shared_ssh) self.resource = resource hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) @@ -505,8 +514,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): try: dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) os.makedirs(os.path.dirname(dst_file), exist_ok=True) - self.download(src_file, dst_file, verbose=False) - except ValueError as e: + self.download(src_file, dst_file, retry=3) + except Exception as e: print (self.modelpath, src_file, self.tmp_modelpath) raise e try: @@ -545,25 +554,29 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def update_logFile_status(self): - status = self.pbsjob.status + def pbsjob_status(): + if self.pbsjob._status in [NOT_SUBMITTED, DONE]: + return self.pbsjob._status + if self.pbsjob.jobid in self.resource.is_executing: + self.pbsjob._status = pbsjob.RUNNING + elif self.simulation_id in self.resource.finished: + self.pbsjob._status = DONE + self.pbsjob.jobid = None + return self.pbsjob._status + + + status = pbsjob_status() if status == pbsjob.NOT_SUBMITTED: pass elif status == pbsjob.DONE: self.is_simulating = False pass else: - try: - _, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id) - out = out.split(";") - if len(out) == 5: - self.status = out[0] - self.logFile = LogInfo(*out[1:]) - - except Exception as e: - if "No such file or directory" in str(e): - pass - else: - raise + if self.simulation_id in self.resource.loglines: + self.logline = self.resource.loglines[self.simulation_id] + self.status = self.logline[0] + self.logFile = LogInfo(*self.logline[1:]) + def start(self): """Start non blocking distributed simulation""" @@ -578,6 +591,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): self.finish_simulation() except: pass + self.is_simulating = False + self.is_done = True if self.status != ERROR and self.logFile.status not in [log_file.DONE]: self.status = ABORTED diff --git a/wetb/hawc2/tests/test_htc_file.py b/wetb/hawc2/tests/test_htc_file.py index f0ecc3d0611190d02287d5cd8cf53f6dfa18aee6..e852d6256d2292c577ccd68a9b9c0f617a6834de 100644 --- a/wetb/hawc2/tests/test_htc_file.py +++ b/wetb/hawc2/tests/test_htc_file.py @@ -118,9 +118,9 @@ class TestHtcFile(unittest.TestCase): htcfile.add_mann_turbulence(30.1, 1.1, 3.3, 102, False) s = """begin mann; create_turb_parameters\t30.1 1.1 3.3 102 0;\tL, alfaeps, gamma, seed, highfrq compensation - filename_u\t./turb/l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102u.turb; - filename_v\t./turb/l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102v.turb; - filename_w\t./turb/l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102w.turb; + filename_u\t./turb/mann_l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102u.turb; + filename_v\t./turb/mann_l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102v.turb; + filename_w\t./turb/mann_l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102w.turb; box_dim_u\t4096 1.4652; box_dim_v\t32 3.2258; box_dim_w\t32 3.2258; diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py index 162036fd95edafba7bd1e7e45a1d961b9fbefca1..ebdf6f4aece80b05aa114b39ab516ea5bb76e989 100644 --- a/wetb/utils/cluster_tools/cluster_resource.py +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -9,7 +9,9 @@ import threading import psutil from wetb.utils.cluster_tools import pbswrap -from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.cluster_tools.ssh_client import SSHClient, SharedSSHClient +from _collections import deque +import time class Resource(object): @@ -44,9 +46,11 @@ class Resource(object): class SSHPBSClusterResource(Resource, SSHClient): def __init__(self, host, username, password, port, min_cpu, min_free): Resource.__init__(self, min_cpu, min_free) + self.shared_ssh = SharedSSHClient(host, username, password, port) SSHClient.__init__(self, host, username, password, port=port) self.lock = threading.Lock() + def new_ssh_connection(self): return SSHClient(self.host, self.username, self.password, self.port) @@ -87,6 +91,8 @@ class SSHPBSClusterResource(Resource, SSHClient): + + class LocalResource(Resource): def __init__(self, process_name): N = max(1, multiprocessing.cpu_count() / 2) @@ -97,11 +103,11 @@ class LocalResource(Resource): def check_resources(self): def name(i): try: - return psutil.Process(i).name - except (psutil._error.AccessDenied, psutil._error.NoSuchProcess): + return psutil.Process(i).name() + except (psutil.AccessDenied, psutil.NoSuchProcess): return "" no_cpu = multiprocessing.cpu_count() - cpu_free = no_cpu - self.acquired #(1 - psutil.cpu_percent(.5) / 100) * no_cpu - no_current_process = len([i for i in psutil.get_pid_list() if name(i).lower().startswith(self.process_name.lower())]) - return no_cpu, cpu_free, no_current_process + cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu + no_current_process = len([i for i in psutil.pids() if name(i).lower().startswith(self.process_name.lower())]) + return no_cpu, cpu_free, self.acquired diff --git a/wetb/utils/cluster_tools/pbsjob.py b/wetb/utils/cluster_tools/pbsjob.py index 7b6d6f8e8827eecad29c19fab91f6e3ca280577c..901c00bc62eea095b078c05d4356c269cbf4a63b 100644 --- a/wetb/utils/cluster_tools/pbsjob.py +++ b/wetb/utils/cluster_tools/pbsjob.py @@ -12,14 +12,15 @@ RUNNING = "Running" DONE = "Done" -class SSHPBSJob(SSHClient): +class SSHPBSJob(object): _status = NOT_SUBMITTED nodeid = None jobid = None - def __init__(self, host, username, password, port=22): - SSHClient.__init__(self, host, username, password, port=port) + def __init__(self, sshClient): + self.ssh = sshClient + def submit(self, job, cwd, pbs_out_file): self.cwd = cwd @@ -31,25 +32,27 @@ class SSHPBSJob(SSHClient): if cwd != "": cmds.append("cd %s" % cwd) cmds.append("qsub %s" % job) - _, out, _ = self.execute(";".join(cmds)) + ssh = SSHClient(self.ssh.host, self.ssh.username, self.ssh.password, self.ssh.port) + _, out, _ = ssh.execute(";".join(cmds)) self.jobid = out.split(".")[0] self._status = PENDING @property def status(self): + if self._status in [NOT_SUBMITTED, DONE]: return self._status - with self: + with self.ssh: if self.is_executing(): self._status = RUNNING - elif self.file_exists(self.pbs_out_file): + elif self.ssh.file_exists(self.pbs_out_file): self._status = DONE self.jobid = None return self._status def get_nodeid(self): try: - _, out, _ = self.execute("qstat -f %s | grep exec_host" % self.jobid) + _, out, _ = self.ssh.execute("qstat -f %s | grep exec_host" % self.jobid) return out.strip().replace("exec_host = ", "").split(".")[0] except Warning as e: if 'qstat: Unknown Job Id' in str(e): @@ -59,7 +62,7 @@ class SSHPBSJob(SSHClient): def stop(self): if self.jobid: try: - self.execute("qdel %s" % self.jobid) + self.ssh.execute("qdel %s" % self.jobid) except Warning as e: if 'qdel: Unknown Job Id' in str(e): return @@ -68,7 +71,7 @@ class SSHPBSJob(SSHClient): def is_executing(self): try: - self.execute("qstat %s" % self.jobid) + self.ssh.execute("qstat %s" % self.jobid) return True except Warning as e: if 'qstat: Unknown Job Id' in str(e): diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py index cdda5b515e55fb9a73279d7dc122f93a091ed0c5..97d6ed8d8e5e968c5205c9eb0d3d849cfe412046 100644 --- a/wetb/utils/cluster_tools/ssh_client.py +++ b/wetb/utils/cluster_tools/ssh_client.py @@ -8,6 +8,10 @@ from io import StringIO import paramiko import os import sys +import threading +from _collections import deque +import time +import traceback class SSHClient(object): "A wrapper of paramiko.SSHClient" @@ -31,6 +35,7 @@ class SSHClient(object): self.disconnect += 1 if self.client is None: self.connect() + return self.client def connect(self): if self.password is None: @@ -50,14 +55,20 @@ class SSHClient(object): self.close() - def download(self, remotefilepath, localfile, verbose=False): + def download(self, remotefilepath, localfile, verbose=False, retry=1): if verbose: print ("Download %s > %s" % (remotefilepath, str(localfile))) with self: - if isinstance(localfile, (str, bytes, int)): - ret = self.sftp.get(remotefilepath, localfile) - elif hasattr(localfile, 'write'): - ret = self.sftp.putfo(remotefilepath, localfile) + for i in range(retry): + try: + if isinstance(localfile, (str, bytes, int)): + ret = self.sftp.get(remotefilepath, localfile) + elif hasattr(localfile, 'write'): + ret = self.sftp.putfo(remotefilepath, localfile) + break + except: + pass + print ("retry", i) if verbose: print (ret) @@ -96,8 +107,12 @@ class SSHClient(object): if verbose: print (">>> " + command) - with self: - stdin, stdout, stderr = self.client.exec_command(command) + with self as ssh: + if ssh is None: + exc_info = sys.exc_info() + traceback.print_exception(*exc_info) + raise Exception("ssh_client exe ssh is NOne") + stdin, stdout, stderr = ssh.exec_command(command) if feed_password: stdin.write(self.password + "\n") stdin.flush() @@ -143,6 +158,39 @@ class SSHClient(object): return files +class SharedSSHClient(SSHClient): + def __init__(self, host, username, password=None, port=22, key=None, passphrase=None): + SSHClient.__init__(self, host, username, password=password, port=port, key=key, passphrase=passphrase) + self.shared_ssh_lock = threading.RLock() + self.shared_ssh_queue = deque() + self.next = None + + + def execute(self, command, sudo=False, verbose=False): + res = SSHClient.execute(self, command, sudo=sudo, verbose=verbose) + return res + + def __enter__(self): + with self.shared_ssh_lock: + if self.next == threading.currentThread(): + return self.client + self.shared_ssh_queue.append(threading.current_thread()) + if self.next is None: + self.next = self.shared_ssh_queue.popleft() + + while self.next != threading.currentThread(): + time.sleep(1) + return self.client + + def __exit__(self, *args): + with self.shared_ssh_lock: + if next != threading.current_thread(): + with self.shared_ssh_lock: + if len(self.shared_ssh_queue) > 0: + self.next = self.shared_ssh_queue.popleft() + else: + self.next = None + if __name__ == "__main__": from mmpe.ui.qt_ui import QtInputUI q = QtInputUI(None)