# -*- coding: utf-8 -*- """ Created on Mon Aug 8 10:22:49 2016 @author: dave """ from __future__ import print_function from __future__ import division from __future__ import unicode_literals from __future__ import absolute_import from builtins import dict from io import open from builtins import zip from builtins import range from builtins import str from builtins import int from future import standard_library standard_library.install_aliases() from builtins import object # standard python library import os import zipfile import copy import tarfile import glob import shutil import tempfile import numpy as np import pandas as pd #from tqdm import tqdm from wetb.prepost.Simulations import Cases def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20, i0=0, nr_procs_series=9, queue='workq', pyenv='wetb_py3', walltime='24:00:00', chunks_dir='zip-chunks-jess', compress=False, wine_64bit=False): """Group a large number of simulations htc and pbs launch scripts into different zip files so we can run them with find+xargs on various nodes. """ def chunker(seq, size): # for DataFrames you can also use groupby, as taken from: # http://stackoverflow.com/a/25703030/3156685 # for k,g in df.groupby(np.arange(len(df))//10) # but this approach is faster, see also: # http://stackoverflow.com/a/25701576/3156685 # http://stackoverflow.com/a/434328/3156685 return (seq[pos:pos + size] for pos in range(0, len(seq), size)) def make_zip_chunks(df, ii, sim_id, run_dir, model_zip): """Create zip cunks and also create an index """ # create a new zip file, give index of the first element. THis is # quasi random due to the sorting we applied earlier # ii = df.index[0] rpl = (sim_id, ii) fname = os.path.join(run_dir, chunks_dir, '%s_chnk_%05i' % rpl) zf = zipfile.ZipFile(fname+'.zip', 'w', compression=zipfile.ZIP_STORED) # start with appending the base model zip file fname_model = os.path.join(run_dir, model_zip) with zipfile.ZipFile(fname_model, 'r') as zf_model: for n in zf_model.namelist(): zf.writestr(n, zf_model.open(n).read()) # create all necessary directories in the zip file dirtags = ['[htc_dir]', '[res_dir]','[log_dir]','[animation_dir]', '[pbs_in_dir]', '[eigenfreq_dir]','[turb_dir]','[wake_dir]', '[meander_dir]','[hydro_dir]', '[mooring_dir]', '[pbs_in_dir]', '[pbs_out_dir]'] dirnames = [] for tag in dirtags: for dirname in set(df[tag].unique().tolist()): if not dirname or dirname.lower() not in ['false', 'none', 0]: dirnames.append(dirname) for dirname in set(dirnames): if dirname != 0: zf.write('.', os.path.join(dirname, '.')) # and the post-processing data # FIXME: do not use hard coded paths! zf.write('.', 'prepost-data/') # HTC files df_src = df['[run_dir]'] + df['[htc_dir]'] + df['[case_id]'] df_dst = df['[htc_dir]'] + df['[case_id]'] # create an index so given the htc file, we can find the chunk nr df_index = pd.DataFrame(index=df['[case_id]'].copy(), columns=['chnk_nr', 'name']) df_index['chnk_nr'] = ii df_index['name'] = os.path.join(chunks_dir, '%s_chnk_%05i' % rpl) # Since df_src and df_dst are already Series, iterating is fast an it # is slower to first convert to a list for src, dst_rel in zip(df_src, df_dst): zf.write(src+'.htc', dst_rel+'.htc') # PBS files df_src = df['[run_dir]'] + df['[pbs_in_dir]'] + df['[case_id]'] df_dst = df['[pbs_in_dir]'] + df['[case_id]'] # Since df_src and df_dst are already Series, iterating is fast an it # is slower to first convert to a list for src, dst_rel in zip(df_src, df_dst): zf.write(src+'.p', dst_rel+'.p') # copy and rename input files with given versioned name to the # all files that will have to be renamed to their non-changeable # default file name. # this is a bit more tricky since unique() will not work on list items copyto_files_tmp = df['[copyto_files]'].astype(str) copyto_files = [] # cycle through the unique elements for k in set(copyto_files_tmp): # k is of form: "['some/file.txt', 'another/file1.txt']" if len(k) < 2: continue items = [kk[1:-1] for kk in k.split('[')[1].split(']')[0].split(', ')] copyto_files.extend(items) # we might still have non unique elements copyto_files = set(copyto_files) for copyto_file, dst_rel in zip(copyto_files, df_dst): src = os.path.join(run_dir, copyto_file) # make dir if it does not exist zf.write('.', os.path.dirname(copyto_file), '.') zf.write(src, copyto_file) zf.close() return fname, df_index pbs_tmplate =""" ### Standard Output #PBS -N [job_name] #PBS -o [std_out] ### Standard Error #PBS -e [std_err] #PBS -W umask=[umask] ### Maximum wallclock time format HOURS:MINUTES:SECONDS #PBS -l walltime=[walltime] #PBS -l nodes=[nodes]:ppn=[ppn] ### Queue name #PBS -q [queue] """ def make_pbs_chunks(df, ii, sim_id, run_dir, model_zip, compress=False, wine_64bit=False): """Create a PBS that: * copies all required files (zip chunk) to scratch disk * copies all required turbulence files to scratch disk * runs everything with find+xargs * copies back what's need to mimer """ # ii = df.index[0] cmd_find = '/home/MET/sysalt/bin/find' cmd_xargs = '/home/MET/sysalt/bin/xargs' jobid = '%s_chnk_%05i' % (sim_id, ii) wineparam = ('win32', '~/.wine32') if wine_64bit: wineparam = ('win64', '~/.wine') pbase = os.path.join('/scratch','$USER', '$PBS_JOBID', '') post_dir_base = post_dir.split(sim_id)[1] if post_dir_base[0] == os.path.sep: post_dir_base = post_dir_base[1:] pbs_in_base = os.path.commonpath(df['[pbs_in_dir]'].unique().tolist()) pbs_in_base = os.path.join(pbs_in_base, '') htc_base = os.path.commonpath(df['[htc_dir]'].unique().tolist()) htc_base = os.path.join(htc_base, '') res_base = os.path.commonpath(df['[res_dir]'].unique().tolist()) res_base = os.path.join(res_base, '') log_base = os.path.commonpath(df['[log_dir]'].unique().tolist()) log_base = os.path.join(log_base, '') # ===================================================================== # PBS HEADER pbs = copy.copy(pbs_tmplate) pbs = pbs.replace('[job_name]', jobid) pbs = pbs.replace('[std_out]', './pbs_out_chunks/%s.out' % jobid) pbs = pbs.replace('[std_err]', './pbs_out_chunks/%s.err' % jobid) pbs = pbs.replace('[umask]', '0003') pbs = pbs.replace('[walltime]', walltime) pbs = pbs.replace('[nodes]', str(nodes)) pbs = pbs.replace('[ppn]', str(ppn)) pbs = pbs.replace('[queue]', queue) pbs += '\necho "%s"\n' % ('-'*70) # ===================================================================== # activate the python environment if pyenv is not None: pbs += 'echo "activate python environment %s"\n' % pyenv pbs += 'source /home/python/miniconda3/bin/activate %s\n' % pyenv # sometimes activating an environment fails due to a FileExistsError # is this because it is activated at the same time on another node? # check twice if the environment got activated for real pbs += 'echo "CHECK 2x IF %s IS ACTIVE, IF NOT TRY AGAIN"\n' % pyenv pbs += 'CMD=\"from distutils.sysconfig import get_python_lib;' pbs += 'print (get_python_lib().find(\'%s\'))"\n' % pyenv pbs += 'ACTIVATED=`python -c "$CMD"`\n' pbs += 'if [ $ACTIVATED -eq -1 ]; then source activate %s;fi\n' % pyenv pbs += 'ACTIVATED=`python -c "$CMD"`\n' pbs += 'if [ $ACTIVATED -eq -1 ]; then source activate %s;fi\n' % pyenv # ===================================================================== # create all necessary directories at CPU_NR dirs, turb db dirs, sim_id # browse to scratch directory pbs += '\necho "%s"\n' % ('-'*70) pbs += 'cd %s\n' % pbase pbs += "echo 'current working directory:'\n" pbs += 'pwd\n\n' pbs += 'echo "create CPU directories on the scratch disk"\n' pbs += 'mkdir -p %s\n' % os.path.join(pbase, sim_id, '') for k in range(ppn): pbs += 'mkdir -p %s\n' % os.path.join(pbase, '%i' % k, '') # pretend to be on the scratch sim_id directory to maintain the same # database turb structure pbs += '\necho "%s"\n' % ('-'*70) pbs += 'cd %s\n' % os.path.join(pbase, sim_id, '') pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "create turb_db directories"\n' db_dir_tags = ['[turb_db_dir]', '[meand_db_dir]', '[wake_db_dir]'] turb_dirs = [] for tag in db_dir_tags: for dirname in set(df[tag].unique().tolist()): if not dirname or dirname.lower() not in ['false', 'none']: turb_dirs.append(dirname) turb_dirs = set(turb_dirs) for dirname in turb_dirs: pbs += 'mkdir -p %s\n' % os.path.join(dirname, '') # ===================================================================== # get the zip-chunk file from the PBS_O_WORKDIR pbs += '\n' pbs += 'echo "%s"\n' % ('-'*70) pbs += 'cd $PBS_O_WORKDIR\n' pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "get the zip-chunk file from the PBS_O_WORKDIR"\n' # copy the relevant zip chunk file to the scratch main directory rpl = (os.path.join('./', chunks_dir, jobid), os.path.join(pbase, '')) pbs += 'cp %s.zip %s\n' % rpl # turb_db_dir might not be set, same for turb_base_name, for those # cases we do not need to copy anything from the database to the node base_name_tags = ['[turb_base_name]', '[meand_base_name]', '[wake_base_name]'] for db, base_name in zip(db_dir_tags, base_name_tags): turb_db_dirs = df[db] + df[base_name] # When set to None, the DataFrame will have text as None turb_db_src = turb_db_dirs[turb_db_dirs.str.find('None')==-1] pbs += '\n' pbs += '# copy to scratch db directory for %s, %s\n' % (db, base_name) for k in turb_db_src.unique(): dst = os.path.dirname(os.path.join(pbase, sim_id, k)) pbs += 'cp %s* %s\n' % (k, os.path.join(dst, '.')) # ===================================================================== # browse back to the scratch directory pbs += '\necho "%s"\n' % ('-'*70) pbs += 'cd %s\n' % pbase pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "unzip chunk, create dirs in cpu and sim_id folders"\n' # unzip chunk, this contains all relevant folders already, and also # contains files defined in [copyto_files] for k in list(range(ppn)) + [sim_id]: dst = os.path.join('%s' % k, '.') pbs += '/usr/bin/unzip %s -d %s >> /dev/null\n' % (jobid+'.zip', dst) # create hard links for all the turbulence files turb_dir_base = os.path.join(os.path.commonpath(list(turb_dirs)), '') pbs += '\necho "%s"\n' % ('-'*70) pbs += 'cd %s\n' % pbase pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "copy all turb files into CPU dirs"\n' for k in range(ppn): rpl = (os.path.relpath(os.path.join(sim_id, turb_dir_base)), k) pbs += 'find %s -iname *.bin -exec cp {} %s/{} \\;\n' % rpl # ===================================================================== # finally we can run find+xargs!!! pbs += '\n' pbs += 'echo "%s"\n' % ('-'*70) pbs += 'cd %s\n' % pbase pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "START RUNNING JOBS IN find+xargs MODE"\n' pbs += 'WINEARCH=%s WINEPREFIX=%s winefix\n' % wineparam pbs += '# run all the PBS *.p files in find+xargs mode\n' pbs += 'echo "following cases will be run from following path:"\n' pbs += 'echo "%s"\n' % (os.path.join(sim_id, pbs_in_base)) pbs += 'export LAUNCH_PBS_MODE=false\n' rpl = (cmd_find, os.path.join(sim_id, pbs_in_base)) pbs += "%s %s -type f -name '*.p' | sort -z\n" % rpl pbs += '\n' pbs += 'echo "number of files to be launched: "' pbs += '`find %s -type f | wc -l`\n' % os.path.join(sim_id, pbs_in_base) rpl = (cmd_find, os.path.join(sim_id, pbs_in_base), cmd_xargs, ppn) cmd = ("%s %s -type f -name '*.p' -print0 | sort -z | %s -0 -I{} " "--process-slot-var=CPU_NR -n 1 -P %i sh {}\n" % rpl) pbs += cmd pbs += 'echo "END OF JOBS IN find+xargs MODE"\n' # ===================================================================== # move results back from the node sim_id dir to the origin pbs += '\n' pbs += '\necho "%s"\n' % ('-'*70) pbs += "echo 'total scratch disk usage:'\n" pbs += 'du -hs %s\n' % pbase pbs += 'cd %s\n' % os.path.join(pbase, sim_id) pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "Results saved at sim_id directory:"\n' rpl = (os.path.join(pbs_in_base, '*'), os.path.join(htc_base, '*')) pbs += 'find \n' if compress: # compress all result files into an archive, first *.sel files # FIXME: why doesn this work with -name "*.sel" -o -name "*.dat"?? pbs += '\necho "move results into compressed archive"\n' pbs += 'find %s -name "*.sel" -print0 ' % res_base fname = os.path.join(res_base, 'resfiles_chnk_%05i' % ii) pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname # now add the *.dat files to the archive pbs += 'find %s -name "*.dat" -print0 ' % res_base fname = os.path.join(res_base, 'resfiles_chnk_%05i' % ii) pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) # compress all logfiles into an archive pbs += '\necho "move logfiles into compressed archive"\n' pbs += 'find %s -name "*.log" -print0 ' % log_base fname = os.path.join(log_base, 'logfiles_chnk_%05i' % ii) pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) # compress all post-processing results (saved as csv's) into an archive pbs += '\necho "move statsdel into compressed archive"\n' pbs += 'find %s -name "*.csv" -print0 ' % res_base fname = os.path.join(post_dir_base, 'statsdel_chnk_%05i' % ii) pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) # compress all post-processing results (saved as csv's) into an archive pbs += '\necho "move log analysis into compressed archive"\n' pbs += 'find %s -name "*.csv" -print0 ' % log_base fname = os.path.join(post_dir_base, 'loganalysis_chnk_%05i' % ii) pbs += '| xargs -0 tar --remove-files -rf %s.tar\n' % fname pbs += 'xz -z2 -T %i %s.tar\n' % (ppn, fname) pbs += '\n' pbs += '\necho "%s"\n' % ('-'*70) pbs += 'cd %s\n' % pbase pbs += "echo 'current working directory:'\n" pbs += 'pwd\n' pbs += 'echo "move results back from node scratch/sim_id to origin, ' pbs += 'but ignore htc, and pbs_in directories."\n' tmp = os.path.join(sim_id, '*') pbs += 'echo "copy from %s to $PBS_O_WORKDIR/"\n' % tmp pbs += 'time rsync -au --remove-source-files %s $PBS_O_WORKDIR/ \\\n' % tmp pbs += ' --exclude %s \\\n' % os.path.join(pbs_in_base, '*') pbs += ' --exclude *.htc \n' # when using -u, htc and pbs_in files should be ignored # pbs += 'time cp -ru %s $PBS_O_WORKDIR/\n' % tmp if pyenv is not None: pbs += 'source deactivate\n' pbs += 'echo "DONE !!"\n' pbs += '\necho "%s"\n' % ('-'*70) pbs += '# in case wine has crashed, kill any remaining wine servers\n' pbs += '# caution: ALL the users wineservers will die on this node!\n' pbs += 'echo "following wineservers are still running:"\n' pbs += 'ps -u $USER -U $USER | grep wineserver\n' pbs += 'killall -u $USER wineserver\n' pbs += 'exit\n' rpl = (sim_id, ii) fname = os.path.join(run_dir, chunks_dir, '%s_chnk_%05i' % rpl) with open(fname+'.p', 'w') as f: f.write(pbs) def make_pbs_postpro_chunks(): """When only the post-processing has to be re-done for a chunk. """ pass cc = Cases(cases) df = cc.cases2df() # sort on the specified values in the given columns df.sort_values(by=sort_by_values, inplace=True) # create the directory to store all zipped chunks try: os.mkdir(os.path.join(df['[run_dir]'].iloc[0], chunks_dir)) # FIXME: how do you make this work pythonically on both PY2 and PY3? except (FileExistsError, OSError): pass fpath = os.path.join(df['[run_dir]'].iloc[0], 'pbs_out_chunks') if not os.path.exists(fpath): os.makedirs(fpath) df_iter = chunker(df, nr_procs_series*ppn) sim_id = df['[sim_id]'].iloc[0] run_dir = df['[run_dir]'].iloc[0] model_zip = df['[model_zip]'].iloc[0] post_dir = df['[post_dir]'].iloc[0] nodes = 1 df_ind = pd.DataFrame(columns=['chnk_nr'], dtype=np.int32) df_ind.index.name = '[case_id]' for ii, dfi in enumerate(df_iter): fname, ind = make_zip_chunks(dfi, i0+ii, sim_id, run_dir, model_zip) make_pbs_chunks(dfi, i0+ii, sim_id, run_dir, model_zip, compress=compress, wine_64bit=wine_64bit) df_ind = df_ind.append(ind) print(fname) fname = os.path.join(post_dir, 'case_id-chunk-index') df_ind['chnk_nr'] = df_ind['chnk_nr'].astype(np.int32) df_ind.to_hdf(fname+'.h5', 'table', compression=9, complib='zlib') df_ind.to_csv(fname+'.csv') def regroup_tarfiles(cc): """Re-group all chunks again per [Case folder] compressed file. First all chunks are copied to the node scratch disc, then start working on them. This only works on a node with PBS stuff. Make sure to maintain the same location as defined by the tags! [res_dir] and [Case folder] could be multiple directories deep, bu the final archive will only contain the files (no directory structure), and the name of the archive is that of the last directory: /[res_dir]/[Case folder]/[Case folder].tar.xz /res/dir/case/folder/dlcname/dlcname.tar.xz Parameters ---------- path_pattern : str /path/to/files/*.tar.xz """ USER = os.getenv('USER') PBS_JOBID = os.getenv('PBS_JOBID') scratch = os.path.join('/scratch', USER, PBS_JOBID) src = os.getenv('PBS_O_WORKDIR') path_pattern = '/home/dave/SimResults/NREL5MW/D0022/prepost-data/*.xz' for ffname in tqdm(glob.glob(path_pattern)): appendix = os.path.basename(ffname).split('_')[0] with tarfile.open(ffname, mode='r:xz') as tar: # create new tar files if necessary for each [Case folder] for tarinfo in tar.getmembers(): t2_name = os.path.basename(os.path.dirname(tarinfo.name)) t2_dir = os.path.join(os.path.dirname(path_pattern), t2_name) if not os.path.isdir(t2_dir): os.makedirs(t2_dir) t2_path = os.path.join(t2_dir, t2_name + '_%s.tar' % appendix) fileobj = tar.extractfile(tarinfo) # change the location of the file in the new archive: # the location of the archive is according to the folder # structure as defined in the tags, remove any subfolders tarinfo.name = os.basename(tarinfo.name) with tarfile.open(t2_path, mode='a') as t2: t2.addfile(tarinfo, fileobj) def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False, header='infer', names=None, sep=',', min_itemsize={}, verbose=False, dtypes={}): """Merge all csv files from various tar archives into a big pd.DataFrame store. Parameters ---------- df_fname : str file name of the pd.DataFrame h5 store in which all chunks will be merged. Names usually used are: * [sim_id]_ErrorLogs.h5 * [sim_id]_statistics.h5 path : str Directory in which all chunks are located. pattern : str Search pattern used to select (using glob.glob) files in path tarmode : str, default='r:xz' File opening mode for tarfile (used when opening each of the chunks). tqdm : boolean, default=False If True, an interactive progress bar will be displayed (requires the tqdm module). If set to False no progress bar will be displayed. header : int, default='infer' Argument passed on to pandas.read_csv. Default to 'infer', set to None if there is no header, set to 0 if header is on first row. names : list of column names, default=None Argument passed on to pandas.read_csv. Default to None. List with column names to be used in the DataFrame. min_itemsize : dict, default={} Argument passed on to pandas.HDFStore.append. Set the minimum lenght for a given column in the DataFrame. sep : str, default=',' Argument passed on to pandas.read_csv. Set to ';' when handling the ErrorLogs. """ store = pd.HDFStore(os.path.join(path, df_fname), mode='w', format='table', complevel=9, complib='zlib') if tqdm: from tqdm import tqdm else: def tqdm(itereable): return itereable for tar_fname in tqdm(glob.glob(os.path.join(path, pattern))): if verbose: print(tar_fname) with tarfile.open(tar_fname, mode=tarmode) as tar: df = pd.DataFrame() for tarinfo in tar.getmembers(): fileobj = tar.extractfile(tarinfo) tmp = pd.read_csv(fileobj, header=header, names=names, sep=sep) for col, dtype in dtypes.items(): tmp[col] = tmp[col].astype(dtype) df = df.append(tmp) try: if verbose: print('writing...') store.append('table', df, min_itemsize=min_itemsize) except Exception as e: if verbose: print('store columns:') print(store.select('table', start=0, stop=0).columns) print('columns of the DataFrame being added:') print(df.columns) storecols = store.select('table', start=0, stop=0).columns store.close() print(e) return df, storecols store.close() # TODO: make this class more general so you can also just give a list of files # to be merged, excluding the tar archives. class AppendDataFrames(object): """Merge DataFrames, either in h5 or csv format, located in (compressed) tar archives. """ def __init__(self, tqdm=False): if tqdm: from tqdm import tqdm else: def tqdm(itereable): return itereable self.tqdm = tqdm def _open(self, fname, tarmode='r:xz'): """Open text file directly or from a tar archive. Return iterable since a tar archive might contain several csv text files """ if fname.find('.tar') > -1: with tarfile.open(fname, mode=tarmode) as tar: for tarinfo in tar.getmembers(): linesb = tar.extractfile(tarinfo).readlines() # convert from bytes to strings lines = [line.decode() for line in linesb] yield lines, tarinfo.name else: with open(fname, 'r') as f: lines = f.readlines() yield lines, os.path.basename(fname) def df2store(self, store, path, tarmode='r:xz', min_itemsize={}, colnames=None, header='infer', columns=None, sep=';', index2col=None, ignore_index=True, fname_col=False): """This is very slow, use txt2txt instead. """ # TODO: it seems that with threading you could parallelize this kind # of work: http://stackoverflow.com/q/23598063/3156685 # http://stackoverflow.com/questions/23598063/ # multithreaded-web-scraper-to-store-values-to-pandas-dataframe # http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html df = pd.DataFrame() for fname in self.tqdm(glob.glob(path)): with tarfile.open(fname, mode=tarmode) as tar: df = pd.DataFrame() for tarinfo in tar.getmembers(): fileobj = tar.extractfile(tarinfo) if tarinfo.name[-2:] == 'h5': tmp = pd.read_hdf(fileobj, 'table', columns=columns) elif tarinfo.name[-3:] == 'csv': tmp = pd.read_csv(fileobj, sep=sep, names=colnames, header=header, usecols=columns) else: continue if index2col is not None: # if the index does not have a name we can still set it tmp[index2col] = tmp.index tmp[index2col] = tmp[index2col].astype(str) tmp.reset_index(level=0, drop=True, inplace=True) # add the file name as a column if fname_col: case_id = os.path.basename(tarinfo.name) tmp[fname_col] = '.'.join(case_id.split('.')[:-1]) tmp[fname_col] = tmp[fname_col].astype(str) df = df.append(tmp, ignore_index=ignore_index) store.append('table', df, min_itemsize=min_itemsize) # if len(df) > w_every: # # and merge into the big ass DataFrame # store.append('table', df, min_itemsize=min_itemsize) # df = pd.DataFrame() return store # FIXME: when merging log file analysis (files with header), we are still # skipping over one case def txt2txt(self, fjoined, path, tarmode='r:xz', header=None, sep=';', fname_col=False, header_fjoined=None, recursive=False): """Read as strings, write to another file as strings. Parameters ---------- fjoined path tarmode header : int, default=None Indicate if data files contain a header and on which line it is located. Set to None if data files do not contain header, and in that case the joined file will not contain a header either. All lines above the header are ignored. sep fname_col header_fjoined : str, default=None If the data files do not contain a header write out header_fjoined as the header of the joined file. recursive """ if isinstance(header, int): write_header = True icut = header + 1 else: # when header is None, there is no header icut = 0 write_header = False if isinstance(header_fjoined, str): write_header = True with tempfile.NamedTemporaryFile(mode='w', delete=False) as ft: ftname = ft.name for fname in self.tqdm(glob.glob(path, recursive=recursive)): for lines, case_id in self._open(fname, tarmode=tarmode): # only include the header at the first round if write_header: if header_fjoined is None: header_fjoined = lines[header] # add extra column with the file name if applicable if fname_col: rpl = sep + fname_col + '\n' header_fjoined = header_fjoined.replace('\n', rpl) ft.write(header_fjoined) write_header = False # but cut out the header on all other occurances case_id = '.'.join(case_id.split('.')[:-1]) for line in lines[icut:]: if fname_col: line = line.replace('\n', sep + case_id + '\n') ft.write(line) ft.flush() # and move from temp dir to fjoined shutil.move(ftname, fjoined) def csv2df_chunks(self, store, fcsv, chunksize=100000, min_itemsize={}, colnames=None, dtypes={}, header='infer', sep=';'): """Convert a large csv file to a pandas.DataFrame in chunks using a pandas.HDFStore. """ df_iter = pd.read_csv(fcsv, chunksize=chunksize, sep=sep, names=colnames, header=header) for df_chunk in self.tqdm(df_iter): for col, dtype in dtypes.items(): df_chunk[col] = df_chunk[col].astype(dtype) store.append('table', df_chunk, min_itemsize=min_itemsize) return store if __name__ == '__main__': pass