diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index 8ab8af925d8bdb9e786ce83f5eb731b9a8b394dd..371cd514a0f077e1cbe46b95eca8bce59bcbf9a2 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -11,6 +11,7 @@ from wetb.utils.cluster_tools.pbsjob import PBSJob import io import time from wetb.utils.cluster_tools import pbsjob +from wetb.utils.cluster_tools.ssh_client import SSHClient standard_library.install_aliases() import os from wetb.hawc2.htc_file import HTCFile @@ -49,7 +50,7 @@ class Simulation(object): self.time_stop = self.htcFile.simulation.time_stop[0] self.copy_turbulence = True self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) - self.stdout_filename = "%s.out" % self.simulation_id + self.stdout_filename = "stdout/%s.out" % self.simulation_id if 'logfile' in self.htcFile.simulation: self.log_filename = self.htcFile.simulation.logfile[0] else: @@ -67,7 +68,7 @@ class Simulation(object): self.thread = Thread(target=self.simulate_distributed) self.hawc2exe = hawc2exe self.updateStatusThread = UpdateStatusThread(self) - self.resource = LocalSimulationResource(self) + self.host = LocalSimulationHost(self) def input_sources(self): def fmt(src): @@ -88,13 +89,14 @@ class Simulation(object): dst = dst.replace("\\", "/") assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst return dst - return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence]] + return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]] def prepare_simulation(self): self.status = PREPARING self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) - self.resource._prepare_simulation() + self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath) + self.host._prepare_simulation() def simulate(self): #starts blocking simulation @@ -103,12 +105,12 @@ class Simulation(object): self.status = INITIALIZING self.logFile.clear() - self.resource._simulate() + self.host._simulate() - if self.resource.returncode or 'error' in self.resource.stdout.lower(): - self.errors = (list(set([l for l in self.resource.stdout.split("\n") if 'error' in l.lower()]))) + if self.host.returncode or 'error' in self.host.stdout.lower(): + self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()]))) self.status = ERROR - if 'HAWC2MB version:' not in self.resource.stdout: + if 'HAWC2MB version:' not in self.host.stdout: self.errors.append(self.stdout) self.status = ERROR @@ -116,7 +118,7 @@ class Simulation(object): self.errors.extend(list(set(self.logFile.errors))) self.update_status() self.is_simulating = False - if self.resource.returncode or self.errors: + if self.host.returncode or self.errors: raise Exception("Simulation error:\n" + "\n".join(self.errors)) elif self.logFile.status != log_file.DONE or self.logFile.errors: raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % (self.logFile.status, "\n".join(self.logFile.errors))) @@ -130,13 +132,14 @@ class Simulation(object): if self.status == CLEANED: return if self.status != ERROR: self.status = CLEANED - self.resource._finish_simulation() + self.host._finish_simulation() + self.set_id(self.simulation_id) def update_status(self, *args, **kwargs): - self.resource.update_logFile_status() + self.host.update_logFile_status() if self.status in [INITIALIZING, SIMULATING]: if self.logFile.status == log_file.SIMULATING: self.status = SIMULATING @@ -233,7 +236,7 @@ class Simulation(object): self.update_status() def abort(self): - self.resource.stop() + self.host.stop() for _ in range(100): if self.is_simulating: break @@ -316,8 +319,9 @@ class SimulationResource(object): status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) - -class LocalSimulationResource(SimulationResource, LocalResource): + def __str__(self): + return self.host +class LocalSimulationHost(SimulationResource): def __init__(self, simulation): SimulationResource.__init__(self, simulation) LocalResource.__init__(self, "hawc2mb") @@ -326,6 +330,7 @@ class LocalSimulationResource(SimulationResource, LocalResource): def _prepare_simulation(self): # must be called through simulation object self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) + self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) for src in self.input_sources(): for src_file in glob.glob(os.path.join(self.modelpath, src)): dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) @@ -336,7 +341,8 @@ class LocalSimulationResource(SimulationResource, LocalResource): if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: print ("error copy ", dst) - + if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')): + os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True) self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) self.simulationThread.modelpath = self.tmp_modelpath @@ -358,14 +364,13 @@ class LocalSimulationResource(SimulationResource, LocalResource): for src_file in glob.glob(src): dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) # exist_ok does not exist in Python27 - if not os.path.exists(os.path.dirname(dst_file)): + if not os.path.isdir(os.path.dirname(dst_file)): os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): shutil.copy(src_file, dst_file) self.logFile.filename = os.path.join(self.modelpath, self.log_filename) - try: shutil.rmtree(self.tmp_modelpath) except (PermissionError, OSError) as e: @@ -380,10 +385,10 @@ class LocalSimulationResource(SimulationResource, LocalResource): -class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): +class PBSClusterSimulationHost(SimulationResource, SSHClient): def __init__(self, simulation, host, username, password, port=22): SimulationResource.__init__(self, simulation) - PBSClusterResource.__init__(self, host, username, password, port) + SSHClient.__init__(self, host, username, password, port=port) self.pbsjob = PBSJob(self) hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) @@ -410,37 +415,32 @@ class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): def _finish_simulation(self): - with self.ssh: + with self: for dst in self.output_sources(): src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/") - for src_file in self.ssh.glob(src): + for src_file in self.glob(src): dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) os.makedirs(os.path.dirname(dst_file), exist_ok=True) - self.ssh.download(src_file, dst_file, verbose=False) - self.ssh.execute('rm -r .hawc2launcher/%s' % self.simulation_id) - self.ssh.execute('rm .hawc2launcher/status_%s' % self.simulation_id) - print ("finish3", self.simulation_id) + self.download(src_file, dst_file, verbose=False) + self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) + self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) def _simulate(self): """starts blocking simulation""" - print ("simulate1", self.simulation_id) self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") - pbs_out_file = "%s.out" % self.simulation_id - self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , pbs_out_file) + self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , self.sim.stdout_filename) sleeptime = 1 - print ("simulate2", self.simulation_id) while self.is_simulating: #self.__update_logFile_status() time.sleep(sleeptime) - print ("simulate3", self.simulation_id) - local_out_file = self.modelpath + os.path.splitext(self.log_filename)[0] + ".out" - with self.ssh: + local_out_file = self.modelpath + self.sim.stdout_filename + with self: try: - self.ssh.download(self.tmp_modelpath + pbs_out_file, local_out_file) + self.download(self.tmp_modelpath + self.sim.stdout_filename, local_out_file) with open(local_out_file) as fid: _, self.stdout, returncode_str, _ = fid.read().split("---------------------") self.returncode = returncode_str.strip() != "0" @@ -448,7 +448,7 @@ class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): self.returncode = 1 self.stdout = "error: Could not download and read stdout file" try: - self.ssh.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) + self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) except Exception: raise Exception ("Logfile not found") self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py index dac06a1c8fcffe5a8350ebe5226f718da574e77f..78bc9eb9605f33e2133108500b98044201da71a9 100644 --- a/wetb/utils/cluster_tools/cluster_resource.py +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -9,15 +9,29 @@ import multiprocessing import psutil class Resource(object): - pass + + def __init__(self, min_cpu, min_free): + self.min_cpu = min_cpu + self.min_free = min_free + + def ok2submit(self): + """Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus""" + total, free, user = self.check_resources() + + if user < self.min_cpu: + return True + elif free > self.min_free: + return True + else: + return False class PBSClusterResource(Resource, SSHClient): - def __init__(self, host, username, password, port=22): + def __init__(self, host, username, password, port, min_cpu, min_free): + Resource.__init__(self, min_cpu, min_free) SSHClient.__init__(self, host, username, password, port=port) - self.no_users = 20 def new_ssh_connection(self): return SSHClient(self.host, self.username, self.password, self.port) @@ -41,21 +55,13 @@ class PBSClusterResource(Resource, SSHClient): return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user - def ok2submit(self): - total, free, user = self.check_resources() - minimum_cpus = total * 1 / self.no_users - if user < minimum_cpus: - return True - elif free > minimum_cpus * 4: - return True - else: - return False class LocalResource(Resource): def __init__(self, process_name): + N = max(1, multiprocessing.cpu_count() / 4) + Resource.__init__(self, N, N) self.process_name = process_name - self.no_users = 1 self.host = 'Localhost' def check_resources(self): @@ -65,19 +71,7 @@ class LocalResource(Resource): except psutil._error.AccessDenied: pass - no_cpu = multiprocessing.cpu_count() cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu no_current_process = len([i for i in psutil.get_pid_list() if name(i) == self.process_name]) return no_cpu, cpu_free, no_current_process - - def ok2submit(self): - - total, free, user = self.check_resources() - minimum_cpus = total * 1 / self.no_users - if user < minimum_cpus and free > 2: - return True - else: - return False - -