#!/usr/bin/python # -*- coding: utf-8 -*- """ Created on Fri Nov 14 14:01:38 2014 @author: dave """ __author__ = "David Verelst <dave@dtu.dk>" __license__ = "GPL-2+" #from __future__ import division #from __future__ import print_function import sys import os import subprocess as sproc import re import time from datetime import datetime, timedelta import socket HOSTNAME = socket.gethostname() # only python 2.7 and up #from argparse import ArgumentParser # python 2.4 from optparse import OptionParser import pbswrap class Logger: """The Logger class can be used to redirect standard output to a log file. Usage: Create a Logger object and redirect standard output to the Logger object. For example: output = Logger(file_handle, True) import sys sys.stdout = output """ def __init__(self, logFile, echo): """Arguments: logFile a file object that is available for writing echo Boolean. If True, output is sent to standard output in addition to the log file. """ import sys self.out = sys.stdout self.logFile = logFile self.echo = echo def write(self, s): """Required method that replaces stdout. You don't have to call this directly--all print statements will be redirected here.""" self.logFile.write(s) if self.echo: self.out.write(s) self.logFile.flush() def print_both(f, text, end='\n'): """ Print both to a file and the console """ print text if isinstance(f, file): f.write(text + end) def build_pbsflist(path_pbs_files): pbsflist = [] for root, dirs, files in os.walk(path_pbs_files): for fname in files: pbsflist.append(os.path.join(root, fname)) return pbsflist def load_pbsflist(fname): f = open(fname, 'r') # drop the newline character pbsflist = [line[:-1] for line in f] f.close() return pbsflist def save_pbsflist(fname, pbsflist): f = open(fname, 'w') f.writelines([k+'\n' for k in pbsflist]) f.flush() f.close() def write_qsub_exe(fname, pbs, tsleep=0.25): # no file open context manager in python 2.4 f = open(fname, 'w') f.write(pbs) f.close() job_id = qsub(fname) time.sleep(tsleep) return job_id def write_crontab(every_min=5): """Create a crontab script, and submit to crontab. """ python = '/usr/bin/python' launch = '/home/MET/repositories/toolbox/pbsutils/launch.py' cwd = os.getcwd() fpath = os.path.join(cwd, 'launch_crontab.sh') # TODO: parse other relevant arguments as well: --re # when using the crontab option, cpu parameters are read from the config # file if it exists, but only the first run will build a pbslist file rpl = (every_min, cwd, python, launch) crontab = '*/%i * * * * cd "%s"; %s %s --crontab --cache ;\n' % rpl f = open(fpath, 'w') f.write(crontab) f.flush() f.close() # does the current crontab already exist? cmd = 'crontab -l' p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True) stdout = p.stdout.readlines() p.wait() crontab_exists = False for line in stdout: if line.find(cwd) > -1 and line.find(launch) > -1: # current crontab job is already running! crontab_exists = True break if not crontab_exists: cmd = 'crontab %s' % fpath p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True) stdout = p.stdout.readlines() p.wait() print "added launch.py to crontab:" print crontab def remove_crontab(): cmd = 'crontab -r' p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True) stdout = p.stdout.readlines() p.wait() def qsub(fname, qsub_cmd='qsub %s'): """ Parameters ---------- fname : str path to pbs launch file cmd : str Launch command, with %s indicating the pbs launch file name fname. When launching from a node on Gorm, ssh to the login node first to bypass the fact that you can only launch from the login node. """ cmd = qsub_cmd % fname p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True) stdout = p.stdout.readlines() p.wait() # it should return the job id like this: 387598.g-000.risoe.dk if len(stdout) > 1: print 'cmd:' print cmd print 'stdout:' print stdout raise(UserWarning, 'qsub returned unexpected result') job_id = stdout[0].replace('\n', '').strip().split('.')[0] # job id needs to be a number, otherwise something went wrong try: int(job_id) except ValueError: print('failing qsub command:') print(cmd) print(stdout[0]) raise(UserWarning, 'qsub failed') return job_id def pbs_job_exists(job_id): """ Returns True when the job_id actually exists on the cluster """ cmd = 'qstat %s' % job_id p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True) stdout = p.stdout.readlines() p.wait() # qstat: Unknown Job Id # qstat: Unknown queue destination # one line if it doesn't exist, multiple if it does if len(stdout) == 1: return False else: return True def select(pbsflist, regex): pbsflist2 = [] for k in pbsflist: # search criteria, jump to next if not compliant if regex.search(k): pbsflist2.append(k) return pbsflist2 def launch_deps(nr_cpus, path_pbs_files=None, search_crit_re=r'\.p$', dryrun=False, tsleep=0.30, logfile=None, cache=False): """Launch Launch PBS jobs with dependencies. Regular expression examples: --re .veer-0\\.89_. --re=".veer-0\\.89_.|.veer-0\\.74_." Parameters ---------- nr_cpus : int path_pbs_files : path, default=None search_crit_re : string, default=r'\.p$' Regular expression applied on the full path of the pbs file that has to be launched. Use this to only launch a subset of the created simulations. By default it takes all the *.p files it can find. dryrun : boolean, default=False tsleep : float, default=0.30 logfile : str, default=None cache : boolean, default=False Returns ------- None """ regex = re.compile(search_crit_re) # when there is no path defined, take the current working directory if path_pbs_files is None: path_pbs_files = os.getcwd() # the logfile if isinstance(logfile, str): if logfile == '': path_log = os.path.join(os.getcwd(), 'launch_log.txt') elif not os.path.isabs(logfile): path_log = os.path.join(os.getcwd(), logfile) flog = open(path_log, 'w') print_both(flog, ' nr_cpus ; %i' % nr_cpus) print_both(flog, 'path_pbs_files ; %s' % path_pbs_files) print_both(flog, 'search_crit_re ; %s' % search_crit_re) print_both(flog, ' dryrun ; %s' % dryrun) print_both(flog, ' tsleep ; %s' % tsleep) print_both(flog, ' logfile ; %s' % logfile) print_both(flog, ' path_log ; %s' % path_log) print_both(flog, ' cache ; %s' % cache) print_both(flog, '') flog.flush() else: flog = None fname = os.path.join(os.getcwd(), 'launch_pbs_filelist.txt') if not cache: print_both(flog, 'Building file list of to be launched jobs. ' 'Might take a while...') pbsflist = build_pbsflist(path_pbs_files) save_pbsflist(fname, pbsflist) else: print_both(flog, 'Loading file list from cache...') pbsflist = load_pbsflist(fname) # not the most efficient way, but more generally applicable if we want to # to be able to give a time estimate on how long it will take to launch # all the cases pbsflist2 = select(pbsflist, regex) nr_jobs = len(pbsflist2) print_both(flog, 'Done building/loading. Ready to qsub %i jobs' % nr_jobs) tot_seconds = tsleep*nr_jobs d = datetime(1,1,1) + timedelta(seconds=tot_seconds) rpl = (tsleep, d.hour, d.minute, d.second) print_both(flog, 'With tsleep=%1.3f s, this will take %02i:%02i:%02i' % rpl) print_both(flog, 'You have 5 seconds to abort if this doesn\'t look ok...') print_both(flog, '') time.sleep(5) # make sure job_id is not set at the start job_id = None # now we can create the dependency list: a dictionary with the string_nr # as key, and the value is the job_id dependency list deps = {} blok_nr = 1 nr_string = 1 c0, c1 = 7, 12 job_id_dep = 'none' # list comprehension doesn't work on python 2.4 # deps = {k:[''] for k in range(nr_cpus+1)} deps = {} for k in range(nr_cpus+1): deps[k] = ['none'] header = ('chain'.rjust(c0), 'job_id'.rjust(c1), 'job_id_dep'.rjust(c1), 'job_name'.rjust(c1), 'pbs_in file name') print_both(flog, '%s ; %s ; %s ; %s ; %s' % header) rpl = ('# blok'.rjust(c0), str(blok_nr).ljust(c1), ' '*c1, ' '*c1) print_both(flog, '%s ; %s ; %s ; %s ;' % rpl) for i, fname in enumerate(pbsflist2): # search criteria, jump to next if not compliant if not regex.search(fname): continue # read the file f = open(fname) pbs = f.read() f.close() # the current job will depend on the job launched just before this one job_id_dep = deps[nr_string][-1] # we cycle over all the nr_cpus. Row per row is filled up. Only for the # first row we have an independent job # also check if the job_id_dep actually exists. If not, there should # not be a dependency in the pbs launch script if i < nr_cpus or not pbs_job_exists(job_id_dep): pbs = pbs.replace('[nodeps]', '### ') job_id_dep = 'none' if not dryrun: job_id = write_qsub_exe(fname, pbs, tsleep=tsleep) else: job_id = '%i' % i # create a dependency list deps[nr_string] = [job_id] else: pbs = pbs.replace('[nodeps]', '#') if not dryrun: pbs = pbs.replace('[job_id]', job_id_dep) job_id = write_qsub_exe(fname, pbs, tsleep=tsleep) else: job_id_dep = str(int(job_id) - nr_cpus + 1) job_id = '%i' % i # append to the existing list deps[nr_string].append(job_id) # find the job name, second line job_name = pbs.split('#PBS')[1].split('-N')[1].strip() fname_short = os.path.basename(fname) rpl = (str(nr_string).rjust(c0), job_id.rjust(c1), job_id_dep.rjust(c1), job_name.rjust(c1), fname_short) print_both(flog, '%s ; %s ; %s ; %s ; %s' % rpl) if nr_string >= nr_cpus: if isinstance(flog, file): flog.flush() nr_string = 1 blok_nr += 1 progress = '%6i/%i' % (i, nr_jobs) rpl = ('# blok'.rjust(c0), str(blok_nr).ljust(c1), progress.rjust(c1), ' '*c1) print_both(flog, '%s ; %s ; %s ; %s ;' % rpl) else: nr_string += 1 print_both(flog, '%s ; %s ; %s ; %s ; %s' % header) if isinstance(flog, file): flog.flush() flog.close() class Scheduler: """ A simple scheduler that runs on top of PBS. There is a lot of duplicate code in this class and launch_deps method """ def __init__(self, nr_cpus, path_pbs_files=None, search_crit_re=r'\.p$', dryrun=False, tsleep=5.00, logfile=None, cache=False, cpu_free=48, qsub_cmd='qsub %s', sort=False, debug=False, cpu_user_queue=500): """ Regular expression examples: --re .veer-0\\.89_. --re=".veer-0\\.89_.|.veer-0\\.74_." Parameters ---------- nr_cpus : int path_pbs_files : path, default=None If set to None, the current working directory is used instead. search_crit_re : string, default=r'\.p$' Regular expression applied on the full path of the pbs file that has to be launched. Use this to only launch a subset of the created simulations. By default it takes all the *.p files it can find. dryrun : boolean, default=False tsleep : float, default=5.00 logfile : str, default=None Is only used to trigger the confirmation on continue. cache : boolean, default=False cpu_free : int, default=48 Minimum defined number of free cpu's that the cluster should have before another job is launched for the user. This will prevent more jobs being launced even if the user occupies less than the number of cpu's defined with nr_cpus. cpu_user_queue : int, default=500 qsub_cmd : str, default='qsub %s' When launching from a node on Gorm, ssh to g-000 to bypass the lack of permissions to launch jobs from a node. sort : boolean, default=False If True the file list will be sorted. Applied for both cached and builded file lists. """ # when there is no path defined, take the current working directory if path_pbs_files is None: path_pbs_files = os.getcwd() self.uid = os.getenv('USER') self.nr_cpus = nr_cpus self.cpu_free = cpu_free self.cpu_user_queue = cpu_user_queue self.tsleep = tsleep self.dryrun = dryrun self.tsleep_short = 1.0 self.c0, self.c1, self.c2 = 8, 8, 17 self.logfile = logfile self.qsub_cmd = qsub_cmd self.debug = debug self.search_crit_re = search_crit_re self.path_pbs_files = path_pbs_files self.pbs_update_deltat = 5.0 # in seconds self.fname_config = 'launch_scheduler_config.txt' self.fname_cluster_state = 'launch_scheduler_state.txt' self.reload_pbsflist = 0 self.cache = cache self.sort = sort self.f_pbsflist = os.path.join(os.getcwd(), 'launch_pbs_filelist.txt') self.pbsnodes = '/opt/pbs/bin/pbsnodes' if HOSTNAME[:1] == 'g': self.qstat = '/opt/pbs/bin/qstat' self.qsub_cmd = self.qsub_cmd.replace('qsub', '/opt/pbs/bin/qsub') else: self.qstat = '/usr/bin/qstat' self.qsub_cmd = self.qsub_cmd.replace('qsub', '/usr/bin/qsub') def __call__(self, depend=False, crontab_mode=False): """ Parameters ---------- depend : boolean, default=False If set to True, the dependency launch method will be used. If False, the scheduler method is used. """ if not depend: if crontab_mode: pbsflist = self.get_joblist(verbose=False) self.launch(pbsflist, crontab_mode=crontab_mode) else: self.print_config() print ' method ; scheduler' pbsflist = self.get_joblist() self.print_logheader() self.launch(pbsflist, crontab_mode=crontab_mode) print '' else: print ' method ; dependencies' self.launch_deps() print '' def get_joblist(self, verbose=True): # build the file list if not self.cache: if verbose: print 'Building file list of to be launched jobs. ', print 'Might take a while...' # only filter when building the file list. It would be counter # intuitive to filter a user generated/controlled list. pbsflist = self.filter_joblist(build_pbsflist(self.path_pbs_files)) if self.sort: pbsflist.sort() save_pbsflist(self.f_pbsflist, pbsflist) else: if verbose: print 'Loading file list from cache...' pbsflist = load_pbsflist(self.f_pbsflist) return pbsflist def filter_joblist(self, pbsflist): # filter out files we will not launch regex = re.compile(self.search_crit_re) pbsflist = select(pbsflist, regex) nr_jobs = len(pbsflist) print 'Done building/loading. Ready to qsub %i jobs' % nr_jobs return pbsflist def print_config(self): """Print a header for the log with current config. """ print ' nr_cpus ; %3i' % self.nr_cpus print ' cpu_free ; %3i' % self.cpu_free print 'cpu_user_queue ; %3i' % self.cpu_user_queue print 'path_pbs_files ; %s' % self.path_pbs_files print 'search_crit_re ; %s' % self.search_crit_re print ' dryrun ; %s' % self.dryrun print ' tsleep ; %s' % self.tsleep print ' logfile ; %s' % self.logfile print ' path_log ; %s' % path_log print ' cache ; %s' % self.cache def print_logheader(self): print ' tsleep_short ; %6.2f' % self.tsleep_short print 'min cpu to be free ; %3i' % self.cpu_free # we can only ask for confirmation when stdout is not going to the log if self.logfile is None: print 'Is this ok? You have 5 sec to abort (press CTRL-C to abort)' time.sleep(5) # print 'Press enter to continue, or CTRL-C to abort' # dummy = raw_input() print '' header = ('job nr'.rjust(self.c0), 'job_id'.rjust(self.c1), 'progress'.rjust(self.c1), 'time stamp'.rjust(self.c2), 'pbs_in file name') print '%s ; %s ; %s ; %s ; %s' % header def write_config(self): """Save a config file with the current tuning parameters. """ # read the PBS file f = open(self.fname_config, 'w') f.write(' nr_cpus : % 5i\n' % self.nr_cpus) f.write('cpu_user_queue : % 5i\n' % self.cpu_user_queue) f.write(' cpu_free : % 5i\n' % self.cpu_free) f.write('') f.write(' tsleep : % 5.02f\n' % self.tsleep) f.write(' tsleep_short : % 5.02f\n' % self.tsleep_short) f.write('pbs_update_deltat : % 5.02f\n' % self.pbs_update_deltat) f.write(' f_pbsflist : %s\n' % self.f_pbsflist) f.flush() f.close() def read_config(self): """Update scheduler tuning from config file. """ f = open(self.fname_config, 'r') lines = f.readlines() f.close() for line in lines: items = line.split(':') if len(items) == 2: try: value = int(items[1].strip()) # FIXME: there has to be a more elegant solution than this ugly # hack... except ValueError: try: value = float(items[1].strip()) except ValueError: value = items[1].strip() setattr(self, items[0].strip(), value) def init_state_log(self): """For more extensive logging, write down the scheduler and cluster states. """ self.header = ['date_time', 'cpu_target', 'cpu_user', 'cpu_free', 'free_target', 'queue_user', 'queue_target', 'tsleep', 'tsleep_short', 'pbs_update_deltat'] f = open(self.fname_cluster_state, 'w') f.write(' ; '.join(self.header) + '\n') f.flush() f.close() def update_state_log(self, ts, cpu_user, cpu_free, cpu_user_queue): ts_ = datetime.fromtimestamp(ts).strftime('%m-%d %H:%M:%S') ms = '%1.02f' % ts ts_ += ms[-3:] line = [ts_, self.nr_cpus, cpu_user, cpu_free, self.cpu_free, cpu_user_queue, self.cpu_user_queue, self.tsleep, self.tsleep_short, self.pbs_update_deltat, self.reload_pbsflist] line_str = [str(k) for k in line] f = open(self.fname_cluster_state, 'a') f.write(' ; '.join(line_str) + '\n') f.flush() f.close() def check_nodes(self, uid): """ See how many cpu's are free, and how many are used by the current user """ if HOSTNAME[:2] == 'j-': output = os.popen('ssh jess "%s -l all"' % self.pbsnodes).readlines() else: output = os.popen('%s -l all' % self.pbsnodes).readlines() # FIXME: slow, maybe do this only a few times?? pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output) output = os.popen('%s -n1' % self.qstat).readlines() users, host, nodesload = pbswrap.parse_qstat_n1(output) # if the user does not have any jobs, this will not exist try: cpu_user = users[uid]['cpus'] except KeyError: cpu_user = 0 # add queued jobs as well, if any try: cpu_user_queue = users[uid]['Q'] except KeyError: cpu_user_queue = 0 cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes) return cpu_free, cpu_user, cpu_user_queue def launch(self, pbsflist, crontab_mode=False): """ Iterate over all the pbs files and launch them if there are cpus available, and if the maximum number of predifined cpus is not reached. This scenario should hopefully work with massive amounts of jobs. Queued jobs are also included when the counting the number of cpu's used by the user. This will prevent the scheduler from spamming the queueing system with jobs that will only queue intstead of run. """ ii = 0 ii_max = len(pbsflist) if ii_max < 1: print "" print "No files matching %s at:" % self.search_crit_re print self.path_pbs_files print "I have no choice but to abort the launch sequence." print "" return time0 = 0.0 while True: time1 = time.time() if (time1 - time0) > self.pbs_update_deltat: cpu_free, cpu_user, cpu_user_queue = self.check_nodes(self.uid) time0 = time.time() self.read_config() self.update_state_log(time0, cpu_user, cpu_free, cpu_user_queue) # we only launch a new job when we are not using more than our # quota (nr_cpus), or when there is still some room for others # to breath # TODO: add another metric: if the total queue is bigger than # so many nodes/cpus, don't add anything else. if (self.nr_cpus > cpu_user) and (cpu_free > self.cpu_free) and \ (cpu_user_queue < self.cpu_user_queue): fname = pbsflist[ii] # fname minus current working dir fname_short = fname.replace(os.getcwd(), '') # # read the PBS file # f = open(fname) # pbs = f.read() # f.close() # find the job name, second line # job_name = pbs.split('#PBS')[1].split('-N')[1].strip() # fname_short = os.path.basename(fname) # submit the job if not self.dryrun: try: job_id = qsub(fname, qsub_cmd=self.qsub_cmd) # in case we are not polling for the current PBS state # assume we are the only one eating up available CPU's cpu_user += 1 cpu_free -= 1 except UserWarning: cmd = self.qsub_cmd % fname print 'launch cmd: %s' % cmd print 'failed, try again in %1.1f sec' % self.tsleep time.sleep(self.tsleep) continue else: job_id = '%i' % ii # when was the job launched time_s = time.time() ts = datetime.fromtimestamp(time_s).strftime('%m-%d %H:%M:%S') ms = '%1.02f' % time_s ts += ms[-3:] progress = '%6.2f' % (100.0*(ii+1)/float(ii_max)) rpl = (('%i' % ii).rjust(self.c0), job_id.rjust(self.c1), (progress+'%').rjust(self.c1), ts.rjust(self.c2), fname_short) print '%s ; %s ; %s ; %s ; %s' % rpl # to avoid a huge runoff on jobs launching, just a security # measure time.sleep(self.tsleep_short) ii += 1 elif crontab_mode: save_pbsflist(self.f_pbsflist, pbsflist[ii:]) return else: # wait a bit before trying to launch a job again if self.debug: print 'sleeping because:' rpl = (self.nr_cpus, cpu_user) print ' nr_cpus: %4i, cpu_user: %4i' % rpl rpl = (cpu_free, self.cpu_free) print 'cpu_free: %4i, cpu_free_req: %4i' % rpl rpl = (cpu_user_queue, self.cpu_user_queue) print ' u queue: %4i, u queue max: %4i' % rpl time.sleep(self.tsleep) # stop when we handled all the files if ii >= ii_max: print 'All jobs have been launched, stopping the scheduler.' if crontab_mode: remove_crontab() return def launch_deps(self): """Launch Launch PBS jobs with dependencies. Regular expression examples: --re .veer-0\\.89_. --re=".veer-0\\.89_.|.veer-0\\.74_." Parameters ---------- nr_cpus : int path_pbs_files : path, default=None search_crit_re : string, default=r'\.p$' Regular expression applied on the full path of the pbs file that has to be launched. Use this to only launch a subset of the created simulations. By default it takes all the *.p files it can find. dryrun : boolean, default=False tsleep : float, default=0.30 cache : boolean, default=False Returns ------- None """ nr_jobs = len(self.pbsflist) tot_seconds = self.tsleep*nr_jobs d = datetime(1,1,1) + timedelta(seconds=tot_seconds) rpl = (self.tsleep, d.hour, d.minute, d.second) print 'With tsleep=%1.3f s, this will take %02i:%02i:%02i' % rpl # we can only ask for confirmation when stdout is not going to the log if self.logfile is None: print 'Press enter to continue, or CTRL-C to abort' raw_input() print '' # make sure job_id is not set at the start job_id = None # now we can create the dependency list: a dictionary with the string_nr # as key, and the value is the job_id dependency list deps = {} blok_nr = 1 nr_string = 1 job_id_dep = 'none' # list comprehension doesn't work on python 2.4 # deps = {k:[''] for k in range(nr_cpus+1)} deps = {} for k in range(self.nr_cpus+1): deps[k] = ['none'] header = ('chain'.rjust(self.c0), 'job_id'.rjust(self.c1), 'job_id_dep'.rjust(self.c1), 'job_name'.rjust(self.c1), 'pbs_in file name') print '%s ; %s ; %s ; %s ; %s' % header rpl = ('# blok'.rjust(self.c0), str(blok_nr).ljust(self.c1), ' '*self.c1, ' '*self.c1) print '%s ; %s ; %s ; %s ;' % rpl for i, fname in enumerate(self.pbsflist): # read the file f = open(fname) pbs = f.read() f.close() # the current job will depend on the job launched just before this one job_id_dep = deps[nr_string][-1] # we cycle over all the nr_cpus. Row per row is filled up. Only for the # first row we have an independent job # also check if the job_id_dep actually exists. If not, there should # not be a dependency in the pbs launch script if i < self.nr_cpus or not pbs_job_exists(job_id_dep): pbs = pbs.replace('[nodeps]', '### ') job_id_dep = 'none' if not self.dryrun: job_id = write_qsub_exe(fname, pbs, tsleep=self.tsleep) else: job_id = '%i' % i # create a dependency list deps[nr_string] = [job_id] else: pbs = pbs.replace('[nodeps]', '#') if not self.dryrun: pbs = pbs.replace('[job_id]', job_id_dep) job_id = write_qsub_exe(fname, pbs, tsleep=self.tsleep) else: job_id_dep = str(int(job_id) - self.nr_cpus + 1) job_id = '%i' % i # append to the existing list deps[nr_string].append(job_id) # find the job name, second line job_name = pbs.split('#PBS')[1].split('-N')[1].strip() fname_short = os.path.basename(fname) rpl = (str(nr_string).rjust(self.c0), job_id.rjust(self.c1), job_id_dep.rjust(self.c1), job_name.rjust(self.c1), fname_short) print '%s ; %s ; %s ; %s ; %s' % rpl if nr_string >= self.nr_cpus: nr_string = 1 blok_nr += 1 progress = '%6i/%i' % (i, nr_jobs) rpl = ('# blok'.rjust(self.c0), str(blok_nr).ljust(self.c1), progress.rjust(self.c1), ' '*self.c1) print '%s ; %s ; %s ; %s ;' % rpl else: nr_string += 1 print '%s ; %s ; %s ; %s ; %s' % header if __name__ == '__main__': # parse the arguments, only relevant when using as a command line utility parser = OptionParser(usage='\n\n%prog -n nr_cpus \n\n' '%prog --crontab when running a single iteration of ' '%prog as a crontab job every 5 minutes.\n' 'File list is read from "launch_pbs_filelist.txt", ' 'and the configuration can be changed on the fly\n' 'by editing the file "launch_scheduler_config.txt".') parser.add_argument = parser.add_option parser.add_argument('--depend', dest='depend', action='store_true', default=False, help='Switch on for launch depend method') parser.add_argument('-n', '--nr_cpus', type='int', dest='nr_cpus', action='store', default=None, help='number of cpus to be used') parser.add_argument('-p', '--path_pbs_files', type='string', action='store', dest='path_pbs_files', default='pbs_in/', help='optionally specify location of pbs files') parser.add_argument('--re', type='string', action='store', dest='search_crit_re', default=r'\.p$', help='regular expression search criterium applied on ' 'the full pbs file path. Escape backslashes! ' 'By default it will select all *.p files in pbs_in/.') parser.add_argument('--dry', action='store_true', dest='dry', default=False, help='dry run: do not alter pbs files, do not launch') parser.add_argument('--tsleep', action='store', dest='tsleep', type='float', default=5.00, help='Sleep time [s] when cluster is too ' 'bussy to launch new jobs. Default=5 seconds') parser.add_argument('--tsleep_short', action='store', dest='tsleep_short', type='float', default=0.5, help='Sleep time [s] ' 'between between successive job launches. Default=0.5 ' 'seconds.') parser.add_argument('--logfile', action='store', dest='logfile', default=None, help='Save output to file.') parser.add_argument('-c', '--cache', action='store_true', dest='cache', default=False, help='If on, files are read from cache') parser.add_argument('--cpu_free', action='store', dest='cpu_free', type='int', default=48, help='No more jobs will be ' 'launched when the cluster does not have the specified ' 'amount of cpus free. This will make sure there is ' 'room for others on the cluster, but might mean less ' 'cpus available for you. Default=48') parser.add_argument('--cpu_user_queue', action='store', dest='cpu_user_queue', type='int', default=5, help='No more jobs will be ' 'launched after having cpu_user_queue number of jobs ' 'in the queue. This prevents users from filling the ' 'queue, while still allowing to aim for a high cpu_free ' 'target. Default=5') parser.add_argument('--qsub_cmd', action='store', dest='qsub_cmd', default='qsub %s', help='Is set automatically by --node flag') parser.add_argument('--node', action='store_true', dest='node', default=False, help='If executed on dedicated node. ' 'Although this works, consider using --crontab instead.' ' Default=False') parser.add_argument('--sort', action='store_true', dest='sort', default=False, help='Sort pbs file list. Default=False') parser.add_argument('--crontab', action='store_true', dest='crontab', default=False, help='Crontab mode: %prog will check ' 'every 5 (default) minutes if more jobs can be launched. ' 'Not compatible with --node. When all jobs are done, ' 'crontab -r will remove all existing crontab jobs of ' 'the current user. Use crontab -l to inspect current ' 'crontab jobs, and edit them with crontab -e. ' 'Default=False') parser.add_argument('--every_min', action='store', dest='every_min', type='int', default=5, help='Crontab update interval ' 'in minutes. Default=5') parser.add_argument('--debug', action='store_true', dest='debug', default=False, help='Debug print statements. Default=False') (options, args) = parser.parse_args() if options.crontab and options.node: parser.print_usage() sys.stderr.write("error: --node and --crontab not compatible" + os.linesep) sys.exit(1) if options.nr_cpus is None and not options.crontab: parser.print_usage() sys.stderr.write("error: specify number of cpus with -n" + os.linesep) sys.exit(1) if options.node: # repeat exactly the same command but now wrap a pbs script around it # and remove the --node argument jobname = 'launch.py' args = [arg for arg in sys.argv] # remove --node argument iqsub_cmd, ilogfile = None, None for i, arg in enumerate(args): if arg == '--node': inode = i elif arg[:10] == '--qsub_cmd': iqsub_cmd = i rpl = (os.getcwd(), '%s') if HOSTNAME[:1] == 'g': args[i] = '--qsub_cmd=\'ssh g-000 "cd %s;qsub %s"\'' % rpl else: args[i] = '--qsub_cmd="cd %s;qsub %s"' % rpl elif arg[:9] == '--logfile': ilogfile = i args[i] = '--logfile= ' args.pop(inode) # if they were not defined in the original command, add them if iqsub_cmd is None: rpl = (os.getcwd(), '%s') if HOSTNAME[:1] == 'g': args.append('--qsub_cmd=\'ssh g-000 "cd %s;qsub %s"\'' % rpl) else: args.append('--qsub_cmd="cd %s;qsub %s"' % rpl) if ilogfile is None: args.append('--logfile= ') # and force the use of logfile and specify qsub_com with ssh # first argument is the full path of the script args.pop(0) com = 'launch.py %s' % ' '.join(args) # create a PBS script that contains the launch.py command fpath = pbswrap.create_input(walltime='72:00:00', queue='workq', pbs_in='pbs_in/', ppn=1, pbs_out='pbs_out/', jobname=jobname, commands=com, lnodes=1) job_id = qsub(fpath) sys.exit(1) elif isinstance(options.logfile, str) or options.crontab: if options.logfile is None: options.logfile = '' if options.logfile.strip() == '': path_log = os.path.join(os.getcwd(), 'launch_scheduler_log.txt') elif not os.path.isabs(options.logfile): path_log = os.path.join(os.getcwd(), options.logfile) # log to file and do not print messages, but in crontab mode, append! if os.path.exists(path_log) and options.crontab: mode = 'a' else: mode = 'w' output = Logger(open(path_log, mode), True) sys.stdout = output else: path_log = None ss = Scheduler(options.nr_cpus, path_pbs_files=options.path_pbs_files, search_crit_re=options.search_crit_re, dryrun=options.dry, tsleep=options.tsleep, logfile=path_log, cache=options.cache, cpu_free=options.cpu_free, qsub_cmd=options.qsub_cmd, sort=options.sort, debug=options.debug, cpu_user_queue=options.cpu_user_queue) ss.tsleep_short = options.tsleep_short if options.crontab: if os.path.exists(ss.fname_config): ss.read_config() else: ss.write_config() ss.init_state_log() else: ss.write_config() ss.init_state_log() ss(depend=options.depend, crontab_mode=options.crontab) # start crontab job after first round of launching to prevent it from # running more than once if options.crontab: write_crontab(every_min=options.every_min)