Skip to content
Snippets Groups Projects
Commit 22a5b668 authored by mads's avatar mads
Browse files

doc + clean up

parent 1a08f099
No related branches found
No related tags found
2 merge requests!6Clustertools,!5Clustertools
......@@ -39,9 +39,41 @@ ABORTED = "Aborted" # when stopped and logfile.status != Done
CLEANED = "Cleaned" # after copy back
class Simulation(object):
"""Class for doing hawc2 simulations
Examples
--------
>>> sim = Simulation("<path>/MyModel","htc/MyHtc")
Blocking inplace simulation
>>> sim.simulate()
Non-blocking distributed simulation(everything copied to temporary folder)\n
Starts two threads:
- non_blocking_simulation_thread:
- prepare_simulation() # copy to temporary folder
- simulate() # simulate
- finish_simulation # copy results back again
- updateStatusThread:
- update status every second
>>> sim.start()
>>> while sim.status!=CLEANED:
>>> sim.show_status()
The default host is LocalSimulationHost. To simulate on pbs featured cluster
>>> sim.host = PBSClusterSimulationHost(sim, <hostname>, <username>, <password>):
>>> sim.start()
>>> while sim.status!=CLEANED:
>>> sim.show_status()
"""
is_simulating = False
status = QUEUED
def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"):
def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe", copy_turbulence=True):
self.modelpath = os.path.abspath(modelpath) + "/"
self.folder = os.path.dirname(htcfilename)
if not os.path.isabs(htcfilename):
......@@ -49,7 +81,8 @@ class Simulation(object):
self.filename = os.path.basename(htcfilename)
self.htcFile = HTCFile(htcfilename)
self.time_stop = self.htcFile.simulation.time_stop[0]
self.copy_turbulence = True
self.hawc2exe = hawc2exe
self.copy_turbulence = copy_turbulence
self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_").replace("/", "_") + "_%d" % id(self)
self.stdout_filename = "stdout/%s.out" % self.simulation_id
if 'logfile' in self.htcFile.simulation:
......@@ -66,31 +99,52 @@ class Simulation(object):
self.logFile.clear()
self.last_status = self.status
self.errors = []
self.thread = Thread(target=self.simulate_distributed)
self.hawc2exe = hawc2exe
self.non_blocking_simulation_thread = Thread(target=self.simulate_distributed)
self.updateStatusThread = UpdateStatusThread(self)
self.host = LocalSimulationHost(self)
def input_sources(self):
def fmt(src):
if os.path.isabs(src):
src = os.path.relpath(os.path.abspath(src), self.modelpath)
else:
src = os.path.relpath (src)
assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src
return src
return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])]
def output_sources(self):
def fmt(dst):
if os.path.isabs(dst):
dst = os.path.relpath(os.path.abspath(dst), self.modelpath)
else:
dst = os.path.relpath (dst)
dst = dst.replace("\\", "/")
assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst
return dst
return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]]
def start(self, update_interval=1):
"""Start non blocking distributed simulation"""
self.is_simulating = True
self.updateStatusThread.start()
self.non_blocking_simulation_thread.start()
def wait(self):
self.non_blocking_simulation_thread.join()
self.update_status()
def abort(self):
self.host.stop()
for _ in range(100):
if self.is_simulating:
break
time.sleep(0.1)
if self.logFile.status not in [log_file.DONE]:
self.status = ABORTED
self.update_status()
def show_status(self):
#print ("log status:", self.logFile.status)
if self.logFile.status == log_file.SIMULATING:
if self.last_status != log_file.SIMULATING:
print ("|" + ("-"*50) + "|" + ("-"*49) + "|")
sys.stdout.write("|")
sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0)))
sys.stdout.flush()
self.last_pct = self.logFile.pct
elif self.last_status == log_file.SIMULATING:
sys.stdout.write("."*(100 - self.last_pct) + "|")
sys.stdout.flush()
print ("\n")
elif self.logFile.status == log_file.UNKNOWN:
print (self.status)
else:
print (self.logFile.status)
if self.logFile.status != log_file.SIMULATING:
if self.logFile.errors:
print (self.logFile.errors)
self.last_status = self.logFile.status
def prepare_simulation(self):
......@@ -146,31 +200,33 @@ class Simulation(object):
if self.logFile.status == log_file.DONE and self.is_simulating is False:
self.status = FINISH
def _input_sources(self):
def fmt(src):
if os.path.isabs(src):
src = os.path.relpath(os.path.abspath(src), self.modelpath)
else:
src = os.path.relpath (src)
assert not src.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % src
return src
return [fmt(src) for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', [])]
def _output_sources(self):
def fmt(dst):
if os.path.isabs(dst):
dst = os.path.relpath(os.path.abspath(dst), self.modelpath)
else:
dst = os.path.relpath (dst)
dst = dst.replace("\\", "/")
assert not dst.startswith(".."), "%s referes to a file outside the model path\nAll input files be inside model path" % dst
return dst
return [fmt(dst) for dst in self.htcFile.output_files() + ([], self.htcFile.turbulence_files())[self.copy_turbulence] + [self.stdout_filename]]
def __str__(self):
return "Simulation(%s)" % self.filename
def show_status(self):
#print ("log status:", self.logFile.status)
if self.logFile.status == log_file.SIMULATING:
if self.last_status != log_file.SIMULATING:
print ("|" + ("-"*50) + "|" + ("-"*49) + "|")
sys.stdout.write("|")
sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0)))
sys.stdout.flush()
self.last_pct = self.logFile.pct
elif self.last_status == log_file.SIMULATING:
sys.stdout.write("."*(100 - self.last_pct) + "|")
sys.stdout.flush()
print ("\n")
elif self.logFile.status == log_file.UNKNOWN:
print (self.status)
else:
print (self.logFile.status)
if self.logFile.status != log_file.SIMULATING:
if self.logFile.errors:
print (self.logFile.errors)
self.last_status = self.logFile.status
def additional_files(self):
......@@ -220,74 +276,13 @@ class Simulation(object):
continue
def get_confirmation(self, title, msg):
"""override in subclass"""
return True
def show_message(self, msg, title="Information"):
print (msg)
def start(self, update_interval=1):
"""Start non blocking distributed simulation"""
self.is_simulating = True
self.updateStatusThread.start()
self.thread.start()
def wait(self):
self.thread.join()
self.update_status()
def abort(self):
self.host.stop()
for _ in range(100):
if self.is_simulating:
break
time.sleep(0.1)
# try:
# self.finish_simulation()
# except Exception as e:
# print (str(e))
# pass
if self.logFile.status not in [log_file.DONE]:
self.status = ABORTED
self.update_status()
class SimulationThread(Thread):
def __init__(self, simulation, low_priority=True):
Thread.__init__(self)
self.sim = simulation
self.modelpath = self.sim.modelpath
self.res = [0, "", ""]
self.low_priority = low_priority
def start(self):
CREATE_NO_WINDOW = 0x08000000
modelpath = self.modelpath
htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath)
hawc2exe = self.sim.hawc2exe
stdout = self.sim.stdout_filename
if os.name == "nt":
self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW)
else:
self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath)
Thread.start(self)
def run(self):
p = psutil.Process(os.getpid())
if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
self.process.communicate()
errorcode = self.process.returncode
if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)):
os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename))
with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid:
stdout = fid.read()
self.res = errorcode, stdout
def stop(self):
subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid))
class UpdateStatusThread(Thread):
def __init__(self, simulation, interval=1):
......@@ -315,8 +310,8 @@ class SimulationResource(object):
stdout_filename = property(lambda self : self.sim.stdout_filename)
htcFile = property(lambda self : self.sim.htcFile)
additional_files = property(lambda self : self.sim.additional_files)
input_sources = property(lambda self : self.sim.input_sources)
output_sources = property(lambda self : self.sim.output_sources)
_input_sources = property(lambda self : self.sim._input_sources)
_output_sources = property(lambda self : self.sim._output_sources)
log_filename = property(lambda self : self.sim.log_filename)
status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v))
......@@ -334,7 +329,7 @@ class LocalSimulationHost(SimulationResource):
# must be called through simulation object
self.tmp_modelpath = os.path.join(self.modelpath, self.tmp_modelpath)
self.sim.set_id(self.simulation_id, 'Localhost', self.tmp_modelpath)
for src in self.input_sources():
for src in self._input_sources():
for src_file in glob.glob(os.path.join(self.modelpath, src)):
dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath))
# exist_ok does not exist in Python27
......@@ -361,7 +356,7 @@ class LocalSimulationHost(SimulationResource):
def _finish_simulation(self):
for dst in self.output_sources():
for dst in self._output_sources():
src = os.path.join(self.tmp_modelpath, dst)
for src_file in glob.glob(src):
......@@ -388,6 +383,45 @@ class LocalSimulationHost(SimulationResource):
class SimulationThread(Thread):
def __init__(self, simulation, low_priority=True):
Thread.__init__(self)
self.sim = simulation
self.modelpath = self.sim.modelpath
self.res = [0, "", ""]
self.low_priority = low_priority
def start(self):
CREATE_NO_WINDOW = 0x08000000
modelpath = self.modelpath
htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath)
hawc2exe = self.sim.hawc2exe
stdout = self.sim.stdout_filename
if os.name == "nt":
self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath) #, creationflags=CREATE_NO_WINDOW)
else:
self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath)
Thread.start(self)
def run(self):
p = psutil.Process(os.getpid())
if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
self.process.communicate()
errorcode = self.process.returncode
if not os.path.isdir(os.path.dirname(self.modelpath + self.sim.stdout_filename)):
os.makedirs(os.path.dirname(self.modelpath + self.sim.stdout_filename))
with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid:
stdout = fid.read()
self.res = errorcode, stdout
def stop(self):
subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid))
class PBSClusterSimulationHost(SimulationResource, SSHClient):
def __init__(self, simulation, host, username, password, port=22):
SimulationResource.__init__(self, simulation)
......@@ -402,7 +436,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False)
self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename)))
for src in self.input_sources():
for src in self._input_sources():
for src_file in glob.glob(os.path.join(self.modelpath, src)):
dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/")
self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False)
......@@ -420,7 +454,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
def _finish_simulation(self):
with self:
for dst in self.output_sources():
for dst in self._output_sources():
src = os.path.join(self.tmp_modelpath, dst).replace("\\", "/")
for src_file in self.glob(src):
......@@ -482,7 +516,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
def start(self):
"""Start non blocking distributed simulation"""
self.thread.start()
self.non_blocking_simulation_thread.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment