Skip to content
Snippets Groups Projects
Commit 56fb2c22 authored by David Verelst's avatar David Verelst
Browse files

prepost.simchunks: fixes and small speed improvements

parent b321b28c
No related branches found
No related tags found
No related merge requests found
...@@ -478,10 +478,11 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False, ...@@ -478,10 +478,11 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False,
store.close() store.close()
return None, None return None, None
# TODO: make this class more general so you can also just give a list of files
class MergeDataFrames(object): # to be merged, excluding the tar archives.
"""Merge DataFrames, either in h5 or csv format, located in files or class AppendDataFrames(object):
(compressed) archives. """Merge DataFrames, either in h5 or csv format, located in (compressed)
tar archives.
""" """
def __init__(self, tqdm=False): def __init__(self, tqdm=False):
...@@ -492,36 +493,95 @@ class MergeDataFrames(object): ...@@ -492,36 +493,95 @@ class MergeDataFrames(object):
return itereable return itereable
self.tqdm = tqdm self.tqdm = tqdm
def txt(self, fjoined, path, tarmode='r:xz', header=None): def df2store(self, store, path, tarmode='r:xz', min_itemsize={},
"""Read as strings, write to another file as strings. colnames=None, header='infer', columns=None, sep=';',
index2col=None, ignore_index=True, fname_col=False):
"""
""" """
i = None # TODO: it seems that with treading you could parallelize this kind
# of work: http://stackoverflow.com/q/23598063/3156685
# http://stackoverflow.com/questions/23598063/
# multithreaded-web-scraper-to-store-values-to-pandas-dataframe
# http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html
df = pd.DataFrame()
for fname in self.tqdm(glob.glob(path)):
with tarfile.open(fname, mode=tarmode) as tar:
df = pd.DataFrame()
for tarinfo in tar.getmembers():
fileobj = tar.extractfile(tarinfo)
if tarinfo.name[-2:] == 'h5':
tmp = pd.read_hdf(fileobj, 'table', columns=columns)
elif tarinfo.name[-3:] == 'csv':
tmp = pd.read_csv(fileobj, sep=sep, names=colnames,
header=header, usecols=columns)
else:
continue
if index2col is not None:
# if the index does not have a name we can still set it
tmp[index2col] = tmp.index
tmp[index2col] = tmp[index2col].astype(str)
tmp.reset_index(level=0, drop=True, inplace=True)
# add the file name as a column
if fname_col:
case_id = os.path.basename(tarinfo.name)
tmp[fname_col] = '.'.join(case_id.split('.')[:-1])
tmp[fname_col] = tmp[fname_col].astype(str)
df = df.append(tmp, ignore_index=ignore_index)
store.append('table', df, min_itemsize=min_itemsize)
# if len(df) > w_every:
# # and merge into the big ass DataFrame
# store.append('table', df, min_itemsize=min_itemsize)
# df = pd.DataFrame()
return store
def txt2txt(self, fjoined, path, tarmode='r:xz', header=None, sep=';',
fname_col=False):
"""Read as strings, write to another file as strings.
"""
if header is not None:
write_header = True
icut = header + 1
else:
# when header is None, there is no header
icut = 0
write_header = False
with open(fjoined, 'w') as f: with open(fjoined, 'w') as f:
for fname in self.tqdm(glob.glob(path)): for fname in self.tqdm(glob.glob(path)):
with tarfile.open(fname, mode=tarmode) as tar: with tarfile.open(fname, mode=tarmode) as tar:
for tarinfo in tar.getmembers(): for tarinfo in tar.getmembers():
lines = tar.extractfile(tarinfo).readlines() linesb = tar.extractfile(tarinfo).readlines()
# convert from bytes to strings
lines = [line.decode() for line in linesb]
# only include the header at the first round # only include the header at the first round
if isinstance(header, int): if write_header:
i = header line = lines[header]
header = None # add extra column with the file name if applicable
if fname_col:
rpl = sep + fname_col + '\n'
line = line.replace('\n', rpl)
f.write(line)
write_header = False
# but cut out the header on all other occurances # but cut out the header on all other occurances
elif isinstance(i, int): for line in lines[icut:]:
lines = lines[i:] if fname_col:
# when the header is None, there is never an header case_id = os.path.basename(tarinfo.name)
else: case_id = '.'.join(case_id.split('.')[:-1])
pass line = line.replace('\n', sep + case_id + '\n')
for line in lines: f.write(line)
f.write(line.decode())
f.flush() f.flush()
def csv2df_chunks(self, store, fcsv, chunksize=20000, min_itemsize={}, def csv2df_chunks(self, store, fcsv, chunksize=100000, min_itemsize={},
colnames=None, dtypes={}): colnames=None, dtypes={}, header='infer', sep=';'):
"""Convert a large csv file to a pandas.DataFrame in chunks using """Convert a large csv file to a pandas.DataFrame in chunks using
a pandas.HDFStore. a pandas.HDFStore.
""" """
for df_chunk in self.tqdm(pd.read_csv(fcsv, chunksize=chunksize)): df_iter = pd.read_csv(fcsv, chunksize=chunksize, sep=sep,
names=colnames, header=header)
for df_chunk in self.tqdm(df_iter):
for col, dtype in dtypes.items(): for col, dtype in dtypes.items():
df_chunk[col] = df_chunk[col].astype(dtype) df_chunk[col] = df_chunk[col].astype(dtype)
store.append('table', df_chunk, min_itemsize=min_itemsize) store.append('table', df_chunk, min_itemsize=min_itemsize)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment