diff --git a/wetb/prepost/simchunks.py b/wetb/prepost/simchunks.py index 0fc2f8eff7501e3ccc26cb14de71518a6c2df8f1..3207c50e2b623f62f1ad4f8c4ce4d9dbb9bda462 100644 --- a/wetb/prepost/simchunks.py +++ b/wetb/prepost/simchunks.py @@ -24,6 +24,8 @@ import zipfile import copy import tarfile import glob +import shutil +import tempfile import numpy as np import pandas as pd @@ -548,7 +550,6 @@ def merge_from_tarfiles(df_fname, path, pattern, tarmode='r:xz', tqdm=False, return df, storecols store.close() - return None, None # TODO: make this class more general so you can also just give a list of files @@ -566,10 +567,27 @@ class AppendDataFrames(object): return itereable self.tqdm = tqdm + def _open(self, fname, tarmode='r:xz'): + """Open text file directly or from a tar archive. Return iterable + since a tar archive might contain several csv text files + """ + + if fname.find('.tar') > -1: + with tarfile.open(fname, mode=tarmode) as tar: + for tarinfo in tar.getmembers(): + linesb = tar.extractfile(tarinfo).readlines() + # convert from bytes to strings + lines = [line.decode() for line in linesb] + yield lines, tarinfo.name + else: + with open(fname, 'r') as f: + lines = f.readlines() + yield lines, os.path.basename(fname) + def df2store(self, store, path, tarmode='r:xz', min_itemsize={}, colnames=None, header='infer', columns=None, sep=';', index2col=None, ignore_index=True, fname_col=False): - """ + """This is very slow, use txt2txt instead. """ # TODO: it seems that with threading you could parallelize this kind @@ -614,40 +632,69 @@ class AppendDataFrames(object): # FIXME: when merging log file analysis (files with header), we are still # skipping over one case def txt2txt(self, fjoined, path, tarmode='r:xz', header=None, sep=';', - fname_col=False): + fname_col=False, header_fjoined=None, recursive=False): """Read as strings, write to another file as strings. + + Parameters + ---------- + + fjoined + + path + + tarmode + + header : int, default=None + Indicate if data files contain a header and on which line it is + located. Set to None if data files do not contain header, and in + that case the joined file will not contain a header either. All + lines above the header are ignored. + + sep + + fname_col + + header_fjoined : str, default=None + If the data files do not contain a header write out header_fjoined + as the header of the joined file. + + recursive + """ - if header is not None: + if isinstance(header, int): write_header = True icut = header + 1 else: # when header is None, there is no header icut = 0 write_header = False - with open(fjoined, 'w') as f: - for fname in self.tqdm(glob.glob(path)): - with tarfile.open(fname, mode=tarmode) as tar: - for tarinfo in tar.getmembers(): - linesb = tar.extractfile(tarinfo).readlines() - # convert from bytes to strings - lines = [line.decode() for line in linesb] - # only include the header at the first round - if write_header: - line = lines[header] - # add extra column with the file name if applicable - if fname_col: - rpl = sep + fname_col + '\n' - line = line.replace('\n', rpl) - f.write(line) - write_header = False - # but cut out the header on all other occurances - for line in lines[icut:]: - if fname_col: - case_id = os.path.basename(tarinfo.name) - case_id = '.'.join(case_id.split('.')[:-1]) - line = line.replace('\n', sep + case_id + '\n') - f.write(line) - f.flush() + if isinstance(header_fjoined, str): + write_header = True + + with tempfile.NamedTemporaryFile(mode='w', delete=False) as ft: + ftname = ft.name + for fname in self.tqdm(glob.glob(path, recursive=recursive)): + for lines, case_id in self._open(fname, tarmode=tarmode): + # only include the header at the first round + if write_header: + if header_fjoined is None: + header_fjoined = lines[header] + # add extra column with the file name if applicable + if fname_col: + rpl = sep + fname_col + '\n' + header_fjoined = header_fjoined.replace('\n', rpl) + ft.write(header_fjoined) + write_header = False + # but cut out the header on all other occurances + case_id = '.'.join(case_id.split('.')[:-1]) + for line in lines[icut:]: + if fname_col: + line = line.replace('\n', sep + case_id + '\n') + ft.write(line) + ft.flush() + + # and move from temp dir to fjoined + shutil.move(ftname, fjoined) def csv2df_chunks(self, store, fcsv, chunksize=100000, min_itemsize={}, colnames=None, dtypes={}, header='infer', sep=';'): diff --git a/wetb/prepost/windIO.py b/wetb/prepost/windIO.py index c0ae4ba4b0a385ece0c452febc02838e70b91b79..2e4c71b7e4b3713eccfefd567f03f8900008bb9b 100755 --- a/wetb/prepost/windIO.py +++ b/wetb/prepost/windIO.py @@ -380,11 +380,11 @@ class LogFile(object): contents = contents + '\n' return contents - def csv2df(self, fname): + def csv2df(self, fname, header=0): """Read a csv log file analysis and convert to a pandas.DataFrame """ colnames, min_itemsize, dtypes = self.headers4df() - df = pd.read_csv(fname, header=0, names=colnames, sep=';', ) + df = pd.read_csv(fname, header=header, names=colnames, sep=';', ) for col, dtype in dtypes.items(): df[col] = df[col].astype(dtype) # replace nan with empty for str columns