Newer
Older
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
from __future__ import absolute_import
from builtins import str
from future import standard_library
standard_library.install_aliases()
import os
from wetb.hawc2.htc_file import HTCFile
from threading import Timer, Thread
import sys
from multiprocessing import Process
import psutil
import subprocess
import shutil
import json
import glob
PREPARING = "Copy to host" # during prepare simulation
INITIALIZING = "Initializing" #when starting
SIMULATING = "Simulating" # when logfile.status=simulating
ERROR = "Error" # when hawc2 returns error
ABORTED = "Aborted" # when stopped and logfile.status != Done
CLEANED = "Cleaned" # after copy back
class Simulation(object):
is_simulating = False
def __init__(self, modelpath, htcfilename, hawc2exe="HAWC2MB.exe"):
self.modelpath = os.path.abspath(modelpath) + "/"
self.folder = os.path.dirname(htcfilename)
if not os.path.isabs(htcfilename):
htcfilename = os.path.join(modelpath, htcfilename)
self.filename = os.path.basename(htcfilename)
self.htcFile = HTCFile(htcfilename)
self.time_stop = self.htcFile.simulation.time_stop[0]
self.simulation_id = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self)
self.stdout_filename = "%s.out" % self.simulation_id
if 'logfile' in self.htcFile.simulation:
self.log_filename = self.htcFile.simulation.logfile[0]
else:
self.log_filename = self.stdout_filename
if os.path.isabs(self.log_filename):
self.log_filename = os.path.relpath(self.log_filename, self.modelpath)
else:
self.log_filename = os.path.relpath(self.log_filename)
self.log_filename = self.log_filename.replace("\\", "/")
self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop)
self.logFile.clear()
self.last_status = self._status
self.thread = Thread(target=self.simulate_distributed)
self.hawc2exe = hawc2exe
self.simulationThread = SimulationThread(self)
self.timer = RepeatedTimer(self.update_status)
@property
def status(self):
return self._status
@status.setter
def status(self, status):
self._status = status
self.show_status()
if self.status in [INITIALIZING, SIMULATING]:
if self.logFile.status == log_file.SIMULATING:
self._status = SIMULATING
if self.logFile.status == log_file.DONE and self.is_simulating is False:
#print ("log status:", self.logFile.status)
if self.logFile.status == log_file.SIMULATING:
if self.last_status != log_file.SIMULATING:
print ("|" + ("-"*50) + "|" + ("-"*49) + "|")
sys.stdout.write("|")
sys.stdout.write("."*(self.logFile.pct - getattr(self, 'last_pct', 0)))
self.last_pct = self.logFile.pct
elif self.last_status == log_file.SIMULATING:
sys.stdout.write("."*(100 - self.last_pct) + "|")
else:
print (self.logFile.status)
if self.logFile.status != log_file.SIMULATING:
if self.logFile.errors:
print (self.logFile.errors)
self.last_status = self.logFile.status
additional_files_file = os.path.join(self.modelpath, 'additional_files.txt')
with open(additional_files_file, encoding='utf-8') as fid:
def add_additional_input_file(self, file):
additional_files = self.additional_files()
additional_files['input'] = list(set(additional_files.get('input', []) + [file]))
additional_files_file = os.path.join(self.modelpath, 'additional_files.txt')
with open(additional_files_file, 'w', encoding='utf-8') as fid:
def prepare_simulation(self):
self.status = PREPARING
self.tmp_modelpath = os.path.join(self.modelpath, "tmp_%s/" % self.simulation_id)
for src in self.htcFile.input_files() + self.htcFile.turbulence_files() + self.additional_files().get('input', []):
if not os.path.isabs(src):
src = os.path.join(self.modelpath, src)
for src_file in glob.glob(src):
dst = os.path.join(self.tmp_modelpath, os.path.relpath(src_file, self.modelpath))
# exist_ok does not exist in Python27
if not os.path.exists(os.path.dirname(dst)):
os.makedirs(os.path.dirname(dst)) #, exist_ok=True)
if not os.path.isfile(dst) or os.stat(dst).st_size != os.stat(src_file).st_size:
print ("error copy ", dst)
else:
#print (dst)
pass
self.logFile.filename = os.path.join(self.tmp_modelpath, self.log_filename)
self.simulationThread.modelpath = self.tmp_modelpath
lock = threading.Lock()
with lock:
if self.status == CLEANED: return
if self.status != ERROR:
self.status = CLEANED
files.extend(self.htcFile.turbulence_files())
for dst in files:
if not os.path.isabs(dst):
dst = os.path.join(self.modelpath, dst)
src = os.path.join(self.tmp_modelpath, os.path.relpath(dst, self.modelpath))
for src_file in glob.glob(src):
dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath))
# exist_ok does not exist in Python27
if not os.path.exists(os.path.dirname(dst_file)):
os.makedirs(os.path.dirname(dst_file)) #, exist_ok=True)
if not os.path.isfile(dst_file) or os.path.getmtime(dst_file) != os.path.getmtime(src_file):
shutil.copy(src_file, dst_file)
self.logFile.filename = os.path.join(self.modelpath, self.log_filename)
try:
shutil.rmtree(self.tmp_modelpath)
except (PermissionError, OSError) as e:
raise Warning(str(e))
self.returncode, self.stdout = 1, "Simulation failed"
self.simulationThread.start()
self.simulationThread.join()
self.returncode, self.stdout = self.simulationThread.res
if self.returncode or 'error' in self.stdout.lower():
self.errors = list(set([l for l in self.stdout.split("\n") if 'error' in l.lower()]))
self.errors.extend(list(set(self.logFile.errors)))
elif self.logFile.status != log_file.DONE or self.errors or self.logFile.errors:
raise Warning("Simulation succeded with errors:\nLog status:%s\n" % self.logFile.status + "\n".join(self.logFile.errors))
else:
self.status = FINISH
def simulate_distributed(self):
self.prepare_simulation()
self.simulate()
self.finish_simulation()
def fix_errors(self):
def confirm_add_additional_file(folder, file):
if os.path.isfile(os.path.join(self.modelpath, folder, file)):
filename = os.path.join(folder, file).replace(os.path.sep, "/")
if self.get_confirmation("File missing", "'%s' seems to be missing in the temporary working directory. \n\nDo you want to add it to additional_files.txt" % filename):
self.add_additional_input_file(filename)
self.show_message("'%s' is now added to additional_files.txt.\n\nPlease restart the simulation" % filename)
for regex in [r".*\*\*\* ERROR \*\*\* File '(.*)' does not exist in the (.*) folder",
r".*\*\*\* ERROR \*\*\* DLL (.*)()"]:
m = re.compile(regex).match(error.strip())
if m is not None:
file, folder = m.groups()
confirm_add_additional_file(folder, file)
continue
m = re.compile(r".*\*\*\* ERROR \*\*\* File '(.*)' does not exist in the working directory").match(error.strip())
if m is not None:
file = m.groups()[0]
for root, folder, files in os.walk(self.modelpath):
if "__Thread" not in root and file in files:
folder = os.path.relpath(root, self.modelpath)
confirm_add_additional_file(folder, file)
continue
def get_confirmation(self, title, msg):
return True
def show_message(self, msg, title="Information"):
print (msg)
def start(self, update_interval=1):
"""Start non blocking distributed simulation"""
self.timer.start(update_interval*1000)
def wait(self):
self.thread.join()
self.timer.stop()
self.update_status()
self.timer.stop()
self.simulationThread.process.kill()
try:
self.finish_simulation()
except:
pass
def __init__(self, simulation, low_priority=True):
def start(self):
CREATE_NO_WINDOW = 0x08000000
modelpath = self.modelpath
htcfile = os.path.relpath(self.sim.htcFile.filename, self.sim.modelpath)
hawc2exe = self.sim.hawc2exe
stdout = self.sim.stdout_filename
if os.name=="nt":
self.process = subprocess.Popen('"%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath, creationflags=CREATE_NO_WINDOW)
else:
self.process = subprocess.Popen('wine "%s" %s 1> %s 2>&1' % (hawc2exe, htcfile, stdout), stdout=None, stderr=None, shell=True, cwd=modelpath)
Thread.start(self)
def run(self):
p = psutil.Process(os.getpid())
if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
with open(self.modelpath + self.sim.stdout_filename, encoding='utf-8') as fid:
stdout = fid.read()
self.res = errorcode, stdout
def __init__(self, function, *args, **kwargs):
self._timer = None
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
def _run(self):
self.is_running = False
def start(self, interval_ms=None):
self.interval = interval_ms
self._timer = Timer(interval_ms / 1000, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
if __name__ == "__main__":
sim = Simulation('C:\mmpe\HAWC2\Hawc2_model/', 'htc/error.htc')
sim.simulate()