# -*- coding: utf-8 -*- from __future__ import annotations from typing import TYPE_CHECKING from functools import cached_property if TYPE_CHECKING: import Iterable import abc, importlib, re from .ExecContext import ExecContext from .base import Result, InputMode from .Package import Package from .log import * class Distro(abc.ABC): def __init__(self, ec: ExecContext, id: str|None=None, os_release_str: str|None=None): assert ec is not None assert id is not None self.__exec_context = ec self.__os_release_str: str|None = os_release_str self.__id: str|None = None # Names that can be used by code outside this class to retrieve # distribution properties by # getattr(instance, name.replace('-', '_')) macro_names = [ 'os', 'id', 'name', 'codename', 'gnu-triplet', 'os-cascade', 'os-release', 'pkg-ext', ] # == Load @classmethod async def read_os_release_str(cls, ec: ExecContext) -> None: release_file = '/etc/os-release' try: result = await ec.get(release_file, throw=True) ret = result.stdout.decode().strip() except Exception as e: log(INFO, f'Failed to read {release_file} ({str(e)}), falling back to uname') result = await ec.run( ['uname', '-s'], throw=False, cmd_input=InputMode.NonInteractive ) if result.status != 0: log(ERR, f'/etc/os-release and uname both failed, the latter with exit status {result.status}') raise uname = result.decode().stdout.strip().lower() ret = f'ID={uname}\nVERSION_CODENAME=unknown' return ret @classmethod def parse_os_release_field(self, key: str, os_release_str: str, throw: bool=False) -> str: m = re.search(r'^\s*' + key + r'\s*=\s*("?)([^"\n]+)\1\s*$', os_release_str, re.MULTILINE) if m is None: if throw: raise Exception(f'Could not read "{key}=" from /etc/os-release') return None return m.group(2) @classmethod def parse_os_release_field_id(cls, os_release_str: str, throw: bool=False) -> str: ret = cls.parse_os_release_field('ID', os_release_str, throw=throw) match ret: case 'opensuse-tumbleweed': return 'suse' return ret @classmethod async def instantiate(cls, ec: ExecContext, *args, id: str|None=None, os_release_str: str|None=None, **kwargs): if id is None: os_release_str = await cls.read_os_release_str(ec) id = cls.parse_os_release_field_id(os_release_str, throw=True) backend_id = id.lower().replace('-', '_') match backend_id: case 'ubuntu' | 'raspbian' | 'kali': backend_id = 'debian' case 'centos': backend_id = 'redhat' case 'opensuse' | 'suse': backend_id = 'suse' module_path = 'jw.pkg.lib.distros.' + backend_id + '.Distro' try: module = importlib.import_module(module_path) except Exception as e: log(ERR, f'Failed to import Distro module {module_path} ({str(e)})') raise cls = getattr(module, 'Distro') ret = cls(ec, *args, id=id, os_release_str=os_release_str, **kwargs) return ret def os_release_field(self, key: str, throw: bool=False) -> str: return self.parse_os_release_field(key, self.os_release_str, throw) async def cache(self) -> None: if self.__os_release_str is None: self.__os_release_str = await self.read_os_release_str(self.__exec_context) @cached_property def os_cascade(self) -> list[str]: def __append(entry: str): if not entry in ret: ret.append(entry) ret = [ 'os' ] match self.id: case 'centos': __append('linux') __append('pkg-rpm') __append('pm-yum') __append('redhat') __append('rhel') case 'fedora' | 'rhel': __append('linux') __append('pkg-rpm') __append('pm-yum') __append('redhat') case 'suse': __append('linux') __append('pkg-rpm') __append('pm-zypper') case 'kali' | 'raspbian': __append('linux') __append('pkg-debian') __append('pm-apt') __append('debian') case 'ubuntu': __append('linux') __append('pkg-debian') __append('pm-apt') case 'archlinux': __append('linux') __append('pkg-pm') __append('pm-pacman') os = self.os name = re.sub(r'-.*', '', os) series = os rx = re.compile(r'\.[0-9]+$') while True: n = re.sub(rx, '', series) if n == series: break ret.append(n) series = n __append(name) __append(os) __append(self.id) # e.g. os, linux, suse, suse-tumbleweed return ret @cached_property def cascade(self) -> str: return ' '.join(self.os_cascade) @property def os_release_str(self) -> str: if self.__os_release_str is None: raise Exception(f'Tried to access OS release from an incompletely loaded Distro instance. Call reacache() before') return self.__os_release_str @cached_property def name(self) -> str: return self.os_release_field('NAME', throw=True) @cached_property def id(self) -> str: return self.parse_os_release_field_id(self.__os_release_str, throw=True) @cached_property def codename(self) -> str: match self.id: case 'suse': return self.os_release_field('ID', throw=True).split('-')[1] case 'kali': return self.os_release_field('VERSION_CODENAME', throw=True).split('-')[1] case _: return self.os_release_field('VERSION_CODENAME', throw=True) raise NotImplementedError(f'Can\'t determine code name from distribution ID {self.id}') @cached_property def os(self) -> str: return self.id + '-' + self.codename @cached_property def pkg_ext(self) -> str: for entry in self.os_cascade(): ret = entry.replace('pkg-', '') if ret != entry: return ret raise RuntimeError(f'No package extension in found in {self.os_cascade}') @cached_property def gnu_triplet(self) -> str: import sysconfig import shutil import subprocess # Best: GNU host triplet Python was built for for key in ("HOST_GNU_TYPE", "BUILD_GNU_TYPE"): # BUILD_GNU_TYPE can exist too ret = sysconfig.get_config_var(key) if isinstance(ret, str) and ret: return ret # Common on Debian/Ubuntu: multiarch component (often looks like a triplet) ret = sysconfig.get_config_var("MULTIARCH") if isinstance(ret, str) and ret: return ret # Sometimes exposed (privately) by CPython ret = getattr(sys.implementation, "_multiarch", None) if isinstance(ret, str) and ret: return ret # Last resort: ask the system compiler for cc in ("gcc", "cc", "clang"): path = shutil.which(cc) if not path: continue try: ret = subprocess.check_output([path, "-dumpmachine"], text=True, stderr=subprocess.DEVNULL).strip() if ret: return ret except Exception: pass raise RuntimeError('Failed to get GNU triplet from running machine') @classmethod def macros(cls) -> list[str]: return ['%%{' + name + '}' for name in cls.macro_names] def expand_macros(self, fmt: str|Iterable) -> str|Iterable: if not isinstance(fmt, str): ret: list[str] = [] for entry in fmt: ret.append(self.expand_macros(entry)) return ret ret = fmt for macro in re.findall("%{([A-Za-z_-]+)}", fmt): try: name = macro.replace('-', '_') val = getattr(self, name) patt = r'%{' + macro + r'}' if ret.find(patt) == -1: continue ret = ret.replace(patt, val) except Exception as e: log(ERR, f'Failed to expand macro "{macro}" inside "{fmt}": {str(e)}') raise return ret # == Convenience methods @property def ctx(self) -> ExecContext: return self.__exec_context async def run(self, *args, **kwargs) -> Result: return await self.__exec_context.run(*args, **kwargs) async def sudo(self, *args, **kwargs) -> Result: return await self.__exec_context.sudo(*args, **kwargs) @property def interactive(self) -> bool: return self.__exec_context.interactive # == Distribution abstraction methods # -- ref @abc.abstractmethod async def _ref(self) -> None: pass async def ref(self) -> None: return await self._ref() # -- dup @abc.abstractmethod async def _dup(self, download_only: bool) -> None: pass async def dup(self, download_only: bool=False) -> None: return await self._dup(download_only=download_only) # -- reboot_required @abc.abstractmethod async def _reboot_required(self, verbose: bool) -> bool: pass async def reboot_required(self, verbose: bool|None=None) -> bool: if verbose is None: verbose = self.ctx.verbose_default return await self._reboot_required(verbose=verbose) # -- select @abc.abstractmethod async def _select(self, names: Iterable[str]) -> Iterable[Package]: pass async def select(self, names: Iterable[str] = []) -> Iterable[Package]: return await self._select(names) # -- install # Pass names to the package manager @abc.abstractmethod async def _install(self, names: Iterable[str], only_update: bool) -> None: pass # Default implementation assumes package manager can handle local files. # Not true for all distros. Override if Distro knows better. async def _install_local_files(self, paths: Iterable[str], only_update: bool) -> None: await self._install(paths, only_update=only_update) # Download first and then install. Override if Distro knows better. async def _install_urls(self, urls: Iterable[str], only_update: bool) -> None: import tempfile from .util import copy with tempfile.TemporaryDirectory(prefix='jw-pkg-') as tmp: paths: list[str] = [] for url in urls: paths.append(await copy(url, tmp)) await self._install_local_files(paths, only_update=only_update) # Default implementation installs in two steps: # - Download URLs into local directories and install # - Pass names to package manager # Override if Distro knows better. async def _install_urls_and_names(self, packages: Iterable[str], only_update: bool) -> None: urls: list[str] = [] names: list[str] = [] for package in packages: if package[0] == '/': urls.append('file://' + package) continue if package.find('://') != -1: urls.append(package) continue names.append(package) if urls: await self._install_urls(urls, only_update=only_update) if names: await self._install(names, only_update=only_update) async def install(self, names: Iterable[str], only_update: bool=False) -> None: if not names: log(WARNING, f'No packages specified for installation') return await self._install_urls_and_names(names, only_update=only_update) # -- delete @abc.abstractmethod async def _delete(self, names: Iterable[str]) -> None: pass async def delete(self, names: Iterable[str]) -> None: if not names: log(WARNING, f'No packages specified for deletion') return return await self._delete(names) # -- pkg_files @abc.abstractmethod async def _pkg_files(self, name: str) -> Iterable[str]: pass async def pkg_files(self, name: str) -> Iterable[str]: if not name: log(WARNING, f'No package specified for inspection') return [] return await self._pkg_files(name)