Skip to content
Snippets Groups Projects
launch.py 38.22 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Created on Fri Nov 14 14:01:38 2014

@author: dave
"""

__author__ = "David Verelst <dave@dtu.dk>"
__license__ = "GPL-2+"

#from __future__ import division
#from __future__ import print_function

import sys
import os
import subprocess as sproc
import re
import time
from datetime import datetime, timedelta
import socket
HOSTNAME = socket.gethostname()

# only python 2.7 and up
#from argparse import ArgumentParser
# python 2.4
from optparse import OptionParser

import pbswrap


class Logger:
    """The Logger class can be used to redirect standard output to a log file.
    Usage: Create a Logger object and redirect standard output to the Logger
    object.  For example:
    output = Logger(file_handle, True)
    import sys
    sys.stdout = output
    """

    def __init__(self, logFile, echo):
        """Arguments:
        logFile     a file object that is available for writing
        echo        Boolean.  If True, output is sent to standard output in
                    addition to the log file.
        """
        import sys
        self.out = sys.stdout
        self.logFile = logFile
        self.echo = echo

    def write(self, s):
        """Required method that replaces stdout. You don't have to call this
        directly--all print statements will be redirected here."""
        self.logFile.write(s)
        if self.echo:
            self.out.write(s)
        self.logFile.flush()


def print_both(f, text, end='\n'):
    """
    Print both to a file and the console
    """
    print text
    if isinstance(f, file):
        f.write(text + end)

def build_pbsflist(path_pbs_files):
    pbsflist = []
    for root, dirs, files in os.walk(path_pbs_files):
        for fname in files:
            pbsflist.append(os.path.join(root, fname))
    return pbsflist

def load_pbsflist(fname):
    f = open(fname, 'r')
    # drop the newline character
    pbsflist = [line[:-1] for line in f]
    f.close()
    return pbsflist

def save_pbsflist(fname, pbsflist):
    f = open(fname, 'w')
    f.writelines([k+'\n' for k in pbsflist])
    f.flush()
    f.close()

def write_qsub_exe(fname, pbs, tsleep=0.25):
    # no file open context manager in python 2.4
    f = open(fname, 'w')
    f.write(pbs)
    f.close()
    job_id = qsub(fname)
    time.sleep(tsleep)

    return job_id

def write_crontab(every_min=5):
    """Create a crontab script, and submit to crontab.
    """
    python = '/usr/bin/python'
    launch = '/home/MET/repositories/toolbox/pbsutils/launch.py'

    cwd = os.getcwd()
    fpath = os.path.join(cwd, 'launch_crontab.sh')
    # TODO: parse other relevant arguments as well: --re
    # when using the crontab option, cpu parameters are read from the config
    # file if it exists, but only the first run will build a pbslist file
    rpl = (every_min, cwd, python, launch)
    crontab = '*/%i * * * * cd "%s"; %s %s --crontab --cache ;\n' % rpl
    f = open(fpath, 'w')
    f.write(crontab)
    f.flush()
    f.close()

    # does the current crontab already exist?
    cmd = 'crontab -l'
    p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
    stdout = p.stdout.readlines()
    p.wait()
    crontab_exists = False
    for line in stdout:
        if line.find(cwd) > -1 and line.find(launch) > -1:
            # current crontab job is already running!
            crontab_exists = True
            break

    if not crontab_exists:
        cmd = 'crontab %s' % fpath
        p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
        stdout = p.stdout.readlines()
        p.wait()
        print "added launch.py to crontab:"
        print crontab

def remove_crontab():
    cmd = 'crontab -r'
    p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
    stdout = p.stdout.readlines()
    p.wait()

def qsub(fname, qsub_cmd='qsub %s'):
    """
    Parameters
    ----------

    fname : str
        path to pbs launch file

    cmd : str
        Launch command, with %s indicating the pbs launch file name fname.
        When launching from a node on Gorm, ssh to the login node first
        to bypass the fact that you can only launch from the login node.
    """

    cmd = qsub_cmd % fname
    p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
    stdout = p.stdout.readlines()
    p.wait()
    # it should return the job id like this: 387598.g-000.risoe.dk
    if len(stdout) > 1:
        print 'cmd:'
        print cmd
        print 'stdout:'
        print stdout
        raise(UserWarning, 'qsub returned unexpected result')
    job_id = stdout[0].replace('\n', '').strip().split('.')[0]
    # job id needs to be a number, otherwise something went wrong
    try:
        int(job_id)
    except ValueError:
        print('failing qsub command:')
        print(cmd)
        print(stdout[0])
        raise(UserWarning, 'qsub failed')

    return job_id

def pbs_job_exists(job_id):
    """
    Returns True when the job_id actually exists on the cluster
    """
    cmd = 'qstat %s' % job_id
    p = sproc.Popen(cmd, stdout=sproc.PIPE, stderr=sproc.STDOUT, shell=True)
    stdout = p.stdout.readlines()
    p.wait()
    # qstat: Unknown Job Id
    # qstat: Unknown queue destination
    # one line if it doesn't exist, multiple if it does
    if len(stdout) == 1:
        return False
    else:
        return True

def select(pbsflist, regex):
    pbsflist2 = []
    for k in pbsflist:
        # search criteria, jump to next if not compliant
        if regex.search(k):
            pbsflist2.append(k)
    return pbsflist2

def launch_deps(nr_cpus, path_pbs_files=None, search_crit_re=r'\.p$',
                dryrun=False, tsleep=0.30, logfile=None, cache=False):
    """Launch

    Launch PBS jobs with dependencies.

    Regular expression examples:
        --re .veer-0\\.89_.
        --re=".veer-0\\.89_.|.veer-0\\.74_."

    Parameters
    ----------

    nr_cpus : int

    path_pbs_files : path, default=None

    search_crit_re : string, default=r'\.p$'
        Regular expression applied on the full path of the pbs file that has
        to be launched. Use this to only launch a subset of the created
        simulations. By default it takes all the *.p files it can find.

    dryrun : boolean, default=False

    tsleep : float, default=0.30

    logfile : str, default=None

    cache : boolean, default=False

    Returns
    -------

    None

    """
    regex = re.compile(search_crit_re)

    # when there is no path defined, take the current working directory
    if path_pbs_files is None:
        path_pbs_files = os.getcwd()

    # the logfile
    if isinstance(logfile, str):
        if logfile == '':
            path_log = os.path.join(os.getcwd(), 'launch_log.txt')
        elif not os.path.isabs(logfile):
            path_log = os.path.join(os.getcwd(), logfile)
        flog = open(path_log, 'w')
        print_both(flog, '       nr_cpus ; %i' % nr_cpus)
        print_both(flog, 'path_pbs_files ; %s' % path_pbs_files)
        print_both(flog, 'search_crit_re ; %s' % search_crit_re)
        print_both(flog, '        dryrun ; %s' % dryrun)
        print_both(flog, '        tsleep ; %s' % tsleep)
        print_both(flog, '       logfile ; %s' % logfile)
        print_both(flog, '      path_log ; %s' % path_log)
        print_both(flog, '         cache ; %s' % cache)
        print_both(flog, '')
        flog.flush()
    else:
        flog = None

    fname = os.path.join(os.getcwd(), 'launch_pbs_filelist.txt')
    if not cache:
        print_both(flog, 'Building file list of to be launched jobs. '
                   'Might take a while...')
        pbsflist = build_pbsflist(path_pbs_files)
        save_pbsflist(fname, pbsflist)
    else:
        print_both(flog, 'Loading file list from cache...')
        pbsflist = load_pbsflist(fname)

    # not the most efficient way, but more generally applicable if we want to
    # to be able to give a time estimate on how long it will take to launch
    # all the cases
    pbsflist2 = select(pbsflist, regex)

    nr_jobs = len(pbsflist2)
    print_both(flog, 'Done building/loading. Ready to qsub %i jobs' % nr_jobs)
    tot_seconds = tsleep*nr_jobs
    d = datetime(1,1,1) + timedelta(seconds=tot_seconds)
    rpl = (tsleep, d.hour, d.minute, d.second)
    print_both(flog, 'With tsleep=%1.3f s, this will take %02i:%02i:%02i' % rpl)
    print_both(flog, 'You have 5 seconds to abort if this doesn\'t look ok...')
    print_both(flog, '')
    time.sleep(5)

    # make sure job_id is not set at the start
    job_id = None
    # now we can create the dependency list: a dictionary with the string_nr
    # as key, and the value is the job_id dependency list
    deps = {}
    blok_nr = 1
    nr_string = 1
    c0, c1 = 7, 12
    job_id_dep = 'none'
    # list comprehension doesn't work on python 2.4
#    deps = {k:[''] for k in range(nr_cpus+1)}
    deps = {}
    for k in range(nr_cpus+1):
        deps[k] = ['none']
    header = ('chain'.rjust(c0), 'job_id'.rjust(c1), 'job_id_dep'.rjust(c1),
              'job_name'.rjust(c1), 'pbs_in file name')
    print_both(flog, '%s ; %s ; %s ; %s ; %s' % header)
    rpl = ('# blok'.rjust(c0), str(blok_nr).ljust(c1), ' '*c1, ' '*c1)
    print_both(flog, '%s ; %s ; %s ; %s ;' % rpl)
    for i, fname in enumerate(pbsflist2):
        # search criteria, jump to next if not compliant
        if not regex.search(fname):
            continue
        # read the file
        f = open(fname)
        pbs = f.read()
        f.close()
        # the current job will depend on the job launched just before this one
        job_id_dep = deps[nr_string][-1]
        # we cycle over all the nr_cpus. Row per row is filled up. Only for the
        # first row we have an independent job
        # also check if the job_id_dep actually exists. If not, there should
        # not be a dependency in the pbs launch script
        if i < nr_cpus or not pbs_job_exists(job_id_dep):
            pbs = pbs.replace('[nodeps]', '### ')
            job_id_dep = 'none'
            if not dryrun:
                job_id = write_qsub_exe(fname, pbs, tsleep=tsleep)
            else:
                job_id = '%i' % i
            # create a dependency list
            deps[nr_string] = [job_id]
        else:
            pbs = pbs.replace('[nodeps]', '#')
            if not dryrun:
                pbs = pbs.replace('[job_id]', job_id_dep)
                job_id = write_qsub_exe(fname, pbs, tsleep=tsleep)
            else:
                job_id_dep = str(int(job_id) - nr_cpus + 1)
                job_id = '%i' % i
            # append to the existing list
            deps[nr_string].append(job_id)

        # find the job name, second line
        job_name = pbs.split('#PBS')[1].split('-N')[1].strip()
        fname_short = os.path.basename(fname)
        rpl = (str(nr_string).rjust(c0), job_id.rjust(c1), job_id_dep.rjust(c1),
               job_name.rjust(c1), fname_short)
        print_both(flog, '%s ; %s ; %s ; %s ; %s' % rpl)

        if nr_string >= nr_cpus:
            if isinstance(flog, file):
                flog.flush()
            nr_string = 1
            blok_nr += 1
            progress = '%6i/%i' % (i, nr_jobs)
            rpl = ('# blok'.rjust(c0), str(blok_nr).ljust(c1),
                   progress.rjust(c1), ' '*c1)
            print_both(flog, '%s ; %s ; %s ; %s ;' % rpl)
        else:
            nr_string += 1

    print_both(flog, '%s ; %s ; %s ; %s ; %s' % header)
    if isinstance(flog, file):
        flog.flush()
        flog.close()


class Scheduler:
    """
    A simple scheduler that runs on top of PBS.

    There is a lot of duplicate code in this class and launch_deps method
    """

    def __init__(self, nr_cpus, path_pbs_files=None, search_crit_re=r'\.p$',
                 dryrun=False, tsleep=5.00, logfile=None, cache=False,
                 cpu_free=48, qsub_cmd='qsub %s', sort=False, debug=False,
                 cpu_user_queue=500):
        """
        Regular expression examples:
            --re .veer-0\\.89_.
            --re=".veer-0\\.89_.|.veer-0\\.74_."

        Parameters
        ----------

        nr_cpus : int

        path_pbs_files : path, default=None
            If set to None, the current working directory is used instead.

        search_crit_re : string, default=r'\.p$'
            Regular expression applied on the full path of the pbs file that
            has to be launched. Use this to only launch a subset of the created
            simulations. By default it takes all the *.p files it can find.

        dryrun : boolean, default=False

        tsleep : float, default=5.00

        logfile : str, default=None
            Is only used to trigger the confirmation on continue.

        cache : boolean, default=False

        cpu_free : int, default=48
            Minimum defined number of free cpu's that the cluster should have
            before another job is launched for the user. This will prevent
            more jobs being launced even if the user occupies less than the
            number of cpu's defined with nr_cpus.

        cpu_user_queue : int, default=500

        qsub_cmd : str, default='qsub %s'
            When launching from a node on Gorm, ssh to g-000 to bypass the
            lack of permissions to launch jobs from a node.

        sort : boolean, default=False
            If True the file list will be sorted. Applied for both cached and
            builded file lists.
        """
        # when there is no path defined, take the current working directory
        if path_pbs_files is None:
            path_pbs_files = os.getcwd()

        self.uid = os.getenv('USER')
        self.nr_cpus = nr_cpus
        self.cpu_free = cpu_free
        self.cpu_user_queue = cpu_user_queue
        self.tsleep = tsleep
        self.dryrun = dryrun
        self.tsleep_short = 1.0
        self.c0, self.c1, self.c2 = 8, 8, 17
        self.logfile = logfile
        self.qsub_cmd = qsub_cmd
        self.debug = debug
        self.search_crit_re = search_crit_re
        self.path_pbs_files = path_pbs_files
        self.pbs_update_deltat = 5.0 # in seconds
        self.fname_config = 'launch_scheduler_config.txt'
        self.fname_cluster_state = 'launch_scheduler_state.txt'
        self.reload_pbsflist = 0
        self.cache = cache
        self.sort = sort
        self.f_pbsflist = os.path.join(os.getcwd(), 'launch_pbs_filelist.txt')

        self.pbsnodes = '/opt/pbs/bin/pbsnodes'
        if HOSTNAME[:1] == 'g':
            self.qstat = '/opt/pbs/bin/qstat'
            self.qsub_cmd = self.qsub_cmd.replace('qsub', '/opt/pbs/bin/qsub')
        else:
            self.qstat = '/usr/bin/qstat'
            self.qsub_cmd = self.qsub_cmd.replace('qsub', '/usr/bin/qsub')

    def __call__(self, depend=False, crontab_mode=False):
        """
        Parameters
        ----------

        depend : boolean, default=False
            If set to True, the dependency launch method will be used. If False,
            the scheduler method is used.
        """
        if not depend:
            if crontab_mode:
                pbsflist = self.get_joblist(verbose=False)
                self.launch(pbsflist, crontab_mode=crontab_mode)
            else:
                self.print_config()
                print '            method ; scheduler'
                pbsflist = self.get_joblist()
                self.print_logheader()
                self.launch(pbsflist, crontab_mode=crontab_mode)
                print ''
        else:
            print '        method ; dependencies'
            self.launch_deps()
            print ''

    def get_joblist(self, verbose=True):

        # build the file list
        if not self.cache:
            if verbose:
                print 'Building file list of to be launched jobs. ',
                print 'Might take a while...'
            # only filter when building the file list. It would be counter
            # intuitive to filter a user generated/controlled list.
            pbsflist = self.filter_joblist(build_pbsflist(self.path_pbs_files))
            if self.sort:
                pbsflist.sort()
            save_pbsflist(self.f_pbsflist, pbsflist)
        else:
            if verbose:
                print 'Loading file list from cache...'
            pbsflist = load_pbsflist(self.f_pbsflist)
        return pbsflist

    def filter_joblist(self, pbsflist):
        # filter out files we will not launch
        regex = re.compile(self.search_crit_re)
        pbsflist = select(pbsflist, regex)
        nr_jobs = len(pbsflist)
        print 'Done building/loading. Ready to qsub %i jobs' % nr_jobs
        return pbsflist

    def print_config(self):
        """Print a header for the log with current config.
        """

        print '       nr_cpus ; %3i' % self.nr_cpus
        print '      cpu_free ; %3i' % self.cpu_free
        print 'cpu_user_queue ; %3i' % self.cpu_user_queue
        print 'path_pbs_files ; %s' % self.path_pbs_files
        print 'search_crit_re ; %s' % self.search_crit_re
        print '        dryrun ; %s' % self.dryrun
        print '        tsleep ; %s' % self.tsleep
        print '       logfile ; %s' % self.logfile
        print '      path_log ; %s' % path_log
        print '         cache ; %s' % self.cache

    def print_logheader(self):
        print '      tsleep_short ; %6.2f' % self.tsleep_short
        print 'min cpu to be free ; %3i' % self.cpu_free
        # we can only ask for confirmation when stdout is not going to the log
        if self.logfile is None:
            print 'Is this ok? You have 5 sec to abort (press CTRL-C to abort)'
            time.sleep(5)
#            print 'Press enter to continue, or CTRL-C to abort'
#            dummy = raw_input()
        print ''

        header = ('job nr'.rjust(self.c0), 'job_id'.rjust(self.c1),
                  'progress'.rjust(self.c1), 'time stamp'.rjust(self.c2),
                  'pbs_in file name')
        print '%s ; %s ; %s ; %s ; %s' % header

    def write_config(self):
        """Save a config file with the current tuning parameters.
        """
        # read the PBS file
        f = open(self.fname_config, 'w')
        f.write('       nr_cpus : % 5i\n' % self.nr_cpus)
        f.write('cpu_user_queue : % 5i\n' % self.cpu_user_queue)
        f.write('      cpu_free : % 5i\n' % self.cpu_free)
        f.write('')
        f.write('           tsleep : % 5.02f\n' % self.tsleep)
        f.write('     tsleep_short : % 5.02f\n' % self.tsleep_short)
        f.write('pbs_update_deltat : % 5.02f\n' % self.pbs_update_deltat)
        f.write('       f_pbsflist : %s\n' % self.f_pbsflist)
        f.flush()
        f.close()

    def read_config(self):
        """Update scheduler tuning from config file.
        """
        f = open(self.fname_config, 'r')
        lines = f.readlines()
        f.close()
        for line in lines:
            items = line.split(':')
            if len(items) == 2:
                try:
                    value = int(items[1].strip())
                # FIXME: there has to be a more elegant solution than this ugly
                # hack...
                except ValueError:
                    try:
                        value = float(items[1].strip())
                    except ValueError:
                        value = items[1].strip()
                setattr(self, items[0].strip(), value)

    def init_state_log(self):
        """For more extensive logging, write down the scheduler and cluster
        states.
        """
        self.header = ['date_time', 'cpu_target', 'cpu_user', 'cpu_free',
                       'free_target', 'queue_user', 'queue_target', 'tsleep',
                       'tsleep_short', 'pbs_update_deltat']
        f = open(self.fname_cluster_state, 'w')
        f.write(' ; '.join(self.header) + '\n')
        f.flush()
        f.close()

    def update_state_log(self, ts, cpu_user, cpu_free, cpu_user_queue):
        ts_ = datetime.fromtimestamp(ts).strftime('%m-%d %H:%M:%S')
        ms = '%1.02f' % ts
        ts_ += ms[-3:]
        line = [ts_, self.nr_cpus, cpu_user, cpu_free, self.cpu_free,
                cpu_user_queue, self.cpu_user_queue, self.tsleep,
                self.tsleep_short, self.pbs_update_deltat,
                self.reload_pbsflist]
        line_str = [str(k) for k in line]
        f = open(self.fname_cluster_state, 'a')
        f.write(' ; '.join(line_str) + '\n')
        f.flush()
        f.close()

    def check_nodes(self, uid):
        """
        See how many cpu's are free, and how many are used by the current user
        """
        if HOSTNAME[:2] == 'j-':
            output = os.popen('ssh jess "%s -l all"' % self.pbsnodes).readlines()
        else:
            output = os.popen('%s -l all' % self.pbsnodes).readlines()
        # FIXME: slow, maybe do this only a few times??
        pbsnodes, nodes = pbswrap.parse_pbsnode_lall(output)
        output = os.popen('%s -n1' % self.qstat).readlines()
        users, host, nodesload = pbswrap.parse_qstat_n1(output)
        # if the user does not have any jobs, this will not exist
        try:
            cpu_user = users[uid]['cpus']
        except KeyError:
            cpu_user = 0
        # add queued jobs as well, if any
        try:
            cpu_user_queue = users[uid]['Q']
        except KeyError:
            cpu_user_queue = 0
        cpu_free, nodeSum = pbswrap.count_cpus(users, host, pbsnodes)

        return cpu_free, cpu_user, cpu_user_queue

    def launch(self, pbsflist, crontab_mode=False):
        """
        Iterate over all the pbs files and launch them if there are cpus
        available, and if the maximum number of predifined cpus is not reached.
        This scenario should hopefully work with massive amounts of jobs.

        Queued jobs are also included when the counting the number of cpu's
        used by the user. This will prevent the scheduler from spamming the
        queueing system with jobs that will only queue intstead of run.
        """

        ii = 0
        ii_max = len(pbsflist)
        if ii_max < 1:
            print ""
            print "No files matching %s at:" % self.search_crit_re
            print self.path_pbs_files
            print "I have no choice but to abort the launch sequence."
            print ""
            return

        time0 = 0.0
        while True:
            time1 = time.time()
            if (time1 - time0) > self.pbs_update_deltat:
                cpu_free, cpu_user, cpu_user_queue = self.check_nodes(self.uid)
                time0 = time.time()
                self.read_config()
                self.update_state_log(time0, cpu_user, cpu_free, cpu_user_queue)
            # we only launch a new job when we are not using more than our
            # quota (nr_cpus), or when there is still some room for others
            # to breath
            # TODO: add another metric: if the total queue is bigger than
            # so many nodes/cpus, don't add anything else.
            if (self.nr_cpus > cpu_user) and (cpu_free > self.cpu_free) and \
                (cpu_user_queue < self.cpu_user_queue):

                fname = pbsflist[ii]
                # fname minus current working dir
                fname_short = fname.replace(os.getcwd(), '')
#                # read the PBS file
#                f = open(fname)
#                pbs = f.read()
#                f.close()
                # find the job name, second line
#                job_name = pbs.split('#PBS')[1].split('-N')[1].strip()
#                fname_short = os.path.basename(fname)
                # submit the job
                if not self.dryrun:
                    try:
                        job_id = qsub(fname, qsub_cmd=self.qsub_cmd)
                        # in case we are not polling for the current PBS state
                        # assume we are the only one eating up available CPU's
                        cpu_user += 1
                        cpu_free -= 1
                    except UserWarning:
                        cmd = self.qsub_cmd % fname
                        print 'launch cmd: %s' % cmd
                        print 'failed, try again in %1.1f sec' % self.tsleep
                        time.sleep(self.tsleep)
                        continue
                else:
                    job_id = '%i' % ii

                # when was the job launched
                time_s = time.time()
                ts = datetime.fromtimestamp(time_s).strftime('%m-%d %H:%M:%S')
                ms = '%1.02f' % time_s
                ts += ms[-3:]

                progress = '%6.2f' % (100.0*(ii+1)/float(ii_max))
                rpl = (('%i' % ii).rjust(self.c0), job_id.rjust(self.c1),
                       (progress+'%').rjust(self.c1), ts.rjust(self.c2),
                       fname_short)
                print '%s ; %s ; %s ; %s ; %s' % rpl
                # to avoid a huge runoff on jobs launching, just a security
                # measure
                time.sleep(self.tsleep_short)
                ii += 1
            elif crontab_mode:
                save_pbsflist(self.f_pbsflist, pbsflist[ii:])
                return
            else:
                # wait a bit before trying to launch a job again
                if self.debug:
                    print 'sleeping because:'
                    rpl = (self.nr_cpus, cpu_user)
                    print ' nr_cpus: %4i,     cpu_user: %4i' % rpl
                    rpl = (cpu_free, self.cpu_free)
                    print 'cpu_free: %4i, cpu_free_req: %4i' % rpl
                    rpl = (cpu_user_queue, self.cpu_user_queue)
                    print ' u queue: %4i,  u queue max: %4i' % rpl
                time.sleep(self.tsleep)

            # stop when we handled all the files
            if ii >= ii_max:
                print 'All jobs have been launched, stopping the scheduler.'
                if crontab_mode:
                    remove_crontab()
                return

    def launch_deps(self):
        """Launch

        Launch PBS jobs with dependencies.

        Regular expression examples:
            --re .veer-0\\.89_.
            --re=".veer-0\\.89_.|.veer-0\\.74_."

        Parameters
        ----------

        nr_cpus : int

        path_pbs_files : path, default=None

        search_crit_re : string, default=r'\.p$'
            Regular expression applied on the full path of the pbs file that
            has to be launched. Use this to only launch a subset of the created
            simulations. By default it takes all the *.p files it can find.

        dryrun : boolean, default=False

        tsleep : float, default=0.30

        cache : boolean, default=False

        Returns
        -------

        None
        """

        nr_jobs = len(self.pbsflist)
        tot_seconds = self.tsleep*nr_jobs
        d = datetime(1,1,1) + timedelta(seconds=tot_seconds)
        rpl = (self.tsleep, d.hour, d.minute, d.second)
        print 'With tsleep=%1.3f s, this will take %02i:%02i:%02i' % rpl
        # we can only ask for confirmation when stdout is not going to the log
        if self.logfile is None:
            print 'Press enter to continue, or CTRL-C to abort'
            raw_input()
        print ''

        # make sure job_id is not set at the start
        job_id = None
        # now we can create the dependency list: a dictionary with the string_nr
        # as key, and the value is the job_id dependency list
        deps = {}
        blok_nr = 1
        nr_string = 1
        job_id_dep = 'none'
        # list comprehension doesn't work on python 2.4
#        deps = {k:[''] for k in range(nr_cpus+1)}
        deps = {}
        for k in range(self.nr_cpus+1):
            deps[k] = ['none']
        header = ('chain'.rjust(self.c0), 'job_id'.rjust(self.c1),
                  'job_id_dep'.rjust(self.c1),
                  'job_name'.rjust(self.c1), 'pbs_in file name')
        print '%s ; %s ; %s ; %s ; %s' % header
        rpl = ('# blok'.rjust(self.c0), str(blok_nr).ljust(self.c1),
               ' '*self.c1, ' '*self.c1)
        print '%s ; %s ; %s ; %s ;' % rpl
        for i, fname in enumerate(self.pbsflist):
            # read the file
            f = open(fname)
            pbs = f.read()
            f.close()
            # the current job will depend on the job launched just before this one
            job_id_dep = deps[nr_string][-1]
            # we cycle over all the nr_cpus. Row per row is filled up. Only for the
            # first row we have an independent job
            # also check if the job_id_dep actually exists. If not, there should
            # not be a dependency in the pbs launch script
            if i < self.nr_cpus or not pbs_job_exists(job_id_dep):
                pbs = pbs.replace('[nodeps]', '### ')
                job_id_dep = 'none'
                if not self.dryrun:
                    job_id = write_qsub_exe(fname, pbs, tsleep=self.tsleep)
                else:
                    job_id = '%i' % i
                # create a dependency list
                deps[nr_string] = [job_id]
            else:
                pbs = pbs.replace('[nodeps]', '#')
                if not self.dryrun:
                    pbs = pbs.replace('[job_id]', job_id_dep)
                    job_id = write_qsub_exe(fname, pbs, tsleep=self.tsleep)
                else:
                    job_id_dep = str(int(job_id) - self.nr_cpus + 1)
                    job_id = '%i' % i
                # append to the existing list
                deps[nr_string].append(job_id)

            # find the job name, second line
            job_name = pbs.split('#PBS')[1].split('-N')[1].strip()
            fname_short = os.path.basename(fname)
            rpl = (str(nr_string).rjust(self.c0), job_id.rjust(self.c1),
                   job_id_dep.rjust(self.c1),
                   job_name.rjust(self.c1), fname_short)
            print '%s ; %s ; %s ; %s ; %s' % rpl

            if nr_string >= self.nr_cpus:
                nr_string = 1
                blok_nr += 1
                progress = '%6i/%i' % (i, nr_jobs)
                rpl = ('# blok'.rjust(self.c0), str(blok_nr).ljust(self.c1),
                       progress.rjust(self.c1), ' '*self.c1)
                print '%s ; %s ; %s ; %s ;' % rpl
            else:
                nr_string += 1

        print '%s ; %s ; %s ; %s ; %s' % header


if __name__ == '__main__':

    # parse the arguments, only relevant when using as a command line utility
    parser = OptionParser(usage='\n\n%prog -n nr_cpus \n\n'
                          '%prog --crontab when running a single iteration of '
                          '%prog as a crontab job every 5 minutes.\n'
                          'File list is read from "launch_pbs_filelist.txt", '
                          'and the configuration can be changed on the fly\n'
                          'by editing the file "launch_scheduler_config.txt".')
    parser.add_argument = parser.add_option
    parser.add_argument('--depend', dest='depend', action='store_true',
                        default=False, help='Switch on for launch depend method')
    parser.add_argument('-n', '--nr_cpus', type='int', dest='nr_cpus',
                        action='store', default=None,
                        help='number of cpus to be used')
    parser.add_argument('-p', '--path_pbs_files', type='string', action='store',
                        dest='path_pbs_files', default='pbs_in/',
                        help='optionally specify location of pbs files')
    parser.add_argument('--re', type='string', action='store',
                        dest='search_crit_re', default=r'\.p$',
                        help='regular expression search criterium applied on '
                        'the full pbs file path. Escape backslashes! '
                        'By default it will select all *.p files in pbs_in/.')
    parser.add_argument('--dry', action='store_true', dest='dry', default=False,
                        help='dry run: do not alter pbs files, do not launch')
    parser.add_argument('--tsleep', action='store', dest='tsleep', type='float',
                        default=5.00, help='Sleep time [s] when cluster is too '
                        'bussy to launch new jobs. Default=5 seconds')
    parser.add_argument('--tsleep_short', action='store', dest='tsleep_short',
                        type='float', default=0.5, help='Sleep time [s] '
                        'between between successive job launches. Default=0.5 '
                        'seconds.')
    parser.add_argument('--logfile', action='store', dest='logfile',
                        default=None, help='Save output to file.')
    parser.add_argument('-c', '--cache', action='store_true', dest='cache',
                        default=False, help='If on, files are read from cache')
    parser.add_argument('--cpu_free', action='store', dest='cpu_free',
                        type='int', default=48, help='No more jobs will be '
                        'launched when the cluster does not have the specified '
                        'amount of cpus free. This will make sure there is '
                        'room for others on the cluster, but might mean less '
                        'cpus available for you. Default=48')
    parser.add_argument('--cpu_user_queue', action='store', dest='cpu_user_queue',
                        type='int', default=5, help='No more jobs will be '
                        'launched after having cpu_user_queue number of jobs '
                        'in the queue. This prevents users from filling the '
                        'queue, while still allowing to aim for a high cpu_free '
                        'target. Default=5')
    parser.add_argument('--qsub_cmd', action='store', dest='qsub_cmd',
                        default='qsub %s',
                        help='Is set automatically by --node flag')
    parser.add_argument('--node', action='store_true', dest='node',
                        default=False, help='If executed on dedicated node. '
                        'Although this works, consider using --crontab instead.'
                        ' Default=False')
    parser.add_argument('--sort', action='store_true', dest='sort',
                        default=False, help='Sort pbs file list. Default=False')
    parser.add_argument('--crontab', action='store_true', dest='crontab',
                        default=False, help='Crontab mode: %prog will check '
                        'every 5 (default) minutes if more jobs can be launched. '
                        'Not compatible with --node. When all jobs are done, '
                        'crontab -r will remove all existing crontab jobs of '
                        'the current user. Use crontab -l to inspect current '
                        'crontab jobs, and edit them with crontab -e. '
                        'Default=False')
    parser.add_argument('--every_min', action='store', dest='every_min',
                        type='int', default=5, help='Crontab update interval '
                        'in minutes. Default=5')
    parser.add_argument('--debug', action='store_true', dest='debug',
                        default=False, help='Debug print statements. Default=False')
    (options, args) = parser.parse_args()
    if options.crontab and options.node:
        parser.print_usage()
        sys.stderr.write("error: --node and --crontab not compatible" + os.linesep)
        sys.exit(1)
    if options.nr_cpus is None and not options.crontab:
        parser.print_usage()
        sys.stderr.write("error: specify number of cpus with -n" + os.linesep)
        sys.exit(1)
    if options.node:
        # repeat exactly the same command but now wrap a pbs script around it
        # and remove the --node argument
        jobname = 'launch.py'
        args = [arg for arg in sys.argv]
        # remove --node argument
        iqsub_cmd, ilogfile = None, None
        for i, arg in enumerate(args):
            if arg == '--node':
                inode = i
            elif arg[:10] == '--qsub_cmd':
                iqsub_cmd = i
                rpl = (os.getcwd(), '%s')
                if HOSTNAME[:1] == 'g':
                    args[i] = '--qsub_cmd=\'ssh g-000 "cd %s;qsub %s"\'' % rpl
                else:
                    args[i] = '--qsub_cmd="cd %s;qsub %s"' % rpl
            elif arg[:9] == '--logfile':
                ilogfile = i
                args[i] = '--logfile= '
        args.pop(inode)
        # if they were not defined in the original command, add them
        if iqsub_cmd is None:
            rpl = (os.getcwd(), '%s')
            if HOSTNAME[:1] == 'g':
                args.append('--qsub_cmd=\'ssh g-000 "cd %s;qsub %s"\'' % rpl)
            else:
                args.append('--qsub_cmd="cd %s;qsub %s"' % rpl)
        if ilogfile is None:
            args.append('--logfile= ')
        # and force the use of logfile and specify qsub_com with ssh

        # first argument is the full path of the script
        args.pop(0)
        com = 'launch.py %s' % ' '.join(args)
        # create a PBS script that contains the launch.py command
        fpath = pbswrap.create_input(walltime='72:00:00', queue='workq',
                                     pbs_in='pbs_in/', ppn=1, pbs_out='pbs_out/',
                                     jobname=jobname, commands=com, lnodes=1)
        job_id = qsub(fpath)
        sys.exit(1)
    elif isinstance(options.logfile, str) or options.crontab:
        if options.logfile is None:
            options.logfile = ''
        if options.logfile.strip() == '':
            path_log = os.path.join(os.getcwd(), 'launch_scheduler_log.txt')
        elif not os.path.isabs(options.logfile):
            path_log = os.path.join(os.getcwd(), options.logfile)
        # log to file and do not print messages, but in crontab mode, append!
        if os.path.exists(path_log) and options.crontab:
            mode = 'a'
        else:
            mode = 'w'
        output = Logger(open(path_log, mode), True)
        sys.stdout = output
    else:
        path_log = None

    ss = Scheduler(options.nr_cpus, path_pbs_files=options.path_pbs_files,
                   search_crit_re=options.search_crit_re, dryrun=options.dry,
                   tsleep=options.tsleep, logfile=path_log,
                   cache=options.cache, cpu_free=options.cpu_free,
                   qsub_cmd=options.qsub_cmd, sort=options.sort,
                   debug=options.debug, cpu_user_queue=options.cpu_user_queue)
    ss.tsleep_short = options.tsleep_short
    if options.crontab:
        if os.path.exists(ss.fname_config):
            ss.read_config()
        else:
            ss.write_config()
            ss.init_state_log()
    else:
        ss.write_config()
        ss.init_state_log()
    ss(depend=options.depend, crontab_mode=options.crontab)
    # start crontab job after first round of launching to prevent it from
    # running more than once
    if options.crontab:
        write_crontab(every_min=options.every_min)