From 56fb2c221e5e88883d850ce3f409ddbadb602039 Mon Sep 17 00:00:00 2001 From: dave <dave@dtu.dk> Date: Wed, 10 Aug 2016 11:13:31 +0200 Subject: [PATCH] prepost.simchunks: fixes and small speed improvements --- wetb/prepost/simchunks.py | 102 ++++++++++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 21 deletions(-) diff --git a/wetb/prepost/simchunks.py b/wetb/prepost/simchunks.py index 2624e86..b510cd3 100644 --- a/wetb/prepost/simchunks.py +++ b/wetb/prepost/simchunks.py @@ -478,10 +478,11 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False, store.close() return None, None - -class MergeDataFrames(object): - """Merge DataFrames, either in h5 or csv format, located in files or - (compressed) archives. +# TODO: make this class more general so you can also just give a list of files +# to be merged, excluding the tar archives. +class AppendDataFrames(object): + """Merge DataFrames, either in h5 or csv format, located in (compressed) + tar archives. """ def __init__(self, tqdm=False): @@ -492,36 +493,95 @@ class MergeDataFrames(object): return itereable self.tqdm = tqdm - def txt(self, fjoined, path, tarmode='r:xz', header=None): - """Read as strings, write to another file as strings. + def df2store(self, store, path, tarmode='r:xz', min_itemsize={}, + colnames=None, header='infer', columns=None, sep=';', + index2col=None, ignore_index=True, fname_col=False): + """ """ - i = None + # TODO: it seems that with treading you could parallelize this kind + # of work: http://stackoverflow.com/q/23598063/3156685 + # http://stackoverflow.com/questions/23598063/ + # multithreaded-web-scraper-to-store-values-to-pandas-dataframe + + # http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html + + df = pd.DataFrame() + for fname in self.tqdm(glob.glob(path)): + with tarfile.open(fname, mode=tarmode) as tar: + df = pd.DataFrame() + for tarinfo in tar.getmembers(): + fileobj = tar.extractfile(tarinfo) + if tarinfo.name[-2:] == 'h5': + tmp = pd.read_hdf(fileobj, 'table', columns=columns) + elif tarinfo.name[-3:] == 'csv': + tmp = pd.read_csv(fileobj, sep=sep, names=colnames, + header=header, usecols=columns) + else: + continue + if index2col is not None: + # if the index does not have a name we can still set it + tmp[index2col] = tmp.index + tmp[index2col] = tmp[index2col].astype(str) + tmp.reset_index(level=0, drop=True, inplace=True) + # add the file name as a column + if fname_col: + case_id = os.path.basename(tarinfo.name) + tmp[fname_col] = '.'.join(case_id.split('.')[:-1]) + tmp[fname_col] = tmp[fname_col].astype(str) + df = df.append(tmp, ignore_index=ignore_index) + + store.append('table', df, min_itemsize=min_itemsize) +# if len(df) > w_every: +# # and merge into the big ass DataFrame +# store.append('table', df, min_itemsize=min_itemsize) +# df = pd.DataFrame() + return store + + def txt2txt(self, fjoined, path, tarmode='r:xz', header=None, sep=';', + fname_col=False): + """Read as strings, write to another file as strings. + """ + if header is not None: + write_header = True + icut = header + 1 + else: + # when header is None, there is no header + icut = 0 + write_header = False with open(fjoined, 'w') as f: for fname in self.tqdm(glob.glob(path)): with tarfile.open(fname, mode=tarmode) as tar: for tarinfo in tar.getmembers(): - lines = tar.extractfile(tarinfo).readlines() + linesb = tar.extractfile(tarinfo).readlines() + # convert from bytes to strings + lines = [line.decode() for line in linesb] # only include the header at the first round - if isinstance(header, int): - i = header - header = None + if write_header: + line = lines[header] + # add extra column with the file name if applicable + if fname_col: + rpl = sep + fname_col + '\n' + line = line.replace('\n', rpl) + f.write(line) + write_header = False # but cut out the header on all other occurances - elif isinstance(i, int): - lines = lines[i:] - # when the header is None, there is never an header - else: - pass - for line in lines: - f.write(line.decode()) + for line in lines[icut:]: + if fname_col: + case_id = os.path.basename(tarinfo.name) + case_id = '.'.join(case_id.split('.')[:-1]) + line = line.replace('\n', sep + case_id + '\n') + f.write(line) f.flush() - def csv2df_chunks(self, store, fcsv, chunksize=20000, min_itemsize={}, - colnames=None, dtypes={}): + def csv2df_chunks(self, store, fcsv, chunksize=100000, min_itemsize={}, + colnames=None, dtypes={}, header='infer', sep=';'): """Convert a large csv file to a pandas.DataFrame in chunks using a pandas.HDFStore. """ - for df_chunk in self.tqdm(pd.read_csv(fcsv, chunksize=chunksize)): + df_iter = pd.read_csv(fcsv, chunksize=chunksize, sep=sep, + names=colnames, header=header) + for df_chunk in self.tqdm(df_iter): for col, dtype in dtypes.items(): df_chunk[col] = df_chunk[col].astype(dtype) store.append('table', df_chunk, min_itemsize=min_itemsize) -- GitLab