jw-pkg/src/python/jw/pkg/App.py
Jan Lindemann aa7275f426 App.distro_xxx: Move properties to Distro.xxx

Commit a19679fec reverted the first attempt to make AsyncSSH reuse one connection during an instance lifetime. That failed because a lot of distribution-specific properties were filled in a new event loop thread started by AsyncRunner, and AsyncSSH didn't like that.

This commit is the first part of the solution: Move those properties from the App class to the Distro class, and load the Distro class in an async loader. As soon as it's instantiated, it can provide all its properties without cluttering the code with async keywords.

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-04-19 21:00:21 +02:00

472 lines
18 KiB
Python

# -*- coding: utf-8 -*-
#
# This source code file is a merge of various build tools and a horrible mess.
#
from __future__ import annotations
from typing import TYPE_CHECKING, Iterable
if TYPE_CHECKING:
from typing import TypeAlias
import os, sys, pwd, re
import os, sys, argparse, pwd, re
from functools import lru_cache, cached_property
from enum import Enum, auto
from .lib.App import App as Base
from .lib.log import *
from .lib.Distro import Distro
from .lib.base import InputMode
# Meaning of pkg.requires.xxx variables
# build: needs to be built and installed before this can be built
# devel: needs to be installed before this-devel can be installed, i.e. before _other_ packages can be built against this
# run: needs to be installed before this-run can be installed, i.e. before this and other packages can run with this
# --------------------------------------------------------------------- Helpers
class ResultCache(object):
def __init__(self):
self.__cache = {}
def run(self, func, args):
d = self.__cache
depth = 0
keys = [ func.__name__ ] + args
l = len(keys)
for k in keys:
if k is None:
k = 'None'
else:
k = str(k)
depth += 1
#log(DEBUG, 'depth = ', depth, 'key = ', k, 'd = ', str(d))
if k in d:
if l == depth:
return d[k]
d = d[k]
continue
if l == depth:
r = func(*args)
d[k] = r
return r
d = d[k] = {}
#d = d[k]
raise Exception("cache algorithm failed for function", func.__name__, "in depth", depth)
class Scope(Enum):
Self = auto()
One = auto()
Subtree = auto()
Graph: TypeAlias = dict[str, set[str]]
# ----------------------------------------------------------------- class App
class App(Base):
def __format_topdir(self, topdir: None|str, fmt: str) -> str:
if topdir is None:
return None
match fmt:
case 'unaltered':
return topdir
case None | 'absolute':
return os.path.abspath(self.__topdir)
case _:
m = re.search(r'^make:(\S+)$', fmt)
if m is None:
raise Exception(f'Can\'t interpret "{fmt}" as valid topdir ' +
'reference, expecting "unaltered", "absolute", or "make:<variable-name>"')
return '$(' + m.group(1) + ')'
def __proj_dir(self, name: str, pretty) -> str:
if name == self.__top_name:
if pretty:
return self.__pretty_topdir
return self.__topdir
for d in [ self.__projs_root, '/opt' ]:
ret = d + '/' + name
if os.path.exists(ret):
return ret
if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'):
# The package exists but does not have a dedicated project directory
return None
raise Exception('No project path found for module "{}"'.format(name))
def __find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True) -> str|None:
def format_pd(name: str, pd: str, pretty: bool):
if not pretty:
return pd
if self.__topdir_fmt == 'absolute':
return os.path.abspath(pd)
if self.__topdir_fmt == 'unaltered':
return pd
if name == self.__top_name:
return self.__pretty_topdir
raise NotImplementedError(f'Tried to pretty-format directory {pd}, not implemented')
pd = self.__proj_dir(name, False)
if pd is None:
return None
if not search_subdirs and not search_absdirs:
return format_pd(name, pd, pretty)
for sd in search_subdirs:
path = pd + '/' + sd
if os.path.isdir(path):
ret = format_pd(name, pd, pretty)
if sd and sd[0] != '/':
ret += '/'
ret += sd
return ret
for ret in search_absdirs:
if os.path.isdir(ret):
return ret
return None
def __get_project_refs_cached(self, buf, visited, spec, section, key, add_self, scope, names_only):
return self.__res_cache.run(self.__get_project_refs, [buf, visited, spec, section, key, add_self, scope, names_only])
def __get_project_refs(self, buf: list[str], visited: set[str], spec: str,
section: str, key: str, add_self: bool, scope: Scope, names_only: bool) -> None:
name = self.strip_module_from_spec(spec)
if names_only:
spec = name
if spec in buf:
return
if spec in visited:
if add_self:
buf.append(spec)
return
visited.add(spec)
deps = self.get_value(name, section, key)
log(DEBUG, "name = ", name, "section = ", section, "key = ", key, "deps = ", deps, "scope = ", scope.name, "visited = ", visited)
if deps and scope != Scope.Self:
if scope == Scope.One:
subscope = Scope.Self
else:
subscope = Scope.Subtree
deps = deps.split(',')
for dep in deps:
dep = dep.strip()
if not(len(dep)):
continue
self.__get_project_refs_cached(buf, visited, dep,
section, key, add_self=True, scope=subscope,
names_only=names_only)
if add_self:
buf.append(spec)
def __read_dep_graph(self, projects: list[str], section: str, graph: Graph) -> None:
for project in projects:
if project in graph:
continue
deps = self.get_project_refs([ project ], ['pkg.requires.jw'], section,
scope = Scope.One, add_self=False, names_only=True)
if not deps is None:
graph[project] = set(deps)
for dep in deps:
self.__read_dep_graph([ dep ], section, graph)
def __flip_dep_graph(self, graph: Graph):
ret: Graph = {}
for project, deps in graph.items():
for d in deps:
if not d in ret:
ret[d] = set()
ret[d].add(project)
return ret
def __find_circular_deps_recursive(self, project: str, graph: Graph, unvisited: list[str],
temp: set[str], path: str) -> str|None:
if project in temp:
log(DEBUG, 'found circular dependency at project', project)
return project
if not project in unvisited:
return None
temp.add(project)
if project in graph:
for dep in graph[project]:
last = self.__find_circular_deps_recursive(dep, graph, unvisited, temp, path)
if last is not None:
path.insert(0, dep)
return last
unvisited.remove(project)
temp.remove(project)
def __find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool:
graph: Graph = {}
ret: list[str] = []
self.__read_dep_graph(projects, flavours, graph)
unvisited = list(graph.keys())
temp: set[str] = set()
while unvisited:
project = unvisited[0]
log(DEBUG, 'Checking circular dependency of', project)
last = self.__find_circular_deps_recursive(project, self.__flip_dep_graph(graph), unvisited, temp, ret)
if last is not None:
log(DEBUG, f'Found circular dependency below {project}, last is {last}')
return True
return False
def __init__(self, distro: Distro|None=None) -> None:
super().__init__("jw-pkg swiss army knife", modules=["jw.pkg.cmds"])
# -- Members without default values
self.__opt_interactive: bool|None = None
self.__opt_verbose: bool|None = None
self.__top_name: str|None = None
self.__distro = distro
self.__res_cache = ResultCache()
self.__topdir: str|None = None
self.__pretty_topdir: str|None = None
self.__exec_context: ExecContext|None = None
# -- Members with default values
self.__topdir_fmt = 'absolute'
self.__projs_root = pwd.getpwuid(os.getuid()).pw_dir + "/local/src/jw.dev/proj"
self.__pretty_projs_root = None
async def __init_async(self) -> None:
if self.__distro is None:
self.__distro = await Distro.instantiate(ec=self.exec_context, id=self.args.distro_id)
async def __aexit__(self, exc_type, exc, tb) -> None:
if self.__exec_context is not None:
await self.__exec_context.close()
self.__exec_context = None
return super().__aexit__(exc_type, exc, tb)
def _add_arguments(self, parser) -> None:
super()._add_arguments(parser)
parser.add_argument('-t', '--topdir', default = None, help='Project Path')
parser.add_argument('--topdir-format', default = 'absolute', help='Output references to topdir as '
+ 'one of "make:<var-name>", "unaltered", "absolute". Absolute topdir by default')
parser.add_argument('-p', '--prefix', default = None,
help='Parent directory of project source directories')
parser.add_argument('--distro-id', default=None, help='Distribution ID (default is taken from /etc/os-release)')
parser.add_argument('--interactive', choices=['true', 'false', 'auto'], default='true', help="Wait for user input or try to proceed unattended")
parser.add_argument('--verbose', action='store_true', default=False, help="Be verbose on stderr about what's being done on the distro level")
parser.add_argument('--uri', default='local', help="Run commands on this host")
async def _run(self, args: argparse.Namespace) -> None:
self.__topdir = args.topdir
self.__pretty_topdir = self.__format_topdir(self.__topdir, args.topdir_format)
self.__topdir_fmt = args.topdir_format
if self.__topdir is not None:
self.__top_name = self.read_value(self.__topdir + '/make/project.conf', 'build', 'name')
if not self.__top_name:
self.__top_name = re.sub('-[0-9.-]*$', '', os.path.basename(os.path.realpath(self.__topdir)))
if args.prefix is not None:
self.__projs_root = args.prefix
self.__pretty_projs_root = args.prefix
await self.__init_async()
return await super()._run(args)
@property
def interactive(self) -> bool:
if self.__opt_interactive is None:
match self.args.interactive:
case 'true':
self.__opt_interactive = True
case 'false':
self.__opt_interactive = False
case 'auto':
self.__opt_interactive = sys.stdin.isatty()
return self.__opt_interactive
@property
def verbose(self) -> bool:
if self.__opt_verbose is None:
self.__opt_verbose = self.args.verbose
return self.__opt_verbose
@property
def exec_context(self) -> str:
if self.__exec_context is None:
from .lib.ExecContext import ExecContext
self.__exec_context = ExecContext.create(self.args.uri, interactive=self.interactive,
verbose_default=self.verbose)
return self.__exec_context
@property
def top_name(self):
return self.__top_name
@property
def projs_root(self):
return self.__projs_root
@property
def distro(self) -> Distro:
if self.__distro is None:
raise Exception('No distro object')
return self.__distro
def find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True):
return self.__find_dir(name, search_subdirs, search_absdirs, pretty)
# TODO: add support for customizing this in project.conf
def htdocs_dir(self, project: str) -> str:
return self.find_dir(project, ["/src/html/htdocs", "/tools/html/htdocs", "/htdocs"],
["/srv/www/proj/" + project])
# TODO: add support for customizing this in project.conf
def tmpl_dir(self, name: str) -> str:
return self.find_dir(name, ["/tmpl"], ["/opt/" + name + "/share/tmpl"])
def strip_module_from_spec(self, mod):
return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip())
@lru_cache(maxsize=None)
def get_section(self, path: str, section: str) -> str:
ret = ''
pat = '[' + section + ']'
in_section = False
file = open(path)
for line in file:
if (line.rstrip() == pat):
in_section = True
continue
if in_section:
if len(line) and line[0] == '[':
break
ret += line
file.close()
return ret.rstrip()
@lru_cache(maxsize=None)
def read_value(self, path: str, section: str, key: str) -> str|None:
def scan_section(f, key: str) -> str|None:
if key is None:
ret = ''
for line in f:
if len(line) and line[0] == '[':
break
ret += line
return ret if len(ret) else None
lines: list[str] = []
cont_line = ''
for line in f:
if len(line) and line[0] == '[':
break
cont_line += line.rstrip()
if len(cont_line) and cont_line[-1] == '\\':
cont_line = cont_line[0:-1]
continue
lines.append(cont_line)
cont_line = ''
rx = re.compile(r'^\s*' + key + r'\s*=\s*(.*)\s*$')
for line in lines:
#log(DEBUG, " looking for >%s< in line=>%s<" % (key, line))
m = re.search(rx, line)
if m is not None:
return m.group(1)
return None
def scan_section_debug(f, key: str) -> str|None:
ret = scan_section(f, key)
#log(DEBUG, " returning", rr)
return ret
try:
#log(DEBUG, "looking for {}::[{}].{}".format(path, section, key))
with open(path, 'r') as f:
if not len(section):
rr = scan_section(f, key)
pat = '[' + section + ']'
for line in f:
if line.rstrip() == pat:
return scan_section(f, key)
return None
except:
log(DEBUG, path, "not found")
# TODO: handle this special case cleaner somewhere up the stack
if section == 'build' and key == 'libname':
return 'none'
return None
@lru_cache(maxsize=None)
def get_value(self, project: str, section: str, key: str) -> str:
if self.__top_name and project == self.__top_name:
proj_root = self.__topdir
else:
proj_root = self.__projs_root + '/' + project
if section == 'version':
proj_version_dirs = [ proj_root ]
if proj_root != self.__topdir:
proj_version_dirs.append('/usr/share/doc/packages/' + project)
for d in proj_version_dirs:
version_path = d + '/VERSION'
try:
with open(version_path) as fd:
ret = fd.read().replace('\n', '').replace('-dev', '')
fd.close()
return ret
except EnvironmentError:
log(DEBUG, f'"Ignoring unreadable file "{version_path}"')
continue
raise Exception(f'No version file found for project "{project}"')
path = proj_root + '/make/project.conf'
ret = self.read_value(path, section, key)
log(DEBUG, "Lookup %s -> %s / [%s%s] -> \"%s\"" %
(self.__top_name, project, section, '.' + key if key else '', ret))
return ret
def get_values(self, projects: list[str], sections: list[str], keys: list[str]) -> list[str]:
"""
Collect a list of values from a list of given projects, sections and
keys, maintaining order
"""
ret: list[str] = []
for p in projects:
for section in sections:
for key in keys:
vals = self.get_value(p, section, key)
if vals:
ret += [val.strip() for val in vals.split(",")]
return list(dict.fromkeys(ret)) # Remove duplicates, keep ordering
def get_project_refs(self, projects: list[str], sections: list[str],
keys: str|list[str], add_self: bool, scope: Scope, names_only=True) -> list[str]:
if isinstance(keys, str):
keys = [ keys ]
ret: list[str] = []
for section in sections:
for key in keys:
visited = set()
for name in projects:
rr: list[str] = []
self.__get_project_refs_cached(rr, visited, name, section, key, add_self, scope, names_only)
# TODO: this looks like a performance hogger
for m in rr:
if not m in ret:
ret.append(m)
return ret
def get_libname(self, projects) -> str:
vals = self.get_project_refs(projects, ['build'], 'libname',
scope = Scope.One, add_self=False, names_only=True)
if not vals:
return ' '.join(projects)
if 'none' in vals:
vals.remove('none')
return ' '.join(reversed(vals))
def is_excluded_from_build(self, project: str) -> str|None:
log(DEBUG, "checking if project " + project + " is excluded from build")
exclude = self.get_project_refs([ project ], ['build'], 'exclude',
scope = Scope.One, add_self=False, names_only=True)
cascade = self.distro.os_cascade + [ 'all' ]
for p1 in exclude:
for p2 in cascade:
if p1 == p2:
return p1
return None
def find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool:
return self.__find_circular_deps(projects, flavours)