Skip to content
Snippets Groups Projects
Commit b321b28c authored by David Verelst's avatar David Verelst
Browse files

prepost.simchunks: add class to handle more general merger of df's

parent a4e47070
No related branches found
No related tags found
No related merge requests found
...@@ -394,8 +394,9 @@ def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20, ...@@ -394,8 +394,9 @@ def create_chunks_htc_pbs(cases, sort_by_values=['[Windspeed]'], ppn=20,
df_ind.to_csv(fname+'.csv') df_ind.to_csv(fname+'.csv')
def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False,
tqdm=False, header='infer', sep=','): header='infer', names=None, sep=',', min_itemsize={},
verbose=False, dtypes={}):
"""Merge all csv files from various tar archives into a big pd.DataFrame """Merge all csv files from various tar archives into a big pd.DataFrame
store. store.
...@@ -421,9 +422,17 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', ...@@ -421,9 +422,17 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz',
If True, an interactive progress bar will be displayed (requires the If True, an interactive progress bar will be displayed (requires the
tqdm module). If set to False no progress bar will be displayed. tqdm module). If set to False no progress bar will be displayed.
header : str, default='infer' header : int, default='infer'
Argument passed on to pandas.read_csv. Set to None if the compressed Argument passed on to pandas.read_csv. Default to 'infer', set to
chunks do not contain any headers. None if there is no header, set to 0 if header is on first row.
names : list of column names, default=None
Argument passed on to pandas.read_csv. Default to None. List with
column names to be used in the DataFrame.
min_itemsize : dict, default={}
Argument passed on to pandas.HDFStore.append. Set the minimum lenght
for a given column in the DataFrame.
sep : str, default=',' sep : str, default=','
Argument passed on to pandas.read_csv. Set to ';' when handling the Argument passed on to pandas.read_csv. Set to ';' when handling the
...@@ -441,13 +450,82 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', ...@@ -441,13 +450,82 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz',
return itereable return itereable
for tar_fname in tqdm(glob.glob(os.path.join(path, pattern))): for tar_fname in tqdm(glob.glob(os.path.join(path, pattern))):
if verbose:
print(tar_fname)
with tarfile.open(tar_fname, mode=tarmode) as tar: with tarfile.open(tar_fname, mode=tarmode) as tar:
df = pd.DataFrame() df = pd.DataFrame()
for tarinfo in tar.getmembers(): for tarinfo in tar.getmembers():
fileobj = tar.extractfile(tarinfo) fileobj = tar.extractfile(tarinfo)
df = df.append(pd.read_csv(fileobj, header=header, sep=sep)) tmp = pd.read_csv(fileobj, header=header, names=names, sep=sep)
store.append('table', df, min_itemsize={}) for col, dtype in dtypes.items():
tmp[col] = tmp[col].astype(dtype)
df = df.append(tmp)
try:
if verbose:
print('writing...')
store.append('table', df, min_itemsize=min_itemsize)
except Exception as e:
if verbose:
print('store columns:')
print(store.select('table', start=0, stop=0).columns)
print('columns of the DataFrame being added:')
print(df.columns)
storecols = store.select('table', start=0, stop=0).columns
store.close()
print(e)
return df, storecols
store.close() store.close()
return None, None
class MergeDataFrames(object):
"""Merge DataFrames, either in h5 or csv format, located in files or
(compressed) archives.
"""
def __init__(self, tqdm=False):
if tqdm:
from tqdm import tqdm
else:
def tqdm(itereable):
return itereable
self.tqdm = tqdm
def txt(self, fjoined, path, tarmode='r:xz', header=None):
"""Read as strings, write to another file as strings.
"""
i = None
with open(fjoined, 'w') as f:
for fname in self.tqdm(glob.glob(path)):
with tarfile.open(fname, mode=tarmode) as tar:
for tarinfo in tar.getmembers():
lines = tar.extractfile(tarinfo).readlines()
# only include the header at the first round
if isinstance(header, int):
i = header
header = None
# but cut out the header on all other occurances
elif isinstance(i, int):
lines = lines[i:]
# when the header is None, there is never an header
else:
pass
for line in lines:
f.write(line.decode())
f.flush()
def csv2df_chunks(self, store, fcsv, chunksize=20000, min_itemsize={},
colnames=None, dtypes={}):
"""Convert a large csv file to a pandas.DataFrame in chunks using
a pandas.HDFStore.
"""
for df_chunk in self.tqdm(pd.read_csv(fcsv, chunksize=chunksize)):
for col, dtype in dtypes.items():
df_chunk[col] = df_chunk[col].astype(dtype)
store.append('table', df_chunk, min_itemsize=min_itemsize)
return store
if __name__ == '__main__': if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment