Skip to content
Snippets Groups Projects
htc_contents.py 17.24 KiB
'''
Created on 20/01/2014

@author: MMPE

See documentation of HTCFile below

'''
import os
from collections import OrderedDict
import collections


class OrderedDict(collections.OrderedDict):
    pass

    def __str__(self):
        return "\n".join(["%-30s\t %s" % ((str(k) + ":"), str(v)) for k, v in self.items()])


def parse_next_line(lines):
    _3to2list = list(lines.pop(0).split(";"))
    line, comments, = _3to2list[:1] + [_3to2list[1:]]
    comments = ";".join(comments).rstrip()
    while lines and lines[0].lstrip().startswith(";"):
        comments += "\n%s" % lines.pop(0).rstrip()
    return line.strip(), comments


def fmt_value(v):
    try:
        if int(float(v)) == float(v):
            return int(float(v))
        return float(v)
    except ValueError:
        return v.replace("\\", "/")


class HTCContents(object):
    lines = []
    contents = None
    name_ = ""
    parent = None

    def __getitem__(self, key):
        if isinstance(key, str):
            key = key.replace(".", "/")
            if "/" in key:
                keys = key.split('/')
                val = self.contents[keys[0]]
                for k in keys[1:]:
                    val = val[k]
                return val
            return self.contents[key]
        else:
            return self.values[key]

    def __getattr__(self, *args, **kwargs):
        if args[0] in ['__members__', '__methods__']:
            # fix python2 related issue. In py2, dir(self) calls
            # __getattr__(('__members__',)), and this call must fail unhandled to work
            return object.__getattribute__(self, *args, **kwargs)
        try:
            return object.__getattribute__(self, *args, **kwargs)
        except Exception:
            k = args[0]
            if k.endswith("__1"):
                k = k[:-3]
            return self.contents[k]

    def __setattr__(self, *args, **kwargs):
        try:
            object.__getattribute__(self, args[0])
            return object.__setattr__(self, *args, **kwargs)
        except AttributeError:
            pass
        _3to2list1 = list(args)
        k, v, = _3to2list1[:1] + _3to2list1[1:]

        if isinstance(v, str):
            v = [fmt_value(v) for v in v.split()]
        if not isinstance(v, HTCContents):
            if not isinstance(v, (list, tuple)):
                v = [v]
            if k in self.contents:
                self.contents[k].values = list(v)
                return
            v = HTCLine(k, v, "")
        self.contents[k] = v
        v.parent = self

    def __delattr__(self, *args, **kwargs):
        k, = args
        if k in self:
            del self.contents[k]

    def __iter__(self):
        # mainbodies must preceed constraints
        values = ([v for v in self.contents.values() if v.name_ != 'constraints'] +
                  [v for v in self.contents.values() if v.name_ == 'constraints'])
        return iter(values)

    def __contains__(self, key):
        if self.contents is None:
            return False
        return key in self.contents

    def get(self, section, default=None):
        try:
            return self[section]
        except KeyError:
            return default

    def __call__(self, **kwargs):
        """Allow accesing one of multiple subsections with same name, e.g. the main body where name=='shaft'
        > htc.new_htc_structure.main_body(name='shaft')

        or one of multiple lines with same name, e.g. the section in c2_def where value[0]==3
        > htc.new_htc_structure.main_body.c2_def.sec(v0=3)
        """
        lst = [s for s in self.parent if s.name_ == self.name_ and (
            all([k in s and s[k][0] == v for k, v in kwargs.items()]) or
            (all([k[0] == 'v' for k in kwargs]) and all([s[int(k[1:])] == v for k, v in kwargs.items()]))
        )]
        assert len(lst) == 1
        return lst[0]

    def keys(self):
        return list(self.contents.keys())

    def _add_contents(self, contents):
        if contents.name_ not in self:
            self[contents.name_] = contents
        else:
            ending = "__2"
            while contents.name_ + ending in self:
                ending = "__%d" % (1 + float("0%s" % ending.replace("__", "")))
            self[contents.name_ + ending] = contents
        contents.parent = self

    def add_section(self, section_name, members={}, section=None, allow_duplicate=False, **kwargs):
        if isinstance(section_name, HTCSection):
            section = section_name
            section_name = section.name_
        if section_name in self and allow_duplicate is False:
            return self[section_name]
        if section_name == "output":
            section = HTCOutputSection(section_name)
        elif section_name.startswith("output_at_time"):
            section = HTCOutputAtTimeSection(section_name)
        elif section is None:
            section = HTCSection(section_name)
        self._add_contents(section)
        kwargs.update(members)
        for k, v in kwargs.items():
            section[k] = v
        return section

    def delete(self):
        keys = [k for (k, v) in self.parent.contents.items() if v == self]
        for k in keys:
            del self.parent.contents[k]

    def location(self):
        if self.parent is None:
            return os.path.basename(self.filename)
        else:
            name = [k for k in self.parent.keys() if self.parent[k] == self][0]
            return self.parent.location() + "/" + name

    def compare(self, other, compare_order=False):
        my_keys = self.keys()
        other_keys = other.keys()
        s = ""
        while my_keys or other_keys:
            if my_keys:
                if (my_keys[0] in other_keys):
                    if compare_order:
                        other_i = 0
                    else:
                        other_i = other_keys.index(my_keys[0])
                    while other_keys[other_i] != my_keys[0]:
                        s += "\n".join(["+ %s" % l for l in str(other[other_keys.pop(other_i)]
                                                                ).strip().split("\n")]) + "\n\n"

                    s += self[my_keys.pop(0)].compare(other[other_keys.pop(other_i)])

                else:
                    s += "\n".join(["- %s" % l for l in str(self[my_keys.pop(0)]).strip().split("\n")]) + "\n\n"
            else:
                s += "\n".join(["+ %s" % l for l in str(other[other_keys.pop(0)]).strip().split("\n")]) + "\n\n"
        return s


