Add global --pkg-filter argument, defaulting to JW_DEFAULT_PKG_FILTER. If it's specified, instantiate a PackageFilterString from it, and initialize App's Distro instance with it.
Signed-off-by: Jan Lindemann <jan@janware.com>
484 lines
19 KiB
Python
484 lines
19 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# This source code file is a merge of various build tools and a horrible mess.
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Iterable
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import TypeAlias
|
|
import os, sys, pwd, re
|
|
|
|
import os, sys, argparse, pwd, re
|
|
from functools import lru_cache, cached_property
|
|
from enum import Enum, auto
|
|
|
|
from .lib.App import App as Base
|
|
from .lib.log import *
|
|
from .lib.Distro import Distro
|
|
from .lib.base import InputMode
|
|
|
|
# Meaning of pkg.requires.xxx variables
|
|
# build: needs to be built and installed before this can be built
|
|
# devel: needs to be installed before this-devel can be installed, i.e. before _other_ packages can be built against this
|
|
# run: needs to be installed before this-run can be installed, i.e. before this and other packages can run with this
|
|
|
|
# --------------------------------------------------------------------- Helpers
|
|
|
|
class ResultCache(object):
|
|
|
|
def __init__(self):
|
|
self.__cache = {}
|
|
|
|
def run(self, func, args):
|
|
d = self.__cache
|
|
depth = 0
|
|
keys = [ func.__name__ ] + args
|
|
l = len(keys)
|
|
for k in keys:
|
|
if k is None:
|
|
k = 'None'
|
|
else:
|
|
k = str(k)
|
|
depth += 1
|
|
#log(DEBUG, 'depth = ', depth, 'key = ', k, 'd = ', str(d))
|
|
if k in d:
|
|
if l == depth:
|
|
return d[k]
|
|
d = d[k]
|
|
continue
|
|
if l == depth:
|
|
r = func(*args)
|
|
d[k] = r
|
|
return r
|
|
d = d[k] = {}
|
|
#d = d[k]
|
|
raise Exception('cache algorithm failed for function', func.__name__, 'in depth', depth)
|
|
|
|
class Scope(Enum):
|
|
Self = auto()
|
|
One = auto()
|
|
Subtree = auto()
|
|
|
|
Graph: TypeAlias = dict[str, set[str]]
|
|
|
|
# ----------------------------------------------------------------- class App
|
|
|
|
class App(Base):
|
|
|
|
def __format_topdir(self, topdir: None|str, fmt: str) -> str:
|
|
if topdir is None:
|
|
return None
|
|
match fmt:
|
|
case 'unaltered':
|
|
return topdir
|
|
case None | 'absolute':
|
|
return os.path.abspath(self.__topdir)
|
|
case _:
|
|
m = re.search(r'^make:(\S+)$', fmt)
|
|
if m is None:
|
|
raise Exception(f'Can\'t interpret "{fmt}" as valid topdir ' +
|
|
'reference, expecting "unaltered", "absolute", or "make:<variable-name>"')
|
|
return '$(' + m.group(1) + ')'
|
|
|
|
def __proj_dir(self, name: str, pretty) -> str:
|
|
if name == self.__top_name:
|
|
if pretty:
|
|
return self.__pretty_topdir
|
|
return self.__topdir
|
|
for d in [ self.__projs_root, '/opt' ]:
|
|
ret = d + '/' + name
|
|
if os.path.exists(ret):
|
|
return ret
|
|
if os.path.exists(f'/usr/share/doc/packages/{name}/VERSION'):
|
|
# The package exists but does not have a dedicated project directory
|
|
return None
|
|
raise Exception('No project path found for module "{}"'.format(name))
|
|
|
|
def __find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True) -> str|None:
|
|
def format_pd(name: str, pd: str, pretty: bool):
|
|
if not pretty:
|
|
return pd
|
|
if self.__topdir_fmt == 'absolute':
|
|
return os.path.abspath(pd)
|
|
if self.__topdir_fmt == 'unaltered':
|
|
return pd
|
|
if name == self.__top_name:
|
|
return self.__pretty_topdir
|
|
raise NotImplementedError(f'Tried to pretty-format directory {pd}, not implemented')
|
|
pd = self.__proj_dir(name, False)
|
|
if pd is None:
|
|
return None
|
|
if not search_subdirs and not search_absdirs:
|
|
return format_pd(name, pd, pretty)
|
|
for sd in search_subdirs:
|
|
path = pd + '/' + sd
|
|
if os.path.isdir(path):
|
|
ret = format_pd(name, pd, pretty)
|
|
if sd and sd[0] != '/':
|
|
ret += '/'
|
|
ret += sd
|
|
return ret
|
|
for ret in search_absdirs:
|
|
if os.path.isdir(ret):
|
|
return ret
|
|
return None
|
|
|
|
def __get_project_refs_cached(self, buf, visited, spec, section, key, add_self, scope, names_only):
|
|
return self.__res_cache.run(self.__get_project_refs, [buf, visited, spec, section, key, add_self, scope, names_only])
|
|
|
|
def __get_project_refs(self, buf: list[str], visited: set[str], spec: str,
|
|
section: str, key: str, add_self: bool, scope: Scope, names_only: bool) -> None:
|
|
name = self.strip_module_from_spec(spec)
|
|
if names_only:
|
|
spec = name
|
|
if spec in buf:
|
|
return
|
|
if spec in visited:
|
|
if add_self:
|
|
buf.append(spec)
|
|
return
|
|
visited.add(spec)
|
|
deps = self.get_value(name, section, key)
|
|
log(DEBUG, 'name = ', name, 'section = ', section, 'key = ', key, 'deps = ', deps, 'scope = ', scope.name, 'visited = ', visited)
|
|
if deps and scope != Scope.Self:
|
|
if scope == Scope.One:
|
|
subscope = Scope.Self
|
|
else:
|
|
subscope = Scope.Subtree
|
|
deps = deps.split(',')
|
|
for dep in deps:
|
|
dep = dep.strip()
|
|
if not(len(dep)):
|
|
continue
|
|
self.__get_project_refs_cached(buf, visited, dep,
|
|
section, key, add_self=True, scope=subscope,
|
|
names_only=names_only)
|
|
if add_self:
|
|
buf.append(spec)
|
|
|
|
def __read_dep_graph(self, projects: list[str], section: str, graph: Graph) -> None:
|
|
for project in projects:
|
|
if project in graph:
|
|
continue
|
|
deps = self.get_project_refs([ project ], ['pkg.requires.jw'], section,
|
|
scope = Scope.One, add_self=False, names_only=True)
|
|
if not deps is None:
|
|
graph[project] = set(deps)
|
|
for dep in deps:
|
|
self.__read_dep_graph([ dep ], section, graph)
|
|
|
|
def __flip_dep_graph(self, graph: Graph):
|
|
ret: Graph = {}
|
|
for project, deps in graph.items():
|
|
for d in deps:
|
|
if not d in ret:
|
|
ret[d] = set()
|
|
ret[d].add(project)
|
|
return ret
|
|
|
|
def __find_circular_deps_recursive(self, project: str, graph: Graph, unvisited: list[str],
|
|
temp: set[str], path: str) -> str|None:
|
|
if project in temp:
|
|
log(DEBUG, 'found circular dependency at project', project)
|
|
return project
|
|
if not project in unvisited:
|
|
return None
|
|
temp.add(project)
|
|
if project in graph:
|
|
for dep in graph[project]:
|
|
last = self.__find_circular_deps_recursive(dep, graph, unvisited, temp, path)
|
|
if last is not None:
|
|
path.insert(0, dep)
|
|
return last
|
|
unvisited.remove(project)
|
|
temp.remove(project)
|
|
|
|
def __find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool:
|
|
graph: Graph = {}
|
|
ret: list[str] = []
|
|
self.__read_dep_graph(projects, flavours, graph)
|
|
unvisited = list(graph.keys())
|
|
temp: set[str] = set()
|
|
while unvisited:
|
|
project = unvisited[0]
|
|
log(DEBUG, 'Checking circular dependency of', project)
|
|
last = self.__find_circular_deps_recursive(project, self.__flip_dep_graph(graph), unvisited, temp, ret)
|
|
if last is not None:
|
|
log(DEBUG, f'Found circular dependency below {project}, last is {last}')
|
|
return True
|
|
return False
|
|
|
|
def __init__(self, distro: Distro|None=None) -> None:
|
|
|
|
super().__init__('jw-pkg swiss army knife', modules=['jw.pkg.cmds'])
|
|
|
|
# -- Members without default values
|
|
self.__opt_interactive: bool|None = None
|
|
self.__opt_verbose: bool|None = None
|
|
self.__top_name: str|None = None
|
|
self.__distro = distro
|
|
self.__res_cache = ResultCache()
|
|
self.__topdir: str|None = None
|
|
self.__pretty_topdir: str|None = None
|
|
self.__exec_context: ExecContext|None = None
|
|
|
|
# -- Members with default values
|
|
self.__topdir_fmt = 'absolute'
|
|
self.__projs_root = pwd.getpwuid(os.getuid()).pw_dir + '/local/src/jw.dev/proj'
|
|
self.__pretty_projs_root = None
|
|
|
|
async def __init_async(self) -> None:
|
|
if self.__distro is None:
|
|
pkg_filter_str = self.args.pkg_filter
|
|
if pkg_filter_str is None:
|
|
pkg_filter_str = os.getenv('JW_DEFAULT_PKG_FILTER')
|
|
pkg_filter: PackageFilter|None = None
|
|
if pkg_filter_str is not None:
|
|
from .lib.PackageFilter import PackageFilterString
|
|
pkg_filter = PackageFilterString(pkg_filter_str)
|
|
self.__distro = await Distro.instantiate(
|
|
ec = self.exec_context,
|
|
id = self.args.distro_id,
|
|
default_pkg_filter = pkg_filter,
|
|
)
|
|
|
|
async def __aexit__(self, exc_type, exc, tb) -> None:
|
|
if self.__exec_context is not None:
|
|
await self.__exec_context.close()
|
|
self.__exec_context = None
|
|
return super().__aexit__(exc_type, exc, tb)
|
|
|
|
def _add_arguments(self, parser) -> None:
|
|
super()._add_arguments(parser)
|
|
parser.add_argument('-t', '--topdir', default = None, help='Project Path')
|
|
parser.add_argument('--topdir-format', default = 'absolute', help='Output references to topdir as '
|
|
+ 'one of "make:<var-name>", "unaltered", "absolute". Absolute topdir by default')
|
|
parser.add_argument('-p', '--prefix', default = None,
|
|
help='Parent directory of project source directories')
|
|
parser.add_argument('--distro-id', default=None, help='Distribution ID (default is taken from /etc/os-release)')
|
|
parser.add_argument('--interactive', choices=['true', 'false', 'auto'], default='true', help='Wait for user input or try to proceed unattended')
|
|
parser.add_argument('--verbose', action='store_true', default=False, help='Be verbose on stderr about what\'s being done on the distro level')
|
|
parser.add_argument('--uri', default='local', help='Run commands on this host')
|
|
parser.add_argument('--pkg-filter', help='Default filter for all distribution package-related operations')
|
|
|
|
async def _run(self, args: argparse.Namespace) -> None:
|
|
self.__topdir = args.topdir
|
|
self.__pretty_topdir = self.__format_topdir(self.__topdir, args.topdir_format)
|
|
self.__topdir_fmt = args.topdir_format
|
|
if self.__topdir is not None:
|
|
self.__top_name = self.read_value(self.__topdir + '/make/project.conf', 'build', 'name')
|
|
if not self.__top_name:
|
|
self.__top_name = re.sub('-[0-9.-]*$', '', os.path.basename(os.path.realpath(self.__topdir)))
|
|
if args.prefix is not None:
|
|
self.__projs_root = args.prefix
|
|
self.__pretty_projs_root = args.prefix
|
|
await self.__init_async()
|
|
return await super()._run(args)
|
|
|
|
@property
|
|
def interactive(self) -> bool:
|
|
if self.__opt_interactive is None:
|
|
match self.args.interactive:
|
|
case 'true':
|
|
self.__opt_interactive = True
|
|
case 'false':
|
|
self.__opt_interactive = False
|
|
case 'auto':
|
|
self.__opt_interactive = sys.stdin.isatty()
|
|
return self.__opt_interactive
|
|
|
|
@property
|
|
def verbose(self) -> bool:
|
|
if self.__opt_verbose is None:
|
|
self.__opt_verbose = self.args.verbose
|
|
return self.__opt_verbose
|
|
|
|
@property
|
|
def exec_context(self) -> str:
|
|
if self.__exec_context is None:
|
|
from .lib.ExecContext import ExecContext
|
|
self.__exec_context = ExecContext.create(self.args.uri, interactive=self.interactive,
|
|
verbose_default=self.verbose)
|
|
return self.__exec_context
|
|
|
|
@property
|
|
def top_name(self):
|
|
return self.__top_name
|
|
|
|
@property
|
|
def projs_root(self):
|
|
return self.__projs_root
|
|
|
|
@property
|
|
def distro(self) -> Distro:
|
|
if self.__distro is None:
|
|
raise Exception('No distro object')
|
|
return self.__distro
|
|
|
|
def find_dir(self, name: str, search_subdirs: list[str]=[], search_absdirs: list[str]=[], pretty: bool=True):
|
|
return self.__find_dir(name, search_subdirs, search_absdirs, pretty)
|
|
|
|
# TODO: add support for customizing this in project.conf
|
|
def htdocs_dir(self, project: str) -> str:
|
|
return self.find_dir(project, ['/src/html/htdocs', '/tools/html/htdocs', '/htdocs'],
|
|
['/srv/www/proj/' + project])
|
|
|
|
# TODO: add support for customizing this in project.conf
|
|
def tmpl_dir(self, name: str) -> str:
|
|
return self.find_dir(name, ['/tmpl'], ['/opt/' + name + '/share/tmpl'])
|
|
|
|
def strip_module_from_spec(self, mod):
|
|
return re.sub(r'-dev$|-devel$|-run$', '', re.split('([=><]+)', mod)[0].strip())
|
|
|
|
@lru_cache(maxsize=None)
|
|
def get_section(self, path: str, section: str) -> str:
|
|
ret = ''
|
|
pat = '[' + section + ']'
|
|
in_section = False
|
|
file = open(path)
|
|
for line in file:
|
|
if (line.rstrip() == pat):
|
|
in_section = True
|
|
continue
|
|
if in_section:
|
|
if len(line) and line[0] == '[':
|
|
break
|
|
ret += line
|
|
file.close()
|
|
return ret.rstrip()
|
|
|
|
@lru_cache(maxsize=None)
|
|
def read_value(self, path: str, section: str, key: str) -> str|None:
|
|
|
|
def scan_section(f, key: str) -> str|None:
|
|
if key is None:
|
|
ret = ''
|
|
for line in f:
|
|
if len(line) and line[0] == '[':
|
|
break
|
|
ret += line
|
|
return ret if len(ret) else None
|
|
lines: list[str] = []
|
|
cont_line = ''
|
|
for line in f:
|
|
if len(line) and line[0] == '[':
|
|
break
|
|
cont_line += line.rstrip()
|
|
if len(cont_line) and cont_line[-1] == '\\':
|
|
cont_line = cont_line[0:-1]
|
|
continue
|
|
lines.append(cont_line)
|
|
cont_line = ''
|
|
rx = re.compile(r'^\s*' + key + r'\s*=\s*(.*)\s*$')
|
|
for line in lines:
|
|
#log(DEBUG, ' looking for "%s" in line="%s"' % (key, line))
|
|
m = re.search(rx, line)
|
|
if m is not None:
|
|
return m.group(1)
|
|
return None
|
|
|
|
def scan_section_debug(f, key: str) -> str|None:
|
|
ret = scan_section(f, key)
|
|
#log(DEBUG, ' returning', rr)
|
|
return ret
|
|
|
|
try:
|
|
#log(DEBUG, 'looking for {}::[{}].{}'.format(path, section, key))
|
|
with open(path, 'r') as f:
|
|
if not len(section):
|
|
rr = scan_section(f, key)
|
|
pat = '[' + section + ']'
|
|
for line in f:
|
|
if line.rstrip() == pat:
|
|
return scan_section(f, key)
|
|
return None
|
|
except:
|
|
log(DEBUG, path, 'not found')
|
|
# TODO: handle this special case cleaner somewhere up the stack
|
|
if section == 'build' and key == 'libname':
|
|
return 'none'
|
|
return None
|
|
|
|
@lru_cache(maxsize=None)
|
|
def get_value(self, project: str, section: str, key: str) -> str:
|
|
if self.__top_name and project == self.__top_name:
|
|
proj_root = self.__topdir
|
|
else:
|
|
proj_root = self.__projs_root + '/' + project
|
|
if section == 'version':
|
|
proj_version_dirs = [ proj_root ]
|
|
if proj_root != self.__topdir:
|
|
proj_version_dirs.append('/usr/share/doc/packages/' + project)
|
|
for d in proj_version_dirs:
|
|
version_path = d + '/VERSION'
|
|
try:
|
|
with open(version_path) as fd:
|
|
ret = fd.read().replace('\n', '').replace('-dev', '')
|
|
fd.close()
|
|
return ret
|
|
except EnvironmentError:
|
|
log(DEBUG, f'Ignoring unreadable file "{version_path}"')
|
|
continue
|
|
raise Exception(f'No version file found for project "{project}"')
|
|
path = proj_root + '/make/project.conf'
|
|
ret = self.read_value(path, section, key)
|
|
log(DEBUG, 'Lookup %s -> %s / [%s%s] -> "%s"' %
|
|
(self.__top_name, project, section, '.' + key if key else '', ret))
|
|
return ret
|
|
|
|
def get_values(self, projects: list[str], sections: list[str], keys: list[str]) -> list[str]:
|
|
"""
|
|
Collect a list of values from a list of given projects, sections and
|
|
keys, maintaining order
|
|
"""
|
|
ret: list[str] = []
|
|
for p in projects:
|
|
for section in sections:
|
|
for key in keys:
|
|
vals = self.get_value(p, section, key)
|
|
if vals:
|
|
ret += [val.strip() for val in vals.split(',')]
|
|
return list(dict.fromkeys(ret)) # Remove duplicates, keep ordering
|
|
|
|
def get_project_refs(self, projects: list[str], sections: list[str],
|
|
keys: str|list[str], add_self: bool, scope: Scope, names_only=True) -> list[str]:
|
|
if isinstance(keys, str):
|
|
keys = [ keys ]
|
|
ret: list[str] = []
|
|
for section in sections:
|
|
for key in keys:
|
|
visited = set()
|
|
for name in projects:
|
|
rr: list[str] = []
|
|
self.__get_project_refs_cached(rr, visited, name, section, key, add_self, scope, names_only)
|
|
# TODO: this looks like a performance hogger
|
|
for m in rr:
|
|
if not m in ret:
|
|
ret.append(m)
|
|
return ret
|
|
|
|
def get_libname(self, projects) -> str:
|
|
vals = self.get_project_refs(projects, ['build'], 'libname',
|
|
scope = Scope.One, add_self=False, names_only=True)
|
|
if not vals:
|
|
return ' '.join(projects)
|
|
if 'none' in vals:
|
|
vals.remove('none')
|
|
return ' '.join(reversed(vals))
|
|
|
|
def is_excluded_from_build(self, project: str) -> str|None:
|
|
log(DEBUG, 'checking if project ' + project + ' is excluded from build')
|
|
exclude = self.get_project_refs([ project ], ['build'], 'exclude',
|
|
scope = Scope.One, add_self=False, names_only=True)
|
|
cascade = self.distro.os_cascade + [ 'all' ]
|
|
for p1 in exclude:
|
|
for p2 in cascade:
|
|
if p1 == p2:
|
|
return p1
|
|
return None
|
|
|
|
def find_circular_deps(self, projects: list[str], flavours: list[str]) -> bool:
|
|
return self.__find_circular_deps(projects, flavours)
|