From 420c27b2c74750ec1ac6064def543529efd49855 Mon Sep 17 00:00:00 2001 From: David Robert Verelst <dave@dtu.dk> Date: Fri, 5 Aug 2016 09:53:12 +0200 Subject: [PATCH] launch.py: modularize, move log/config printing away from main logic --- launch.py | 128 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 75 insertions(+), 53 deletions(-) diff --git a/launch.py b/launch.py index bb42afb..383c2e4 100755 --- a/launch.py +++ b/launch.py @@ -410,39 +410,6 @@ class Scheduler: if path_pbs_files is None: path_pbs_files = os.getcwd() - print ' nr_cpus ; %3i' % nr_cpus - print ' cpu_free ; %3i' % cpu_free - print 'cpu_user_queue ; %3i' % cpu_user_queue - print 'path_pbs_files ; %s' % path_pbs_files - print 'search_crit_re ; %s' % search_crit_re - print ' dryrun ; %s' % dryrun - print ' tsleep ; %s' % tsleep - print ' logfile ; %s' % logfile - print ' path_log ; %s' % path_log - print ' cache ; %s' % cache - - # build the file list - fname = os.path.join(os.getcwd(), 'pbs_in_file_cache.txt') - if not cache: - print 'Building file list of to be launched jobs. ', - print 'Might take a while...' - pbsflist = build_pbsflist(path_pbs_files) - if sort: - pbsflist.sort() - save_pbsflist(fname, pbsflist) - else: - print 'Loading file list from cache...' - pbsflist = load_pbsflist(fname) - if sort: - pbsflist.sort() - - # filter out files we will not launch - regex = re.compile(search_crit_re) - pbsflist2 = select(pbsflist, regex) - self.pbsflist = pbsflist2 - nr_jobs = len(pbsflist2) - print 'Done building/loading. Ready to qsub %i jobs' % nr_jobs - self.uid = os.getenv('USER') self.nr_cpus = nr_cpus self.cpu_free = cpu_free @@ -460,6 +427,7 @@ class Scheduler: self.fname_config = 'launch_scheduler_config.txt' self.fname_cluster_state = 'launch_scheduler_state.txt' self.reload_pbsflist = 0 + self.cache = cache self.pbsnodes = '/opt/pbs/bin/pbsnodes' if HOSTNAME[:1] == 'g': @@ -479,13 +447,80 @@ class Scheduler: the scheduler method is used. """ if not depend: - print ' method ; scheduler' - self.launch(crontab_mode=crontab_mode) + if crontab_mode: + pbsflist = self.get_joblist(verbose=False) + self.launch(pbsflist, crontab_mode=crontab_mode) + else: + self.print_config() + print ' method ; scheduler' + pbsflist = self.filter_joblist(self.get_joblist()) + self.print_logheader() + self.launch(pbsflist, crontab_mode=crontab_mode) + print '' else: print ' method ; dependencies' self.launch_deps() + print '' + + def get_joblist(self, verbose=True): + + # build the file list + self.f_pbsflist = os.path.join(os.getcwd(), 'pbs_in_file_cache.txt') + if not self.cache: + if verbose: + print 'Building file list of to be launched jobs. ', + print 'Might take a while...' + pbsflist = build_pbsflist(self.path_pbs_files) + if self.sort: + pbsflist.sort() + save_pbsflist(self.f_pbsflist, pbsflist) + else: + if verbose: + print 'Loading file list from cache...' + pbsflist = load_pbsflist(self.f_pbsflist) + if self.sort: + pbsflist.sort() + return pbsflist + + def filter_joblist(self, pbsflist): + # filter out files we will not launch + regex = re.compile(self.search_crit_re) + pbsflist = select(pbsflist, regex) + nr_jobs = len(pbsflist) + print 'Done building/loading. Ready to qsub %i jobs' % nr_jobs + return pbsflist + + def print_config(self): + """Print a header for the log with current config. + """ + + print ' nr_cpus ; %3i' % self.nr_cpus + print ' cpu_free ; %3i' % self.cpu_free + print 'cpu_user_queue ; %3i' % self.cpu_user_queue + print 'path_pbs_files ; %s' % self.path_pbs_files + print 'search_crit_re ; %s' % self.search_crit_re + print ' dryrun ; %s' % self.dryrun + print ' tsleep ; %s' % self.tsleep + print ' logfile ; %s' % self.logfile + print ' path_log ; %s' % path_log + print ' cache ; %s' % self.cache + + def print_logheader(self): + print ' tsleep_short ; %6.2f' % self.tsleep_short + print 'min cpu to be free ; %3i' % self.cpu_free + # we can only ask for confirmation when stdout is not going to the log + if self.logfile is None: + print 'Is this ok? You have 5 sec to abort (press CTRL-C to abort)' + time.sleep(5) +# print 'Press enter to continue, or CTRL-C to abort' +# dummy = raw_input() print '' + header = ('job nr'.rjust(self.c0), 'job_id'.rjust(self.c1), + 'progress'.rjust(self.c1), 'time stamp'.rjust(self.c2), + 'pbs_in file name') + print '%s ; %s ; %s ; %s ; %s' % header + def write_config(self): """Save a config file with the current tuning parameters. """ @@ -499,6 +534,7 @@ class Scheduler: f.write(' tsleep_short : % 5.02f\n' % self.tsleep_short) f.write('pbs_update_deltat : % 5.02f\n' % self.pbs_update_deltat) f.write(' reload_pbsflist : % 5i\n' % self.reload_pbsflist) + f.write(' fname pbsflist : %s\n' % self.f_pbsflist) f.flush() f.close() @@ -569,7 +605,7 @@ class Scheduler: return cpu_free, cpu_user, cpu_user_queue - def launch(self, crontab_mode=False): + def launch(self, pbsflist, crontab_mode=False): """ Iterate over all the pbs files and launch them if there are cpus available, and if the maximum number of predifined cpus is not reached. @@ -579,23 +615,9 @@ class Scheduler: used by the user. This will prevent the scheduler from spamming the queueing system with jobs that will only queue intstead of run. """ - print ' tsleep_short ; %6.2f' % self.tsleep_short - print 'min cpu to be free ; %3i' % self.cpu_free - # we can only ask for confirmation when stdout is not going to the log - if self.logfile is None: - print 'Is this ok? You have 5 sec to abort (press CTRL-C to abort)' - time.sleep(5) -# print 'Press enter to continue, or CTRL-C to abort' -# dummy = raw_input() - print '' - - header = ('job nr'.rjust(self.c0), 'job_id'.rjust(self.c1), - 'progress'.rjust(self.c1), 'time stamp'.rjust(self.c2), - 'pbs_in file name') - print '%s ; %s ; %s ; %s ; %s' % header ii = 0 - ii_max = len(self.pbsflist) + ii_max = len(pbsflist) if ii_max < 1: print "" print "No files matching %s at:" % self.search_crit_re @@ -620,7 +642,7 @@ class Scheduler: if (self.nr_cpus > cpu_user) and (cpu_free > self.cpu_free) and \ (cpu_user_queue < self.cpu_user_queue): - fname = self.pbsflist[ii] + fname = pbsflist[ii] # fname minus current working dir fname_short = fname.replace(os.getcwd(), '') # # read the PBS file @@ -664,7 +686,7 @@ class Scheduler: ii += 1 elif crontab_mode: fname = os.path.join(os.getcwd(), 'pbs_in_file_cache.txt') - save_pbsflist(fname, self.pbsflist[ii:]) + save_pbsflist(fname, pbsflist[ii:]) return else: # wait a bit before trying to launch a job again -- GitLab