class HTCSection(HTCContents):
    end_comments = ""
    begin_comments = ""

    def __init__(self, name, begin_comments="", end_comments=""):
        self.name_ = name
        self.begin_comments = begin_comments.strip(" \t")
        self.end_comments = end_comments.strip(" \t")
        self.contents = OrderedDict()
        self.parent = None

    @property
    def section_name(self):
        return self.name_

    @section_name.setter
    def section_name(self, value):
        self.name_ = value

    def add_line(self, name, values, comments=""):
        line = HTCLine(name, values, comments)
        self._add_contents(line)
        return line

    def __setitem__(self, key, value):
        if isinstance(value, HTCContents):
            self.contents[key] = value
            value.parent = self
        elif isinstance(value, (str, int, float)):
            self.add_line(key, [value])
        else:
            self.add_line(key, value)

    @staticmethod
    def from_lines(lines):
        line, begin_comments = parse_next_line(lines)
        name = line[6:].lower().strip()
        if name == "output":
            section = HTCOutputSection(name, begin_comments)
        elif name.startswith("output_at_time"):
            section = HTCOutputAtTimeSection(name, begin_comments)
        else:
            section = HTCSection(name, begin_comments)
        while lines:
            if lines[0].strip() == "":
                lines.pop(0)
            if len(lines) == 0:
                print()
            if lines[0].lower().startswith("begin"):
                section._add_contents(HTCSection.from_lines(lines))
            elif lines[0].lower().startswith("end"):
                line, section.end_comments = parse_next_line(lines)
                break
            elif lines:
                section._add_contents(section.line_from_line(lines))
        else:
            raise Exception("Section '%s' has not end" % section.name_)
        return section

    def line_from_line(self, lines):
        return HTCLine.from_lines(lines)

    def __str__(self, level=0):
        s = "%sbegin %s;%s\n" % ("  " * level, self.name_, (("", "\t" + self.begin_comments)
                                                            [bool(self.begin_comments.strip())]).replace("\t\n", "\n"))
        s += "".join([c.__str__(level + 1) for c in self])
        s += "%send %s;%s\n" % ("  " * level, self.name_, (("", "\t" + self.end_comments)
                                                           [self.end_comments.strip() != ""]).replace("\t\n", "\n"))
        return s

    def get_subsection_by_name(self, name, field='name'):
        return self.get_section(name, field)

    def get_section(self, name, field='name'):
        lst = [s for s in self if field in s and s[field][0] == name]
        if len(lst) == 1:
            return lst[0]
        elif len(lst) == 0:
            raise ValueError("subsection with %s='%s' not found" % (field, name))
        else:
            raise ValueError("Multiple subsection with %s='%s' not found" % (field, name))

    def get_element(self, key, value):
        """Return subsection where subsection.<key>==value or line where line.values[key]==value"""
        if isinstance(key, int):
            lst = [s for s in self if s.values[key] == value]
        elif isinstance(key, str):
            lst = [s for s in self if key in s and s[key][0] == name]
        else:
            raise ValueError("Key argument must be int or str")
        if len(lst) == 1:
            return lst[0]
        elif len(lst) == 0:
            raise ValueError("contents with '%s=%s' not found" % (key, value))
        else:
            raise ValueError("Multiple contents with '%s=%s' not found" % (key, value))

    def copy(self):
        copy = self.__class__(name=self.name_, begin_comments=self.begin_comments, end_comments=self.end_comments)
        for k, v in self.contents.items():
            copy.contents[k] = v.copy()
        return copy


