Skip to content
Snippets Groups Projects
Commit 16a13e8c authored by David Verelst's avatar David Verelst
Browse files

launch.py: add crontab mode

parent 571975e2
No related branches found
No related tags found
No related merge requests found
...@@ -77,6 +77,7 @@ def load_pbsflist(fname): ...@@ -77,6 +77,7 @@ def load_pbsflist(fname):
f = open(fname, 'r') f = open(fname, 'r')
# drop the newline character # drop the newline character
pbsflist = [line[:-1] for line in f] pbsflist = [line[:-1] for line in f]
f.close()
return pbsflist return pbsflist
def save_pbsflist(fname, pbsflist): def save_pbsflist(fname, pbsflist):
...@@ -95,6 +96,32 @@ def write_qsub_exe(fname, pbs, tsleep=0.25): ...@@ -95,6 +96,32 @@ def write_qsub_exe(fname, pbs, tsleep=0.25):
return job_id return job_id
def write_crontab():
"""Create a crontab script, and submit to crontab.
"""
python = '/usr/bin/python'
launch = '/home/MET/repositories/toolbox/pbsutils/launch.py'
cwd = os.getcwd()
fpath = os.path.join(cwd, 'launch_crontab.sh')
# TODO: parse other relevant arguments as well: --re
# but not the free cpu's etc, they are read from the config file
crontab = '*/5 * * * * cd "%s"; %s %s --crontab' % (cwd, python, launch)
f = open(fpath, 'w')
f.write(crontab)
f.flush()
f.close()
cmd = 'crontab %s' % fpath
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
def remove_crontab():
cmd = 'crontab -r'
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
def qsub(fname, qsub_cmd='qsub %s'): def qsub(fname, qsub_cmd='qsub %s'):
""" """
Parameters Parameters
...@@ -432,10 +459,7 @@ class Scheduler: ...@@ -432,10 +459,7 @@ class Scheduler:
self.fname_cluster_state = 'launch_scheduler_state.txt' self.fname_cluster_state = 'launch_scheduler_state.txt'
self.reload_pbsflist = 0 self.reload_pbsflist = 0
self.write_config() def __call__(self, depend=False, crontab_mode=False):
self.init_state_log()
def __call__(self, depend=False):
""" """
Parameters Parameters
---------- ----------
...@@ -446,7 +470,7 @@ class Scheduler: ...@@ -446,7 +470,7 @@ class Scheduler:
""" """
if not depend: if not depend:
print ' method ; scheduler' print ' method ; scheduler'
self.launch() self.launch(crontab_mode=crontab_mode)
else: else:
print ' method ; dependencies' print ' method ; dependencies'
self.launch_deps() self.launch_deps()
...@@ -535,7 +559,7 @@ class Scheduler: ...@@ -535,7 +559,7 @@ class Scheduler:
return cpu_free, cpu_user, cpu_user_queue return cpu_free, cpu_user, cpu_user_queue
def launch(self): def launch(self, crontab_mode=False):
""" """
Iterate over all the pbs files and launch them if there are cpus Iterate over all the pbs files and launch them if there are cpus
available, and if the maximum number of predifined cpus is not reached. available, and if the maximum number of predifined cpus is not reached.
...@@ -628,6 +652,10 @@ class Scheduler: ...@@ -628,6 +652,10 @@ class Scheduler:
# measure # measure
time.sleep(self.tsleep_short) time.sleep(self.tsleep_short)
ii += 1 ii += 1
elif crontab_mode:
fname = os.path.join(os.getcwd(), 'pbs_in_file_cache.txt')
save_pbsflist(fname, self.pbsflist[ii:])
break
else: else:
# wait a bit before trying to launch a job again # wait a bit before trying to launch a job again
if self.debug: if self.debug:
...@@ -643,6 +671,8 @@ class Scheduler: ...@@ -643,6 +671,8 @@ class Scheduler:
# stop when we handled all the files # stop when we handled all the files
if ii >= ii_max: if ii >= ii_max:
print 'All jobs have been launched, stopping the scheduler.' print 'All jobs have been launched, stopping the scheduler.'
if crontab_mode:
remove_crontab()
break break
def launch_deps(self): def launch_deps(self):
...@@ -796,10 +826,10 @@ if __name__ == '__main__': ...@@ -796,10 +826,10 @@ if __name__ == '__main__':
'cpus available for you. Default=48') 'cpus available for you. Default=48')
parser.add_argument('--cpu_user_queue', action='store', dest='cpu_user_queue', parser.add_argument('--cpu_user_queue', action='store', dest='cpu_user_queue',
type='int', default=500, help='No more jobs will be ' type='int', default=500, help='No more jobs will be '
'launched after having cpu_user_queue number of jobs' 'launched after having cpu_user_queue number of jobs '
'in the queue. This prevents users from filling the' 'in the queue. This prevents users from filling the '
'queue, while still allowing to aim for a high cpu_free' 'queue, while still allowing to aim for a high cpu_free '
'target.') 'target. Default=500')
parser.add_argument('--qsub_cmd', action='store', dest='qsub_cmd', parser.add_argument('--qsub_cmd', action='store', dest='qsub_cmd',
default='qsub %s', default='qsub %s',
help='Is set automatically by --node flag') help='Is set automatically by --node flag')
...@@ -807,6 +837,9 @@ if __name__ == '__main__': ...@@ -807,6 +837,9 @@ if __name__ == '__main__':
default=False, help='If executed on dedicated node.') default=False, help='If executed on dedicated node.')
parser.add_argument('--sort', action='store_true', dest='sort', parser.add_argument('--sort', action='store_true', dest='sort',
default=False, help='Sort pbs file list. Default=False') default=False, help='Sort pbs file list. Default=False')
parser.add_argument('--crontab', action='store_true', dest='crontab',
default=False, help='Crontab mode. Implies --cache, '
'and not compatible with --node. Default=False')
parser.add_argument('--debug', action='store_true', dest='debug', parser.add_argument('--debug', action='store_true', dest='debug',
default=False, help='Debug print statements. Default=False') default=False, help='Debug print statements. Default=False')
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
...@@ -866,10 +899,23 @@ if __name__ == '__main__': ...@@ -866,10 +899,23 @@ if __name__ == '__main__':
else: else:
path_log = None path_log = None
if options.crontab:
options.cache = True
ss = Scheduler(options.nr_cpus, path_pbs_files=options.path_pbs_files, ss = Scheduler(options.nr_cpus, path_pbs_files=options.path_pbs_files,
search_crit_re=options.search_crit_re, dryrun=options.dry, search_crit_re=options.search_crit_re, dryrun=options.dry,
tsleep=options.tsleep, logfile=path_log, tsleep=options.tsleep, logfile=path_log,
cache=options.cache, cpu_free=options.cpu_free, cache=options.cache, cpu_free=options.cpu_free,
qsub_cmd=options.qsub_cmd, sort=options.sort, qsub_cmd=options.qsub_cmd, sort=options.sort,
debug=options.debug, cpu_user_queue=options.cpu_user_queue) debug=options.debug, cpu_user_queue=options.cpu_user_queue)
ss(options.depend) if options.crontab:
if os.path.exists(os.path.join(os.getcwd(), 'launch_crontab.sh')):
ss.read_config()
else:
ss.write_config()
ss.init_state_log()
write_crontab()
else:
ss.write_config()
ss.init_state_log()
ss(depend=options.depend, crontab_mode=options.crontab)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment