jw-pkg/src/python/jw/pkg/lib/ExecContext.py
Jan Lindemann 238cc05d3b
All checks were successful
CI / Packaging - Kali Linux (pull_request) Successful in 3m18s
CI / Packaging - OpenSUSE Tumbleweed (pull_request) Successful in 3m24s
CI / Packaging test (pull_request) Successful in 0s
CI / Packaging - Kali Linux (push) Successful in 3m17s
CI / Packaging - OpenSUSE Tumbleweed (push) Successful in 3m21s
CI / Packaging test (push) Successful in 0s
lib.ExecContext.CallContext.log_delim: Beautify

Make the log delimiter look more consistent: Whether a CallContext was constructed with a title parameter or without, prefix its .log_delimiter property with a "----".

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-06-11 14:09:58 +02:00

679 lines
22 KiB
Python

from __future__ import annotations
import abc
import errno
import sys
from decimal import ROUND_FLOOR, Decimal
from typing import TYPE_CHECKING, NamedTuple
if TYPE_CHECKING:
from typing import Type
from types import TracebackType
from .base import Input, InputMode, Result, StatResult
from .FileContext import FileContext as Base
from .log import DEBUG, ERR, NOTICE, log
_US = '\x1f' # unlikely to appear in numeric output
_BILLION = Decimal(1_000_000_000)
def _looks_like_option_error(stderr: str | None) -> bool:
if stderr is None:
return False
s = stderr.lower()
return any(
needle in s for needle in (
'unrecognized option',
'illegal option',
'unknown option',
'invalid option',
'option requires an argument', )
)
def _raise_stat_error(path: str, result: Result) -> None:
stderr = result.stderr_str_or_none or f'stat exited with status {result.status}'
msg = stderr.strip()
lower = msg.lower()
if 'no such file' in lower:
raise FileNotFoundError(errno.ENOENT, msg, path)
if 'permission denied' in lower or 'operation not permitted' in lower:
raise PermissionError(errno.EACCES, msg, path)
raise OSError(errno.EIO, msg, path)
def _parse_epoch(value: str) -> tuple[int, float, int]:
"""
Convert a decimal epoch timestamp string into:
(integer seconds for tuple slot, float seconds for attribute, ns for *_ns)
"""
dec = Decimal(value.strip())
sec = int(dec.to_integral_value(rounding = ROUND_FLOOR))
ns = int((dec * _BILLION).to_integral_value(rounding = ROUND_FLOOR))
return sec, float(dec), ns
def _build_stat_result(fields: list[str], mode_base: int) -> StatResult:
if len(fields) != 13:
raise ValueError(
f'unexpected stat output: expected 13 fields, got {len(fields)}: {fields!r}'
)
(
mode_s,
ino_s,
dev_s,
nlink_s,
uid_s,
gid_s,
size_s,
atime_s,
mtime_s,
ctime_s,
blksize_s,
blocks_s,
rdev_s,
) = fields
st_mode = int(mode_s, mode_base)
# st_ino = int(ino_s)
# st_dev = int(dev_s)
# st_nlink = int(nlink_s)
st_uid = uid_s
st_gid = gid_s
st_size = int(size_s)
st_atime_i, st_atime_f, st_atime_ns = _parse_epoch(atime_s)
st_mtime_i, st_mtime_f, st_mtime_ns = _parse_epoch(mtime_s)
st_ctime_i, st_ctime_f, st_ctime_ns = _parse_epoch(ctime_s)
# st_blksize = int(blksize_s)
# st_blocks = int(blocks_s)
# st_rdev = int(rdev_s)
return StatResult(
mode = st_mode,
owner = st_uid,
group = st_gid,
size = st_size,
atime = st_atime_i,
mtime = st_mtime_i,
ctime = st_ctime_i,
)
class ExecContext(Base):
class CallContext:
def __init__(
self,
parent: ExecContext,
title: str | None,
cmd: list[str],
cmd_input: Input,
mod_env: dict[str, str] | None,
wd: str | None,
log_prefix: str,
throw: bool,
verbose: bool | None,
) -> None:
self.__cmd = cmd
self.__wd = wd
self.__log_prefix = log_prefix
self.__parent = parent
self.__pretty_cmd: str | None = None
self.__title = f'{parent.uri}: Running {self.pretty_cmd} -'
self.__delim = f'---- {title} -'
delim_len = 120
self.__delim += '-' * max(0, delim_len - len(self.__delim))
self.__mod_env = {'LC_ALL': 'C'} if mod_env is None else mod_env
self.__cmd_input: bytes | None = None
# -- At the end of this dance, interactive needs to be either True
# or False
interactive: bool | None = None
cmd_input_bytes: None | bytes
if isinstance(cmd_input, InputMode):
cmd_input_bytes = None
match cmd_input:
case InputMode.Interactive:
interactive = True
case InputMode.NonInteractive:
interactive = False
case InputMode.OptInteractive:
interactive = parent.interactive
case InputMode.Auto:
interactive = sys.stdin.isatty()
if interactive is None:
interactive = parent.interactive
if interactive is None:
interactive = sys.stdin.isatty()
else:
interactive = False
if cmd_input is None:
cmd_input_bytes = None
elif isinstance(cmd_input, str):
cmd_input_bytes = cmd_input.encode(sys.stdout.encoding or 'utf-8')
else:
cmd_input_bytes = cmd_input
self.__cmd_input = cmd_input_bytes
assert interactive in [True, False], f'Invalid: interactive = {interactive}'
self.__interactive = interactive
self.__throw = throw
self.__verbose = verbose if verbose is not None else parent.verbose_default
def __enter__(self) -> ExecContext.CallContext:
self.log_delim(start = True)
return self
def __exit__(
self,
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.log_delim(start = False)
@property
def log_prefix(self) -> str:
return self.__log_prefix
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose(self) -> bool:
return self.__verbose
@property
def cmd_input(self) -> bytes | None:
return self.__cmd_input
@property
def mod_env(self) -> dict[str, str]:
return self.__mod_env
@property
def throw(self) -> bool:
return self.__throw
@property
def wd(self) -> str | None:
return self.__wd
@property
def cmd(self) -> list[str]:
return self.__cmd
@property
def pretty_cmd(self) -> str:
if self.__pretty_cmd is None:
from .util import pretty_cmd
self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd)
return self.__pretty_cmd
def log(self, prio: int, *args, **kwargs) -> None:
log(prio, self.__log_prefix, *args, **kwargs)
def log_delim(self, start: bool) -> None:
if not self.__verbose:
return None
if self.__interactive: # Don't log footer in interative mode
if start:
log(NOTICE, self.__delim)
return
delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <'
log(NOTICE, delim)
def check_exit_code(self, result: Result) -> None:
if result.status == 0:
return
if self.__throw or self.__verbose:
if self.__throw:
raise RuntimeError(result.summary)
def exception(self, result: Result, e: Exception) -> Result:
log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}')
if self.__throw:
raise e
return result
@classmethod
def __mode_str(cls, mode: int) -> str:
return f'{mode:0o}'
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@classmethod
def create(cls, *args, **kwargs) -> ExecContext:
ret = super().create(*args, **kwargs)
if not isinstance(ret, cls):
raise TypeError(f'Expected {cls.__name__}, got {type(ret).__name__}')
return ret
@abc.abstractmethod
async def _run(
self,
cmd: list[str],
wd: str | None,
verbose: bool,
cmd_input: bytes | None,
mod_env: dict[str, str] | None,
interactive: bool,
log_prefix: str,
) -> Result:
raise NotImplementedError('Called pure virtual method _run()')
async def run(
self,
cmd: list[str],
wd: str | None = None,
throw: bool = True,
verbose: bool | None = None,
cmd_input: Input = InputMode.OptInteractive,
mod_env: dict[str, str] | None = None,
title: str | None = None,
) -> Result:
"""
Run a command asynchronously and return its output
Args:
cmd: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
cmd_input:
- "InputMode.OptInteractive" -> Let --interactive govern how to handle
interactivity (default)
- "InputMode.Interactive" -> Inherit terminal stdin
- "InputMode.Auto" -> Inherit terminal stdin if it is a TTY
- "InputMode.NonInteractive" -> stdin from /dev/null
- None -> Alias for InputMode.NonInteractive
- otherwise -> Feed cmd_input to stdin
mod_env: Change set to command's environment:
- key: val adds a variable,
- key: None removes it
Returns:
A Result instance
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
# Note that in the calls to the wrapped method, cmd_input == None can
# be returned by CallContext and is very much allowed
assert cmd_input is not None, 'Invalid: cmd_input is None'
# Enclose multiple run() calls in an additional open() / close() pair
# if you want the context to stay open between the calls
await self.open()
try:
ret = Result(None, None, 1)
with self.CallContext(
self,
title = title,
cmd = cmd,
cmd_input = cmd_input,
mod_env = mod_env,
wd = wd,
log_prefix = '|',
throw = throw,
verbose = verbose,
) as cc:
try:
ret = await self._run(
cmd = cc.cmd,
wd = wd,
verbose = cc.verbose,
cmd_input = cc.cmd_input,
mod_env = cc.mod_env,
interactive = cc.interactive,
log_prefix = cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
finally:
await self.close()
return ret
async def _sudo(
self,
cmd: list[str],
opts: list[str] | None,
wd: str | None,
mod_env_sudo: dict[str, str] | None,
mod_env_cmd: dict[str, str] | None,
cmd_input: bytes | None,
verbose: bool,
interactive: bool,
log_prefix: str,
) -> Result:
def __check_equal_values(d1: dict[str, str], d2: dict[str, str]) -> None:
for key, val in d1.items():
if d2.get(key, None) not in [None, val]:
raise ValueError(
'Outer and inner environments differ at least for '
f'{key}: "{val}" != "{d2.get(key)}"'
)
fw_cmd: list[str] = []
fw_env: dict[str, str] = {}
if opts is None:
opts = []
if mod_env_cmd:
fw_env.update(mod_env_cmd)
if self.username != 'root':
if mod_env_sudo and mod_env_cmd:
__check_equal_values(mod_env_sudo, mod_env_cmd)
__check_equal_values(mod_env_cmd, mod_env_sudo)
fw_cmd.append('/usr/bin/sudo')
if mod_env_sudo:
fw_env.update(mod_env_sudo)
if mod_env_cmd:
fw_cmd.append('--preserve-env=' + ','.join(mod_env_cmd.keys()))
if wd is not None:
opts.extend(['-D', wd])
wd = None
fw_cmd.extend(opts)
mod_env = fw_env if fw_env else None
fw_cmd.extend(cmd)
return await self._run(
fw_cmd,
wd = wd,
mod_env = mod_env,
verbose = verbose,
cmd_input = cmd_input,
interactive = interactive,
log_prefix = log_prefix,
)
async def sudo(
self,
cmd: list[str],
opts: list[str] | None = None,
wd: str | None = None,
mod_env_sudo: dict[str, str] | None = None,
mod_env_cmd: dict[str, str] | None = None,
throw: bool = True,
verbose: bool | None = None,
cmd_input: Input = InputMode.OptInteractive,
title: str | None = None,
) -> Result:
# Note that in the calls to the wrapped method, cmd_input == None can
# be returned by CallContext and is very much allowed
assert cmd_input is not None, 'Invalid: cmd_input is None'
ret = Result(None, None, 1)
with self.CallContext(
self,
title = title,
cmd = cmd,
cmd_input = cmd_input,
mod_env = mod_env_cmd,
wd = wd,
log_prefix = '|',
throw = throw,
verbose = verbose,
) as cc:
try:
ret = await self._sudo(
cmd = cc.cmd,
opts = opts,
wd = cc.wd,
mod_env_sudo = mod_env_sudo,
mod_env_cmd = cc.mod_env,
verbose = cc.verbose,
cmd_input = cc.cmd_input,
interactive = cc.interactive,
log_prefix = cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
async def _get(
self, path: str, wd: str | None, throw: bool, verbose: bool | None, title: str
) -> Result:
ret = Result(None, None, 1)
if wd is not None:
path = wd + '/' + path
with self.CallContext(
self,
title = title,
cmd = ['cat', path],
cmd_input = InputMode.NonInteractive,
wd = None,
mod_env = None,
log_prefix = '|',
throw = throw,
verbose = verbose,
) as cc:
try:
ret = await self._run(
cmd = cc.cmd,
wd = wd,
verbose = cc.verbose,
cmd_input = cc.cmd_input,
mod_env = cc.mod_env,
interactive = cc.interactive,
log_prefix = cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
if ret.matches_error('No such file'):
raise FileNotFoundError(ret.summarize(cc.cmd, wd = cc.wd))
cc.check_exit_code(ret)
return ret
async def _put(
self,
path: str,
content: bytes,
wd: str | None,
throw: bool,
verbose: bool | None,
title: str,
owner: str | None,
group: str | None,
mode: str | None,
atomic: bool,
) -> Result:
from .util import pretty_cmd
async def __run(
cmd: list[str],
cmd_input: Input = InputMode.NonInteractive,
**kwargs
) -> Result:
return await self.run(cmd, cmd_input = cmd_input, **kwargs)
ret = Result(None, None, 1)
try:
class RemoteCmd(NamedTuple):
cmd: list[str]
cmd_input: Input = InputMode.NonInteractive
tmp_file: str | None = None
if wd is not None:
path = wd + '/' + path
cmds: list[RemoteCmd] = []
out = path
if atomic:
out = (await __run(['mktemp', path + '.XXXXXX'])).stdout_str.strip()
tmp_file = out
cmds.append(RemoteCmd(
cmd = ['tee', out],
cmd_input = content,
))
if owner is not None and group is not None:
cmds.append(RemoteCmd(
cmd = ['chown', f'{owner}:{group}', out],
))
elif owner is not None:
cmds.append(RemoteCmd(
cmd = ['chown', owner, out],
))
elif group is not None:
cmds.append(RemoteCmd(
cmd = ['chgrp', group, out],
))
if mode is not None:
cmds.append(RemoteCmd(
cmd = ['chmod', mode, out],
))
if atomic:
cmds.append(RemoteCmd(
cmd = ['mv', out, path],
))
await self.open()
try:
for cmd in cmds:
log(DEBUG, f'{self.log_name}: Running {pretty_cmd(cmd.cmd, wd)}')
ret = await __run(cmd.cmd, cmd_input = cmd.cmd_input)
tmp_file = None # Has been successfully moved at this point
return ret
finally:
if tmp_file is not None:
await self.erase(tmp_file)
await self.close()
except Exception as e:
msg = f'Failed to get {path} from {self.root} ({str(e)})'
if throw:
raise Exception(msg)
log(ERR, msg)
return ret
async def _unlink(self, path: str) -> None:
cmd = ['rm', '-f', path]
await self.run(cmd, cmd_input = InputMode.NonInteractive)
async def _erase(self, path: str) -> None:
cmd = ['rm', '-rf', path]
await self.run(cmd, cmd_input = InputMode.NonInteractive)
async def _rename(self, src: str, dst: str) -> None:
cmd = ['mv', src, dst]
await self.run(cmd, cmd_input = InputMode.NonInteractive)
async def _mkdir(self, path: str, mode: int) -> None:
cmd = ['mkdir', path, '-m', self.__mode_str(mode)]
await self.run(cmd, cmd_input = InputMode.NonInteractive)
async def _mktemp(self, tmpl: str, directory: bool) -> str:
cmd = ['mktemp']
if directory:
cmd.append('-d')
cmd.append(tmpl)
result = await self.run(cmd, cmd_input = InputMode.NonInteractive, throw = True)
if result.status != 0 or result.stdout is None:
raise Exception(
f'Failed to create temporary file on {self.root}: {result.summary}'
)
return result.stdout_str.strip()
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
async def __stat(opts: list[str]) -> Result:
mod_env = {'LC_ALL': 'C'}
cmd = ['stat']
if follow_symlinks:
cmd.append('-L')
cmd.extend(opts)
cmd.append(path)
return await self.run(
cmd,
mod_env = mod_env,
throw = False,
cmd_input = InputMode.NonInteractive
)
# GNU coreutils stat
gnu_format = _US.join(
[
'%f', # st_mode in hex
'%i', # st_ino
'%d', # st_dev
'%h', # st_nlink
'%U', # st_uid
'%G', # st_gid
'%s', # st_size
'%.9X', # st_atime
'%.9Y', # st_mtime
'%.9Z', # st_ctime
'%o', # st_blksize hint
'%b', # st_blocks
'%r', # st_rdev
]
)
result = await __stat(['--printf', gnu_format])
if result.status == 0 and result.stdout is not None:
return _build_stat_result(result.stdout_str.split(_US), mode_base = 16)
if not _looks_like_option_error(result.stderr_str_or_none):
# log(DEBUG, f'GNU stat attempt failed on "{path}" ({str(e)})')
_raise_stat_error(path, result)
# BSD / macOS / OpenBSD / NetBSD stat
bsd_format = _US.join(
[
'%p', # st_mode in octal
'%i', # st_ino
'%d', # st_dev
'%l', # st_nlink
'%U', # st_uid
'%G', # st_gid
'%z', # st_size
'%.9Fa', # st_atime
'%.9Fm', # st_mtime
'%.9Fc', # st_ctime
'%k', # st_blksize
'%b', # st_blocks
'%r', # st_rdev
]
)
result = await __stat(['-n', '-f', bsd_format])
stdout = result.stdout_str_or_none
if result.status != 0 or stdout is None:
_raise_stat_error(path, result)
assert stdout is not None # Just there to pacify the linter
return _build_stat_result(stdout.rstrip('\n').split(_US), mode_base = 8)
async def _chown(self, path: str, owner: str | None, group: str | None) -> None:
if owner is None and group is None:
raise ValueError(f'Tried to chown("{path}") without owner and group')
if group is None:
ownership = owner
elif owner is None:
ownership = ':' + group
else:
ownership = owner + ':' + group
assert ownership is not None # Impossible, just there to calm the linter
await self.run(['chown', ownership, path], cmd_input = InputMode.NonInteractive)
async def _chmod(self, path: str, mode: int) -> None:
await self.run(
['chmod', self.__mode_str(mode), path],
cmd_input = InputMode.NonInteractive
)