class HTCLine(HTCContents):
    values = None
    comments = ""

    def __init__(self, name, values, comments):
        if "__" in name:
            name = name[:name.index("__")]
        self.name_ = name
        self.values = list(values)
        self.comments = comments.strip(" \t")
        self.parent = None

    def __repr__(self):
        return str(self)

    def __str__(self, level=0):
        if self.name_ == "":
            return ""
        return "%s%s%s;%s\n" % ("  " * (level), self.name_,
                                ("", "\t" + self.str_values())[bool(self.values)],
                                ("", "\t" + self.comments)[bool(self.comments.strip())])

    def str_values(self):
        return " ".join([str(v) for v in self.values])

    def __getitem__(self, key):
        try:
            return self.values[key]
        except Exception:
            raise IndexError("Parameter %s does not exists for %s" % (key + 1, self.location()))

    def __setitem__(self, key, value):
        if isinstance(key, int):
            self.values[key] = value
        else:
            raise NotImplementedError

    @staticmethod
    def from_lines(lines):
        line, end_comments = parse_next_line(lines)
        if len(line.split()) > 0:
            _3to2list3 = list(line.split())
            name, values, = _3to2list3[:1] + [_3to2list3[1:]]
        else:
            name = line
            values = []

        values = [fmt_value(v) for v in values]
        return HTCLine(name, values, end_comments)

    def compare(self, other):
        s = ""
        if self.values != other.values:
            s += "\n".join(["+ %s" % l for l in str(self).strip().split("\n")]) + "\n"
            s += "\n".join(["- %s" % l for l in str(other).strip().split("\n")]) + "\n"
            s += "\n"
        return s

    def copy(self):
        return HTCLine(name=self.name_, values=self.values, comments=self.comments)


