mirror of
ssh://devgit.janware.com/janware/proj/jw-python
synced 2026-06-18 13:36:38 +02:00
jwutils: Move to jwutils -> jw.util
Move all implementation source code from the jwutils module to jw.util. For compatibility with existing Python modules, keep a thin, autogenerated compatibility shim under jwutils.
Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
bc7652fdf9
commit
a2684dd601
129 changed files with 678 additions and 52 deletions
4
src/python/jw/util/stree/Makefile
Normal file
4
src/python/jw/util/stree/Makefile
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
TOPDIR = ../../../../..
|
||||
|
||||
include $(TOPDIR)/make/proj.mk
|
||||
include $(JWBDIR)/make/py-mod.mk
|
||||
302
src/python/jw/util/stree/StringTree.py
Normal file
302
src/python/jw/util/stree/StringTree.py
Normal file
|
|
@ -0,0 +1,302 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, List, Optional, Union
|
||||
|
||||
import re, fnmatch
|
||||
from collections import OrderedDict
|
||||
from enum import Enum, auto
|
||||
|
||||
from ..log import *
|
||||
|
||||
def quote(s):
|
||||
if is_quoted(s):
|
||||
return s
|
||||
s = s.strip()
|
||||
if len(s) > 0:
|
||||
if s[0] == '"':
|
||||
return "'" + s + "'"
|
||||
return '"' + s + '"'
|
||||
|
||||
def is_quoted(s: str) -> bool:
|
||||
if isinstance(s, StringTree):
|
||||
return False
|
||||
s = s.strip()
|
||||
if len(s) < 2:
|
||||
return False
|
||||
d = s[0]
|
||||
if d == s[-1] and d in [ '"', "'" ]:
|
||||
return True
|
||||
return False
|
||||
|
||||
def cleanup_string(s: str) -> str:
|
||||
if isinstance(s, StringTree):
|
||||
return s
|
||||
s = s.strip()
|
||||
if is_quoted(s):
|
||||
return s[1:-1].replace('\\' + s[0], s[0])
|
||||
return s
|
||||
|
||||
class StringTree: # export
|
||||
|
||||
def __init__(self, path: str, content: str, parent: StringTree|None=None) -> None:
|
||||
slog(DEBUG, f'Constructing StringTree(path="{path}", content="{content}")')
|
||||
self.__parent = parent
|
||||
self.children: OrderedDict[str, StringTree] = OrderedDict()
|
||||
self.content: Optional[str] = None
|
||||
self.__set(path, content)
|
||||
|
||||
assert(hasattr(self, "content"))
|
||||
#assert self.content is not None
|
||||
|
||||
# root (content = [ symbols ])
|
||||
# symbols (content = [ regex ])
|
||||
# regex ( content ='[ \n\t\r]+' )
|
||||
|
||||
# root (content = root, children = [ symbols ])
|
||||
# symbols (content = symbols, children = [ regex ])
|
||||
# regex ( content = regex, children = [ '[ \n\t\r]+' ] )
|
||||
# '[ \n\t\r]+)' ( content = '\n\t\r]+)', children = [] )
|
||||
|
||||
def __adopt_children(self, parent):
|
||||
assert isinstance(parent, StringTree)
|
||||
slog(DEBUG, f'At {self.content}: Adopting children of {parent}')
|
||||
#parent.dump(INFO, "These children are added")
|
||||
self.content = parent.content
|
||||
for name, c in parent.children.items():
|
||||
if not name in self.children.keys():
|
||||
slog(DEBUG, f'At {self.content}: Adding new child {c}')
|
||||
self.children[name] = c
|
||||
else:
|
||||
self.children[name].__adopt_children(c)
|
||||
|
||||
def __set(self, path_, content, split=True):
|
||||
slog(DEBUG, ('At "{}": '.format(str(self.content)) if hasattr(self, "content") else "") + f'Setting "{path_}" -> "{content}"')
|
||||
#assert self.content != str(content) # Not sure what the idea behind this was. It often goes off, and all works fine without.
|
||||
if content is not None and not type(content) in [str, StringTree]:
|
||||
raise Exception("Tried to add content of unsupported type {}".format(type(content).__name__))
|
||||
if path_ is None:
|
||||
if isinstance(content, str):
|
||||
self.content = cleanup_string(content)
|
||||
elif isinstance(content, StringTree):
|
||||
self.__adopt_children(content)
|
||||
else:
|
||||
raise Exception("Tried to add content of unsupported type {}".format(type(content).__name__))
|
||||
slog(DEBUG, " -- content = >" + str(content) + "<, self.content = >" + str(self.content) + "<")
|
||||
return self
|
||||
path = cleanup_string(path_)
|
||||
components = path.split('.') if split else [ path ]
|
||||
l = len(components)
|
||||
if len(path) == 0 or l == 0:
|
||||
#assert self.content is None or (isinstance(content, StringTree) and content.content == self.content)
|
||||
if isinstance(content, StringTree):
|
||||
#assert isinstance(content, StringTree), "Type: " + type(content).__name__
|
||||
self.__adopt_children(content)
|
||||
else:
|
||||
if self.content != content:
|
||||
#self.content = cleanup_string(content)
|
||||
slog(DEBUG, f'Changing content: "{self.content}" ->"{content}"')
|
||||
assert(content != '"[a-zA-Z0-9+_*/-]"')
|
||||
self.content = content
|
||||
#assert(content != "'antlr_doesnt_understand_vertical_tab'")
|
||||
#self.children[content] = StringTree(None, content)
|
||||
return self
|
||||
|
||||
#assert self.content is not None, "tried to set empty content to {}".format(path_)
|
||||
|
||||
nibble = components[0]
|
||||
rest = '.'.join(components[1:])
|
||||
if nibble not in self.children:
|
||||
self.children[nibble] = StringTree('', content=nibble, parent=self)
|
||||
if l > 1:
|
||||
assert len(rest) > 0
|
||||
return self.children[nibble].__set(rest, content=content)
|
||||
# last component, a.k.a. leaf
|
||||
if content is not None:
|
||||
gc = content if isinstance(content, StringTree) else StringTree('', content=content, parent=self.children[nibble])
|
||||
# Make sure no existing grand child is updated. It would reside too
|
||||
# far up in the grand child OrderedDict, we need it last
|
||||
if gc.content in self.children[nibble].children:
|
||||
del self.children[nibble].children[gc.content]
|
||||
self.children[nibble].children[gc.content] = gc
|
||||
return self.children[nibble]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return 'st:"{}"'.format(self.content)
|
||||
|
||||
def __getitem__(self, path: str) -> str:
|
||||
r = self.get(path)
|
||||
if r is None:
|
||||
raise KeyError(path)
|
||||
return r.value() # type: ignore
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
return self.__set(key, value)
|
||||
|
||||
def __dump(self, prio, indent=0, **kwargs):
|
||||
caller = kwargs['caller'] if 'caller' in kwargs.keys() else get_caller_pos(1)
|
||||
slog(prio, '|' + (' ' * indent) + str(self.content), caller=caller)
|
||||
indent += 2
|
||||
for name, child in self.children.items():
|
||||
child.__dump(prio, indent=indent, caller=caller)
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
if self.__parent is None:
|
||||
return ''
|
||||
prefix = self.__parent.path
|
||||
if len(prefix):
|
||||
prefix += '.'
|
||||
return prefix + self.content
|
||||
|
||||
def keys(self):
|
||||
return self.children.keys()
|
||||
|
||||
def items(self):
|
||||
return self.children.items()
|
||||
|
||||
def set_content(self, content):
|
||||
if content is None:
|
||||
raise Exception("Tried to set none content")
|
||||
content = cleanup_string(content)
|
||||
if len(content) == 0:
|
||||
raise Exception("Tried to set empty content")
|
||||
self.content = content
|
||||
|
||||
def add(self, path: str, content: Optional[Union[str, StringTree]] = None, split: bool = True) -> StringTree:
|
||||
slog(DEBUG, f'-- At "{self.content}": Adding "{path}" -> "{content}"')
|
||||
return self.__set(path, content, split)
|
||||
|
||||
def get(self, path_: str) -> Optional[StringTree]:
|
||||
slog(DEBUG, 'looking for "{}" in "{}"'.format(path_, self.content))
|
||||
assert not isinstance(path_, int)
|
||||
path = cleanup_string(path_)
|
||||
if len(path) == 0:
|
||||
slog(DEBUG, "returning myself")
|
||||
return self
|
||||
if is_quoted(path_):
|
||||
if not path in self.children.keys():
|
||||
return None
|
||||
return self.children[path]
|
||||
components = path.split('.')
|
||||
if len(components) == 0:
|
||||
return self
|
||||
name = cleanup_string(components[0])
|
||||
if not hasattr(self, "children"):
|
||||
return None
|
||||
if not name in self.children.keys():
|
||||
slog(DEBUG, "Name \"" + name + "\" is not in children of", self.content)
|
||||
for child in self.children:
|
||||
slog(DEBUG, "child = ", child)
|
||||
return None
|
||||
relpath = '.'.join(components[1:])
|
||||
return self.children[name].get(relpath)
|
||||
|
||||
def value(self, path = None, default=None) -> Optional[str]:
|
||||
if path:
|
||||
child = self.get(path)
|
||||
if child is None:
|
||||
if default:
|
||||
return default
|
||||
return None
|
||||
return child.value()
|
||||
if len(self.children) == 0:
|
||||
raise Exception('tried to get value from leaf "{}"'.format(self.content))
|
||||
slog(DEBUG, f'Returning value from children {self.children}')
|
||||
return self.children[next(reversed(self.children))].content # type: ignore
|
||||
|
||||
@property
|
||||
def parent(self):
|
||||
return self.__parent
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
if self.__parent is None:
|
||||
return self
|
||||
return self.__parent.root
|
||||
|
||||
def child_list(self, depth_first: bool=True) -> List[StringTree]:
|
||||
if depth_first == False:
|
||||
raise Exception("tried to retrieve child list with breadth-first search, not yet implemented")
|
||||
r = []
|
||||
for name, c in self.children.items():
|
||||
r.append(c)
|
||||
r.extend(c.child_list())
|
||||
return r
|
||||
|
||||
def dump(self, prio: int, *args, **kwargs) -> None:
|
||||
caller = kwargs['caller'] if 'caller' in kwargs.keys() else get_caller_pos(1)
|
||||
msg = ''
|
||||
if args is not None:
|
||||
msg = ' ' + ' '.join(args) + ' '
|
||||
slog(prio, ",------------" + msg + "----------- >", caller=caller)
|
||||
self.__dump(prio, indent=0, caller=caller)
|
||||
slog(prio, "`------------" + msg + "----------- <", caller=caller)
|
||||
|
||||
class Match(Enum):
|
||||
Equal = auto()
|
||||
RegExArg = auto()
|
||||
RegExConf = auto()
|
||||
GlobArg = auto()
|
||||
GlobConf = auto()
|
||||
|
||||
def __find(self, key: str|None, val: str|None, match: Match, depth_first: bool):
|
||||
|
||||
def __children():
|
||||
for name, child in self.children.items():
|
||||
ret.extend(child.__find(key, val, match, depth_first))
|
||||
|
||||
def __self():
|
||||
_val = self.value()
|
||||
_content = self.content
|
||||
try:
|
||||
if (
|
||||
(key == _content and matcher(val, _val))
|
||||
or (key is None and matcher(val, _val))
|
||||
or (key == _content and val is None)
|
||||
):
|
||||
ret.append(self)
|
||||
except Exception as e:
|
||||
if isinstance(e, re.PatternError):
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
def __debug_matcher(matcher, log_level=DEBUG):
|
||||
def __matcher(x, y):
|
||||
slog(log_level, f'Comparing "{x}" ~ "{y}"')
|
||||
return matcher(x, y)
|
||||
return __matcher
|
||||
|
||||
if not self.children:
|
||||
return []
|
||||
|
||||
matcher = lambda x, y: x == y
|
||||
match match:
|
||||
case self.Match.Equal:
|
||||
pass
|
||||
case self.Match.RegExArg:
|
||||
matcher = lambda x, y: re.search(x, y) is not None
|
||||
case self.Match.RegExConf:
|
||||
matcher = lambda x, y: re.search(y, x) is not None
|
||||
case self.Match.GlobArg:
|
||||
matcher = lambda x, y: fnmatch.fnmatch(y, x)
|
||||
case self.Match.GlobConf:
|
||||
matcher = lambda x, y: fnmatch.fnmatch(x, y)
|
||||
case _:
|
||||
raise NotImplementedError(f'Matcher {match} is not yet implemented')
|
||||
|
||||
ret: list[StringTree] = []
|
||||
|
||||
if depth_first:
|
||||
__children()
|
||||
__self()
|
||||
else:
|
||||
__self()
|
||||
__children()
|
||||
|
||||
return ret
|
||||
|
||||
def find(self, key: str|None=None, val: str|None=None, match: Match=Match.Equal, depth_first: bool=False):
|
||||
return [ node.parent.path for node in self.__find(key, val, match=match, depth_first=depth_first)]
|
||||
116
src/python/jw/util/stree/serdes.py
Normal file
116
src/python/jw/util/stree/serdes.py
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os, glob
|
||||
|
||||
from .StringTree import *
|
||||
from ..log import *
|
||||
|
||||
def _cleanup_line(line: str) -> str:
|
||||
line = line.strip()
|
||||
r = ''
|
||||
in_quote = None
|
||||
for c in line:
|
||||
# slog(DEBUG, "in_quote = >" + str(in_quote) + "<")
|
||||
if in_quote is not None:
|
||||
if c == in_quote:
|
||||
in_quote = None
|
||||
else:
|
||||
if c in [ '"', "'" ]:
|
||||
in_quote = c
|
||||
elif in_quote is None and c == '#':
|
||||
return r.strip()
|
||||
r += c
|
||||
if len(r) >= 2 and r[0] in [ '"', "'" ] and r[-1] == r[0]:
|
||||
return r[1:-1]
|
||||
return r
|
||||
|
||||
def parse(s: str, allow_full_lines: bool=True, root_content: str='root') -> StringTree: # export
|
||||
slog_m(DEBUG, "--->--- parsing --->---\n" + s + "\n---<--- parsing ---<---\n")
|
||||
root = StringTree('', content=root_content)
|
||||
sec = ''
|
||||
for line in s.splitlines():
|
||||
slog(DEBUG, f'Parsing: "{line}"')
|
||||
line = _cleanup_line(line)
|
||||
#slog(DEBUG, "cleaned line=", line)
|
||||
if not len(line):
|
||||
continue
|
||||
if line[0] == '[':
|
||||
if line[-1] == ']':
|
||||
sec = line[1:-1]
|
||||
elif line[-1] == '[':
|
||||
if len(sec):
|
||||
sec += '.'
|
||||
sec += line[1:-1]
|
||||
else:
|
||||
raise Exception("failed to parse section line", line)
|
||||
if root.get(sec) is None:
|
||||
root.add(sec)
|
||||
continue
|
||||
elif line[0] == ']':
|
||||
assert(len(sec) > 0)
|
||||
sec = '.'.join(sec.split('.')[0:-1])
|
||||
continue
|
||||
lhs = ''
|
||||
rhs = None
|
||||
for c in line:
|
||||
if rhs is None:
|
||||
if c == '=':
|
||||
rhs = ''
|
||||
continue
|
||||
lhs += c
|
||||
continue
|
||||
rhs += c
|
||||
|
||||
split = True
|
||||
if rhs is None:
|
||||
if not allow_full_lines:
|
||||
raise Exception("failed to parse assignment", line)
|
||||
rhs = 'empty'
|
||||
split = False
|
||||
root.add(sec + '.' + cleanup_string(lhs), cleanup_string(rhs), split=split)
|
||||
return root
|
||||
|
||||
def _read_lines_from_one_path(path: str, throw=True, level=0, log_prio=INFO, paths_buf=None):
|
||||
try:
|
||||
with open(path, 'r') as infile:
|
||||
slog(log_prio, 'Reading {}"{}"'.format(' ' * level * 2, path))
|
||||
if paths_buf is not None:
|
||||
paths_buf.append(path)
|
||||
ret = []
|
||||
for line in infile: # lines are all trailed by \n
|
||||
m = re.search(r'^\s*(-)*include\s+(\S+)', line)
|
||||
if m:
|
||||
optional = m.group(1) == '-'
|
||||
include_path = m.group(2)
|
||||
if include_path[0] != '/':
|
||||
dir_name = os.path.dirname(path)
|
||||
if len(dir_name):
|
||||
include_path = dir_name + '/' + include_path
|
||||
include_lines = _read_lines(include_path, throw=(not optional), level=level+1, paths_buf=paths_buf)
|
||||
if include_lines is None:
|
||||
slog(DEBUG, f'{path}: Failed to process "{line}"')
|
||||
continue
|
||||
ret.extend(include_lines)
|
||||
continue
|
||||
ret.append(line)
|
||||
return ret
|
||||
except Exception as e:
|
||||
slog(ERR, f'Failed to read file "{path}": {str(e)}')
|
||||
if throw or not isinstance(e, FileNotFoundError):
|
||||
raise
|
||||
return None
|
||||
|
||||
def _read_lines(path: str, throw=True, level=0, log_prio=INFO, paths_buf=None):
|
||||
paths = glob.glob(path)
|
||||
ret = []
|
||||
for p in paths:
|
||||
rr = _read_lines_from_one_path(p, throw=throw, level=level, log_prio=log_prio, paths_buf=paths_buf)
|
||||
if rr is None:
|
||||
return None
|
||||
ret.extend(rr)
|
||||
return ret
|
||||
|
||||
def read(path: str, root_content: str='root', log_prio=INFO, paths_buf=None) -> StringTree: # export
|
||||
lines = _read_lines_from_one_path(path, log_prio=log_prio, paths_buf=paths_buf)
|
||||
s = ''.join(lines)
|
||||
return parse(s, root_content=root_content)
|
||||
Loading…
Add table
Add a link
Reference in a new issue