diff --git a/docs/install-anaconda.md b/docs/install-anaconda.md index 4ba6915c47ce027619eb075d9cf8f10f9ac7bc8f..f22d59b2745060de601de88838310ef74010ac69 100644 --- a/docs/install-anaconda.md +++ b/docs/install-anaconda.md @@ -8,8 +8,8 @@ conda update --all conda create -n wetb_py3 python=3.5 source activate wetb_py3 -conda install setuptools_scm future h5py pytables pytest nose sphinx +conda install setuptools_scm future h5py pytables pytest nose sphinx blosc conda install scipy pandas matplotlib cython xlrd coverage xlwt openpyxl -pip install pyscaffold pytest-cov +pip install pyscaffold pytest-cov --no-deps ``` diff --git a/docs/install-manual-detailed.md b/docs/install-manual-detailed.md index 81acbbbf1f66f1df6eca725f17ec283dc09f15e6..c18162998e5c0cbfd87564b62b6dd4689875e9ef 100644 --- a/docs/install-manual-detailed.md +++ b/docs/install-manual-detailed.md @@ -165,7 +165,7 @@ activate py27 * Install the necessary Python dependencies using the conda package manager: ``` -conda install setuptools_scm future h5py pytables pytest nose sphinx +conda install setuptools_scm future h5py pytables pytest nose sphinx blosc conda install scipy pandas matplotlib cython xlrd coverage xlwt openpyxl ``` @@ -173,7 +173,7 @@ conda install scipy pandas matplotlib cython xlrd coverage xlwt openpyxl easily installed with pip: ``` -pip install pyscaffold pytest-cov +pip install pyscaffold pytest-cov --no-deps ``` diff --git a/wetb/hawc2/htc_contents.py b/wetb/hawc2/htc_contents.py index 54c338b47826883c96a64e3db4fa055c6593e2ba..7bbe2c4548aec27ad8c6ca1f6fd633f83465ed5f 100644 --- a/wetb/hawc2/htc_contents.py +++ b/wetb/hawc2/htc_contents.py @@ -351,7 +351,7 @@ class HTCDefaults(object): else: mann.add_line('create_turb_parameters', [L, ae23, Gamma, seed, int(high_frq_compensation)], "L, alfaeps, gamma, seed, highfrq compensation") if filenames is None: - fmt = "l%.1f_ae%.2f_g%.1f_h%d_%dx%dx%d_%.3fx%.2fx%.2f_s%04d%c.turb" + fmt = "mann_l%.1f_ae%.2f_g%.1f_h%d_%dx%dx%d_%.3fx%.2fx%.2f_s%04d%c.turb" import numpy as np dxyz = tuple(np.array(box_dimension) / no_grid_points) filenames = ["./turb/" + fmt % ((L, ae23, Gamma, high_frq_compensation) + no_grid_points + dxyz + (seed, uvw)) for uvw in ['u', 'v', 'w']] diff --git a/wetb/hawc2/htc_file.py b/wetb/hawc2/htc_file.py index 10eb514ce850b58647367475c58d960550e30bd7..0a58d861b490e8f8865517e9de4ea8a653891e12 100644 --- a/wetb/hawc2/htc_file.py +++ b/wetb/hawc2/htc_file.py @@ -146,8 +146,10 @@ class HTCFile(HTCContents, HTCDefaults): if 'soil' in self: if 'soil_element' in self.soil: files.append(self.soil.soil_element.get('datafile', [None])[0]) - if 'force' in self: - files.append(self.force.get('dll', [None])[0]) + try: + files.append(self.force.dll.dll[0]) + except: + pass return [f for f in set(files) if f] diff --git a/wetb/hawc2/log_file.py b/wetb/hawc2/log_file.py index 12dac0ab0be8b0bdda5acd154a733ca999fd9ff1..0316f709232bf56a7f97e26dbd1a01bc17ba57e2 100644 --- a/wetb/hawc2/log_file.py +++ b/wetb/hawc2/log_file.py @@ -101,7 +101,7 @@ class LogInterpreter(object): i1 = simulation_txt.rfind("Global time") if i1 > -1: self.current_time = self.extract_time(simulation_txt[i1:]) - if self.time_stop > 0: + if self.current_time is not None and self.time_stop > 0: self.pct = int(100 * self.current_time // self.time_stop) try: self.remaining_time = (time.time() - self.start_time[1]) / (self.current_time - self.start_time[0]) * (self.time_stop - self.current_time) @@ -123,11 +123,11 @@ class LogInterpreter(object): if self.remaining_time: if self.remaining_time < 3600: m, s = divmod(self.remaining_time, 60) - return "%02d:%02d" % (m, math.ceil(s)) + return "%02d:%02d" % (m, int(s)) else: h, ms = divmod(self.remaining_time, 3600) m, s = divmod(ms, 60) - return "%d:%02d:%02d" % (h, m, math.ceil(s)) + return "%d:%02d:%02d" % (h, m, int(s)) else: return "--:--" diff --git a/wetb/hawc2/simulation.py b/wetb/hawc2/simulation.py index aae4a60932023bf471ce231ad0806dc34101fc11..9874f3e81f7e950ed6ae5a995dcfd2cdb9d727f2 100755 --- a/wetb/hawc2/simulation.py +++ b/wetb/hawc2/simulation.py @@ -23,8 +23,9 @@ from future import standard_library from wetb.utils.cluster_tools import pbsjob from wetb.utils.cluster_tools.cluster_resource import LocalResource -from wetb.utils.cluster_tools.pbsjob import SSHPBSJob +from wetb.utils.cluster_tools.pbsjob import SSHPBSJob, DONE, NOT_SUBMITTED from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.timing import print_time standard_library.install_aliases() from threading import Thread @@ -98,7 +99,6 @@ class Simulation(object): else: self.log_filename = os.path.relpath(self.log_filename) self.log_filename = unix_path(self.log_filename) - self.logFile = LogFile(os.path.join(self.modelpath, self.log_filename), self.time_stop) self.logFile.clear() self.last_status = self.status @@ -129,6 +129,8 @@ class Simulation(object): time.sleep(0.1) if self.logFile.status not in [log_file.DONE]: self.status = ABORTED + self.is_simulating = False + self.is_done = True if update_status: self.update_status() @@ -191,6 +193,7 @@ class Simulation(object): self.status = INITIALIZING self.logFile.clear() self.host._simulate() + self.returncode, self.stdout = self.host.returncode, self.host.stdout if self.host.returncode or 'error' in self.host.stdout.lower(): if "error" in self.host.stdout.lower(): self.errors = (list(set([l for l in self.host.stdout.split("\n") if 'error' in l.lower()]))) @@ -244,6 +247,7 @@ class Simulation(object): self.status = FINISH + def __str__(self): return "Simulation(%s)" % self.filename @@ -335,10 +339,12 @@ class UpdateStatusThread(Thread): Thread.start(self) def run(self): - print ("Wrong updatestatus") while self.simulation.is_done is False: self.simulation.update_status() - time.sleep(self.interval) + time.sleep(0.5) + t = time.time() + while self.simulation.is_simulating and time.time() < t + self.interval: + time.sleep(1) class SimulationResource(object): @@ -455,8 +461,11 @@ class SimulationThread(Thread): def run(self): import psutil p = psutil.Process(os.getpid()) - if self.low_priority: - p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + try: + if self.low_priority: + p.set_nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + except: + pass self.process.communicate() errorcode = self.process.returncode @@ -473,7 +482,7 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def __init__(self, simulation, resource, host, username, password, port=22): SimulationResource.__init__(self, simulation) SSHClient.__init__(self, host, username, password, port=port) - self.pbsjob = SSHPBSJob(host, username, password, port) + self.pbsjob = SSHPBSJob(resource.shared_ssh) self.resource = resource hawc2exe = property(lambda self : os.path.basename(self.sim.hawc2exe)) @@ -505,8 +514,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): try: dst_file = os.path.join(self.modelpath, os.path.relpath(src_file, self.tmp_modelpath)) os.makedirs(os.path.dirname(dst_file), exist_ok=True) - self.download(src_file, dst_file, verbose=False) - except ValueError as e: + self.download(src_file, dst_file, retry=3) + except Exception as e: print (self.modelpath, src_file, self.tmp_modelpath) raise e try: @@ -545,25 +554,29 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): def update_logFile_status(self): - status = self.pbsjob.status + def pbsjob_status(): + if self.pbsjob._status in [NOT_SUBMITTED, DONE]: + return self.pbsjob._status + if self.pbsjob.jobid in self.resource.is_executing: + self.pbsjob._status = pbsjob.RUNNING + elif self.simulation_id in self.resource.finished: + self.pbsjob._status = DONE + self.pbsjob.jobid = None + return self.pbsjob._status + + + status = pbsjob_status() if status == pbsjob.NOT_SUBMITTED: pass elif status == pbsjob.DONE: self.is_simulating = False pass else: - try: - _, out, _ = self.execute("cat .hawc2launcher/status_%s" % self.simulation_id) - out = out.split(";") - if len(out) == 5: - self.status = out[0] - self.logFile = LogInfo(*out[1:]) - - except Exception as e: - if "No such file or directory" in str(e): - pass - else: - raise + if self.simulation_id in self.resource.loglines: + self.logline = self.resource.loglines[self.simulation_id] + self.status = self.logline[0] + self.logFile = LogInfo(*self.logline[1:]) + def start(self): """Start non blocking distributed simulation""" @@ -578,6 +591,8 @@ class PBSClusterSimulationHost(SimulationResource, SSHClient): self.finish_simulation() except: pass + self.is_simulating = False + self.is_done = True if self.status != ERROR and self.logFile.status not in [log_file.DONE]: self.status = ABORTED diff --git a/wetb/hawc2/tests/test_htc_file.py b/wetb/hawc2/tests/test_htc_file.py index f0ecc3d0611190d02287d5cd8cf53f6dfa18aee6..e852d6256d2292c577ccd68a9b9c0f617a6834de 100644 --- a/wetb/hawc2/tests/test_htc_file.py +++ b/wetb/hawc2/tests/test_htc_file.py @@ -118,9 +118,9 @@ class TestHtcFile(unittest.TestCase): htcfile.add_mann_turbulence(30.1, 1.1, 3.3, 102, False) s = """begin mann; create_turb_parameters\t30.1 1.1 3.3 102 0;\tL, alfaeps, gamma, seed, highfrq compensation - filename_u\t./turb/l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102u.turb; - filename_v\t./turb/l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102v.turb; - filename_w\t./turb/l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102w.turb; + filename_u\t./turb/mann_l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102u.turb; + filename_v\t./turb/mann_l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102v.turb; + filename_w\t./turb/mann_l30.1_ae1.10_g3.3_h0_4096x32x32_1.465x3.12x3.12_s0102w.turb; box_dim_u\t4096 1.4652; box_dim_v\t32 3.2258; box_dim_w\t32 3.2258; diff --git a/wetb/prepost/Simulations.py b/wetb/prepost/Simulations.py index 4c5d7bd2c6da30fd153caa1714763a2603d3b88a..751149b931c73cb65da32805d9f7748ffc459f6b 100755 --- a/wetb/prepost/Simulations.py +++ b/wetb/prepost/Simulations.py @@ -1047,7 +1047,8 @@ def launch(cases, runmethod='local', verbose=False, copyback_turb=True, 'linux-script, windows-script, local-ram, none' raise ValueError(msg) -def post_launch(cases, save_iter=False, silent=False): +def post_launch(cases, save_iter=False, silent=False, suffix=None, + path_errorlog=None): """ Do some basics checks: do all launched cases have a result and LOG file and are there any errors in the LOG files? @@ -1153,8 +1154,11 @@ def post_launch(cases, save_iter=False, silent=False): # but put in one level up, so in the logfiles folder directly errorlogs.ResultFile = sim_id + '_ErrorLog.csv' # save the log file analysis in the run_dir instead of the log_dir - errorlogs.PathToLogs = run_dir# + log_dir - errorlogs.save() + if path_errorlog is None: + errorlogs.PathToLogs = run_dir# + log_dir + else: + errorlogs.PathToLogs = path_errorlog + errorlogs.save(suffix=suffix) # save the error LOG list, this is redundant, since it already exists in # the general LOG file (but only as a print, not the python variable) @@ -1167,7 +1171,7 @@ def post_launch(cases, save_iter=False, silent=False): return cases_fail -def copy_pbs_in_failedcases(cases_fail, pbs_fail='pbs_in_fail', silent=True): +def copy_pbs_in_failedcases(cases_fail, path='pbs_in_fail', silent=True): """ Copy all the pbs_in files from failed cases to a new directory so it is easy to re-launch them @@ -1181,7 +1185,7 @@ def copy_pbs_in_failedcases(cases_fail, pbs_fail='pbs_in_fail', silent=True): src = os.path.join(run_dir, case['[pbs_in_dir]'], pbs_in_fname) - pbs_in_dir_fail = case['[pbs_in_dir]'].replace('pbs_in', pbs_fail) + pbs_in_dir_fail = case['[pbs_in_dir]'].replace('pbs_in', path) dst = os.path.join(run_dir, pbs_in_dir_fail, pbs_in_fname) if not silent: @@ -2543,12 +2547,12 @@ class ErrorLogs(object): # TODO: move to the HAWC2 plugin for cases - def __init__(self, silent=False, cases=None): + def __init__(self, silent=False, cases=None, resultfile='ErrorLog.csv'): self.silent = silent # specify folder which contains the log files self.PathToLogs = '' - self.ResultFile = 'ErrorLog.csv' + self.ResultFile = resultfile self.cases = cases @@ -2681,11 +2685,22 @@ class ErrorLogs(object): tempLog = [] tempLog.append(fname) exit_correct, found_error = False, False + + subcols_sim = 4 + subcols_init = 2 # create empty list item for the different messages and line # number. Include one column for non identified messages - for j in range(self.init_cols + self.sim_cols + 1): - tempLog.append('') - tempLog.append('') + for j in range(self.init_cols): + # 2 sub-columns per message: nr, msg + for k in range(subcols_init): + tempLog.append('') + for j in range(self.sim_cols): + # 4 sub-columns per message: first, last, nr, msg + for k in range(subcols_sim): + tempLog.append('') + # and two more columns at the end for messages of unknown origin + tempLog.append('') + tempLog.append('') # if there is a cases object, see how many time steps we expect if self.cases is not None: @@ -2729,32 +2744,51 @@ class ErrorLogs(object): # if string is shorter, we just get a shorter string. # checking presence in dict is faster compared to checking # the length of the string + # first, last, nr, msg if msg in self.err_init: - col_nr = self.err_init[msg] - # 2nd item is the column position of the message - tempLog[2*(col_nr+1)] = line - # line number of the message - tempLog[2*col_nr+1] += '%i, ' % j + # icol=0 -> fname + icol = subcols_init*self.err_init[msg] + 1 + # 0: number of occurances + if tempLog[icol] == '': + tempLog[icol] = '1' + else: + tempLog[icol] = str(int(tempLog[icol]) + 1) + # 1: the error message itself + tempLog[icol+1] = line found_error = True # find errors that can occur during simulation elif msg in self.err_sim: - col_nr = self.err_sim[msg] + self.init_cols - # 2nd item is the column position of the message - tempLog[2*(col_nr+1)] = line + icol = subcols_sim*self.err_sim[msg] + icol += subcols_init*self.init_cols + 1 # in case stuff already goes wrong on the first time step if time_step == -1: time_step = 0 - # line number of the message - tempLog[2*col_nr+1] += '%i, ' % time_step + + # 1: time step of first occurance + if tempLog[icol] == '': + tempLog[icol] = '%i' % time_step + # 2: time step of last occurance + tempLog[icol+1] = '%i' % time_step + # 3: number of occurances + if tempLog[icol+2] == '': + tempLog[icol+2] = '1' + else: + tempLog[icol+2] = str(int(tempLog[icol+2]) + 1) + # 4: the error message itself + tempLog[icol+3] = line + found_error = True iterations[time_step,2] = 1 # method of last resort, we have no idea what message elif line[:10] == ' *** ERROR' or line[:10]==' ** WARNING': - tempLog[-2] = line + icol = subcols_sim*self.sim_cols + icol += subcols_init*self.init_cols + 1 # line number of the message - tempLog[-1] = j + tempLog[icol] = j + # and message + tempLog[icol+1] = line found_error = True # in case stuff already goes wrong on the first time step if time_step == -1: @@ -2863,11 +2897,11 @@ class ErrorLogs(object): else: self.save(appendlog=appendlog) - def save(self, appendlog=False): + def save(self, appendlog=False, suffix=None): # write the results in a file, start with a header - contents = 'file name;' + 'lnr;msg;'*(self.init_cols) - contents += 'iter_nr;msg;'*(self.sim_cols) + contents = 'file name;' + 'nr;msg;'*(self.init_cols) + contents += 'first_tstep;last_tstep;nr;msg;'*(self.sim_cols) contents += 'lnr;msg;' # and add headers for elapsed time, nr of iterations, and sec/iteration contents += 'Elapsted time;last time step;Simulation time;' @@ -2882,7 +2916,11 @@ class ErrorLogs(object): contents = contents + '\n' # write csv file to disk, append to facilitate more logfile analysis - fname = os.path.join(self.PathToLogs, str(self.ResultFile)) + if isinstance(suffix, str): + tmp = self.ResultFile.replace('.csv', '_%s.csv' % suffix) + fname = os.path.join(self.PathToLogs, tmp) + else: + fname = os.path.join(self.PathToLogs, str(self.ResultFile)) if not self.silent: print('Error log analysis saved at:') print(fname) @@ -3318,7 +3356,7 @@ class WeibullParameters(object): def compute_env_of_env(envelope, dlc_list, Nx=300, Nsectors=12, Ntheta=181): """ - The function computes load envelopes for given channels and a groups of + The function computes load envelopes for given channels and a groups of load cases starting from the envelopes computed for single simulations. The output is the envelope of the envelopes of the single simulations. This total envelope is projected on defined polar directions. @@ -3326,7 +3364,7 @@ def compute_env_of_env(envelope, dlc_list, Nx=300, Nsectors=12, Ntheta=181): Parameters ---------- - envelope : dict, dictionaries of interpolated envelopes of a given + envelope : dict, dictionaries of interpolated envelopes of a given channel (it's important that each entry of the dictonary contains a matrix of the same dimensions). The dictonary is organized by load case @@ -3335,28 +3373,28 @@ def compute_env_of_env(envelope, dlc_list, Nx=300, Nsectors=12, Ntheta=181): Nx : int, default=300 Number of points for the envelope interpolation - + Nsectors: int, default=12 Number of sectors in which the total envelope will be divided. The default is every 30deg - + Ntheta; int, default=181 Number of angles in which the envelope is interpolated in polar coordinates. - + Returns ------- - envelope : array (Nsectors x 6), + envelope : array (Nsectors x 6), Total envelope projected on the number of angles defined in Nsectors. The envelope is projected in Mx and My and the other cross-sectional moments and forces are fetched accordingly (at the same time step where the corresponding Mx and My are occuring) """ - + # Group all the single DLCs - cloud = np.zeros(((Nx+1)*len(envelope),6)) + cloud = np.zeros(((Nx+1)*len(envelope),6)) for i in range(len(envelope)): cloud[(Nx+1)*i:(Nx+1)*(i+1),:] = envelope[dlc_list[i]] # Compute total Hull of all the envelopes @@ -3367,10 +3405,10 @@ def compute_env_of_env(envelope, dlc_list, Nx=300, Nsectors=12, Ntheta=181): cc_x,cc_up,cc_low,cc_int= int_envelope(cc[:,0], cc[:,1], Nx=Nx) # Project full envelope on given direction cc_proj = proj_envelope(cc_x, cc_up, cc_low, cc_int, Nx, Nsectors, Ntheta) - + env_proj = np.zeros([len(cc_proj),6]) env_proj[:,:2] = cc_proj - + # Based on Mx and My, gather the remaining cross-sectional forces and # moments for ich in range(2, 6): @@ -3378,18 +3416,18 @@ def compute_env_of_env(envelope, dlc_list, Nx=300, Nsectors=12, Ntheta=181): s1 = np.array(cloud[hull.vertices[0], ich]).reshape(-1, 1) s0 = np.append(s0, s1, axis=0) cc = np.append(cc, s0, axis=1) - + _,_,_,extra_sensor = int_envelope(cc[:,0],cc[:,ich],Nx) - es = np.atleast_2d(np.array(extra_sensor[:,1])).T + es = np.atleast_2d(np.array(extra_sensor[:,1])).T cc_int = np.append(cc_int,es,axis=1) - + for isec in range(Nsectors): ids = (np.abs(cc_int[:,0]-cc_proj[isec,0])).argmin() env_proj[isec,ich] = (cc_int[ids-1,ich]+cc_int[ids,ich]+\ cc_int[ids+1,ich])/3 - + return env_proj - + def int_envelope(ch1,ch2,Nx): # Function to interpolate envelopes and output arrays of same length @@ -3411,8 +3449,8 @@ def int_envelope(ch1,ch2,Nx): lower = np.concatenate((np.array([ch1[indmin:],ch2[indmin:]]).T, np.array([ch1[:indmax+1],ch2[:indmax+1]]).T), axis=0) - - + + int_1 = np.linspace(min(upper[:,0].min(),lower[:,0].min()), max(upper[:,0].max(),lower[:,0].max()),Nx/2+1) upper = np.flipud(upper) @@ -3435,20 +3473,20 @@ def proj_envelope(env_x, env_up, env_low, env, Nx, Nsectors, Ntheta): theta_int = np.linspace(-np.pi,np.pi,Ntheta) sectors = np.linspace(-np.pi,np.pi,Nsectors+1) proj = np.zeros([Nsectors,2]) - + R_up = np.sqrt(env_x**2+env_up**2) theta_up = np.arctan2(env_up,env_x) - + R_low = np.sqrt(env_x**2+env_low**2) theta_low = np.arctan2(env_low,env_x) - + R = np.concatenate((R_up,R_low)) theta = np.concatenate((theta_up,theta_low)) R = R[np.argsort(theta)] theta = np.sort(theta) - + R_int = np.interp(theta_int,theta,R,period=2*np.pi) - + for i in range(Nsectors): if sectors[i]>=-np.pi and sectors[i+1]<-np.pi/2: indices = np.where(np.logical_and(theta_int >= sectors[i], @@ -3478,17 +3516,17 @@ def proj_envelope(env_x, env_up, env_low, env, Nx, Nsectors, Ntheta): maxR = R_int[indices].max() proj[i,0] = maxR*np.cos(sectors[i]) proj[i,1] = maxR*np.sin(sectors[i]) - - ind = np.where(sectors==0) + + ind = np.where(sectors==0) proj[ind,0] = env[:,0].max() - ind = np.where(sectors==np.pi/2) + ind = np.where(sectors==np.pi/2) proj[ind,1] = env[:,1].max() - ind = np.where(sectors==-np.pi) + ind = np.where(sectors==-np.pi) proj[ind,0] = env[:,0].min() - ind = np.where(sectors==-np.pi/2) + ind = np.where(sectors==-np.pi/2) proj[ind,1] = env[:,1].min() return proj @@ -3652,7 +3690,7 @@ class Cases(object): self.cases.pop(k) def launch(self, runmethod='local', verbose=False, copyback_turb=True, - silent=False, check_log=True): + silent=False, check_log=True): """ Launch all cases """ @@ -3660,7 +3698,8 @@ class Cases(object): launch(self.cases, runmethod=runmethod, verbose=verbose, silent=silent, check_log=check_log, copyback_turb=copyback_turb) - def post_launch(self, save_iter=False, copy_pbs_failed=True): + def post_launch(self, save_iter=False, copy_pbs_failed=True, suffix=None, + path_errorlog=None, silent=False): """ Post Launching Maintenance @@ -3668,11 +3707,12 @@ class Cases(object): accounted for. """ # TODO: integrate global post_launch in here - self.cases_fail = post_launch(self.cases, save_iter=save_iter) + self.cases_fail = post_launch(self.cases, save_iter=save_iter, + suffix=suffix, path_errorlog=path_errorlog) if copy_pbs_failed: - copy_pbs_in_failedcases(self.cases_fail, pbs_in_fail='pbs_in_fail', - silent=self.silent) + copy_pbs_in_failedcases(self.cases_fail, path='pbs_in_fail', + silent=silent) if self.rem_failed: self.remove_failed() @@ -5094,8 +5134,8 @@ class Cases(object): def compute_envelope(self, sig, ch_list, int_env=False, Nx=300): """ - The function computes load envelopes for given signals and a single - load case. Starting from Mx and My moments, the other cross-sectional + The function computes load envelopes for given signals and a single + load case. Starting from Mx and My moments, the other cross-sectional forces are identified. Parameters @@ -5104,20 +5144,20 @@ class Cases(object): sig : list, time-series signal ch_list : list, list of channels for enevelope computation - + int_env : boolean, default=False - If the logic parameter is True, the function will interpolate the + If the logic parameter is True, the function will interpolate the envelope on a given number of points Nx : int, default=300 - Number of points for the envelope interpolation - + Number of points for the envelope interpolation + Returns ------- - envelope : dictionary, + envelope : dictionary, The dictionary has entries refered to the channels selected. - Inside the dictonary under each entry there is a matrix with 6 + Inside the dictonary under each entry there is a matrix with 6 columns, each for the sectional forces and moments """ @@ -5135,13 +5175,13 @@ class Cases(object): closed_contour = np.append(cloud[hull.vertices,:], cloud[hull.vertices[0],:].reshape(1,2), axis=0) - + # Interpolate envelope for a given number of points if int_env: _,_,_,closed_contour_int = int_envelope(closed_contour[:,0], - closed_contour[:,1],Nx) - - + closed_contour[:,1],Nx) + + # Based on Mx and My envelope, the other cross-sectional moments # and forces components are identified and appended to the initial # envelope @@ -5154,16 +5194,16 @@ class Cases(object): if int_env: _,_,_,extra_sensor = int_envelope(closed_contour[:,0], closed_contour[:,ich],Nx) - es = np.atleast_2d(np.array(extra_sensor[:,1])).T + es = np.atleast_2d(np.array(extra_sensor[:,1])).T closed_contour_int = np.append(closed_contour_int,es,axis=1) if int_env: envelope[ch[0]] = closed_contour_int else: envelope[ch[0]] = closed_contour - + return envelope - + def int_envelope(ch1,ch2,Nx): # Function to interpolate envelopes and output arrays of same length diff --git a/wetb/prepost/dlcdefs.py b/wetb/prepost/dlcdefs.py index e7ef0a6a43b75347d75473d136007016c6f3032c..df704758291dded2e75773f9b6c47a709e473113 100644 --- a/wetb/prepost/dlcdefs.py +++ b/wetb/prepost/dlcdefs.py @@ -240,8 +240,8 @@ def tags_defaults(master): return master -def excel_stabcon(proot, fext='xlsx', pignore=None, sheet=0, - pinclude=None, silent=False): +def excel_stabcon(proot, fext='xlsx', pignore=None, pinclude=None, sheet=0, + silent=False): """ Read all MS Excel files that hold load case definitions according to the team STABCON definitions. Save each case in a list according to the @@ -251,6 +251,9 @@ def excel_stabcon(proot, fext='xlsx', pignore=None, sheet=0, are added to be compatible with the tag convention in the Simulations module. + The opt_tags case list is sorted according to the Excel file names, and + follows the same ordering as in each of the different Excel files. + Parameters ---------- @@ -272,25 +275,32 @@ def excel_stabcon(proot, fext='xlsx', pignore=None, sheet=0, Name or index of the Excel sheet to be considered. By default, the first sheet (index=0) is taken. + Returns + ------- + + opt_tags : list of dicts + A list of case dictionaries, where each case dictionary holds all + the tag/value key pairs for a single given case. + """ if not silent: print('looking for DLC spreadsheet definitions at:') print(proot) - df_list = misc.read_excel_files(proot, fext=fext, pignore=pignore, + dict_dfs = misc.read_excel_files(proot, fext=fext, pignore=pignore, sheet=sheet, pinclude=pinclude, silent=silent) if not silent: - print('found %i Excel file(s), ' % len(df_list), end='') + print('found %i Excel file(s), ' % len(dict_dfs), end='') k = 0 - for df in df_list: + for df in dict_dfs: k += len(df) if not silent: print('in which a total of %s cases are defined.' % k) opt_tags = [] - for (dlc, df) in viewitems(df_list): + for (dlc, df) in sorted(viewitems(dict_dfs)): # replace ';' with False, and Nan(='') with True # this is more easy when testing for the presence of stuff compared # to checking if a value is either True/False or ''/';' diff --git a/wetb/prepost/misc.py b/wetb/prepost/misc.py index 7deb74f99f6cb227a3f517ae25f366a38c596548..f5cd5cf93466285c110c6ad70b99d5e7ffaf96de 100644 --- a/wetb/prepost/misc.py +++ b/wetb/prepost/misc.py @@ -716,17 +716,16 @@ def read_excel_files(proot, fext='xlsx', pignore=None, sheet=0, Returns ------- - df_list : list - A list of pandas DataFrames. Each DataFrame corresponds to the - contents of a single Excel file that was found in proot or one of - its sub-directories + df_list : dictionary + A dictionary with the Excel file name (excluding 'fext') as key, and + the corresponding pandas DataFrame as value. """ df_list = {} # find all dlc defintions in the subfolders for root, dirs, files in os.walk(proot): - for file_name in sorted(files): + for file_name in files: if not file_name.split('.')[-1] == fext: continue f_target = os.path.join(root, file_name) diff --git a/wetb/utils/cluster_tools/cluster_resource.py b/wetb/utils/cluster_tools/cluster_resource.py index 162036fd95edafba7bd1e7e45a1d961b9fbefca1..ebdf6f4aece80b05aa114b39ab516ea5bb76e989 100644 --- a/wetb/utils/cluster_tools/cluster_resource.py +++ b/wetb/utils/cluster_tools/cluster_resource.py @@ -9,7 +9,9 @@ import threading import psutil from wetb.utils.cluster_tools import pbswrap -from wetb.utils.cluster_tools.ssh_client import SSHClient +from wetb.utils.cluster_tools.ssh_client import SSHClient, SharedSSHClient +from _collections import deque +import time class Resource(object): @@ -44,9 +46,11 @@ class Resource(object): class SSHPBSClusterResource(Resource, SSHClient): def __init__(self, host, username, password, port, min_cpu, min_free): Resource.__init__(self, min_cpu, min_free) + self.shared_ssh = SharedSSHClient(host, username, password, port) SSHClient.__init__(self, host, username, password, port=port) self.lock = threading.Lock() + def new_ssh_connection(self): return SSHClient(self.host, self.username, self.password, self.port) @@ -87,6 +91,8 @@ class SSHPBSClusterResource(Resource, SSHClient): + + class LocalResource(Resource): def __init__(self, process_name): N = max(1, multiprocessing.cpu_count() / 2) @@ -97,11 +103,11 @@ class LocalResource(Resource): def check_resources(self): def name(i): try: - return psutil.Process(i).name - except (psutil._error.AccessDenied, psutil._error.NoSuchProcess): + return psutil.Process(i).name() + except (psutil.AccessDenied, psutil.NoSuchProcess): return "" no_cpu = multiprocessing.cpu_count() - cpu_free = no_cpu - self.acquired #(1 - psutil.cpu_percent(.5) / 100) * no_cpu - no_current_process = len([i for i in psutil.get_pid_list() if name(i).lower().startswith(self.process_name.lower())]) - return no_cpu, cpu_free, no_current_process + cpu_free = (1 - psutil.cpu_percent(.5) / 100) * no_cpu + no_current_process = len([i for i in psutil.pids() if name(i).lower().startswith(self.process_name.lower())]) + return no_cpu, cpu_free, self.acquired diff --git a/wetb/utils/cluster_tools/pbsjob.py b/wetb/utils/cluster_tools/pbsjob.py index 7b6d6f8e8827eecad29c19fab91f6e3ca280577c..901c00bc62eea095b078c05d4356c269cbf4a63b 100644 --- a/wetb/utils/cluster_tools/pbsjob.py +++ b/wetb/utils/cluster_tools/pbsjob.py @@ -12,14 +12,15 @@ RUNNING = "Running" DONE = "Done" -class SSHPBSJob(SSHClient): +class SSHPBSJob(object): _status = NOT_SUBMITTED nodeid = None jobid = None - def __init__(self, host, username, password, port=22): - SSHClient.__init__(self, host, username, password, port=port) + def __init__(self, sshClient): + self.ssh = sshClient + def submit(self, job, cwd, pbs_out_file): self.cwd = cwd @@ -31,25 +32,27 @@ class SSHPBSJob(SSHClient): if cwd != "": cmds.append("cd %s" % cwd) cmds.append("qsub %s" % job) - _, out, _ = self.execute(";".join(cmds)) + ssh = SSHClient(self.ssh.host, self.ssh.username, self.ssh.password, self.ssh.port) + _, out, _ = ssh.execute(";".join(cmds)) self.jobid = out.split(".")[0] self._status = PENDING @property def status(self): + if self._status in [NOT_SUBMITTED, DONE]: return self._status - with self: + with self.ssh: if self.is_executing(): self._status = RUNNING - elif self.file_exists(self.pbs_out_file): + elif self.ssh.file_exists(self.pbs_out_file): self._status = DONE self.jobid = None return self._status def get_nodeid(self): try: - _, out, _ = self.execute("qstat -f %s | grep exec_host" % self.jobid) + _, out, _ = self.ssh.execute("qstat -f %s | grep exec_host" % self.jobid) return out.strip().replace("exec_host = ", "").split(".")[0] except Warning as e: if 'qstat: Unknown Job Id' in str(e): @@ -59,7 +62,7 @@ class SSHPBSJob(SSHClient): def stop(self): if self.jobid: try: - self.execute("qdel %s" % self.jobid) + self.ssh.execute("qdel %s" % self.jobid) except Warning as e: if 'qdel: Unknown Job Id' in str(e): return @@ -68,7 +71,7 @@ class SSHPBSJob(SSHClient): def is_executing(self): try: - self.execute("qstat %s" % self.jobid) + self.ssh.execute("qstat %s" % self.jobid) return True except Warning as e: if 'qstat: Unknown Job Id' in str(e): diff --git a/wetb/utils/cluster_tools/ssh_client.py b/wetb/utils/cluster_tools/ssh_client.py index cdda5b515e55fb9a73279d7dc122f93a091ed0c5..97d6ed8d8e5e968c5205c9eb0d3d849cfe412046 100644 --- a/wetb/utils/cluster_tools/ssh_client.py +++ b/wetb/utils/cluster_tools/ssh_client.py @@ -8,6 +8,10 @@ from io import StringIO import paramiko import os import sys +import threading +from _collections import deque +import time +import traceback class SSHClient(object): "A wrapper of paramiko.SSHClient" @@ -31,6 +35,7 @@ class SSHClient(object): self.disconnect += 1 if self.client is None: self.connect() + return self.client def connect(self): if self.password is None: @@ -50,14 +55,20 @@ class SSHClient(object): self.close() - def download(self, remotefilepath, localfile, verbose=False): + def download(self, remotefilepath, localfile, verbose=False, retry=1): if verbose: print ("Download %s > %s" % (remotefilepath, str(localfile))) with self: - if isinstance(localfile, (str, bytes, int)): - ret = self.sftp.get(remotefilepath, localfile) - elif hasattr(localfile, 'write'): - ret = self.sftp.putfo(remotefilepath, localfile) + for i in range(retry): + try: + if isinstance(localfile, (str, bytes, int)): + ret = self.sftp.get(remotefilepath, localfile) + elif hasattr(localfile, 'write'): + ret = self.sftp.putfo(remotefilepath, localfile) + break + except: + pass + print ("retry", i) if verbose: print (ret) @@ -96,8 +107,12 @@ class SSHClient(object): if verbose: print (">>> " + command) - with self: - stdin, stdout, stderr = self.client.exec_command(command) + with self as ssh: + if ssh is None: + exc_info = sys.exc_info() + traceback.print_exception(*exc_info) + raise Exception("ssh_client exe ssh is NOne") + stdin, stdout, stderr = ssh.exec_command(command) if feed_password: stdin.write(self.password + "\n") stdin.flush() @@ -143,6 +158,39 @@ class SSHClient(object): return files +class SharedSSHClient(SSHClient): + def __init__(self, host, username, password=None, port=22, key=None, passphrase=None): + SSHClient.__init__(self, host, username, password=password, port=port, key=key, passphrase=passphrase) + self.shared_ssh_lock = threading.RLock() + self.shared_ssh_queue = deque() + self.next = None + + + def execute(self, command, sudo=False, verbose=False): + res = SSHClient.execute(self, command, sudo=sudo, verbose=verbose) + return res + + def __enter__(self): + with self.shared_ssh_lock: + if self.next == threading.currentThread(): + return self.client + self.shared_ssh_queue.append(threading.current_thread()) + if self.next is None: + self.next = self.shared_ssh_queue.popleft() + + while self.next != threading.currentThread(): + time.sleep(1) + return self.client + + def __exit__(self, *args): + with self.shared_ssh_lock: + if next != threading.current_thread(): + with self.shared_ssh_lock: + if len(self.shared_ssh_queue) > 0: + self.next = self.shared_ssh_queue.popleft() + else: + self.next = None + if __name__ == "__main__": from mmpe.ui.qt_ui import QtInputUI q = QtInputUI(None)