class HTCOutputSection(HTCSection):
    sensors = None

    def __init__(self, name, begin_comments="", end_comments=""):
        HTCSection.__init__(self, name, begin_comments=begin_comments, end_comments=end_comments)
        self.sensors = []

    def add_sensor(self, type, sensor, values=[], comment="", nr=None):
        self._add_sensor(HTCSensor(type, sensor, values, comment), nr)

    def _add_sensor(self, htcSensor, nr=None):
        if nr is None:
            nr = len(self.sensors)
        self.sensors.insert(nr, htcSensor)
        htcSensor.parent = self

    def line_from_line(self, lines):
        name = lines[0].split()[0].strip()

        if name in ['filename', 'data_format', 'buffer', 'time']:
            return HTCLine.from_lines(lines)
        else:
            return HTCSensor.from_lines(lines)

    def _add_contents(self, contents):
        if isinstance(contents, HTCSensor):
            self._add_sensor(contents)
        else:
            return HTCSection._add_contents(self, contents)

    def __str__(self, level=0):
        s = "%sbegin %s;%s\n" % ("  " * level, self.name_, ("", "\t" + self.begin_comments)
                                 [len(self.begin_comments.strip()) > 0])
        s += "".join([c.__str__(level + 1) for c in self])
        s += "".join([s.__str__(level + 1) for s in self.sensors])
        s += "%send %s;%s\n" % ("  " * level, self.name_, ("", "\t" + self.end_comments)
                                [self.end_comments.strip() != ""])
        return s

    def compare(self, other):
        s = HTCContents.compare(self, other)
        for s1, s2 in zip(self.sensors, other.sensors):
            s += s1.compare(s2)
        for s1 in self.sensors[len(other.sensors):]:
            s += "\n".join(["- %s" % l for l in str(s1).strip().split("\n")]) + "\n"
        for s2 in self.sensors[len(self.sensors):]:
            s += "\n".join(["- %s" % l for l in str(s2).strip().split("\n")]) + "\n"

        return s

    def copy(self):
        copy = HTCSection.copy(self)
        copy.sensors = list.copy(self.sensors)
        return copy


class HTCOutputAtTimeSection(HTCOutputSection):
    type = None
    time = None

    def __init__(self, name, begin_comments="", end_comments=""):
        if len(name.split()) < 3:
            raise ValueError('"keyword" and "time" arguments required for output_at_time command:\n%s' % name)
        name, self.type, time = name.split()
        self.time = float(time)
        HTCOutputSection.__init__(self, name, begin_comments=begin_comments, end_comments=end_comments)

    def __str__(self, level=0):
        s = "%sbegin %s %s %s;%s\n" % ("  " * level, self.name_, self.type, self.time,
                                       ("", "\t" + self.begin_comments)[len(self.begin_comments.strip())])
        s += "".join([c.__str__(level + 1) for c in self])
        s += "".join([s.__str__(level + 1) for s in self.sensors])
        s += "%send %s;%s\n" % ("  " * level, self.name_, ("", "\t" + self.end_comments)
                                [self.end_comments.strip() != ""])
        return s

    def copy(self):
        copy = HTCOutputAtTimeSection(name=f"{self.name_} {self.type} {self.time}",
                                           begin_comments=self.begin_comments, end_comments=self.end_comments)
        for k, v in self.contents.items():
            copy.contents[k] = v.copy()
        copy.sensors = list.copy(self.sensors)
        return copy


class HTCSensor(HTCLine):
    type = ""
    sensor = ""
    values = []

    def __init__(self, type, sensor, values, comments):
        self.type = type
        self.sensor = sensor
        self.values = list(values)
        self.comments = comments.strip(" \t")

    @staticmethod
    def from_lines(lines):
        line, comments = parse_next_line(lines)
        if len(line.split()) > 2:
            _3to2list5 = list(line.split())
            type, sensor, values, = _3to2list5[:2] + [_3to2list5[2:]]
        elif len(line.split()) == 2:
            type, sensor = line.split()
            values = []
        else:
            type, sensor, values = "", "", []

        def fmt(v):
            try:
                if int(float(v)) == float(v):
                    return int(float(v))
                return float(v)
            except ValueError:
                return v
        values = [fmt(v) for v in values]
        return HTCSensor(type, sensor, values, comments)

    def __str__(self, level=0):
        return "%s%s %s%s;%s\n" % ("  " * (level),
                                   self.type,
                                   self.sensor,
                                   ("", "\t" + self.str_values())[bool(self.values)],
                                   ("", "\t" + self.comments)[bool(self.comments.strip())])

    def delete(self):
        self.parent.sensors.remove(self)

    def compare(self, other):
        s = ""
        if self.sensor != other.sensor or self.values != other.values:
            s += "\n".join(["+ %s" % l for l in str(self).strip().split("\n")]) + "\n"
            s += "\n".join(["- %s" % l for l in str(other).strip().split("\n")]) + "\n"
            s += "\n"
        return s