Skip to content
Snippets Groups Projects
Commit 8b72969d authored by mads's avatar mads
Browse files

works without logfile specification + more

parent 31ebd4ae
No related branches found
No related tags found
No related merge requests found
......@@ -12,21 +12,24 @@ import json
import glob
from wetb.hawc2 import log_file
import re
import threading
QUEUED = "queued" #until start
INITIALIZING = "initializing" #when starting
SIMULATING = "SIMULATING" # when logfile.status=simulating
FINISH = "finish" # when finish
ERROR = "error" # when hawc2 returns error
ABORTED = "aborted" # when stopped and logfile.status != Done
CLEANED = "cleaned" # after copy back
PREPARING = "Copy to host" # during prepare simulation
INITIALIZING = "Initializing" #when starting
SIMULATING = "Simulating" # when logfile.status=simulating
FINISH = "Finish" # when finish
ERROR = "Error" # when hawc2 returns error
ABORTED = "Aborted" # when stopped and logfile.status != Done
CLEANED = "Cleaned" # after copy back
class Simulation(object):
is_simulating = False
_status = QUEUED
def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"):
self.modelpath = modelpath
self.modelpath = os.path.abspath(modelpath) + "/"
self.folder = os.path.dirname(htcfilename)
if not os.path.isabs(htcfilename):
htcfilename = os.path.join(modelpath, htcfilename)
......@@ -35,20 +38,41 @@ class Simulation(object):
self.htcFile = HTCFile(htcfilename)
self.time_stop = self.htcFile.simulation.time_stop[0]
self.copy_turbulence = True
self.log_filename = self.htcFile.simulation.logfile[0]
self.simulation_id = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self)
self.stdout_filename = "%s.out" % self.simulation_id
if 'logfile' in self.htcFile.simulation:
self.log_filename = self.htcFile.simulation.logfile[0]
else:
self.log_filename = self.stdout_filename
if os.path.isabs(self.log_filename):
self.log_filename = os.path.relpath(self.log_filename, self.modelpath)
else:
self.log_filename = os.path.relpath(self.log_filename)
self.log_filename = self.log_filename.replace("\\", "/")
self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop)
self._status = QUEUED
self.logFile.clear()
self.last_status = self._status
self.errors = []
self.thread = Thread(target=self.simulate)
self.simulationThread = SimulationThread(self.modelpath, self.htcFile.filename, hawc2exe)
self.timer = RepeatedTimer(1, self.update_status)
self.thread = Thread(target=self.simulate_distributed)
self.dist_thread = Thread()
self.hawc2exe = hawc2exe
self.simulationThread = SimulationThread(self)
self.timer = RepeatedTimer(self.update_status)
def __str__(self):
return "Simulation(%s)" % self.filename
@property
def status(self):
return self._status
@status.setter
def status(self, status):
self._status = status
self.show_status()
def update_status(self, *args, **kwargs):
if self.status in [INITIALIZING, SIMULATING]:
self.logFile.update_status()
......@@ -59,25 +83,25 @@ class Simulation(object):
self._status = FINISH
def show_status(self):
status = self.logFile.status()
if status[1] == SIMULATING:
if self.last_status[1] != SIMULATING:
#print ("log status:", self.logFile.status)
if self.logFile.status == log_file.SIMULATING:
if self.last_status != log_file.SIMULATING:
print ("|" + ("-"*50) + "|" + ("-"*49) + "|")
sys.stdout.write("|")
#print (status)
sys.stdout.write("."*(status[0] - self.last_status[0]))
sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0)))
sys.stdout.flush()
elif self.last_status[1] == SIMULATING and status[1] != SIMULATING:
sys.stdout.write("."*(status[0] - self.last_status[0]) + "|")
self.last_pct = self.logFile.pct
elif self.last_status == log_file.SIMULATING:
sys.stdout.write("."*(100 - self.last_pct) + "|")
sys.stdout.flush()
print ("\n")
if status[1] != SIMULATING:
else:
print (self.logFile.status)
if self.logFile.status != log_file.SIMULATING:
if self.logFile.errors:
print (self.logFile.errors)
self.last_status = self.logFile.status
if status[2]:
print (status[1:])
else:
print (status[1])
self.last_status = status
def additional_files(self):
......@@ -95,8 +119,10 @@ class Simulation(object):
with open(additional_files_file, 'w') as fid:
json.dump(additional_files, fid)
def prepare_simulation(self, id):
self.tmp_modelpath = os.path.join(self.modelpath, id + "/")
def prepare_simulation(self):
self.status = PREPARING
self.tmp_modelpath = os.path.join(self.modelpath, "tmp_%s/" % self.simulation_id)
for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', []):
......@@ -119,7 +145,12 @@ class Simulation(object):
def finish_simulation(self):
if self.status == CLEANED: return
lock = threading.Lock()
with lock:
if self.status == CLEANED: return
if self.status != ERROR:
self.status = CLEANED
files = self.htcFile.output_files()
if self.copy_turbulence:
files.extend(self.htcFile.turbulence_files())
......@@ -141,17 +172,17 @@ class Simulation(object):
shutil.rmtree(self.tmp_modelpath)
except (PermissionError, OSError) as e:
raise Warning(str(e))
if self.status != ERROR:
self.status = CLEANED
def simulate(self):
#starts blocking simulation
self.is_simulating = True
self.errors = []
self.logFile.clear()
self.status = INITIALIZING
self.returncode, self.stdout = 1, "Simulation failed"
self.simulationThread.start()
self.simulationThread.join()
self.returncode, self.stdout = self.simulationThread.res
......@@ -160,10 +191,17 @@ class Simulation(object):
self.status = ERROR
self.is_simulating = False
self.logFile.update_status()
if self.returncode or self.errors:
if self.returncode:
raise Exception("Simulation error:\n" + "\n".join(self.errors))
if self.logFile.status != log_file.DONE or self.logFile.errors:
elif self.logFile.status != log_file.DONE or self.errors or self.logFile.errors:
raise Warning("Simulation succeded with errors:\nLog status:%s\n" % self.logFile.status + "\n".join(self.logFile.errors))
else:
self.status = FINISH
def simulate_distributed(self):
self.prepare_simulation()
self.simulate()
self.finish_simulation()
......@@ -195,7 +233,8 @@ class Simulation(object):
print (msg)
def start(self):
"""Start non blocking simulation"""
"""Start non blocking distributed simulation"""
self.timer.start(1000)
self.thread.start()
def stop(self):
......@@ -207,6 +246,7 @@ class Simulation(object):
pass
if self.logFile.status not in [log_file.DONE]:
self.status = ABORTED
self.update_status()
#class SimulationProcess(Process):
#
......@@ -227,11 +267,9 @@ class Simulation(object):
class SimulationThread(Thread):
def __init__(self, modelpath, htcfile, hawc2exe):
def __init__(self, simulation):
Thread.__init__(self)
self.modelpath = modelpath
self.htcfile = os.path.relpath(htcfile, self.modelpath)
self.hawc2exe = hawc2exe
self.sim = simulation
self.res = [0, "", ""]
......@@ -239,24 +277,29 @@ class SimulationThread(Thread):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
CREATE_NO_WINDOW = 0x08000000
self.process = subprocess.Popen([self.hawc2exe, self.htcfile], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, cwd=self.modelpath, creationflags=CREATE_NO_WINDOW)
modelpath = self.modelpath
htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath)
hawc2exe = self.sim.hawc2exe
stdout = self.sim.stdout_filename
self.process = subprocess.Popen("%s %s 1> %s 2>&1" % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath, creationflags=CREATE_NO_WINDOW)
Thread.start(self)
def run(self):
p = psutil.Process(os.getpid())
p.nice = psutil.BELOW_NORMAL_PRIORITY_CLASS
stdout, _ = self.process.communicate()
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
self.process.communicate()
errorcode = self.process.returncode
self.res = errorcode, stdout.decode()
with open(self.modelpath + self.sim.stdout_filename) as fid:
stdout = fid.read()
self.res = errorcode, stdout
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
def __init__(self, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
......@@ -265,12 +308,13 @@ class RepeatedTimer(object):
def _run(self):
self.is_running = False
self.start()
self.start(self.interval)
self.function(*self.args, **self.kwargs)
def start(self):
def start(self, interval_ms=None):
self.interval = interval_ms
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer = Timer(interval_ms / 1000, self._run)
self._timer.start()
self.is_running = True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment