Skip to content
Snippets Groups Projects
Commit 591d1222 authored by mads's avatar mads
Browse files

some changes

parent dd1bd353
No related branches found
No related tags found
No related merge requests found
...@@ -171,7 +171,7 @@ class HTCLine(HTCContents): ...@@ -171,7 +171,7 @@ class HTCLine(HTCContents):
("", "\t" + self.str_values())[bool(self.values)], ("", "\t" + self.str_values())[bool(self.values)],
("", "\t" + self.comments)[bool(self.comments.strip())]) ("", "\t" + self.comments)[bool(self.comments.strip())])
def str_values(self): def str_values(self):
return " ".join([str(v) for v in self.values]) return " ".join([str(v).lower() for v in self.values])
def __getitem__(self, key): def __getitem__(self, key):
return self.values[key] return self.values[key]
...@@ -351,7 +351,10 @@ class HTCDefaults(object): ...@@ -351,7 +351,10 @@ class HTCDefaults(object):
else: else:
mann.add_line('create_turb_parameters', [L, ae23, Gamma, seed, int(high_frq_compensation)], "L, alfaeps, gamma, seed, highfrq compensation") mann.add_line('create_turb_parameters', [L, ae23, Gamma, seed, int(high_frq_compensation)], "L, alfaeps, gamma, seed, highfrq compensation")
if filenames is None: if filenames is None:
filenames = ["./turb/turb_wsp%d_s%04d%s.bin" % (self.wind.wsp[0], seed, c) for c in ['u', 'v', 'w']] fmt = "l%.1f_ae%.2f_g%.1f_h%d_%dx%dx%d_%.3fx%.2fx%.2f_s%04d%c.turb"
import numpy as np
dxyz = tuple(np.array(box_dimension) / no_grid_points)
filenames = ["./turb/" + fmt % ((L, ae23, Gamma, high_frq_compensation) + no_grid_points + dxyz + (seed, uvw)) for uvw in ['u', 'v', 'w']]
if isinstance(filenames, str): if isinstance(filenames, str):
filenames = ["./turb/%s_s%04d%s.bin" % (filenames, seed, c) for c in ['u', 'v', 'w']] filenames = ["./turb/%s_s%04d%s.bin" % (filenames, seed, c) for c in ['u', 'v', 'w']]
for filename, c in zip(filenames, ['u', 'v', 'w']): for filename, c in zip(filenames, ['u', 'v', 'w']):
......
...@@ -24,6 +24,14 @@ from copy import copy ...@@ -24,6 +24,14 @@ from copy import copy
class HTCFile(HTCContents, HTCDefaults): class HTCFile(HTCContents, HTCDefaults):
"""Wrapper for HTC files
Examples:
---------
>>> htcfile = HTCFile('htc/test.htc')
>>> htcfile.wind.wsp = 10
>>> htcfile.save()
"""
filename = None filename = None
htc_inputfiles = [] htc_inputfiles = []
...@@ -95,13 +103,16 @@ class HTCFile(HTCContents, HTCDefaults): ...@@ -95,13 +103,16 @@ class HTCFile(HTCContents, HTCDefaults):
with open(filename, 'w', encoding='utf-8') as fid: with open(filename, 'w', encoding='utf-8') as fid:
fid.write(str(self)) fid.write(str(self))
def set_name(self, name, folder="htc"): def set_name(self, name, folder="htc/"):
if os.path.isabs(folder) is False and os.path.relpath(folder).startswith("htc" + os.path.sep):
folder = "./" + os.path.relpath(folder).replace("\\", "/")
self.filename = os.path.join(self.modelpath, folder, "%s.htc" % name).replace("\\", "/") self.filename = os.path.join(self.modelpath, folder, "%s.htc" % name).replace("\\", "/")
if 'simulation' in self and 'logfile' in self.simulation: if 'simulation' in self and 'logfile' in self.simulation:
self.simulation.logfile = "./log/%s.log" % name self.simulation.logfile = os.path.join(folder.replace("htc", "log", 1), "%s.log" % name).replace("\\", "/")
elif 'test_structure' in self and 'logfile' in self.test_structure: elif 'test_structure' in self and 'logfile' in self.test_structure:
self.test_structure.logfile = "./log/%s.log" % name self.test_structure.logfile = os.path.join(folder.replace("htc", "log", 1), "%s.log" % name).replace("\\", "/")
self.output.filename = "./res/%s" % name self.output.filename = os.path.join(folder.replace("htc", "res", 1), "%s" % name).replace("\\", "/")
def input_files(self): def input_files(self):
files = self.htc_inputfiles files = self.htc_inputfiles
......
...@@ -181,4 +181,4 @@ class LogInfo(LogFile): ...@@ -181,4 +181,4 @@ class LogInfo(LogFile):
self.errors = [] self.errors = []
def update_status(self): def update_status(self):
pass pass
\ No newline at end of file
...@@ -13,14 +13,13 @@ import re ...@@ -13,14 +13,13 @@ import re
import shutil import shutil
import subprocess import subprocess
import sys import sys
import threading
import time import time
from wetb.hawc2 import log_file from wetb.hawc2 import log_file
from wetb.hawc2.htc_file import HTCFile from wetb.hawc2.htc_file import HTCFile
from wetb.hawc2.log_file import LogFile, LogInfo from wetb.hawc2.log_file import LogFile, LogInfo
from future import standard_library from future import standard_library
import psutil
from wetb.utils.cluster_tools import pbsjob from wetb.utils.cluster_tools import pbsjob
from wetb.utils.cluster_tools.cluster_resource import LocalResource from wetb.utils.cluster_tools.cluster_resource import LocalResource
...@@ -33,10 +32,13 @@ QUEUED = "queued" #until start ...@@ -33,10 +32,13 @@ QUEUED = "queued" #until start
PREPARING = "Copy to host" # during prepare simulation PREPARING = "Copy to host" # during prepare simulation
INITIALIZING = "Initializing" #when starting INITIALIZING = "Initializing" #when starting
SIMULATING = "Simulating" # when logfile.status=simulating SIMULATING = "Simulating" # when logfile.status=simulating
FINISH = "Finish" # when HAWC2 finish FINISHING = "Copy from host" # during prepare simulation
FINISH = "Simulation finish" # when HAWC2 finish
ERROR = "Error" # when hawc2 returns error ERROR = "Error" # when hawc2 returns error
ABORTED = "Aborted" # when stopped and logfile.status != Done ABORTED = "Aborted" # when stopped and logfile.status != Done
CLEANED = "Cleaned" # after copy back CLEANED = "Cleaned" # after copy back
def unix_path(path):
return path.replace("\\", "/").lower()
class Simulation(object): class Simulation(object):
"""Class for doing hawc2 simulations """Class for doing hawc2 simulations
...@@ -72,6 +74,7 @@ class Simulation(object): ...@@ -72,6 +74,7 @@ class Simulation(object):
""" """
is_simulating = False is_simulating = False
is_done = False
status = QUEUED status = QUEUED
def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True): def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True):
self.modelpath = os.path.abspath(modelpath) + "/" self.modelpath = os.path.abspath(modelpath) + "/"
...@@ -83,8 +86,9 @@ class Simulation(object): ...@@ -83,8 +86,9 @@ class Simulation(object):
self.time_stop = self.htcFile.simulation.time_stop[0] self.time_stop = self.htcFile.simulation.time_stop[0]
self.hawc2exe = hawc2exe self.hawc2exe = hawc2exe
self.copy_turbulence = copy_turbulence self.copy_turbulence = copy_turbulence
self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_").replace("/", "_") + "_%d" % id(self) self.simulation_id = unix_path(os.path.relpath(htcfilename, self.modelpath) + "_%d" % id(self)).replace("/", "_")
self.stdout_filename = "stdout/%s.out" % self.simulation_id self.stdout_filename = os.path.splitext(unix_path(os.path.relpath(htcfilename, self.modelpath)).replace('htc', 'stdout', 1))[0] + ".out"
#self.stdout_filename = "stdout/%s.out" % self.simulation_id
if 'logfile' in self.htcFile.simulation: if 'logfile' in self.htcFile.simulation:
self.log_filename = self.htcFile.simulation.logfile[0] self.log_filename = self.htcFile.simulation.logfile[0]
else: else:
...@@ -93,7 +97,7 @@ class Simulation(object): ...@@ -93,7 +97,7 @@ class Simulation(object):
self.log_filename = os.path.relpath(self.log_filename, self.modelpath) self.log_filename = os.path.relpath(self.log_filename, self.modelpath)
else: else:
self.log_filename = os.path.relpath(self.log_filename) self.log_filename = os.path.relpath(self.log_filename)
self.log_filename = self.log_filename.replace("\\", "/") self.log_filename = unix_path(self.log_filename)
self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop)
self.logFile.clear() self.logFile.clear()
...@@ -104,25 +108,28 @@ class Simulation(object): ...@@ -104,25 +108,28 @@ class Simulation(object):
self.host = LocalSimulationHost(self) self.host = LocalSimulationHost(self)
def start(self, update_interval=1): def start(self, auto_status_update=True):
"""Start non blocking distributed simulation""" """Start non blocking distributed simulation"""
self.is_simulating = True self.is_simulating = True
self.updateStatusThread.start() if auto_status_update:
self.updateStatusThread.start()
self.non_blocking_simulation_thread.start() self.non_blocking_simulation_thread.start()
def wait(self): def wait(self):
self.non_blocking_simulation_thread.join() self.non_blocking_simulation_thread.join()
self.update_status() self.update_status()
def abort(self): def abort(self, update_status=True):
self.host.stop() if self.status != QUEUED:
for _ in range(100): self.host.stop()
if self.is_simulating: for _ in range(100):
break if self.is_simulating is False:
time.sleep(0.1) break
time.sleep(0.1)
if self.logFile.status not in [log_file.DONE]: if self.logFile.status not in [log_file.DONE]:
self.status = ABORTED self.status = ABORTED
self.update_status() if update_status:
self.update_status()
def show_status(self): def show_status(self):
#print ("log status:", self.logFile.status) #print ("log status:", self.logFile.status)
...@@ -147,11 +154,34 @@ class Simulation(object): ...@@ -147,11 +154,34 @@ class Simulation(object):
self.last_status = self.logFile.status self.last_status = self.logFile.status
def prepare_simulation(self): def prepare_simulation(self):
self.status = PREPARING self.status = PREPARING
self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id) self.tmp_modelpath = os.path.join(".hawc2launcher/%s/" % self.simulation_id)
self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath) self.set_id(self.simulation_id, str(self.host), self.tmp_modelpath)
self.host._prepare_simulation()
def fmt(src):
if os.path.isabs(src):
src = os.path.relpath(os.path.abspath(src), self.modelpath)
else:
src = os.path.relpath (src)
assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src
return src
input_patterns = [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])]
input_files = set([f for pattern in input_patterns for f in glob.glob(os.path.join(self.modelpath, pattern))])
if not os.path.isdir(os.path.dirname(self.modelpath + self.stdout_filename)):
os.makedirs(os.path.dirname(self.modelpath + self.stdout_filename))
self.host._prepare_simulation(input_files)
# return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])]
#
# for src in self._input_sources():
# for src_file in glob.glob(os.path.join(self.modelpath, src)):
#
#
# self.host._prepare_simulation()
def simulate(self): def simulate(self):
#starts blocking simulation #starts blocking simulation
...@@ -181,13 +211,25 @@ class Simulation(object): ...@@ -181,13 +211,25 @@ class Simulation(object):
def finish_simulation(self): def finish_simulation(self):
lock = threading.Lock() if self.status == ABORTED:
with lock: return
if self.status == CLEANED: return if self.status != ERROR:
if self.status != ERROR: self.status = FINISHING
self.status = CLEANED
self.host._finish_simulation() def fmt(dst):
self.set_id(self.simulation_id) if os.path.isabs(dst):
dst = os.path.relpath(os.path.abspath(dst), self.modelpath)
else:
dst = os.path.relpath (dst)
dst = unix_path(dst)
assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst
return dst
output_patterns = [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]]
output_files = set([f for pattern in output_patterns for f in self.host.glob(unix_path(os.path.join(self.tmp_modelpath, pattern)))])
self.host._finish_simulation(output_files)
self.set_id(self.filename)
if self.status != ERROR:
self.status = CLEANED
...@@ -200,28 +242,6 @@ class Simulation(object): ...@@ -200,28 +242,6 @@ class Simulation(object):
if self.logFile.status == log_file.DONE and self.is_simulating is False: if self.logFile.status == log_file.DONE and self.is_simulating is False:
self.status = FINISH self.status = FINISH
def _input_sources(self):
def fmt(src):
if os.path.isabs(src):
src = os.path.relpath(os.path.abspath(src), self.modelpath)
else:
src = os.path.relpath (src)
assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src
return src
return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])]
def _output_sources(self):
def fmt(dst):
if os.path.isabs(dst):
dst = os.path.relpath(os.path.abspath(dst), self.modelpath)
else:
dst = os.path.relpath (dst)
dst = dst.replace("\\", "/")
assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst
return dst
return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]]
def __str__(self): def __str__(self):
return "Simulation(%s)" % self.filename return "Simulation(%s)" % self.filename
...@@ -234,7 +254,7 @@ class Simulation(object): ...@@ -234,7 +254,7 @@ class Simulation(object):
additional_files = {} additional_files = {}
if os.path.isfile(additional_files_file): if os.path.isfile(additional_files_file):
with open(additional_files_file, encoding='utf-8') as fid: with open(additional_files_file, encoding='utf-8') as fid:
additional_files = json.load(fid) additional_files = json.loads(fid.read().replace("\\", "/"))
return additional_files return additional_files
def add_additional_input_file(self, file): def add_additional_input_file(self, file):
...@@ -246,15 +266,33 @@ class Simulation(object): ...@@ -246,15 +266,33 @@ class Simulation(object):
def simulate_distributed(self): def simulate_distributed(self):
self.prepare_simulation() try:
self.simulate() self.prepare_simulation()
self.finish_simulation() try:
self.simulate()
except Warning as e:
print ("simulation failed", str(self))
print ("Trying to finish")
raise
finally:
try:
self.finish_simulation()
except:
print ("finish_simulation failed", str(self))
raise
except:
self.status = ERROR
raise
finally:
self.is_done = True
def fix_errors(self): def fix_errors(self):
def confirm_add_additional_file(folder, file): def confirm_add_additional_file(folder, file):
if os.path.isfile(os.path.join(self.modelpath, folder, file)): if os.path.isfile(os.path.join(self.modelpath, folder, file)):
filename = os.path.join(folder, file).replace(os.path.sep, "/") filename = unix_path(os.path.join(folder, file))
if self.get_confirmation("File missing", "'%s' seems to be missing in the temporary working directory. \n\nDo you want to add it to additional_files.txt" % filename): if self.get_confirmation("File missing", "'%s' seems to be missing in the temporary working directory. \n\nDo you want to add it to additional_files.txt" % filename):
self.add_additional_input_file(filename) self.add_additional_input_file(filename)
self.show_message("'%s' is now added to additional_files.txt.\n\nPlease restart the simulation" % filename) self.show_message("'%s' is now added to additional_files.txt.\n\nPlease restart the simulation" % filename)
...@@ -282,6 +320,8 @@ class Simulation(object): ...@@ -282,6 +320,8 @@ class Simulation(object):
def show_message(self, msg, title="Information"): def show_message(self, msg, title="Information"):
print (msg) print (msg)
def set_id(self):
pass
class UpdateStatusThread(Thread): class UpdateStatusThread(Thread):
...@@ -294,7 +334,8 @@ class UpdateStatusThread(Thread): ...@@ -294,7 +334,8 @@ class UpdateStatusThread(Thread):
Thread.start(self) Thread.start(self)
def run(self): def run(self):
while self.simulation.is_simulating: print ("Wrong updatestatus")
while self.simulation.is_done is False:
self.simulation.update_status() self.simulation.update_status()
time.sleep(self.interval) time.sleep(self.interval)
...@@ -320,24 +361,28 @@ class SimulationResource(object): ...@@ -320,24 +361,28 @@ class SimulationResource(object):
def __str__(self): def __str__(self):
return self.host return self.host
class LocalSimulationHost(SimulationResource): class LocalSimulationHost(SimulationResource):
def __init__(self, simulation): def __init__(self, simulation, resource=None):
SimulationResource.__init__(self, simulation) SimulationResource.__init__(self, simulation)
LocalResource.__init__(self, "hawc2mb") LocalResource.__init__(self, "hawc2mb")
self.resource = resource
self.simulationThread = SimulationThread(self.sim) self.simulationThread = SimulationThread(self.sim)
def _prepare_simulation(self):
def glob(self, path):
return glob.glob(path)
def _prepare_simulation(self, input_files):
# must be called through simulation object # must be called through simulation object
self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath) self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath)
self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath) self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath)
for src in self._input_sources(): for src_file in input_files:
for src_file in glob.glob(os.path.join(self.modelpath, src)): dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath))
dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath)) # exist_ok does not exist in Python27
# exist_ok does not exist in Python27 if not os.path.exists(os.path.dirname(dst)):
if not os.path.exists(os.path.dirname(dst)): os.makedirs(os.path.dirname(dst)) #, exist_ok=True)
os.makedirs(os.path.dirname(dst)) #, exist_ok=True) shutil.copy(src_file, dst)
shutil.copy(src_file, dst) if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size:
if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size: print ("error copy ", dst)
print ("error copy ", dst)
if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')): if not os.path.exists(os.path.join(self.tmp_modelpath, 'stdout')):
os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True) os.makedirs(os.path.join(self.tmp_modelpath, 'stdout')) #, exist_ok=True)
...@@ -355,17 +400,14 @@ class LocalSimulationHost(SimulationResource): ...@@ -355,17 +400,14 @@ class LocalSimulationHost(SimulationResource):
self.errors.extend(list(set(self.logFile.errors))) self.errors.extend(list(set(self.logFile.errors)))
def _finish_simulation(self): def _finish_simulation(self, output_files):
for dst in self._output_sources(): for src_file in output_files:
src = os.path.join(self.tmp_modelpath, dst) dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath))
# exist_ok does not exist in Python27
for src_file in glob.glob(src): if not os.path.isdir(os.path.dirname(dst_file)):
dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True)
# exist_ok does not exist in Python27 if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file):
if not os.path.isdir(os.path.dirname(dst_file)): shutil.copy(src_file, dst_file)
os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True)
if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file):
shutil.copy(src_file, dst_file)
self.logFile.filename = os.path.join(self.modelpath, self.log_filename) self.logFile.filename = os.path.join(self.modelpath, self.log_filename)
...@@ -378,8 +420,9 @@ class LocalSimulationHost(SimulationResource): ...@@ -378,8 +420,9 @@ class LocalSimulationHost(SimulationResource):
self.logFile.update_status() self.logFile.update_status()
def stop(self): def stop(self):
self.simulationThread.stop() if self.simulationThread.is_alive():
self.simulationThread.join() self.simulationThread.stop()
self.simulationThread.join()
...@@ -394,11 +437,13 @@ class SimulationThread(Thread): ...@@ -394,11 +437,13 @@ class SimulationThread(Thread):
def start(self): def start(self):
CREATE_NO_WINDOW = 0x08000000 #CREATE_NO_WINDOW = 0x08000000
modelpath = self.modelpath modelpath = self.modelpath
htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath) htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath)
hawc2exe = self.sim.hawc2exe hawc2exe = self.sim.hawc2exe
stdout = self.sim.stdout_filename stdout = self.sim.stdout_filename
if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)):
os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename))
if os.name == "nt": if os.name == "nt":
self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW) self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW)
else: else:
...@@ -407,38 +452,39 @@ class SimulationThread(Thread): ...@@ -407,38 +452,39 @@ class SimulationThread(Thread):
def run(self): def run(self):
import psutil
p = psutil.Process(os.getpid()) p = psutil.Process(os.getpid())
if self.low_priority: if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
self.process.communicate() self.process.communicate()
errorcode = self.process.returncode errorcode = self.process.returncode
if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)):
os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename))
with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid: with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid:
stdout = fid.read() stdout = fid.read()
self.res = errorcode, stdout self.res = errorcode, stdout
def stop(self): def stop(self):
subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) if hasattr(self, 'process'):
subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid))
class PBSClusterSimulationHost(SimulationResource, SSHClient): class PBSClusterSimulationHost(SimulationResource, SSHClient):
def __init__(self, simulation, host, username, password, port=22): def __init__(self, simulation, resource, host, username, password, port=22):
SimulationResource.__init__(self, simulation) SimulationResource.__init__(self, simulation)
SSHClient.__init__(self, host, username, password, port=port) SSHClient.__init__(self, host, username, password, port=port)
self.pbsjob = SSHPBSJob(host, username, password, port) self.pbsjob = SSHPBSJob(host, username, password, port)
self.resource = resource
hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe))
def _prepare_simulation(self): def _prepare_simulation(self, input_files):
with self: with self:
self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False)
self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename)))
for src in self._input_sources(): for src_file in input_files:
for src_file in glob.glob(os.path.join(self.modelpath, src)): dst = unix_path(self.tmp_modelpath + os.path.relpath(src_file, self.modelpath))
dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/")
self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False)
self.upload(src_file, dst, verbose=False) self.upload(src_file, dst, verbose=False)
##assert self.ssh.file_exists(dst) ##assert self.ssh.file_exists(dst)
...@@ -446,23 +492,27 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): ...@@ -446,23 +492,27 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
f = io.StringIO(self.pbsjobfile()) f = io.StringIO(self.pbsjobfile())
f.seek(0) f.seek(0)
self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id)
self.execute("mkdir -p .hawc2launcher/%s/stdout" % self.simulation_id) self.execute("mkdir -p .hawc2launcher/%s/%s" % (self.simulation_id, os.path.dirname(self.stdout_filename)))
remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename)
self.execute("rm -f %s" % remote_log_filename) self.execute("rm -f %s" % remote_log_filename)
def _finish_simulation(self): def _finish_simulation(self, output_files):
with self: with self:
for dst in self._output_sources(): for src_file in output_files:
try:
src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/")
for src_file in self.glob(src):
dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath))
os.makedirs(os.path.dirname(dst_file), exist_ok=True) os.makedirs(os.path.dirname(dst_file), exist_ok=True)
self.download(src_file, dst_file, verbose=False) self.download(src_file, dst_file, verbose=False)
self.execute('rm -r .hawc2launcher/%s' % self.simulation_id) except ValueError as e:
self.execute('rm .hawc2launcher/status_%s' % self.simulation_id) print (self.modelpath, src_file, self.tmp_modelpath)
raise e
try:
self.execute('rm -r .hawc2launcher/%s' % self.simulation_id)
self.execute('rm .hawc2launcher/status_%s' % self.simulation_id)
except:
pass
def _simulate(self): def _simulate(self):
...@@ -475,10 +525,10 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): ...@@ -475,10 +525,10 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
#self.__update_logFile_status() #self.__update_logFile_status()
time.sleep(sleeptime) time.sleep(sleeptime)
local_out_file = self.modelpath + self.sim.stdout_filename local_out_file = self.modelpath + self.stdout_filename
with self: with self:
try: try:
self.download(self.tmp_modelpath + self.sim.stdout_filename, local_out_file) self.download(self.tmp_modelpath + self.stdout_filename, local_out_file)
with open(local_out_file) as fid: with open(local_out_file) as fid:
_, self.stdout, returncode_str, _ = fid.read().split("---------------------") _, self.stdout, returncode_str, _ = fid.read().split("---------------------")
self.returncode = returncode_str.strip() != "0" self.returncode = returncode_str.strip() != "0"
...@@ -488,7 +538,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): ...@@ -488,7 +538,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
try: try:
self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename) self.download(self.tmp_modelpath + self.log_filename, self.modelpath + self.log_filename)
except Exception: except Exception:
raise Exception ("Logfile not found") raise Warning ("Logfile not found", self.tmp_modelpath + self.log_filename)
self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath)
...@@ -536,10 +586,10 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): ...@@ -536,10 +586,10 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
def pbsjobfile(self): def pbsjobfile(self):
cp_back = "" cp_back = ""
for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): for folder in set([unix_path(os.path.relpath(os.path.dirname(f))) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]):
cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder
cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder)
rel_htcfilename = os.path.relpath(self.htcFile.filename, self.modelpath).replace("\\", "/") rel_htcfilename = unix_path(os.path.relpath(self.htcFile.filename, self.modelpath))
return """ return """
### Standard Output ### Standard Output
#PBS -N h2l_%s #PBS -N h2l_%s
......
...@@ -3,16 +3,22 @@ Created on 04/04/2016 ...@@ -3,16 +3,22 @@ Created on 04/04/2016
@author: MMPE @author: MMPE
''' '''
from wetb.utils.cluster_tools.ssh_client import SSHClient
from wetb.utils.cluster_tools import pbswrap
import multiprocessing import multiprocessing
import threading
import psutil import psutil
from wetb.utils.cluster_tools import pbswrap
from wetb.utils.cluster_tools.ssh_client import SSHClient
class Resource(object): class Resource(object):
def __init__(self, min_cpu, min_free): def __init__(self, min_cpu, min_free):
self.min_cpu = min_cpu self.min_cpu = min_cpu
self.min_free = min_free self.min_free = min_free
self.acquired = 0
self.lock = threading.Lock()
def ok2submit(self): def ok2submit(self):
"""Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus""" """Always ok to have min_cpu cpus and ok to have more if there are min_free free cpus"""
...@@ -25,6 +31,13 @@ class Resource(object): ...@@ -25,6 +31,13 @@ class Resource(object):
else: else:
return False return False
def acquire(self):
with self.lock:
self.acquired += 1
def release(self):
with self.lock:
self.acquired -= 1
...@@ -32,35 +45,52 @@ class SSHPBSClusterResource(Resource, SSHClient): ...@@ -32,35 +45,52 @@ class SSHPBSClusterResource(Resource, SSHClient):
def __init__(self, host, username, password, port, min_cpu, min_free): def __init__(self, host, username, password, port, min_cpu, min_free):
Resource.__init__(self, min_cpu, min_free) Resource.__init__(self, min_cpu, min_free)
SSHClient.__init__(self, host, username, password, port=port) SSHClient.__init__(self, host, username, password, port=port)
self.lock = threading.Lock()
def new_ssh_connection(self): def new_ssh_connection(self):
return SSHClient(self.host, self.username, self.password, self.port) return SSHClient(self.host, self.username, self.password, self.port)
def check_resources(self): def check_resources(self):
with self: with self.lock:
_, output, _ = self.execute('pbsnodes -l all') try:
pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n")) with self:
_, output, _ = self.execute('pbsnodes -l all')
pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output.split("\n"))
_, output, _ = self.execute('qstat -n1')
users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n"))
_, output, _ = self.execute('qstat -n1') # if the user does not have any jobs, this will not exist
users, host, nodesload = pbswrap.parse_qstat_n1(output.split("\n")) try:
cpu_user = users[self.username]['cpus']
cpu_user += users[self.username]['Q']
except KeyError:
cpu_user = 0
cpu_user = max(cpu_user, self.acquired)
cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes)
return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user
except IOError as e:
raise e
except:
raise EnvironmentError("check resources failed")
# if the user does not have any jobs, this will not exist def jobids(self, jobname_prefix):
try: _, output, _ = self.execute('qstat -u %s' % self.username)
cpu_user = users[self.username]['cpus'] return [l.split()[0].split(".")[0] for l in output.split("\n")[5:] if l.strip() != "" and l.split()[3].startswith("h2l")]
cpu_user += users[self.username]['Q']
except KeyError:
cpu_user = 0
cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes)
return nodeSum['used_cpu'] + cpu_free, cpu_free, cpu_user def stop_pbsjobs(self, jobids):
if not hasattr(jobids, "len"):
jobids = list(jobids)
self.execute("qdel %s" % (" ".join(jobids)))
class LocalResource(Resource): class LocalResource(Resource):
def __init__(self, process_name): def __init__(self, process_name):
N = max(1, multiprocessing.cpu_count() / 4) N = max(1, multiprocessing.cpu_count() / 2)
Resource.__init__(self, N, N) Resource.__init__(self, N, multiprocessing.cpu_count())
self.process_name = process_name self.process_name = process_name
self.host = 'Localhost' self.host = 'Localhost'
...@@ -68,10 +98,10 @@ class LocalResource(Resource): ...@@ -68,10 +98,10 @@ class LocalResource(Resource):
def name(i): def name(i):
try: try:
return psutil.Process(i).name return psutil.Process(i).name
except psutil._error.AccessDenied: except (psutil._error.AccessDenied, psutil._error.NoSuchProcess):
return "" return ""
no_cpu = multiprocessing.cpu_count() no_cpu = multiprocessing.cpu_count()
cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu cpu_free = no_cpu - self.acquired #(1 - psutil.cpu_percent(.5) / 100) * no_cpu
no_current_process = len([i for i in psutil.get_pid_list() if name(i).lower().startswith(self.process_name.lower())]) no_current_process = len([i for i in psutil.get_pid_list() if name(i).lower().startswith(self.process_name.lower())])
return no_cpu, cpu_free, no_current_process return no_cpu, cpu_free, no_current_process
...@@ -13,7 +13,7 @@ class SSHClient(object): ...@@ -13,7 +13,7 @@ class SSHClient(object):
"A wrapper of paramiko.SSHClient" "A wrapper of paramiko.SSHClient"
TIMEOUT = 4 TIMEOUT = 4
def __init__(self, host, username, password, port=22, key=None, passphrase=None): def __init__(self, host, username, password=None, port=22, key=None, passphrase=None):
self.host = host self.host = host
self.username = username self.username = username
self.password = password self.password = password
...@@ -33,6 +33,8 @@ class SSHClient(object): ...@@ -33,6 +33,8 @@ class SSHClient(object):
self.connect() self.connect()
def connect(self): def connect(self):
if self.password is None:
raise IOError("Password not set")
self.client = paramiko.SSHClient() self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, self.port, username=self.username, password=self.password, pkey=self.key, timeout=self.TIMEOUT) self.client.connect(self.host, self.port, username=self.username, password=self.password, pkey=self.key, timeout=self.TIMEOUT)
...@@ -136,7 +138,8 @@ class SSHClient(object): ...@@ -136,7 +138,8 @@ class SSHClient(object):
_, out, _ = self.execute(r'find %s -maxdepth 1 -type f -name "%s"' % (cwd, filepattern)) _, out, _ = self.execute(r'find %s -maxdepth 1 -type f -name "%s"' % (cwd, filepattern))
files = [] files = []
for file in out.strip().split("\n"): for file in out.strip().split("\n"):
files.append(file.strip()) if file != "":
files.append(file.strip())
return files return files
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment