Skip to content
Snippets Groups Projects
Commit f71df4ae authored by mads's avatar mads
Browse files

updates

parent d39da16f
No related branches found
No related tags found
2 merge requests!6Clustertools,!5Clustertools
Pipeline #
...@@ -44,12 +44,11 @@ class Simulation(object): ...@@ -44,12 +44,11 @@ class Simulation(object):
self.folder = os.path.dirname(htcfilename) self.folder = os.path.dirname(htcfilename)
if not os.path.isabs(htcfilename): if not os.path.isabs(htcfilename):
htcfilename = os.path.join(modelpath, htcfilename) htcfilename = os.path.join(modelpath, htcfilename)
self.htcfilename = htcfilename
self.filename = os.path.basename(htcfilename) self.filename = os.path.basename(htcfilename)
self.htcFile = HTCFile(htcfilename) self.htcFile = HTCFile(htcfilename)
self.time_stop = self.htcFile.simulation.time_stop[0] self.time_stop = self.htcFile.simulation.time_stop[0]
self.copy_turbulence = True self.copy_turbulence = True
self.simulation_id = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self) self.simulation_id = os.path.relpath(htcfilename, self.modelpath).replace("\\", "_") + "_%d" % id(self)
self.stdout_filename = "%s.out" % self.simulation_id self.stdout_filename = "%s.out" % self.simulation_id
if 'logfile' in self.htcFile.simulation: if 'logfile' in self.htcFile.simulation:
self.log_filename = self.htcFile.simulation.logfile[0] self.log_filename = self.htcFile.simulation.logfile[0]
...@@ -283,9 +282,7 @@ class SimulationThread(Thread): ...@@ -283,9 +282,7 @@ class SimulationThread(Thread):
self.res = errorcode, stdout self.res = errorcode, stdout
def stop(self): def stop(self):
print ("stop")
subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid)) subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self.process.pid))
print ("stop2")
class UpdateStatusThread(Thread): class UpdateStatusThread(Thread):
def __init__(self, simulation, interval=1): def __init__(self, simulation, interval=1):
...@@ -305,7 +302,7 @@ class UpdateStatusThread(Thread): ...@@ -305,7 +302,7 @@ class UpdateStatusThread(Thread):
class SimulationResource(object): class SimulationResource(object):
def __init__(self, simulation): def __init__(self, simulation):
self.sim = simulation self.sim = simulation
logFile = property(lambda self : self.sim.logFile) logFile = property(lambda self : self.sim.logFile, lambda self, v: setattr(self.sim, "logFile", v))
errors = property(lambda self : self.sim.errors) errors = property(lambda self : self.sim.errors)
modelpath = property(lambda self : self.sim.modelpath) modelpath = property(lambda self : self.sim.modelpath)
tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v)) tmp_modelpath = property(lambda self : self.sim.tmp_modelpath, lambda self, v: setattr(self.sim, "tmp_modelpath", v))
...@@ -315,7 +312,9 @@ class SimulationResource(object): ...@@ -315,7 +312,9 @@ class SimulationResource(object):
input_sources = property(lambda self : self.sim.input_sources) input_sources = property(lambda self : self.sim.input_sources)
output_sources = property(lambda self : self.sim.output_sources) output_sources = property(lambda self : self.sim.output_sources)
log_filename = property(lambda self : self.sim.log_filename) log_filename = property(lambda self : self.sim.log_filename)
status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v)) status = property(lambda self : self.sim.status, lambda self, v: setattr(self.sim, "status", v))
is_simulating = property(lambda self : self.sim.is_simulating, lambda self, v: setattr(self.sim, "is_simulating", v))
class LocalSimulationResource(SimulationResource, LocalResource): class LocalSimulationResource(SimulationResource, LocalResource):
...@@ -381,30 +380,32 @@ class LocalSimulationResource(SimulationResource, LocalResource): ...@@ -381,30 +380,32 @@ class LocalSimulationResource(SimulationResource, LocalResource):
class PBSClusterSimulationResouce(PBSClusterResource): class PBSClusterSimulationResouce(SimulationResource, PBSClusterResource):
def __init__(self, simulation, htcfilename, host, username, password, port=22): def __init__(self, simulation, host, username, password, port=22):
SimulationResource.__init__(self, simulation)
PBSClusterResource.__init__(self, host, username, password, port) PBSClusterResource.__init__(self, host, username, password, port)
self.htc = htcfilename
self.pbsjob = PBSJob(self) self.pbsjob = PBSJob(self)
hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe))
def _prepare_simulation(self): def _prepare_simulation(self):
with self.ssh: with self:
self.ssh.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False) self.execute(["mkdir -p .hawc2launcher/%s" % self.simulation_id], verbose=False)
self.ssh.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename))) self.execute("mkdir -p %s%s" % (self.tmp_modelpath, os.path.dirname(self.log_filename)))
for src in self.input_sources(): for src in self.input_sources():
for src_file in glob.glob(os.path.join(self.modelpath, src)): for src_file in glob.glob(os.path.join(self.modelpath, src)):
dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/") dst = (self.tmp_modelpath + os.path.relpath(src_file, self.modelpath)).replace("\\", "/")
self.ssh.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False) self.execute("mkdir -p %s" % os.path.dirname(dst), verbose=False)
self.ssh.upload(src_file, dst, verbose=False) self.upload(src_file, dst, verbose=False)
##assert self.ssh.file_exists(dst) ##assert self.ssh.file_exists(dst)
f = io.StringIO(self.pbsjobfile(self.simulation_id)) f = io.StringIO(self.pbsjobfile(self.simulation_id))
f.seek(0) f.seek(0)
self.ssh.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id) self.upload(f, self.tmp_modelpath + "%s.in" % self.simulation_id)
remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename) remote_log_filename = "%s%s" % (self.tmp_modelpath, self.log_filename)
self.ssh.execute("rm -f %s" % remote_log_filename) self.execute("rm -f %s" % remote_log_filename)
...@@ -432,7 +433,7 @@ class PBSClusterSimulationResouce(PBSClusterResource): ...@@ -432,7 +433,7 @@ class PBSClusterSimulationResouce(PBSClusterResource):
sleeptime = 1 sleeptime = 1
print ("simulate2", self.simulation_id) print ("simulate2", self.simulation_id)
while self.is_simulating: while self.is_simulating:
self.update_status() #self.__update_logFile_status()
time.sleep(sleeptime) time.sleep(sleeptime)
print ("simulate3", self.simulation_id) print ("simulate3", self.simulation_id)
...@@ -453,36 +454,27 @@ class PBSClusterSimulationResouce(PBSClusterResource): ...@@ -453,36 +454,27 @@ class PBSClusterSimulationResouce(PBSClusterResource):
self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath) self.sim.logFile = LogFile.from_htcfile(self.htcFile, self.modelpath)
# @property
# def status(self):
# return self._status
#
# @status.setter
# def status(self, status):
# self._status = status
def update_status(self, *args, **kwargs): def update_logFile_status(self):
status = self.pbsjob.status status = self.pbsjob.status
if status == pbsjob.NOT_SUBMITTED: if status == pbsjob.NOT_SUBMITTED:
return self.status pass
elif status == pbsjob.DONE: elif status == pbsjob.DONE:
self.is_simulating = False self.is_simulating = False
return self.status pass
try: else:
_, out, _ = self.ssh.execute("cat .hawc2launcher/status_%s" % self.simulation_id) try:
out = out.split(";") _, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id)
if len(out) == 5: out = out.split(";")
self.status = out[0] if len(out) == 5:
self.logFile = LogInfo(*out[1:]) self.status = out[0]
self.logFile = LogInfo(*out[1:])
except Exception as e:
if "No such file or directory" in str(e): except Exception as e:
pass if "No such file or directory" in str(e):
else: pass
raise else:
raise
Simulation.update_status(self)
def start(self): def start(self):
"""Start non blocking distributed simulation""" """Start non blocking distributed simulation"""
...@@ -509,10 +501,10 @@ class PBSClusterSimulationResouce(PBSClusterResource): ...@@ -509,10 +501,10 @@ class PBSClusterSimulationResouce(PBSClusterResource):
for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]): for folder in set([os.path.relpath(os.path.dirname(f)) for f in self.htcFile.output_files() + self.htcFile.turbulence_files()]):
cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder cp_back += "mkdir -p $PBS_O_WORKDIR/%s/. \n" % folder
cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder) cp_back += "cp -R -f %s/. $PBS_O_WORKDIR/%s/.\n" % (folder, folder)
rel_htcfilename = os.path.relpath(self.htcfilename, self.modelpath).replace("\\", "/") rel_htcfilename = os.path.relpath(self.htcFile.filename, self.modelpath).replace("\\", "/")
return """ return """
### Standard Output ### Standard Output
#PBS -N %s #PBS -N h2l_%s
### merge stderr into stdout ### merge stderr into stdout
#PBS -j oe #PBS -j oe
#PBS -o %s.out #PBS -o %s.out
...@@ -545,14 +537,3 @@ exit""" % (simulation_id, simulation_id, rel_htcfilename, self.hawc2exe, cp_back ...@@ -545,14 +537,3 @@ exit""" % (simulation_id, simulation_id, rel_htcfilename, self.hawc2exe, cp_back
if __name__ == "__main__":
sim = Simulation('C:\mmpe\HAWC2\Hawc2_model/', 'htc/long1/long01.htc', hawc2exe="hawc2-123_beta.exe")
sim.start()
for i in range(10):
sim.show_status()
time.sleep(0.1)
sim.abort()
sim.show_status()
sim.show_status()
...@@ -19,6 +19,7 @@ DONE = "Done" ...@@ -19,6 +19,7 @@ DONE = "Done"
class PBSJob(object): class PBSJob(object):
_status = NOT_SUBMITTED _status = NOT_SUBMITTED
nodeid = None nodeid = None
jobid = None
def __init__(self, sshclient): def __init__(self, sshclient):
self.ssh = sshclient self.ssh = sshclient
...@@ -49,6 +50,7 @@ class PBSJob(object): ...@@ -49,6 +50,7 @@ class PBSJob(object):
self._status = RUNNING self._status = RUNNING
elif self.ssh.file_exists(self.pbs_out_file): elif self.ssh.file_exists(self.pbs_out_file):
self._status = DONE self._status = DONE
self.jobid = None
return self._status return self._status
def get_nodeid(self): def get_nodeid(self):
...@@ -61,12 +63,13 @@ class PBSJob(object): ...@@ -61,12 +63,13 @@ class PBSJob(object):
#raise e #raise e
def stop(self): def stop(self):
try: if self.jobid:
self.ssh.execute("qdel %s" % self.jobid) try:
except Warning as e: self.ssh.execute("qdel %s" % self.jobid)
if 'qdel: Unknown Job Id' in str(e): except Warning as e:
return if 'qdel: Unknown Job Id' in str(e):
raise e return
raise e
def is_executing(self): def is_executing(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment