-
David Verelst authoredDavid Verelst authored
launch.py 38.22 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Created on Fri Nov 14 14:01:38 2014
@author: dave
"""
__author__ = "David Verelst <dave@dtu.dk>"
__license__ = "GPL-2+"
#from __future__ import division
#from __future__ import print_function
import sys
import os
import subprocess as sproc
import re
import time
from datetime import datetime, timedelta
import socket
HOSTNAME = socket.gethostname()
# only python 2.7 and up
#from argparse import ArgumentParser
# python 2.4
from optparse import OptionParser
import pbswrap
class Logger:
"""The Logger class can be used to redirect standard output to a log file.
Usage: Create a Logger object and redirect standard output to the Logger
object. For example:
output = Logger(file_handle, True)
import sys
sys.stdout = output
"""
def __init__(self, logFile, echo):
"""Arguments:
logFile a file object that is available for writing
echo Boolean. If True, output is sent to standard output in
addition to the log file.
"""
import sys
self.out = sys.stdout
self.logFile = logFile
self.echo = echo
def write(self, s):
"""Required method that replaces stdout. You don't have to call this
directly--all print statements will be redirected here."""
self.logFile.write(s)
if self.echo:
self.out.write(s)
self.logFile.flush()
def print_both(f, text, end='\n'):
"""
Print both to a file and the console
"""
print text
if isinstance(f, file):
f.write(text + end)
def build_pbsflist(path_pbs_files):
pbsflist = []
for root, dirs, files in os.walk(path_pbs_files):
for fname in files:
pbsflist.append(os.path.join(root, fname))
return pbsflist
def load_pbsflist(fname):
f = open(fname, 'r')
# drop the newline character
pbsflist = [line[:-1] for line in f]
f.close()
return pbsflist
def save_pbsflist(fname, pbsflist):
f = open(fname, 'w')
f.writelines([k+'\n' for k in pbsflist])
f.flush()
f.close()
def write_qsub_exe(fname, pbs, tsleep=0.25):
# no file open context manager in python 2.4
f = open(fname, 'w')
f.write(pbs)
f.close()
job_id = qsub(fname)
time.sleep(tsleep)
return job_id
def write_crontab(every_min=5):
"""Create a crontab script, and submit to crontab.
"""
python = '/usr/bin/python'
launch = '/home/MET/repositories/toolbox/pbsutils/launch.py'
cwd = os.getcwd()
fpath = os.path.join(cwd, 'launch_crontab.sh')
# TODO: parse other relevant arguments as well: --re
# when using the crontab option, cpu parameters are read from the config
# file if it exists, but only the first run will build a pbslist file
rpl = (every_min, cwd, python, launch)
crontab = '*/%i * * * * cd "%s"; %s %s --crontab --cache ;\n' % rpl
f = open(fpath, 'w')
f.write(crontab)
f.flush()
f.close()
# does the current crontab already exist?
cmd = 'crontab -l'
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
crontab_exists = False
for line in stdout:
if line.find(cwd) > -1 and line.find(launch) > -1:
# current crontab job is already running!
crontab_exists = True
break
if not crontab_exists:
cmd = 'crontab %s' % fpath
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
print "added launch.py to crontab:"
print crontab
def remove_crontab():
cmd = 'crontab -r'
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
def qsub(fname, qsub_cmd='qsub %s'):
"""
Parameters
----------
fname : str
path to pbs launch file
cmd : str
Launch command, with %s indicating the pbs launch file name fname.
When launching from a node on Gorm, ssh to the login node first
to bypass the fact that you can only launch from the login node.
"""
cmd = qsub_cmd % fname
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
# it should return the job id like this: 387598.g-000.risoe.dk
if len(stdout) > 1:
print 'cmd:'
print cmd
print 'stdout:'
print stdout
raise(UserWarning, 'qsub returned unexpected result')
job_id = stdout[0].replace('\n', '').strip().split('.')[0]
# job id needs to be a number, otherwise something went wrong
try:
int(job_id)
except ValueError:
print('failing qsub command:')
print(cmd)
print(stdout[0])
raise(UserWarning, 'qsub failed')
return job_id
def pbs_job_exists(job_id):
"""
Returns True when the job_id actually exists on the cluster
"""
cmd = 'qstat %s' % job_id
p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
stdout = p.stdout.readlines()
p.wait()
# qstat: Unknown Job Id
# qstat: Unknown queue destination
# one line if it doesn't exist, multiple if it does
if len(stdout) == 1:
return False
else:
return True
def select(pbsflist, regex):
pbsflist2 = []
for k in pbsflist:
# search criteria, jump to next if not compliant
if regex.search(k):
pbsflist2.append(k)
return pbsflist2
def launch_deps(nr_cpus, path_pbs_files=None, search_crit_re=r'\.p$',
dryrun=False, tsleep=0.30, logfile=None, cache=False):
"""Launch
Launch PBS jobs with dependencies.
Regular expression examples:
--re .veer-0\\.89_.
--re=".veer-0\\.89_.|.veer-0\\.74_."
Parameters
----------
nr_cpus : int
path_pbs_files : path, default=None
search_crit_re : string, default=r'\.p$'
Regular expression applied on the full path of the pbs file that has
to be launched. Use this to only launch a subset of the created
simulations. By default it takes all the *.p files it can find.
dryrun : boolean, default=False
tsleep : float, default=0.30
logfile : str, default=None
cache : boolean, default=False
Returns
-------
None
"""
regex = re.compile(search_crit_re)
# when there is no path defined, take the current working directory
if path_pbs_files is None:
path_pbs_files = os.getcwd()
# the logfile
if isinstance(logfile, str):
if logfile == '':
path_log = os.path.join(os.getcwd(), 'launch_log.txt')
elif not os.path.isabs(logfile):
path_log = os.path.join(os.getcwd(), logfile)
flog = open(path_log, 'w')
print_both(flog, ' nr_cpus ; %i' % nr_cpus)
print_both(flog, 'path_pbs_files ; %s' % path_pbs_files)
print_both(flog, 'search_crit_re ; %s' % search_crit_re)
print_both(flog, ' dryrun ; %s' % dryrun)
print_both(flog, ' tsleep ; %s' % tsleep)
print_both(flog, ' logfile ; %s' % logfile)
print_both(flog, ' path_log ; %s' % path_log)
print_both(flog, ' cache ; %s' % cache)
print_both(flog, '')
flog.flush()
else:
flog = None
fname = os.path.join(os.getcwd(), 'launch_pbs_filelist.txt')
if not cache:
print_both(flog, 'Building file list of to be launched jobs. '
'Might take a while...')
pbsflist = build_pbsflist(path_pbs_files)
save_pbsflist(fname, pbsflist)
else:
print_both(flog, 'Loading file list from cache...')
pbsflist = load_pbsflist(fname)
# not the most efficient way, but more generally applicable if we want to
# to be able to give a time estimate on how long it will take to launch
# all the cases
pbsflist2 = select(pbsflist, regex)
nr_jobs = len(pbsflist2)
print_both(flog, 'Done building/loading. Ready to qsub %i jobs' % nr_jobs)
tot_seconds = tsleep*nr_jobs
d = datetime(1,1,1) + timedelta(seconds=tot_seconds)
rpl = (tsleep, d.hour, d.minute, d.second)
print_both(flog, 'With tsleep=%1.3f s, this will take %02i:%02i:%02i' % rpl)
print_both(flog, 'You have 5 seconds to abort if this doesn\'t look ok...')
print_both(flog, '')
time.sleep(5)
# make sure job_id is not set at the start
job_id = None
# now we can create the dependency list: a dictionary with the string_nr
# as key, and the value is the job_id dependency list
deps = {}
blok_nr = 1
nr_string = 1
c0, c1 = 7, 12
job_id_dep = 'none'
# list comprehension doesn't work on python 2.4
# deps = {k:[''] for k in range(nr_cpus+1)}
deps = {}
for k in range(nr_cpus+1):
deps[k] = ['none']
header = ('chain'.rjust(c0), 'job_id'.rjust(c1), 'job_id_dep'.rjust(c1),
'job_name'.rjust(c1), 'pbs_in file name')
print_both(flog, '%s ; %s ; %s ; %s ; %s' % header)
rpl = ('# blok'.rjust(c0), str(blok_nr).ljust(c1), ' '*c1, ' '*c1)
print_both(flog, '%s ; %s ; %s ; %s ;' % rpl)
for i, fname in enumerate(pbsflist2):
# search criteria, jump to next if not compliant
if not regex.search(fname):
continue
# read the file
f = open(fname)
pbs = f.read()
f.close()
# the current job will depend on the job launched just before this one
job_id_dep = deps[nr_string][-1]
# we cycle over all the nr_cpus. Row per row is filled up. Only for the
# first row we have an independent job
# also check if the job_id_dep actually exists. If not, there should
# not be a dependency in the pbs launch script
if i < nr_cpus or not pbs_job_exists(job_id_dep):
pbs = pbs.replace('[nodeps]', '### ')
job_id_dep = 'none'
if not dryrun:
job_id = write_qsub_exe(fname, pbs, tsleep=tsleep)
else:
job_id = '%i' % i
# create a dependency list
deps[nr_string] = [job_id]
else:
pbs = pbs.replace('[nodeps]', '#')
if not dryrun:
pbs = pbs.replace('[job_id]', job_id_dep)
job_id = write_qsub_exe(fname, pbs, tsleep=tsleep)
else:
job_id_dep = str(int(job_id) - nr_cpus + 1)
job_id = '%i' % i
# append to the existing list
deps[nr_string].append(job_id)
# find the job name, second line
job_name = pbs.split('#PBS')[1].split('-N')[1].strip()
fname_short = os.path.basename(fname)
rpl = (str(nr_string).rjust(c0), job_id.rjust(c1), job_id_dep.rjust(c1),
job_name.rjust(c1), fname_short)
print_both(flog, '%s ; %s ; %s ; %s ; %s' % rpl)
if nr_string >= nr_cpus:
if isinstance(flog, file):
flog.flush()
nr_string = 1
blok_nr += 1
progress = '%6i/%i' % (i, nr_jobs)
rpl = ('# blok'.rjust(c0), str(blok_nr).ljust(c1),
progress.rjust(c1), ' '*c1)
print_both(flog, '%s ; %s ; %s ; %s ;' % rpl)
else:
nr_string += 1
print_both(flog, '%s ; %s ; %s ; %s ; %s' % header)
if isinstance(flog, file):
flog.flush()
flog.close()
class Scheduler:
"""
A simple scheduler that runs on top of PBS.
There is a lot of duplicate code in this class and launch_deps method
"""
def __init__(self, nr_cpus, path_pbs_files=None, search_crit_re=r'\.p$',
dryrun=False, tsleep=5.00, logfile=None, cache=False,
cpu_free=48, qsub_cmd='qsub %s', sort=False, debug=False,
cpu_user_queue=500):
"""
Regular expression examples:
--re .veer-0\\.89_.
--re=".veer-0\\.89_.|.veer-0\\.74_."
Parameters
----------
nr_cpus : int
path_pbs_files : path, default=None
If set to None, the current working directory is used instead.
search_crit_re : string, default=r'\.p$'
Regular expression applied on the full path of the pbs file that
has to be launched. Use this to only launch a subset of the created
simulations. By default it takes all the *.p files it can find.
dryrun : boolean, default=False
tsleep : float, default=5.00
logfile : str, default=None
Is only used to trigger the confirmation on continue.
cache : boolean, default=False
cpu_free : int, default=48
Minimum defined number of free cpu's that the cluster should have
before another job is launched for the user. This will prevent
more jobs being launced even if the user occupies less than the
number of cpu's defined with nr_cpus.
cpu_user_queue : int, default=500
qsub_cmd : str, default='qsub %s'
When launching from a node on Gorm, ssh to g-000 to bypass the
lack of permissions to launch jobs from a node.
sort : boolean, default=False
If True the file list will be sorted. Applied for both cached and
builded file lists.
"""
# when there is no path defined, take the current working directory
if path_pbs_files is None:
path_pbs_files = os.getcwd()
self.uid = os.getenv('USER')
self.nr_cpus = nr_cpus
self.cpu_free = cpu_free
self.cpu_user_queue = cpu_user_queue
self.tsleep = tsleep
self.dryrun = dryrun
self.tsleep_short = 1.0
self.c0, self.c1, self.c2 = 8, 8, 17
self.logfile = logfile
self.qsub_cmd = qsub_cmd
self.debug = debug
self.search_crit_re = search_crit_re
self.path_pbs_files = path_pbs_files
self.pbs_update_deltat = 5.0 # in seconds
self.fname_config = 'launch_scheduler_config.txt'
self.fname_cluster_state = 'launch_scheduler_state.txt'
self.reload_pbsflist = 0
self.cache = cache
self.sort = sort
self.f_pbsflist = os.path.join(os.getcwd(), 'launch_pbs_filelist.txt')
self.pbsnodes = '/opt/pbs/bin/pbsnodes'
if HOSTNAME[:1] == 'g':
self.qstat = '/opt/pbs/bin/qstat'
self.qsub_cmd = self.qsub_cmd.replace('qsub', '/opt/pbs/bin/qsub')
else:
self.qstat = '/usr/bin/qstat'
self.qsub_cmd = self.qsub_cmd.replace('qsub', '/usr/bin/qsub')
def __call__(self, depend=False, crontab_mode=False):
"""
Parameters
----------
depend : boolean, default=False
If set to True, the dependency launch method will be used. If False,
the scheduler method is used.
"""
if not depend:
if crontab_mode:
pbsflist = self.get_joblist(verbose=False)
self.launch(pbsflist, crontab_mode=crontab_mode)
else:
self.print_config()
print ' method ; scheduler'
pbsflist = self.get_joblist()
self.print_logheader()
self.launch(pbsflist, crontab_mode=crontab_mode)
print ''
else:
print ' method ; dependencies'
self.launch_deps()
print ''
def get_joblist(self, verbose=True):
# build the file list
if not self.cache:
if verbose:
print 'Building file list of to be launched jobs. ',
print 'Might take a while...'
# only filter when building the file list. It would be counter
# intuitive to filter a user generated/controlled list.
pbsflist = self.filter_joblist(build_pbsflist(self.path_pbs_files))
if self.sort:
pbsflist.sort()
save_pbsflist(self.f_pbsflist, pbsflist)
else:
if verbose:
print 'Loading file list from cache...'
pbsflist = load_pbsflist(self.f_pbsflist)
return pbsflist
def filter_joblist(self, pbsflist):
# filter out files we will not launch
regex = re.compile(self.search_crit_re)
pbsflist = select(pbsflist, regex)
nr_jobs = len(pbsflist)
print 'Done building/loading. Ready to qsub %i jobs' % nr_jobs
return pbsflist
def print_config(self):
"""Print a header for the log with current config.
"""
print ' nr_cpus ; %3i' % self.nr_cpus
print ' cpu_free ; %3i' % self.cpu_free
print 'cpu_user_queue ; %3i' % self.cpu_user_queue
print 'path_pbs_files ; %s' % self.path_pbs_files
print 'search_crit_re ; %s' % self.search_crit_re
print ' dryrun ; %s' % self.dryrun
print ' tsleep ; %s' % self.tsleep
print ' logfile ; %s' % self.logfile
print ' path_log ; %s' % path_log
print ' cache ; %s' % self.cache
def print_logheader(self):
print ' tsleep_short ; %6.2f' % self.tsleep_short
print 'min cpu to be free ; %3i' % self.cpu_free
# we can only ask for confirmation when stdout is not going to the log
if self.logfile is None:
print 'Is this ok? You have 5 sec to abort (press CTRL-C to abort)'
time.sleep(5)
# print 'Press enter to continue, or CTRL-C to abort'
# dummy = raw_input()
print ''
header = ('job nr'.rjust(self.c0), 'job_id'.rjust(self.c1),
'progress'.rjust(self.c1), 'time stamp'.rjust(self.c2),
'pbs_in file name')
print '%s ; %s ; %s ; %s ; %s' % header
def write_config(self):
"""Save a config file with the current tuning parameters.
"""
# read the PBS file
f = open(self.fname_config, 'w')
f.write(' nr_cpus : % 5i\n' % self.nr_cpus)
f.write('cpu_user_queue : % 5i\n' % self.cpu_user_queue)
f.write(' cpu_free : % 5i\n' % self.cpu_free)
f.write('')
f.write(' tsleep : % 5.02f\n' % self.tsleep)
f.write(' tsleep_short : % 5.02f\n' % self.tsleep_short)
f.write('pbs_update_deltat : % 5.02f\n' % self.pbs_update_deltat)
f.write(' f_pbsflist : %s\n' % self.f_pbsflist)
f.flush()
f.close()
def read_config(self):
"""Update scheduler tuning from config file.
"""
f = open(self.fname_config, 'r')
lines = f.readlines()
f.close()
for line in lines:
items = line.split(':')
if len(items) == 2:
try:
value = int(items[1].strip())
# FIXME: there has to be a more elegant solution than this ugly
# hack...
except ValueError:
try:
value = float(items[1].strip())
except ValueError:
value = items[1].strip()
setattr(self, items[0].strip(), value)
def init_state_log(self):
"""For more extensive logging, write down the scheduler and cluster
states.
"""
self.header = ['date_time', 'cpu_target', 'cpu_user', 'cpu_free',
'free_target', 'queue_user', 'queue_target', 'tsleep',
'tsleep_short', 'pbs_update_deltat']
f = open(self.fname_cluster_state, 'w')
f.write(' ; '.join(self.header) + '\n')
f.flush()
f.close()
def update_state_log(self, ts, cpu_user, cpu_free, cpu_user_queue):
ts_ = datetime.fromtimestamp(ts).strftime('%m-%d %H:%M:%S')
ms = '%1.02f' % ts
ts_ += ms[-3:]
line = [ts_, self.nr_cpus, cpu_user, cpu_free, self.cpu_free,
cpu_user_queue, self.cpu_user_queue, self.tsleep,
self.tsleep_short, self.pbs_update_deltat,
self.reload_pbsflist]
line_str = [str(k) for k in line]
f = open(self.fname_cluster_state, 'a')
f.write(' ; '.join(line_str) + '\n')
f.flush()
f.close()
def check_nodes(self, uid):
"""
See how many cpu's are free, and how many are used by the current user
"""
if HOSTNAME[:2] == 'j-':
output = os.popen('ssh jess "%s -l all"' % self.pbsnodes).readlines()
else:
output = os.popen('%s -l all' % self.pbsnodes).readlines()
# FIXME: slow, maybe do this only a few times??
pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output)
output = os.popen('%s -n1' % self.qstat).readlines()
users, host, nodesload = pbswrap.parse_qstat_n1(output)
# if the user does not have any jobs, this will not exist
try:
cpu_user = users[uid]['cpus']
except KeyError:
cpu_user = 0
# add queued jobs as well, if any
try:
cpu_user_queue = users[uid]['Q']
except KeyError:
cpu_user_queue = 0
cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes)
return cpu_free, cpu_user, cpu_user_queue
def launch(self, pbsflist, crontab_mode=False):
"""
Iterate over all the pbs files and launch them if there are cpus
available, and if the maximum number of predifined cpus is not reached.
This scenario should hopefully work with massive amounts of jobs.
Queued jobs are also included when the counting the number of cpu's
used by the user. This will prevent the scheduler from spamming the
queueing system with jobs that will only queue intstead of run.
"""
ii = 0
ii_max = len(pbsflist)
if ii_max < 1:
print ""
print "No files matching %s at:" % self.search_crit_re
print self.path_pbs_files
print "I have no choice but to abort the launch sequence."
print ""
return
time0 = 0.0
while True:
time1 = time.time()
if (time1 - time0) > self.pbs_update_deltat:
cpu_free, cpu_user, cpu_user_queue = self.check_nodes(self.uid)
time0 = time.time()
self.read_config()
self.update_state_log(time0, cpu_user, cpu_free, cpu_user_queue)
# we only launch a new job when we are not using more than our
# quota (nr_cpus), or when there is still some room for others
# to breath
# TODO: add another metric: if the total queue is bigger than
# so many nodes/cpus, don't add anything else.
if (self.nr_cpus > cpu_user) and (cpu_free > self.cpu_free) and \
(cpu_user_queue < self.cpu_user_queue):
fname = pbsflist[ii]
# fname minus current working dir
fname_short = fname.replace(os.getcwd(), '')
# # read the PBS file
# f = open(fname)
# pbs = f.read()
# f.close()
# find the job name, second line
# job_name = pbs.split('#PBS')[1].split('-N')[1].strip()
# fname_short = os.path.basename(fname)
# submit the job
if not self.dryrun:
try:
job_id = qsub(fname, qsub_cmd=self.qsub_cmd)
# in case we are not polling for the current PBS state
# assume we are the only one eating up available CPU's
cpu_user += 1
cpu_free -= 1
except UserWarning:
cmd = self.qsub_cmd % fname
print 'launch cmd: %s' % cmd
print 'failed, try again in %1.1f sec' % self.tsleep
time.sleep(self.tsleep)
continue
else:
job_id = '%i' % ii
# when was the job launched
time_s = time.time()
ts = datetime.fromtimestamp(time_s).strftime('%m-%d %H:%M:%S')
ms = '%1.02f' % time_s
ts += ms[-3:]
progress = '%6.2f' % (100.0*(ii+1)/float(ii_max))
rpl = (('%i' % ii).rjust(self.c0), job_id.rjust(self.c1),
(progress+'%').rjust(self.c1), ts.rjust(self.c2),
fname_short)
print '%s ; %s ; %s ; %s ; %s' % rpl
# to avoid a huge runoff on jobs launching, just a security
# measure
time.sleep(self.tsleep_short)
ii += 1
elif crontab_mode:
save_pbsflist(self.f_pbsflist, pbsflist[ii:])
return
else:
# wait a bit before trying to launch a job again
if self.debug:
print 'sleeping because:'
rpl = (self.nr_cpus, cpu_user)
print ' nr_cpus: %4i, cpu_user: %4i' % rpl
rpl = (cpu_free, self.cpu_free)
print 'cpu_free: %4i, cpu_free_req: %4i' % rpl
rpl = (cpu_user_queue, self.cpu_user_queue)
print ' u queue: %4i, u queue max: %4i' % rpl
time.sleep(self.tsleep)
# stop when we handled all the files
if ii >= ii_max:
print 'All jobs have been launched, stopping the scheduler.'
if crontab_mode:
remove_crontab()
return
def launch_deps(self):
"""Launch
Launch PBS jobs with dependencies.
Regular expression examples:
--re .veer-0\\.89_.
--re=".veer-0\\.89_.|.veer-0\\.74_."
Parameters
----------
nr_cpus : int
path_pbs_files : path, default=None
search_crit_re : string, default=r'\.p$'
Regular expression applied on the full path of the pbs file that
has to be launched. Use this to only launch a subset of the created
simulations. By default it takes all the *.p files it can find.
dryrun : boolean, default=False
tsleep : float, default=0.30
cache : boolean, default=False
Returns
-------
None
"""
nr_jobs = len(self.pbsflist)
tot_seconds = self.tsleep*nr_jobs
d = datetime(1,1,1) + timedelta(seconds=tot_seconds)
rpl = (self.tsleep, d.hour, d.minute, d.second)
print 'With tsleep=%1.3f s, this will take %02i:%02i:%02i' % rpl
# we can only ask for confirmation when stdout is not going to the log
if self.logfile is None:
print 'Press enter to continue, or CTRL-C to abort'
raw_input()
print ''
# make sure job_id is not set at the start
job_id = None
# now we can create the dependency list: a dictionary with the string_nr
# as key, and the value is the job_id dependency list
deps = {}
blok_nr = 1
nr_string = 1
job_id_dep = 'none'
# list comprehension doesn't work on python 2.4
# deps = {k:[''] for k in range(nr_cpus+1)}
deps = {}
for k in range(self.nr_cpus+1):
deps[k] = ['none']
header = ('chain'.rjust(self.c0), 'job_id'.rjust(self.c1),
'job_id_dep'.rjust(self.c1),
'job_name'.rjust(self.c1), 'pbs_in file name')
print '%s ; %s ; %s ; %s ; %s' % header
rpl = ('# blok'.rjust(self.c0), str(blok_nr).ljust(self.c1),
' '*self.c1, ' '*self.c1)
print '%s ; %s ; %s ; %s ;' % rpl
for i, fname in enumerate(self.pbsflist):
# read the file
f = open(fname)
pbs = f.read()
f.close()
# the current job will depend on the job launched just before this one
job_id_dep = deps[nr_string][-1]
# we cycle over all the nr_cpus. Row per row is filled up. Only for the
# first row we have an independent job
# also check if the job_id_dep actually exists. If not, there should
# not be a dependency in the pbs launch script
if i < self.nr_cpus or not pbs_job_exists(job_id_dep):
pbs = pbs.replace('[nodeps]', '### ')
job_id_dep = 'none'
if not self.dryrun:
job_id = write_qsub_exe(fname, pbs, tsleep=self.tsleep)
else:
job_id = '%i' % i
# create a dependency list
deps[nr_string] = [job_id]
else:
pbs = pbs.replace('[nodeps]', '#')
if not self.dryrun:
pbs = pbs.replace('[job_id]', job_id_dep)
job_id = write_qsub_exe(fname, pbs, tsleep=self.tsleep)
else:
job_id_dep = str(int(job_id) - self.nr_cpus + 1)
job_id = '%i' % i
# append to the existing list
deps[nr_string].append(job_id)
# find the job name, second line
job_name = pbs.split('#PBS')[1].split('-N')[1].strip()
fname_short = os.path.basename(fname)
rpl = (str(nr_string).rjust(self.c0), job_id.rjust(self.c1),
job_id_dep.rjust(self.c1),
job_name.rjust(self.c1), fname_short)
print '%s ; %s ; %s ; %s ; %s' % rpl
if nr_string >= self.nr_cpus:
nr_string = 1
blok_nr += 1
progress = '%6i/%i' % (i, nr_jobs)
rpl = ('# blok'.rjust(self.c0), str(blok_nr).ljust(self.c1),
progress.rjust(self.c1), ' '*self.c1)
print '%s ; %s ; %s ; %s ;' % rpl
else:
nr_string += 1
print '%s ; %s ; %s ; %s ; %s' % header
if __name__ == '__main__':
# parse the arguments, only relevant when using as a command line utility
parser = OptionParser(usage='\n\n%prog -n nr_cpus \n\n'
'%prog --crontab when running a single iteration of '
'%prog as a crontab job every 5 minutes.\n'
'File list is read from "launch_pbs_filelist.txt", '
'and the configuration can be changed on the fly\n'
'by editing the file "launch_scheduler_config.txt".')
parser.add_argument = parser.add_option
parser.add_argument('--depend', dest='depend', action='store_true',
default=False, help='Switch on for launch depend method')
parser.add_argument('-n', '--nr_cpus', type='int', dest='nr_cpus',
action='store', default=None,
help='number of cpus to be used')
parser.add_argument('-p', '--path_pbs_files', type='string', action='store',
dest='path_pbs_files', default='pbs_in/',
help='optionally specify location of pbs files')
parser.add_argument('--re', type='string', action='store',
dest='search_crit_re', default=r'\.p$',
help='regular expression search criterium applied on '
'the full pbs file path. Escape backslashes! '
'By default it will select all *.p files in pbs_in/.')
parser.add_argument('--dry', action='store_true', dest='dry', default=False,
help='dry run: do not alter pbs files, do not launch')
parser.add_argument('--tsleep', action='store', dest='tsleep', type='float',
default=5.00, help='Sleep time [s] when cluster is too '
'bussy to launch new jobs. Default=5 seconds')
parser.add_argument('--tsleep_short', action='store', dest='tsleep_short',
type='float', default=0.5, help='Sleep time [s] '
'between between successive job launches. Default=0.5 '
'seconds.')
parser.add_argument('--logfile', action='store', dest='logfile',
default=None, help='Save output to file.')
parser.add_argument('-c', '--cache', action='store_true', dest='cache',
default=False, help='If on, files are read from cache')
parser.add_argument('--cpu_free', action='store', dest='cpu_free',
type='int', default=48, help='No more jobs will be '
'launched when the cluster does not have the specified '
'amount of cpus free. This will make sure there is '
'room for others on the cluster, but might mean less '
'cpus available for you. Default=48')
parser.add_argument('--cpu_user_queue', action='store', dest='cpu_user_queue',
type='int', default=5, help='No more jobs will be '
'launched after having cpu_user_queue number of jobs '
'in the queue. This prevents users from filling the '
'queue, while still allowing to aim for a high cpu_free '
'target. Default=5')
parser.add_argument('--qsub_cmd', action='store', dest='qsub_cmd',
default='qsub %s',
help='Is set automatically by --node flag')
parser.add_argument('--node', action='store_true', dest='node',
default=False, help='If executed on dedicated node. '
'Although this works, consider using --crontab instead.'
' Default=False')
parser.add_argument('--sort', action='store_true', dest='sort',
default=False, help='Sort pbs file list. Default=False')
parser.add_argument('--crontab', action='store_true', dest='crontab',
default=False, help='Crontab mode: %prog will check '
'every 5 (default) minutes if more jobs can be launched. '
'Not compatible with --node. When all jobs are done, '
'crontab -r will remove all existing crontab jobs of '
'the current user. Use crontab -l to inspect current '
'crontab jobs, and edit them with crontab -e. '
'Default=False')
parser.add_argument('--every_min', action='store', dest='every_min',
type='int', default=5, help='Crontab update interval '
'in minutes. Default=5')
parser.add_argument('--debug', action='store_true', dest='debug',
default=False, help='Debug print statements. Default=False')
(options, args) = parser.parse_args()
if options.crontab and options.node:
parser.print_usage()
sys.stderr.write("error: --node and --crontab not compatible" + os.linesep)
sys.exit(1)
if options.nr_cpus is None and not options.crontab:
parser.print_usage()
sys.stderr.write("error: specify number of cpus with -n" + os.linesep)
sys.exit(1)
if options.node:
# repeat exactly the same command but now wrap a pbs script around it
# and remove the --node argument
jobname = 'launch.py'
args = [arg for arg in sys.argv]
# remove --node argument
iqsub_cmd, ilogfile = None, None
for i, arg in enumerate(args):
if arg == '--node':
inode = i
elif arg[:10] == '--qsub_cmd':
iqsub_cmd = i
rpl = (os.getcwd(), '%s')
if HOSTNAME[:1] == 'g':
args[i] = '--qsub_cmd=\'ssh g-000 "cd %s;qsub %s"\'' % rpl
else:
args[i] = '--qsub_cmd="cd %s;qsub %s"' % rpl
elif arg[:9] == '--logfile':
ilogfile = i
args[i] = '--logfile= '
args.pop(inode)
# if they were not defined in the original command, add them
if iqsub_cmd is None:
rpl = (os.getcwd(), '%s')
if HOSTNAME[:1] == 'g':
args.append('--qsub_cmd=\'ssh g-000 "cd %s;qsub %s"\'' % rpl)
else:
args.append('--qsub_cmd="cd %s;qsub %s"' % rpl)
if ilogfile is None:
args.append('--logfile= ')
# and force the use of logfile and specify qsub_com with ssh
# first argument is the full path of the script
args.pop(0)
com = 'launch.py %s' % ' '.join(args)
# create a PBS script that contains the launch.py command
fpath = pbswrap.create_input(walltime='72:00:00', queue='workq',
pbs_in='pbs_in/', ppn=1, pbs_out='pbs_out/',
jobname=jobname, commands=com, lnodes=1)
job_id = qsub(fpath)
sys.exit(1)
elif isinstance(options.logfile, str) or options.crontab:
if options.logfile is None:
options.logfile = ''
if options.logfile.strip() == '':
path_log = os.path.join(os.getcwd(), 'launch_scheduler_log.txt')
elif not os.path.isabs(options.logfile):
path_log = os.path.join(os.getcwd(), options.logfile)
# log to file and do not print messages, but in crontab mode, append!
if os.path.exists(path_log) and options.crontab:
mode = 'a'
else:
mode = 'w'
output = Logger(open(path_log, mode), True)
sys.stdout = output
else:
path_log = None
ss = Scheduler(options.nr_cpus, path_pbs_files=options.path_pbs_files,
search_crit_re=options.search_crit_re, dryrun=options.dry,
tsleep=options.tsleep, logfile=path_log,
cache=options.cache, cpu_free=options.cpu_free,
qsub_cmd=options.qsub_cmd, sort=options.sort,
debug=options.debug, cpu_user_queue=options.cpu_user_queue)
ss.tsleep_short = options.tsleep_short
if options.crontab:
if os.path.exists(ss.fname_config):
ss.read_config()
else:
ss.write_config()
ss.init_state_log()
else:
ss.write_config()
ss.init_state_log()
ss(depend=options.depend, crontab_mode=options.crontab)
# start crontab job after first round of launching to prevent it from
# running more than once
if options.crontab:
write_crontab(every_min=options.every_min)