From b321b28cd8a5334eb1788171e3b0bb10a0660690 Mon Sep 17 00:00:00 2001 From: dave <dave@dtu.dk> Date: Tue, 9 Aug 2016 18:54:59 +0200 Subject: [PATCH] prepost.simchunks: add class to handle more general merger of df's --- wetb/prepost/simchunks.py | 92 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 85 insertions(+), 7 deletions(-) diff --git a/wetb/prepost/simchunks.py b/wetb/prepost/simchunks.py index 0535cc2..2624e86 100644 --- a/wetb/prepost/simchunks.py +++ b/wetb/prepost/simchunks.py @@ -394,8 +394,9 @@ def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20, df_ind.to_csv(fname+'.csv') -def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', - tqdm=False, header='infer', sep=','): +def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False, + header='infer', names=None, sep=',', min_itemsize={}, + verbose=False, dtypes={}): """Merge all csv files from various tar archives into a big pd.DataFrame store. @@ -421,9 +422,17 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', If True, an interactive progress bar will be displayed (requires the tqdm module). If set to False no progress bar will be displayed. - header : str, default='infer' - Argument passed on to pandas.read_csv. Set to None if the compressed - chunks do not contain any headers. + header : int, default='infer' + Argument passed on to pandas.read_csv. Default to 'infer', set to + None if there is no header, set to 0 if header is on first row. + + names : list of column names, default=None + Argument passed on to pandas.read_csv. Default to None. List with + column names to be used in the DataFrame. + + min_itemsize : dict, default={} + Argument passed on to pandas.HDFStore.append. Set the minimum lenght + for a given column in the DataFrame. sep : str, default=',' Argument passed on to pandas.read_csv. Set to ';' when handling the @@ -441,13 +450,82 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', return itereable for tar_fname in tqdm(glob.glob(os.path.join(path, pattern))): + if verbose: + print(tar_fname) with tarfile.open(tar_fname, mode=tarmode) as tar: df = pd.DataFrame() for tarinfo in tar.getmembers(): fileobj = tar.extractfile(tarinfo) - df = df.append(pd.read_csv(fileobj, header=header, sep=sep)) - store.append('table', df, min_itemsize={}) + tmp = pd.read_csv(fileobj, header=header, names=names, sep=sep) + for col, dtype in dtypes.items(): + tmp[col] = tmp[col].astype(dtype) + df = df.append(tmp) + try: + if verbose: + print('writing...') + store.append('table', df, min_itemsize=min_itemsize) + except Exception as e: + if verbose: + print('store columns:') + print(store.select('table', start=0, stop=0).columns) + print('columns of the DataFrame being added:') + print(df.columns) + storecols = store.select('table', start=0, stop=0).columns + store.close() + print(e) + return df, storecols + store.close() + return None, None + + +class MergeDataFrames(object): + """Merge DataFrames, either in h5 or csv format, located in files or + (compressed) archives. + """ + + def __init__(self, tqdm=False): + if tqdm: + from tqdm import tqdm + else: + def tqdm(itereable): + return itereable + self.tqdm = tqdm + + def txt(self, fjoined, path, tarmode='r:xz', header=None): + """Read as strings, write to another file as strings. + """ + + i = None + with open(fjoined, 'w') as f: + for fname in self.tqdm(glob.glob(path)): + with tarfile.open(fname, mode=tarmode) as tar: + for tarinfo in tar.getmembers(): + lines = tar.extractfile(tarinfo).readlines() + # only include the header at the first round + if isinstance(header, int): + i = header + header = None + # but cut out the header on all other occurances + elif isinstance(i, int): + lines = lines[i:] + # when the header is None, there is never an header + else: + pass + for line in lines: + f.write(line.decode()) + f.flush() + + def csv2df_chunks(self, store, fcsv, chunksize=20000, min_itemsize={}, + colnames=None, dtypes={}): + """Convert a large csv file to a pandas.DataFrame in chunks using + a pandas.HDFStore. + """ + for df_chunk in self.tqdm(pd.read_csv(fcsv, chunksize=chunksize)): + for col, dtype in dtypes.items(): + df_chunk[col] = df_chunk[col].astype(dtype) + store.append('table', df_chunk, min_itemsize=min_itemsize) + return store if __name__ == '__main__': -- GitLab