Skip to content
Snippets Groups Projects
Commit e7e88be2 authored by mads's avatar mads
Browse files

different things

parent 081c895c
No related branches found
No related tags found
No related merge requests found
......@@ -23,8 +23,9 @@ from future import standard_library
from wetb.utils.cluster_tools import pbsjob
from wetb.utils.cluster_tools.cluster_resource import LocalResource
from wetb.utils.cluster_tools.pbsjob import SSHPBSJob
from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, DONE, NOT_SUBMITTED
from wetb.utils.cluster_tools.ssh_client import SSHClient
from wetb.utils.timing import print_time
standard_library.install_aliases()
from threading import Thread
......@@ -98,7 +99,6 @@ class Simulation(object):
else:
self.log_filename = os.path.relpath(self.log_filename)
self.log_filename = unix_path(self.log_filename)
self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop)
self.logFile.clear()
self.last_status = self.status
......@@ -129,6 +129,8 @@ class Simulation(object):
time.sleep(0.1)
if self.logFile.status not in [log_file.DONE]:
self.status = ABORTED
self.is_simulating = False
self.is_done = True
if update_status:
self.update_status()
......@@ -191,6 +193,7 @@ class Simulation(object):
self.status = INITIALIZING
self.logFile.clear()
self.host._simulate()
self.returncode, self.stdout = self.host.returncode, self.host.stdout
if self.host.returncode or 'error' in self.host.stdout.lower():
if "error" in self.host.stdout.lower():
self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()])))
......@@ -244,6 +247,7 @@ class Simulation(object):
self.status = FINISH
def __str__(self):
return "Simulation(%s)" % self.filename
......@@ -335,10 +339,12 @@ class UpdateStatusThread(Thread):
Thread.start(self)
def run(self):
print ("Wrong updatestatus")
while self.simulation.is_done is False:
self.simulation.update_status()
time.sleep(self.interval)
time.sleep(0.5)
t = time.time()
while self.simulation.is_simulating and time.time() < t + self.interval:
time.sleep(1)
class SimulationResource(object):
......@@ -455,8 +461,11 @@ class SimulationThread(Thread):
def run(self):
import psutil
p = psutil.Process(os.getpid())
if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
try:
if self.low_priority:
p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
except:
pass
self.process.communicate()
errorcode = self.process.returncode
......@@ -473,7 +482,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
def __init__(self, simulation, resource, host, username, password, port=22):
SimulationResource.__init__(self, simulation)
SSHClient.__init__(self, host, username, password, port=port)
self.pbsjob = SSHPBSJob(host, username, password, port)
self.pbsjob = SSHPBSJob(resource.shared_ssh)
self.resource = resource
hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe))
......@@ -505,8 +514,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
try:
dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath))
os.makedirs(os.path.dirname(dst_file), exist_ok=True)
self.download(src_file, dst_file, verbose=False)
except ValueError as e:
self.download(src_file, dst_file, retry=3)
except Exception as e:
print (self.modelpath, src_file, self.tmp_modelpath)
raise e
try:
......@@ -545,25 +554,29 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
def update_logFile_status(self):
status = self.pbsjob.status
def pbsjob_status():
if self.pbsjob._status in [NOT_SUBMITTED, DONE]:
return self.pbsjob._status
if self.pbsjob.jobid in self.resource.is_executing:
self.pbsjob._status = pbsjob.RUNNING
elif self.simulation_id in self.resource.finished:
self.pbsjob._status = DONE
self.pbsjob.jobid = None
return self.pbsjob._status
status = pbsjob_status()
if status == pbsjob.NOT_SUBMITTED:
pass
elif status == pbsjob.DONE:
self.is_simulating = False
pass
else:
try:
_, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id)
out = out.split(";")
if len(out) == 5:
self.status = out[0]
self.logFile = LogInfo(*out[1:])
except Exception as e:
if "No such file or directory" in str(e):
pass
else:
raise
if self.simulation_id in self.resource.loglines:
self.logline = self.resource.loglines[self.simulation_id]
self.status = self.logline[0]
self.logFile = LogInfo(*self.logline[1:])
def start(self):
"""Start non blocking distributed simulation"""
......@@ -578,6 +591,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient):
self.finish_simulation()
except:
pass
self.is_simulating = False
self.is_done = True
if self.status != ERROR and self.logFile.status not in [log_file.DONE]:
self.status = ABORTED
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment