''' Created on 11. apr. 2017 @author: mmpe ''' import struct import numpy as np import os def load(filename, dtype=np.float): if isinstance(filename, str): fid = open(filename,'rb') elif hasattr(filename, "name"): fid = filename filename = fid.name try: _ = struct.unpack('i', fid.read(4)) _ = struct.unpack('i', fid.read(4)) title = fid.read(60).strip() _ = struct.unpack('i', fid.read(4)) _ = struct.unpack('i', fid.read(4)) no_sensors = struct.unpack('i', fid.read(4))[0] sensor_numbers = [struct.unpack('i', fid.read(4))[0] for _ in range(no_sensors)] _ = struct.unpack('i', fid.read(4)) _ = struct.unpack('i', fid.read(4)) time_start = struct.unpack('f', fid.read(4))[0] time_step = struct.unpack('f', fid.read(4))[0] scale_factors = np.array([struct.unpack('f', fid.read(4))[0] for _ in range(no_sensors)], dtype=np.double) data = np.fromstring(fid.read(), 'int16').astype(dtype) finally: fid.close() # if title.isalnum(): # self.set_info(title, "", self.filename) # else: # self.set_info(os.path.basename(self.name), "", self.name) try: data = data.reshape(len(data) // no_sensors, no_sensors) except ValueError: raise ValueError("The number of data values (%d) is not divisible by the number of sensors (%d)" % (len(data), no_sensors)) for i in range(data.shape[1]): data[:, i] *= scale_factors[i] no_scans = data.shape[0] # Load sensor file if exists sensor_filename = os.path.join(os.path.dirname(filename), "sensor") sensor_info = {info[0]:info[1:] for info in read_sensor_info(sensor_filename) } # set gain and offset for "Time" gains = [] offsets = [] names, units, descriptions = [], [], [] for sensor_nr in sensor_numbers: name, unit, description, gain, offset = sensor_info.get(sensor_nr, ["Attribute %d"%sensor_nr, '-','',1,0]) if sensor_nr==1 and name=="Time" and unit=="s": data = data[:,1:] continue names.append(name) units.append(unit) descriptions.append(description) gains.append(gain) offsets.append(offset) time = np.arange(time_start, time_start + data.shape[0] * time_step, time_step).reshape((no_scans, 1)) #data = np.concatenate((time, data), axis=1) #gains = np.r_[1,gains] #offsets = np.r_[0,offsets] # self[:]*=self.gains # self[:]+=self.offsets info = {"name": title, "description": filename, "attribute_names": names, "attribute_units": units, "attribute_descriptions": descriptions} return time, data, info def read_sensor_info(sensor_file): if hasattr(sensor_file, 'readlines'): sensor_info_lines = sensor_file.readlines()[2:] else: with open(sensor_file, encoding="utf-8") as fid: sensor_info_lines = fid.readlines()[2:] sensor_info = [] for line in sensor_info_lines: # while " " in line: # line = line.replace(" "," ") line = line.strip().split() nr = int(line[0]) gain = float(line[1]) offset = float(line[2]) unit = line[5] name_desc = " ".join(line[6:]) name = name_desc[:8].split()[0] description = name_desc[8:] # name = line[6] # description = " ".join(line[7:]) sensor_info.append((nr, name, unit, description, gain, offset)) return sensor_info # def save(dataset, filename): # ds = dataset # # Write int data file # f = open(filename, 'wb') # time_att = ds.basis_attribute() # sensors = [s for s in ds.attributes() if s is not time_att] # # if isinstance(ds, FLEX4Dataset): # data = ds[:] # (ds[:]-ds.offsets)/ds.gains # else: # data = ds[:] # if time_att.index != -1: # time_att may be "Index" with index=-1 if "Time" not exists # data = np.delete(data, time_att.index, axis=1) # f.write(struct.pack('ii', 0, 0)) # 2x empty int # title = ("%-60s" % str(ds.name)).encode() # f.write(struct.pack('60s', title)) # title # f.write(struct.pack('ii', 0, 0)) # 2x empty int # ns = len(sensors) # f.write(struct.pack('i', ns)) # f.write(struct.pack('i' * ns, *range(1, ns + 1))) # sensor number # f.write(struct.pack('ii', 0, 0)) # 2x empty int # time = ds.basis_attribute() # f.write(struct.pack('ff', time[0], time[1] - time[0])) # start time and time step # # scale_factors = np.max(np.abs(data), 0) / 32000 # f.write(struct.pack('f' * len(scale_factors), *scale_factors)) # # avoid dividing by zero # not0 = np.where(scale_factors != 0) # data[:, not0] /= scale_factors[not0] # #flatten and round # data = np.round(data.flatten()).astype(np.int16) # f.write(struct.pack('h' * len(data), *data.tolist())) # f.close() # # # write sensor file # f = open(os.path.join(os.path.dirname(filename), 'sensor'), 'w') # f.write("Sensor list for %s\n" % filename) # f.write(" No forst offset korr. c Volt Unit Navn Beskrivelse------------\n") # sensorlineformat = "%3s %.3f %.3f 1.00 0.00 %7s %-8s %s\n" # # if isinstance(ds, FLEX4Dataset): # gains = np.r_[ds.gains[1:], np.ones(ds.shape[1] - len(ds.gains))] # offsets = np.r_[ds.offsets[1:], np.zeros(ds.shape[1] - len(ds.offsets))] # sensorlines = [sensorlineformat % ((nr + 1), gain, offset, att.unit[:7], att.name.replace(" ", "_")[:8], att.description[:512]) for nr, att, gain, offset in zip(range(ns), sensors, gains, offsets)] # else: # sensorlines = [sensorlineformat % ((nr + 1), 1, 0, att.unit[:7], att.name.replace(" ", "_")[:8], att.description[:512]) for nr, att in enumerate(sensors)] # f.writelines(sensorlines) # f.close()