from __future__ import annotations import abc import errno import sys from decimal import ROUND_FLOOR, Decimal from typing import TYPE_CHECKING, NamedTuple if TYPE_CHECKING: from typing import Type from types import TracebackType from .base import Input, InputMode, Result, StatResult from .FileContext import FileContext as Base from .log import DEBUG, ERR, NOTICE, log _US = '\x1f' # unlikely to appear in numeric output _BILLION = Decimal(1_000_000_000) def _looks_like_option_error(stderr: str | None) -> bool: if stderr is None: return False s = stderr.lower() return any( needle in s for needle in ( 'unrecognized option', 'illegal option', 'unknown option', 'invalid option', 'option requires an argument', ) ) def _raise_stat_error(path: str, result: Result) -> None: stderr = result.stderr_str_or_none or f'stat exited with status {result.status}' msg = stderr.strip() lower = msg.lower() if 'no such file' in lower: raise FileNotFoundError(errno.ENOENT, msg, path) if 'permission denied' in lower or 'operation not permitted' in lower: raise PermissionError(errno.EACCES, msg, path) raise OSError(errno.EIO, msg, path) def _parse_epoch(value: str) -> tuple[int, float, int]: """ Convert a decimal epoch timestamp string into: (integer seconds for tuple slot, float seconds for attribute, ns for *_ns) """ dec = Decimal(value.strip()) sec = int(dec.to_integral_value(rounding = ROUND_FLOOR)) ns = int((dec * _BILLION).to_integral_value(rounding = ROUND_FLOOR)) return sec, float(dec), ns def _build_stat_result(fields: list[str], mode_base: int) -> StatResult: if len(fields) != 13: raise ValueError( f'unexpected stat output: expected 13 fields, got {len(fields)}: {fields!r}' ) ( mode_s, ino_s, dev_s, nlink_s, uid_s, gid_s, size_s, atime_s, mtime_s, ctime_s, blksize_s, blocks_s, rdev_s, ) = fields st_mode = int(mode_s, mode_base) # st_ino = int(ino_s) # st_dev = int(dev_s) # st_nlink = int(nlink_s) st_uid = uid_s st_gid = gid_s st_size = int(size_s) st_atime_i, st_atime_f, st_atime_ns = _parse_epoch(atime_s) st_mtime_i, st_mtime_f, st_mtime_ns = _parse_epoch(mtime_s) st_ctime_i, st_ctime_f, st_ctime_ns = _parse_epoch(ctime_s) # st_blksize = int(blksize_s) # st_blocks = int(blocks_s) # st_rdev = int(rdev_s) return StatResult( mode = st_mode, owner = st_uid, group = st_gid, size = st_size, atime = st_atime_i, mtime = st_mtime_i, ctime = st_ctime_i, ) class ExecContext(Base): class CallContext: def __init__( self, parent: ExecContext, title: str | None, cmd: list[str], cmd_input: Input, mod_env: dict[str, str] | None, wd: str | None, log_prefix: str, throw: bool, verbose: bool | None, ) -> None: self.__cmd = cmd self.__wd = wd self.__log_prefix = log_prefix self.__parent = parent self.__pretty_cmd: str | None = None self.__title = ( title if title else f'{parent.uri}: Running {self.pretty_cmd} -' ) self.__delim = f'---- {self.__title} -' delim_len = 120 self.__delim += '-' * max(0, delim_len - len(self.__delim)) self.__mod_env = {'LC_ALL': 'C'} if mod_env is None else mod_env self.__cmd_input: bytes | None = None # -- At the end of this dance, interactive needs to be either True # or False interactive: bool | None = None cmd_input_bytes: None | bytes if isinstance(cmd_input, InputMode): cmd_input_bytes = None match cmd_input: case InputMode.Interactive: interactive = True case InputMode.NonInteractive: interactive = False case InputMode.OptInteractive: interactive = parent.interactive case InputMode.Auto: interactive = sys.stdin.isatty() if interactive is None: interactive = parent.interactive if interactive is None: interactive = sys.stdin.isatty() else: interactive = False if cmd_input is None: cmd_input_bytes = None elif isinstance(cmd_input, str): cmd_input_bytes = cmd_input.encode(sys.stdout.encoding or 'utf-8') else: cmd_input_bytes = cmd_input self.__cmd_input = cmd_input_bytes assert interactive in [True, False], f'Invalid: interactive = {interactive}' self.__interactive = interactive self.__throw = throw self.__verbose = verbose if verbose is not None else parent.verbose_default def __enter__(self) -> ExecContext.CallContext: self.log_delim(start = True) return self def __exit__( self, exc_type: Type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: self.log_delim(start = False) @property def log_prefix(self) -> str: return self.__log_prefix @property def interactive(self) -> bool: return self.__interactive @property def verbose(self) -> bool: return self.__verbose @property def cmd_input(self) -> bytes | None: return self.__cmd_input @property def mod_env(self) -> dict[str, str]: return self.__mod_env @property def throw(self) -> bool: return self.__throw @property def wd(self) -> str | None: return self.__wd @property def cmd(self) -> list[str]: return self.__cmd @property def pretty_cmd(self) -> str: if self.__pretty_cmd is None: from .util import pretty_cmd self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd) return self.__pretty_cmd def log(self, prio: int, *args, **kwargs) -> None: log(prio, self.__log_prefix, *args, **kwargs) def log_delim(self, start: bool) -> None: if not self.__verbose: return None if self.__interactive: # Don't log footer in interative mode if start: log(NOTICE, self.__delim) return delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <' log(NOTICE, delim) def check_exit_code(self, result: Result) -> None: if result.status == 0: return if self.__throw or self.__verbose: if self.__throw: raise RuntimeError(result.summary) def exception(self, result: Result, e: Exception) -> Result: log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}') if self.__throw: raise e return result @classmethod def __mode_str(cls, mode: int) -> str: return f'{mode:0o}' def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @classmethod def create(cls, *args, **kwargs) -> ExecContext: ret = super().create(*args, **kwargs) if not isinstance(ret, cls): raise TypeError(f'Expected {cls.__name__}, got {type(ret).__name__}') return ret @abc.abstractmethod async def _run( self, cmd: list[str], wd: str | None, verbose: bool, cmd_input: bytes | None, mod_env: dict[str, str] | None, interactive: bool, log_prefix: str, ) -> Result: raise NotImplementedError('Called pure virtual method _run()') async def run( self, cmd: list[str], wd: str | None = None, throw: bool = True, verbose: bool | None = None, cmd_input: Input = InputMode.OptInteractive, mod_env: dict[str, str] | None = None, title: str | None = None, ) -> Result: """ Run a command asynchronously and return its output Args: cmd: Command and arguments wd: Optional working directory throw: Raise an exception on non-zero exit status if True verbose: Emit log output while the command runs cmd_input: - "InputMode.OptInteractive" -> Let --interactive govern how to handle interactivity (default) - "InputMode.Interactive" -> Inherit terminal stdin - "InputMode.Auto" -> Inherit terminal stdin if it is a TTY - "InputMode.NonInteractive" -> stdin from /dev/null - None -> Alias for InputMode.NonInteractive - otherwise -> Feed cmd_input to stdin mod_env: Change set to command's environment: - key: val adds a variable, - key: None removes it Returns: A Result instance In PTY mode stderr is always None because PTY merges stdout/stderr. """ # Note that in the calls to the wrapped method, cmd_input == None can # be returned by CallContext and is very much allowed assert cmd_input is not None, 'Invalid: cmd_input is None' # Enclose multiple run() calls in an additional open() / close() pair # if you want the context to stay open between the calls await self.open() try: ret = Result(None, None, 1) with self.CallContext( self, title = title, cmd = cmd, cmd_input = cmd_input, mod_env = mod_env, wd = wd, log_prefix = '|', throw = throw, verbose = verbose, ) as cc: try: ret = await self._run( cmd = cc.cmd, wd = wd, verbose = cc.verbose, cmd_input = cc.cmd_input, mod_env = cc.mod_env, interactive = cc.interactive, log_prefix = cc.log_prefix, ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) finally: await self.close() return ret async def _sudo( self, cmd: list[str], opts: list[str] | None, wd: str | None, mod_env_sudo: dict[str, str] | None, mod_env_cmd: dict[str, str] | None, cmd_input: bytes | None, verbose: bool, interactive: bool, log_prefix: str, ) -> Result: def __check_equal_values(d1: dict[str, str], d2: dict[str, str]) -> None: for key, val in d1.items(): if d2.get(key, None) not in [None, val]: raise ValueError( 'Outer and inner environments differ at least for ' f'{key}: "{val}" != "{d2.get(key)}"' ) fw_cmd: list[str] = [] fw_env: dict[str, str] = {} if opts is None: opts = [] if mod_env_cmd: fw_env.update(mod_env_cmd) if self.username != 'root': if mod_env_sudo and mod_env_cmd: __check_equal_values(mod_env_sudo, mod_env_cmd) __check_equal_values(mod_env_cmd, mod_env_sudo) fw_cmd.append('/usr/bin/sudo') if mod_env_sudo: fw_env.update(mod_env_sudo) if mod_env_cmd: fw_cmd.append('--preserve-env=' + ','.join(mod_env_cmd.keys())) if wd is not None: opts.extend(['-D', wd]) wd = None fw_cmd.extend(opts) mod_env = fw_env if fw_env else None fw_cmd.extend(cmd) return await self._run( fw_cmd, wd = wd, mod_env = mod_env, verbose = verbose, cmd_input = cmd_input, interactive = interactive, log_prefix = log_prefix, ) async def sudo( self, cmd: list[str], opts: list[str] | None = None, wd: str | None = None, mod_env_sudo: dict[str, str] | None = None, mod_env_cmd: dict[str, str] | None = None, throw: bool = True, verbose: bool | None = None, cmd_input: Input = InputMode.OptInteractive, title: str | None = None, ) -> Result: # Note that in the calls to the wrapped method, cmd_input == None can # be returned by CallContext and is very much allowed assert cmd_input is not None, 'Invalid: cmd_input is None' ret = Result(None, None, 1) with self.CallContext( self, title = title, cmd = cmd, cmd_input = cmd_input, mod_env = mod_env_cmd, wd = wd, log_prefix = '|', throw = throw, verbose = verbose, ) as cc: try: ret = await self._sudo( cmd = cc.cmd, opts = opts, wd = cc.wd, mod_env_sudo = mod_env_sudo, mod_env_cmd = cc.mod_env, verbose = cc.verbose, cmd_input = cc.cmd_input, interactive = cc.interactive, log_prefix = cc.log_prefix, ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret async def _get( self, path: str, wd: str | None, throw: bool, verbose: bool | None, title: str ) -> Result: ret = Result(None, None, 1) if wd is not None: path = wd + '/' + path with self.CallContext( self, title = title, cmd = ['cat', path], cmd_input = InputMode.NonInteractive, wd = None, mod_env = None, log_prefix = '|', throw = throw, verbose = verbose, ) as cc: try: ret = await self._run( cmd = cc.cmd, wd = wd, verbose = cc.verbose, cmd_input = cc.cmd_input, mod_env = cc.mod_env, interactive = cc.interactive, log_prefix = cc.log_prefix, ) except Exception as e: return cc.exception(ret, e) if ret.matches_error('No such file'): raise FileNotFoundError(ret.summarize(cc.cmd, wd = cc.wd)) cc.check_exit_code(ret) return ret async def _put( self, path: str, content: bytes, wd: str | None, throw: bool, verbose: bool | None, title: str, owner: str | None, group: str | None, mode: str | None, atomic: bool, ) -> Result: from .util import pretty_cmd async def __run( cmd: list[str], cmd_input: Input = InputMode.NonInteractive, **kwargs ) -> Result: return await self.run(cmd, cmd_input = cmd_input, **kwargs) ret = Result(None, None, 1) try: class RemoteCmd(NamedTuple): cmd: list[str] cmd_input: Input = InputMode.NonInteractive tmp_file: str | None = None if wd is not None: path = wd + '/' + path cmds: list[RemoteCmd] = [] out = path if atomic: out = (await __run(['mktemp', path + '.XXXXXX'])).stdout_str.strip() tmp_file = out cmds.append(RemoteCmd( cmd = ['tee', out], cmd_input = content, )) if owner is not None and group is not None: cmds.append(RemoteCmd( cmd = ['chown', f'{owner}:{group}', out], )) elif owner is not None: cmds.append(RemoteCmd( cmd = ['chown', owner, out], )) elif group is not None: cmds.append(RemoteCmd( cmd = ['chgrp', group, out], )) if mode is not None: cmds.append(RemoteCmd( cmd = ['chmod', mode, out], )) if atomic: cmds.append(RemoteCmd( cmd = ['mv', out, path], )) await self.open() try: for cmd in cmds: log(DEBUG, f'{self.log_name}: Running {pretty_cmd(cmd.cmd, wd)}') ret = await __run(cmd.cmd, cmd_input = cmd.cmd_input) tmp_file = None # Has been successfully moved at this point return ret finally: if tmp_file is not None: await self.erase(tmp_file) await self.close() except Exception as e: msg = f'Failed to get {path} from {self.root} ({str(e)})' if throw: raise Exception(msg) log(ERR, msg) return ret async def _unlink(self, path: str) -> None: cmd = ['rm', '-f', path] await self.run(cmd, cmd_input = InputMode.NonInteractive) async def _erase(self, path: str) -> None: cmd = ['rm', '-rf', path] await self.run(cmd, cmd_input = InputMode.NonInteractive) async def _rename(self, src: str, dst: str) -> None: cmd = ['mv', src, dst] await self.run(cmd, cmd_input = InputMode.NonInteractive) async def _mkdir(self, path: str, mode: int) -> None: cmd = ['mkdir', path, '-m', self.__mode_str(mode)] await self.run(cmd, cmd_input = InputMode.NonInteractive) async def _mktemp(self, tmpl: str, directory: bool) -> str: cmd = ['mktemp'] if directory: cmd.append('-d') cmd.append(tmpl) result = await self.run(cmd, cmd_input = InputMode.NonInteractive, throw = True) if result.status != 0 or result.stdout is None: raise Exception( f'Failed to create temporary file on {self.root}: {result.summary}' ) return result.stdout_str.strip() async def _stat(self, path: str, follow_symlinks: bool) -> StatResult: async def __stat(opts: list[str]) -> Result: mod_env = {'LC_ALL': 'C'} cmd = ['stat'] if follow_symlinks: cmd.append('-L') cmd.extend(opts) cmd.append(path) return await self.run( cmd, mod_env = mod_env, throw = False, cmd_input = InputMode.NonInteractive ) # GNU coreutils stat gnu_format = _US.join( [ '%f', # st_mode in hex '%i', # st_ino '%d', # st_dev '%h', # st_nlink '%U', # st_uid '%G', # st_gid '%s', # st_size '%.9X', # st_atime '%.9Y', # st_mtime '%.9Z', # st_ctime '%o', # st_blksize hint '%b', # st_blocks '%r', # st_rdev ] ) result = await __stat(['--printf', gnu_format]) if result.status == 0 and result.stdout is not None: return _build_stat_result(result.stdout_str.split(_US), mode_base = 16) if not _looks_like_option_error(result.stderr_str_or_none): # log(DEBUG, f'GNU stat attempt failed on "{path}" ({str(e)})') _raise_stat_error(path, result) # BSD / macOS / OpenBSD / NetBSD stat bsd_format = _US.join( [ '%p', # st_mode in octal '%i', # st_ino '%d', # st_dev '%l', # st_nlink '%U', # st_uid '%G', # st_gid '%z', # st_size '%.9Fa', # st_atime '%.9Fm', # st_mtime '%.9Fc', # st_ctime '%k', # st_blksize '%b', # st_blocks '%r', # st_rdev ] ) result = await __stat(['-n', '-f', bsd_format]) stdout = result.stdout_str_or_none if result.status != 0 or stdout is None: _raise_stat_error(path, result) assert stdout is not None # Just there to pacify the linter return _build_stat_result(stdout.rstrip('\n').split(_US), mode_base = 8) async def _chown(self, path: str, owner: str | None, group: str | None) -> None: if owner is None and group is None: raise ValueError(f'Tried to chown("{path}") without owner and group') if group is None: ownership = owner elif owner is None: ownership = ':' + group else: ownership = owner + ':' + group assert ownership is not None # Impossible, just there to calm the linter await self.run(['chown', ownership, path], cmd_input = InputMode.NonInteractive) async def _chmod(self, path: str, mode: int) -> None: await self.run( ['chmod', self.__mode_str(mode), path], cmd_input = InputMode.NonInteractive )