Skip to content
Snippets Groups Projects
Commit 677d5205 authored by mads's avatar mads
Browse files

more updates

parent 3424a914
No related branches found
No related tags found
No related merge requests found
...@@ -11,6 +11,7 @@ from wetb.utils.cluster_tools.pbsjob import PBSJob ...@@ -11,6 +11,7 @@ from wetb.utils.cluster_tools.pbsjob import PBSJob
import io import io
import time import time
from wetb.utils.cluster_tools import pbsjob from wetb.utils.cluster_tools import pbsjob
from wetb.utils.cluster_tools.ssh_client import SSHClient
standard_library.install_aliases() standard_library.install_aliases()
import os import os
from wetb.hawc2.htc_file import HTCFile from wetb.hawc2.htc_file import HTCFile
...@@ -49,7 +50,7 @@ class Simulation(object): ...@@ -49,7 +50,7 @@ class Simulation(object):
self.time_stop = self.htcFile.simulation.time_stop[0] self.time_stop = self.htcFile.simulation.time_stop[0]
self.copy_turbulence = True self.copy_turbulence = True
self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self)
self.stdout_filename = "%s.out" % self.simulation_id self.stdout_filename = "stdout/%s.out" % self.simulation_id
if 'logfile' in self.htcFile.simulation: if 'logfile' in self.htcFile.simulation:
self.log_filename = self.htcFile.simulation.logfile[0] self.log_filename = self.htcFile.simulation.logfile[0]
else: else:
...@@ -67,7 +68,7 @@ class Simulation(object): ...@@ -67,7 +68,7 @@ class Simulation(object):
self.thread = Thread(target=self.simulate_distributed) self.thread = Thread(target=self.simulate_distributed)
self.hawc2exe = hawc2exe self.hawc2exe = hawc2exe
self.updateStatusThread = UpdateStatusThread(self) self.updateStatusThread = UpdateStatusThread(self)
self.resource = LocalSimulationResource(self) self.host = LocalSimulationHost(self)
def input_sources(self): def input_sources(self):
def fmt(src): def fmt(src):
...@@ -88,13 +89,14 @@ class Simulation(object): ...@@ -88,13 +89,14 @@ class Simulation(object):
dst = dst.replace("\\", "/") dst = dst.replace("\\", "/")
assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst
return dst return dst
return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence]] return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]]
def prepare_simulation(self): def prepare_simulation(self):
self.status = PREPARING self.status = PREPARING
self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id)
self.resource._prepare_simulation() self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath)
self.host._prepare_simulation()
def simulate(self): def simulate(self):
#starts blocking simulation #starts blocking simulation
...@@ -103,12 +105,12 @@ class Simulation(object): ...@@ -103,12 +105,12 @@ class Simulation(object):
self.status = INITIALIZING self.status = INITIALIZING
self.logFile.clear() self.logFile.clear()
self.resource._simulate() self.host._simulate()
if self.resource.returncode or 'error' in self.resource.stdout.lower(): if self.host.returncode or 'error' in self.host.stdout.lower():
self.errors = (list(set([l for l in self.resource.stdout.split("\n") if 'error' in l.lower()]))) self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()])))
self.status = ERROR self.status = ERROR
if 'HAWC2MB version:' not in self.resource.stdout: if 'HAWC2MB version:' not in self.host.stdout:
self.errors.append(self.stdout) self.errors.append(self.stdout)
self.status = ERROR self.status = ERROR
...@@ -116,7 +118,7 @@ class Simulation(object): ...@@ -116,7 +118,7 @@ class Simulation(object):
self.errors.extend(list(set(self.logFile.errors))) self.errors.extend(list(set(self.logFile.errors)))
self.update_status() self.update_status()
self.is_simulating = False self.is_simulating = False
if self.resource.returncode or self.errors: if self.host.returncode or self.errors:
raise Exception("Simulation error:\n" + "\n".join(self.errors)) raise Exception("Simulation error:\n" + "\n".join(self.errors))
elif self.logFile.status != log_file.DONE or self.logFile.errors: elif self.logFile.status != log_file.DONE or self.logFile.errors:
raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % (self.logFile.status, "\n".join(self.logFile.errors))) raise Warning("Simulation succeded with errors:\nLog status:%s\nErrors:\n%s" % (self.logFile.status, "\n".join(self.logFile.errors)))
...@@ -130,13 +132,14 @@ class Simulation(object): ...@@ -130,13 +132,14 @@ class Simulation(object):
if self.status == CLEANED: return if self.status == CLEANED: return
if self.status != ERROR: if self.status != ERROR:
self.status = CLEANED self.status = CLEANED
self.resource._finish_simulation() self.host._finish_simulation()
self.set_id(self.simulation_id)
def update_status(self, *args, **kwargs): def update_status(self, *args, **kwargs):
self.resource.update_logFile_status() self.host.update_logFile_status()
if self.status in [INITIALIZING, SIMULATING]: if self.status in [INITIALIZING, SIMULATING]:
if self.logFile.status == log_file.SIMULATING: if self.logFile.status == log_file.SIMULATING:
self.status = SIMULATING self.status = SIMULATING
...@@ -233,7 +236,7 @@ class Simulation(object): ...@@ -233,7 +236,7 @@ class Simulation(object):
self.update_status() self.update_status()
def abort(self): def abort(self):
self.resource.stop() self.host.stop()
for _ in range(100): for _ in range(100):
if self.is_simulating: if self.is_simulating:
break break
...@@ -316,8 +319,9 @@ class SimulationResource(object): ...@@ -316,8 +319,9 @@ class SimulationResource(object):
status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v))
is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v)) is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v))
def __str__(self):
class LocalSimulationResource(SimulationResource, LocalResource): return self.host
class LocalSimulationHost(SimulationResource):
def __init__(self, simulation): def __init__(self, simulation):
SimulationResource.__init__(self, simulation) SimulationResource.__init__(self, simulation)
LocalResource.__init__(self, "hawc2mb") LocalResource.__init__(self, "hawc2mb")
...@@ -326,6 +330,7 @@ class LocalSimulationResource(SimulationResource, LocalResource): ...@@ -326,6 +330,7 @@ class LocalSimulationResource(SimulationResource, LocalResource):
def _prepare_simulation(self): def _prepare_simulation(self):
# must be called through simulation object # must be called through simulation object
self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath)
self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath)
for src in self.input_sources(): for src in self.input_sources():
for src_file in glob.glob(os.path.join(self.modelpath, src)): for src_file in glob.glob(os.path.join(self.modelpath, src)):
dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath))
...@@ -336,7 +341,8 @@ class LocalSimulationResource(SimulationResource, LocalResource): ...@@ -336,7 +341,8 @@ class LocalSimulationResource(SimulationResource, LocalResource):
if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size:
print ("error copy ", dst) print ("error copy ", dst)
if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')):
os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True)
self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename) self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename)
self.simulationThread.modelpath = self.tmp_modelpath self.simulationThread.modelpath = self.tmp_modelpath
...@@ -358,14 +364,13 @@ class LocalSimulationResource(SimulationResource, LocalResource): ...@@ -358,14 +364,13 @@ class LocalSimulationResource(SimulationResource, LocalResource):
for src_file in glob.glob(src): for src_file in glob.glob(src):
dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath))
# exist_ok does not exist in Python27 # exist_ok does not exist in Python27
if not os.path.exists(os.path.dirname(dst_file)): if not os.path.isdir(os.path.dirname(dst_file)):
os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True) os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True)
if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file): if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file):
shutil.copy(src_file, dst_file) shutil.copy(src_file, dst_file)
self.logFile.filename = os.path.join(self.modelpath, self.log_filename) self.logFile.filename = os.path.join(self.modelpath, self.log_filename)
try: try:
shutil.rmtree(self.tmp_modelpath) shutil.rmtree(self.tmp_modelpath)
except (PermissionError, OSError) as e: except (PermissionError, OSError) as e:
...@@ -380,10 +385,10 @@ class LocalSimulationResource(SimulationResource, LocalResource): ...@@ -380,10 +385,10 @@ class LocalSimulationResource(SimulationResource, LocalResource):
class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): class PBSClusterSimulationHost(SimulationResource, SSHClient):
def __init__(self, simulation, host, username, password, port=22): def __init__(self, simulation, host, username, password, port=22):
SimulationResource.__init__(self, simulation) SimulationResource.__init__(self, simulation)
PBSClusterResource.__init__(self, host, username, password, port) SSHClient.__init__(self, host, username, password, port=port)
self.pbsjob = PBSJob(self) self.pbsjob = PBSJob(self)
hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe))
...@@ -410,37 +415,32 @@ class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): ...@@ -410,37 +415,32 @@ class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource):
def _finish_simulation(self): def _finish_simulation(self):
with self.ssh: with self:
for dst in self.output_sources(): for dst in self.output_sources():
src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/") src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/")
for src_file in self.ssh.glob(src): for src_file in self.glob(src):
dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath))
os.makedirs(os.path.dirname(dst_file), exist_ok=True) os.makedirs(os.path.dirname(dst_file), exist_ok=True)
self.ssh.download(src_file, dst_file, verbose=False) self.download(src_file, dst_file, verbose=False)
self.ssh.execute('rm -r .hawc2launcher/%s' % self.simulation_id) self.execute('rm -r .hawc2launcher/%s' % self.simulation_id)
self.ssh.execute('rm .hawc2launcher/status_%s' % self.simulation_id) self.execute('rm .hawc2launcher/status_%s' % self.simulation_id)
print ("finish3", self.simulation_id)
def _simulate(self): def _simulate(self):
"""starts blocking simulation""" """starts blocking simulation"""
print ("simulate1", self.simulation_id)
self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "") self.sim.logFile = LogInfo(log_file.MISSING, 0, "None", "")
pbs_out_file = "%s.out" % self.simulation_id self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , self.sim.stdout_filename)
self.pbsjob.submit("%s.in" % self.simulation_id, self.tmp_modelpath , pbs_out_file)
sleeptime = 1 sleeptime = 1
print ("simulate2", self.simulation_id)
while self.is_simulating: while self.is_simulating:
#self.__update_logFile_status() #self.__update_logFile_status()
time.sleep(sleeptime) time.sleep(sleeptime)
print ("simulate3", self.simulation_id) local_out_file = self.modelpath + self.sim.stdout_filename
local_out_file = self.modelpath + os.path.splitext(self.log_filename)[0] + ".out" with self:
with self.ssh:
try: try:
self.ssh.download(self.tmp_modelpath + pbs_out_file, local_out_file) self.download(self.tmp_modelpath + self.sim.stdout_filename, local_out_file)
with open(local_out_file) as fid: with open(local_out_file) as fid:
_, self.stdout, returncode_str, _ = fid.read().split("---------------------") _, self.stdout, returncode_str, _ = fid.read().split("---------------------")
self.returncode = returncode_str.strip() != "0" self.returncode = returncode_str.strip() != "0"
...@@ -448,7 +448,7 @@ class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource): ...@@ -448,7 +448,7 @@ class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource):
self.returncode = 1 self.returncode = 1
self.stdout = "error: Could not download and read stdout file" self.stdout = "error: Could not download and read stdout file"
try: try:
self.ssh.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename)
except Exception: except Exception:
raise Exception ("Logfile not found") raise Exception ("Logfile not found")
self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath)
......
...@@ -9,15 +9,29 @@ import multiprocessing ...@@ -9,15 +9,29 @@ import multiprocessing
import psutil import psutil
class Resource(object): class Resource(object):
pass
def __init__(self, min_cpu, min_free):
self.min_cpu = min_cpu
self.min_free = min_free
def ok2submit(self):
"""Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus"""
total, free, user = self.check_resources()
if user < self.min_cpu:
return True
elif free > self.min_free:
return True
else:
return False
class PBSClusterResource(Resource, SSHClient): class PBSClusterResource(Resource, SSHClient):
def __init__(self, host, username, password, port=22): def __init__(self, host, username, password, port, min_cpu, min_free):
Resource.__init__(self, min_cpu, min_free)
SSHClient.__init__(self, host, username, password, port=port) SSHClient.__init__(self, host, username, password, port=port)
self.no_users = 20
def new_ssh_connection(self): def new_ssh_connection(self):
return SSHClient(self.host, self.username, self.password, self.port) return SSHClient(self.host, self.username, self.password, self.port)
...@@ -41,21 +55,13 @@ class PBSClusterResource(Resource, SSHClient): ...@@ -41,21 +55,13 @@ class PBSClusterResource(Resource, SSHClient):
return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user
def ok2submit(self):
total, free, user = self.check_resources()
minimum_cpus = total * 1 / self.no_users
if user < minimum_cpus:
return True
elif free > minimum_cpus * 4:
return True
else:
return False
class LocalResource(Resource): class LocalResource(Resource):
def __init__(self, process_name): def __init__(self, process_name):
N = max(1, multiprocessing.cpu_count() / 4)
Resource.__init__(self, N, N)
self.process_name = process_name self.process_name = process_name
self.no_users = 1
self.host = 'Localhost' self.host = 'Localhost'
def check_resources(self): def check_resources(self):
...@@ -65,19 +71,7 @@ class LocalResource(Resource): ...@@ -65,19 +71,7 @@ class LocalResource(Resource):
except psutil._error.AccessDenied: except psutil._error.AccessDenied:
pass pass
no_cpu = multiprocessing.cpu_count() no_cpu = multiprocessing.cpu_count()
cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu
no_current_process = len([i for i in psutil.get_pid_list() if name(i) == self.process_name]) no_current_process = len([i for i in psutil.get_pid_list() if name(i) == self.process_name])
return no_cpu, cpu_free, no_current_process return no_cpu, cpu_free, no_current_process
def ok2submit(self):
total, free, user = self.check_resources()
minimum_cpus = total * 1 / self.no_users
if user < minimum_cpus and free > 2:
return True
else:
return False
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment