-
Mads M. Pedersen authoredMads M. Pedersen authored
htc_contents.py 17.24 KiB
'''
Created on 20/01/2014
@author: MMPE
See documentation of HTCFile below
'''
import os
from collections import OrderedDict
import collections
class OrderedDict(collections.OrderedDict):
pass
def __str__(self):
return "\n".join(["%-30s\t %s" % ((str(k) + ":"), str(v)) for k, v in self.items()])
def parse_next_line(lines):
_3to2list = list(lines.pop(0).split(";"))
line, comments, = _3to2list[:1] + [_3to2list[1:]]
comments = ";".join(comments).rstrip()
while lines and lines[0].lstrip().startswith(";"):
comments += "\n%s" % lines.pop(0).rstrip()
return line.strip(), comments
def fmt_value(v):
try:
if int(float(v)) == float(v):
return int(float(v))
return float(v)
except ValueError:
return v.replace("\\", "/")
class HTCContents(object):
lines = []
contents = None
name_ = ""
parent = None
def __getitem__(self, key):
if isinstance(key, str):
key = key.replace(".", "/")
if "/" in key:
keys = key.split('/')
val = self.contents[keys[0]]
for k in keys[1:]:
val = val[k]
return val
return self.contents[key]
else:
return self.values[key]
def __getattr__(self, *args, **kwargs):
if args[0] in ['__members__', '__methods__']:
# fix python2 related issue. In py2, dir(self) calls
# __getattr__(('__members__',)), and this call must fail unhandled to work
return object.__getattribute__(self, *args, **kwargs)
try:
return object.__getattribute__(self, *args, **kwargs)
except Exception:
k = args[0]
if k.endswith("__1"):
k = k[:-3]
return self.contents[k]
def __setattr__(self, *args, **kwargs):
try:
object.__getattribute__(self, args[0])
return object.__setattr__(self, *args, **kwargs)
except AttributeError:
pass
_3to2list1 = list(args)
k, v, = _3to2list1[:1] + _3to2list1[1:]
if isinstance(v, str):
v = [fmt_value(v) for v in v.split()]
if not isinstance(v, HTCContents):
if not isinstance(v, (list, tuple)):
v = [v]
if k in self.contents:
self.contents[k].values = list(v)
return
v = HTCLine(k, v, "")
self.contents[k] = v
v.parent = self
def __delattr__(self, *args, **kwargs):
k, = args
if k in self:
del self.contents[k]
def __iter__(self):
# mainbodies must preceed constraints
values = ([v for v in self.contents.values() if v.name_ != 'constraints'] +
[v for v in self.contents.values() if v.name_ == 'constraints'])
return iter(values)
def __contains__(self, key):
if self.contents is None:
return False
return key in self.contents
def get(self, section, default=None):
try:
return self[section]
except KeyError:
return default
def __call__(self, **kwargs):
"""Allow accesing one of multiple subsections with same name, e.g. the main body where name=='shaft'
> htc.new_htc_structure.main_body(name='shaft')
or one of multiple lines with same name, e.g. the section in c2_def where value[0]==3
> htc.new_htc_structure.main_body.c2_def.sec(v0=3)
"""
lst = [s for s in self.parent if s.name_ == self.name_ and (
all([k in s and s[k][0] == v for k, v in kwargs.items()]) or
(all([k[0] == 'v' for k in kwargs]) and all([s[int(k[1:])] == v for k, v in kwargs.items()]))
)]
assert len(lst) == 1
return lst[0]
def keys(self):
return list(self.contents.keys())
def _add_contents(self, contents):
if contents.name_ not in self:
self[contents.name_] = contents
else:
ending = "__2"
while contents.name_ + ending in self:
ending = "__%d" % (1 + float("0%s" % ending.replace("__", "")))
self[contents.name_ + ending] = contents
contents.parent = self
def add_section(self, section_name, members={}, section=None, allow_duplicate=False, **kwargs):
if isinstance(section_name, HTCSection):
section = section_name
section_name = section.name_
if section_name in self and allow_duplicate is False:
return self[section_name]
if section_name == "output":
section = HTCOutputSection(section_name)
elif section_name.startswith("output_at_time"):
section = HTCOutputAtTimeSection(section_name)
elif section is None:
section = HTCSection(section_name)
self._add_contents(section)
kwargs.update(members)
for k, v in kwargs.items():
section[k] = v
return section
def delete(self):
keys = [k for (k, v) in self.parent.contents.items() if v == self]
for k in keys:
del self.parent.contents[k]
def location(self):
if self.parent is None:
return os.path.basename(self.filename)
else:
name = [k for k in self.parent.keys() if self.parent[k] == self][0]
return self.parent.location() + "/" + name
def compare(self, other, compare_order=False):
my_keys = self.keys()
other_keys = other.keys()
s = ""
while my_keys or other_keys:
if my_keys:
if (my_keys[0] in other_keys):
if compare_order:
other_i = 0
else:
other_i = other_keys.index(my_keys[0])
while other_keys[other_i] != my_keys[0]:
s += "\n".join(["+ %s" % l for l in str(other[other_keys.pop(other_i)]
).strip().split("\n")]) + "\n\n"
s += self[my_keys.pop(0)].compare(other[other_keys.pop(other_i)])
else:
s += "\n".join(["- %s" % l for l in str(self[my_keys.pop(0)]).strip().split("\n")]) + "\n\n"
else:
s += "\n".join(["+ %s" % l for l in str(other[other_keys.pop(0)]).strip().split("\n")]) + "\n\n"
return s
class HTCSection(HTCContents):
end_comments = ""
begin_comments = ""
def __init__(self, name, begin_comments="", end_comments=""):
self.name_ = name
self.begin_comments = begin_comments.strip(" \t")
self.end_comments = end_comments.strip(" \t")
self.contents = OrderedDict()
self.parent = None
@property
def section_name(self):
return self.name_
@section_name.setter
def section_name(self, value):
self.name_ = value
def add_line(self, name, values, comments=""):
line = HTCLine(name, values, comments)
self._add_contents(line)
return line
def __setitem__(self, key, value):
if isinstance(value, HTCContents):
self.contents[key] = value
value.parent = self
elif isinstance(value, (str, int, float)):
self.add_line(key, [value])
else:
self.add_line(key, value)
@staticmethod
def from_lines(lines):
line, begin_comments = parse_next_line(lines)
name = line[6:].lower().strip()
if name == "output":
section = HTCOutputSection(name, begin_comments)
elif name.startswith("output_at_time"):
section = HTCOutputAtTimeSection(name, begin_comments)
else:
section = HTCSection(name, begin_comments)
while lines:
if lines[0].strip() == "":
lines.pop(0)
if len(lines) == 0:
print()
if lines[0].lower().startswith("begin"):
section._add_contents(HTCSection.from_lines(lines))
elif lines[0].lower().startswith("end"):
line, section.end_comments = parse_next_line(lines)
break
elif lines:
section._add_contents(section.line_from_line(lines))
else:
raise Exception("Section '%s' has not end" % section.name_)
return section
def line_from_line(self, lines):
return HTCLine.from_lines(lines)
def __str__(self, level=0):
s = "%sbegin %s;%s\n" % (" " * level, self.name_, (("", "\t" + self.begin_comments)
[bool(self.begin_comments.strip())]).replace("\t\n", "\n"))
s += "".join([c.__str__(level + 1) for c in self])
s += "%send %s;%s\n" % (" " * level, self.name_, (("", "\t" + self.end_comments)
[self.end_comments.strip() != ""]).replace("\t\n", "\n"))
return s
def get_subsection_by_name(self, name, field='name'):
return self.get_section(name, field)
def get_section(self, name, field='name'):
lst = [s for s in self if field in s and s[field][0] == name]
if len(lst) == 1:
return lst[0]
elif len(lst) == 0:
raise ValueError("subsection with %s='%s' not found" % (field, name))
else:
raise ValueError("Multiple subsection with %s='%s' not found" % (field, name))
def get_element(self, key, value):
"""Return subsection where subsection.<key>==value or line where line.values[key]==value"""
if isinstance(key, int):
lst = [s for s in self if s.values[key] == value]
elif isinstance(key, str):
lst = [s for s in self if key in s and s[key][0] == name]
else:
raise ValueError("Key argument must be int or str")
if len(lst) == 1:
return lst[0]
elif len(lst) == 0:
raise ValueError("contents with '%s=%s' not found" % (key, value))
else:
raise ValueError("Multiple contents with '%s=%s' not found" % (key, value))
def copy(self):
copy = self.__class__(name=self.name_, begin_comments=self.begin_comments, end_comments=self.end_comments)
for k, v in self.contents.items():
copy.contents[k] = v.copy()
return copy
class HTCLine(HTCContents):
values = None
comments = ""
def __init__(self, name, values, comments):
if "__" in name:
name = name[:name.index("__")]
self.name_ = name
self.values = list(values)
self.comments = comments.strip(" \t")
self.parent = None
def __repr__(self):
return str(self)
def __str__(self, level=0):
if self.name_ == "":
return ""
return "%s%s%s;%s\n" % (" " * (level), self.name_,
("", "\t" + self.str_values())[bool(self.values)],
("", "\t" + self.comments)[bool(self.comments.strip())])
def str_values(self):
return " ".join([str(v) for v in self.values])
def __getitem__(self, key):
try:
return self.values[key]
except Exception:
raise IndexError("Parameter %s does not exists for %s" % (key + 1, self.location()))
def __setitem__(self, key, value):
if isinstance(key, int):
self.values[key] = value
else:
raise NotImplementedError
@staticmethod
def from_lines(lines):
line, end_comments = parse_next_line(lines)
if len(line.split()) > 0:
_3to2list3 = list(line.split())
name, values, = _3to2list3[:1] + [_3to2list3[1:]]
else:
name = line
values = []
values = [fmt_value(v) for v in values]
return HTCLine(name, values, end_comments)
def compare(self, other):
s = ""
if self.values != other.values:
s += "\n".join(["+ %s" % l for l in str(self).strip().split("\n")]) + "\n"
s += "\n".join(["- %s" % l for l in str(other).strip().split("\n")]) + "\n"
s += "\n"
return s
def copy(self):
return HTCLine(name=self.name_, values=self.values, comments=self.comments)
class HTCOutputSection(HTCSection):
sensors = None
def __init__(self, name, begin_comments="", end_comments=""):
HTCSection.__init__(self, name, begin_comments=begin_comments, end_comments=end_comments)
self.sensors = []
def add_sensor(self, type, sensor, values=[], comment="", nr=None):
self._add_sensor(HTCSensor(type, sensor, values, comment), nr)
def _add_sensor(self, htcSensor, nr=None):
if nr is None:
nr = len(self.sensors)
self.sensors.insert(nr, htcSensor)
htcSensor.parent = self
def line_from_line(self, lines):
name = lines[0].split()[0].strip()
if name in ['filename', 'data_format', 'buffer', 'time']:
return HTCLine.from_lines(lines)
else:
return HTCSensor.from_lines(lines)
def _add_contents(self, contents):
if isinstance(contents, HTCSensor):
self._add_sensor(contents)
else:
return HTCSection._add_contents(self, contents)
def __str__(self, level=0):
s = "%sbegin %s;%s\n" % (" " * level, self.name_, ("", "\t" + self.begin_comments)
[len(self.begin_comments.strip()) > 0])
s += "".join([c.__str__(level + 1) for c in self])
s += "".join([s.__str__(level + 1) for s in self.sensors])
s += "%send %s;%s\n" % (" " * level, self.name_, ("", "\t" + self.end_comments)
[self.end_comments.strip() != ""])
return s
def compare(self, other):
s = HTCContents.compare(self, other)
for s1, s2 in zip(self.sensors, other.sensors):
s += s1.compare(s2)
for s1 in self.sensors[len(other.sensors):]:
s += "\n".join(["- %s" % l for l in str(s1).strip().split("\n")]) + "\n"
for s2 in self.sensors[len(self.sensors):]:
s += "\n".join(["- %s" % l for l in str(s2).strip().split("\n")]) + "\n"
return s
def copy(self):
copy = HTCSection.copy(self)
copy.sensors = list.copy(self.sensors)
return copy
class HTCOutputAtTimeSection(HTCOutputSection):
type = None
time = None
def __init__(self, name, begin_comments="", end_comments=""):
if len(name.split()) < 3:
raise ValueError('"keyword" and "time" arguments required for output_at_time command:\n%s' % name)
name, self.type, time = name.split()
self.time = float(time)
HTCOutputSection.__init__(self, name, begin_comments=begin_comments, end_comments=end_comments)
def __str__(self, level=0):
s = "%sbegin %s %s %s;%s\n" % (" " * level, self.name_, self.type, self.time,
("", "\t" + self.begin_comments)[len(self.begin_comments.strip())])
s += "".join([c.__str__(level + 1) for c in self])
s += "".join([s.__str__(level + 1) for s in self.sensors])
s += "%send %s;%s\n" % (" " * level, self.name_, ("", "\t" + self.end_comments)
[self.end_comments.strip() != ""])
return s
def copy(self):
copy = HTCOutputAtTimeSection(name=f"{self.name_} {self.type} {self.time}",
begin_comments=self.begin_comments, end_comments=self.end_comments)
for k, v in self.contents.items():
copy.contents[k] = v.copy()
copy.sensors = list.copy(self.sensors)
return copy
class HTCSensor(HTCLine):
type = ""
sensor = ""
values = []
def __init__(self, type, sensor, values, comments):
self.type = type
self.sensor = sensor
self.values = list(values)
self.comments = comments.strip(" \t")
@staticmethod
def from_lines(lines):
line, comments = parse_next_line(lines)
if len(line.split()) > 2:
_3to2list5 = list(line.split())
type, sensor, values, = _3to2list5[:2] + [_3to2list5[2:]]
elif len(line.split()) == 2:
type, sensor = line.split()
values = []
else:
type, sensor, values = "", "", []
def fmt(v):
try:
if int(float(v)) == float(v):
return int(float(v))
return float(v)
except ValueError:
return v
values = [fmt(v) for v in values]
return HTCSensor(type, sensor, values, comments)
def __str__(self, level=0):
return "%s%s %s%s;%s\n" % (" " * (level),
self.type,
self.sensor,
("", "\t" + self.str_values())[bool(self.values)],
("", "\t" + self.comments)[bool(self.comments.strip())])
def delete(self):
self.parent.sensors.remove(self)
def compare(self, other):
s = ""
if self.sensor != other.sensor or self.values != other.values:
s += "\n".join(["+ %s" % l for l in str(self).strip().split("\n")]) + "\n"
s += "\n".join(["- %s" % l for l in str(other).strip().split("\n")]) + "\n"
s += "\n"
return s