Commands executed by ExecContext and its derived classes don't populate the "cmd" parameter of "Result"'s constructor. Fixing that makes for nicer error messages.
Signed-off-by: Jan Lindemann <jan@janware.com>
681 lines
22 KiB
Python
681 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
import abc
|
|
import errno
|
|
import sys
|
|
|
|
from decimal import ROUND_FLOOR, Decimal
|
|
from typing import TYPE_CHECKING, NamedTuple
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Type
|
|
from types import TracebackType
|
|
|
|
from .base import Input, InputMode, Result, StatResult
|
|
from .FileContext import FileContext as Base
|
|
from .log import DEBUG, ERR, NOTICE, log
|
|
|
|
_US = '\x1f' # unlikely to appear in numeric output
|
|
_BILLION = Decimal(1_000_000_000)
|
|
|
|
def _looks_like_option_error(stderr: str | None) -> bool:
|
|
if stderr is None:
|
|
return False
|
|
s = stderr.lower()
|
|
return any(
|
|
needle in s for needle in (
|
|
'unrecognized option',
|
|
'illegal option',
|
|
'unknown option',
|
|
'invalid option',
|
|
'option requires an argument', )
|
|
)
|
|
|
|
def _raise_stat_error(path: str, result: Result) -> None:
|
|
stderr = result.stderr_str_or_none or f'stat exited with status {result.status}'
|
|
msg = stderr.strip()
|
|
lower = msg.lower()
|
|
if 'no such file' in lower:
|
|
raise FileNotFoundError(errno.ENOENT, msg, path)
|
|
if 'permission denied' in lower or 'operation not permitted' in lower:
|
|
raise PermissionError(errno.EACCES, msg, path)
|
|
raise OSError(errno.EIO, msg, path)
|
|
|
|
def _parse_epoch(value: str) -> tuple[int, float, int]:
|
|
"""
|
|
Convert a decimal epoch timestamp string into:
|
|
(integer seconds for tuple slot, float seconds for attribute, ns for *_ns)
|
|
"""
|
|
dec = Decimal(value.strip())
|
|
sec = int(dec.to_integral_value(rounding = ROUND_FLOOR))
|
|
ns = int((dec * _BILLION).to_integral_value(rounding = ROUND_FLOOR))
|
|
return sec, float(dec), ns
|
|
|
|
def _build_stat_result(fields: list[str], mode_base: int) -> StatResult:
|
|
if len(fields) != 13:
|
|
raise ValueError(
|
|
f'unexpected stat output: expected 13 fields, got {len(fields)}: {fields!r}'
|
|
)
|
|
|
|
(
|
|
mode_s,
|
|
ino_s,
|
|
dev_s,
|
|
nlink_s,
|
|
uid_s,
|
|
gid_s,
|
|
size_s,
|
|
atime_s,
|
|
mtime_s,
|
|
ctime_s,
|
|
blksize_s,
|
|
blocks_s,
|
|
rdev_s,
|
|
) = fields
|
|
|
|
st_mode = int(mode_s, mode_base)
|
|
# st_ino = int(ino_s)
|
|
# st_dev = int(dev_s)
|
|
# st_nlink = int(nlink_s)
|
|
st_uid = uid_s
|
|
st_gid = gid_s
|
|
st_size = int(size_s)
|
|
|
|
st_atime_i, st_atime_f, st_atime_ns = _parse_epoch(atime_s)
|
|
st_mtime_i, st_mtime_f, st_mtime_ns = _parse_epoch(mtime_s)
|
|
st_ctime_i, st_ctime_f, st_ctime_ns = _parse_epoch(ctime_s)
|
|
|
|
# st_blksize = int(blksize_s)
|
|
# st_blocks = int(blocks_s)
|
|
# st_rdev = int(rdev_s)
|
|
|
|
return StatResult(
|
|
mode = st_mode,
|
|
owner = st_uid,
|
|
group = st_gid,
|
|
size = st_size,
|
|
atime = st_atime_i,
|
|
mtime = st_mtime_i,
|
|
ctime = st_ctime_i,
|
|
)
|
|
|
|
class ExecContext(Base):
|
|
|
|
class CallContext:
|
|
|
|
def __init__(
|
|
self,
|
|
parent: ExecContext,
|
|
title: str | None,
|
|
cmd: list[str],
|
|
cmd_input: Input,
|
|
mod_env: dict[str, str] | None,
|
|
wd: str | None,
|
|
log_prefix: str,
|
|
throw: bool,
|
|
verbose: bool | None,
|
|
) -> None:
|
|
self.__cmd = cmd
|
|
self.__wd = wd
|
|
self.__log_prefix = log_prefix
|
|
self.__parent = parent
|
|
self.__pretty_cmd: str | None = None
|
|
self.__title = (
|
|
title if title else f'{parent.uri}: Running {self.pretty_cmd} -'
|
|
)
|
|
self.__delim = f'---- {self.__title} -'
|
|
delim_len = 120
|
|
self.__delim += '-' * max(0, delim_len - len(self.__delim))
|
|
self.__mod_env = {'LC_ALL': 'C'} if mod_env is None else mod_env
|
|
self.__cmd_input: bytes | None = None
|
|
|
|
# -- At the end of this dance, interactive needs to be either True
|
|
# or False
|
|
interactive: bool | None = None
|
|
cmd_input_bytes: None | bytes
|
|
if isinstance(cmd_input, InputMode):
|
|
cmd_input_bytes = None
|
|
match cmd_input:
|
|
case InputMode.Interactive:
|
|
interactive = True
|
|
case InputMode.NonInteractive:
|
|
interactive = False
|
|
case InputMode.OptInteractive:
|
|
interactive = parent.interactive
|
|
case InputMode.Auto:
|
|
interactive = sys.stdin.isatty()
|
|
if interactive is None:
|
|
interactive = parent.interactive
|
|
if interactive is None:
|
|
interactive = sys.stdin.isatty()
|
|
else:
|
|
interactive = False
|
|
if cmd_input is None:
|
|
cmd_input_bytes = None
|
|
elif isinstance(cmd_input, str):
|
|
cmd_input_bytes = cmd_input.encode(sys.stdout.encoding or 'utf-8')
|
|
else:
|
|
cmd_input_bytes = cmd_input
|
|
self.__cmd_input = cmd_input_bytes
|
|
|
|
assert interactive in [True, False], f'Invalid: interactive = {interactive}'
|
|
self.__interactive = interactive
|
|
|
|
self.__throw = throw
|
|
self.__verbose = verbose if verbose is not None else parent.verbose_default
|
|
|
|
def __enter__(self) -> ExecContext.CallContext:
|
|
self.log_delim(start = True)
|
|
return self
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: Type[BaseException] | None,
|
|
exc_value: BaseException | None,
|
|
traceback: TracebackType | None,
|
|
) -> None:
|
|
self.log_delim(start = False)
|
|
|
|
@property
|
|
def log_prefix(self) -> str:
|
|
return self.__log_prefix
|
|
|
|
@property
|
|
def interactive(self) -> bool:
|
|
return self.__interactive
|
|
|
|
@property
|
|
def verbose(self) -> bool:
|
|
return self.__verbose
|
|
|
|
@property
|
|
def cmd_input(self) -> bytes | None:
|
|
return self.__cmd_input
|
|
|
|
@property
|
|
def mod_env(self) -> dict[str, str]:
|
|
return self.__mod_env
|
|
|
|
@property
|
|
def throw(self) -> bool:
|
|
return self.__throw
|
|
|
|
@property
|
|
def wd(self) -> str | None:
|
|
return self.__wd
|
|
|
|
@property
|
|
def cmd(self) -> list[str]:
|
|
return self.__cmd
|
|
|
|
@property
|
|
def pretty_cmd(self) -> str:
|
|
if self.__pretty_cmd is None:
|
|
from .util import pretty_cmd
|
|
|
|
self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd)
|
|
return self.__pretty_cmd
|
|
|
|
def log(self, prio: int, *args, **kwargs) -> None:
|
|
log(prio, self.__log_prefix, *args, **kwargs)
|
|
|
|
def log_delim(self, start: bool) -> None:
|
|
if not self.__verbose:
|
|
return None
|
|
if self.__interactive: # Don't log footer in interative mode
|
|
if start:
|
|
log(NOTICE, self.__delim)
|
|
return
|
|
delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <'
|
|
log(NOTICE, delim)
|
|
|
|
def check_exit_code(self, result: Result) -> None:
|
|
if result.status == 0:
|
|
return
|
|
if self.__throw or self.__verbose:
|
|
if self.__throw:
|
|
raise RuntimeError(result.summary)
|
|
|
|
def exception(self, result: Result, e: Exception) -> Result:
|
|
log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}')
|
|
if self.__throw:
|
|
raise e
|
|
return result
|
|
|
|
@classmethod
|
|
def __mode_str(cls, mode: int) -> str:
|
|
return f'{mode:0o}'
|
|
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def create(cls, *args, **kwargs) -> ExecContext:
|
|
ret = super().create(*args, **kwargs)
|
|
if not isinstance(ret, cls):
|
|
raise TypeError(f'Expected {cls.__name__}, got {type(ret).__name__}')
|
|
return ret
|
|
|
|
@abc.abstractmethod
|
|
async def _run(
|
|
self,
|
|
cmd: list[str],
|
|
wd: str | None,
|
|
verbose: bool,
|
|
cmd_input: bytes | None,
|
|
mod_env: dict[str, str] | None,
|
|
interactive: bool,
|
|
log_prefix: str,
|
|
) -> Result:
|
|
raise NotImplementedError('Called pure virtual method _run()')
|
|
|
|
async def run(
|
|
self,
|
|
cmd: list[str],
|
|
wd: str | None = None,
|
|
throw: bool = True,
|
|
verbose: bool | None = None,
|
|
cmd_input: Input = InputMode.OptInteractive,
|
|
mod_env: dict[str, str] | None = None,
|
|
title: str | None = None,
|
|
) -> Result:
|
|
"""
|
|
Run a command asynchronously and return its output
|
|
|
|
Args:
|
|
cmd: Command and arguments
|
|
wd: Optional working directory
|
|
throw: Raise an exception on non-zero exit status if True
|
|
verbose: Emit log output while the command runs
|
|
cmd_input:
|
|
- "InputMode.OptInteractive" -> Let --interactive govern how to handle
|
|
interactivity (default)
|
|
- "InputMode.Interactive" -> Inherit terminal stdin
|
|
- "InputMode.Auto" -> Inherit terminal stdin if it is a TTY
|
|
- "InputMode.NonInteractive" -> stdin from /dev/null
|
|
- None -> Alias for InputMode.NonInteractive
|
|
- otherwise -> Feed cmd_input to stdin
|
|
mod_env: Change set to command's environment:
|
|
- key: val adds a variable,
|
|
- key: None removes it
|
|
|
|
Returns:
|
|
A Result instance
|
|
In PTY mode stderr is always None because PTY merges stdout/stderr.
|
|
"""
|
|
|
|
# Note that in the calls to the wrapped method, cmd_input == None can
|
|
# be returned by CallContext and is very much allowed
|
|
assert cmd_input is not None, 'Invalid: cmd_input is None'
|
|
|
|
# Enclose multiple run() calls in an additional open() / close() pair
|
|
# if you want the context to stay open between the calls
|
|
await self.open()
|
|
|
|
try:
|
|
ret = Result(cmd = cmd)
|
|
with self.CallContext(
|
|
self,
|
|
title = title,
|
|
cmd = cmd,
|
|
cmd_input = cmd_input,
|
|
mod_env = mod_env,
|
|
wd = wd,
|
|
log_prefix = '|',
|
|
throw = throw,
|
|
verbose = verbose,
|
|
) as cc:
|
|
try:
|
|
ret = await self._run(
|
|
cmd = cc.cmd,
|
|
wd = wd,
|
|
verbose = cc.verbose,
|
|
cmd_input = cc.cmd_input,
|
|
mod_env = cc.mod_env,
|
|
interactive = cc.interactive,
|
|
log_prefix = cc.log_prefix,
|
|
)
|
|
except Exception as e:
|
|
return cc.exception(ret, e)
|
|
cc.check_exit_code(ret)
|
|
finally:
|
|
await self.close()
|
|
|
|
return ret
|
|
|
|
async def _sudo(
|
|
self,
|
|
cmd: list[str],
|
|
opts: list[str] | None,
|
|
wd: str | None,
|
|
mod_env_sudo: dict[str, str] | None,
|
|
mod_env_cmd: dict[str, str] | None,
|
|
cmd_input: bytes | None,
|
|
verbose: bool,
|
|
interactive: bool,
|
|
log_prefix: str,
|
|
) -> Result:
|
|
|
|
def __check_equal_values(d1: dict[str, str], d2: dict[str, str]) -> None:
|
|
for key, val in d1.items():
|
|
if d2.get(key, None) not in [None, val]:
|
|
raise ValueError(
|
|
'Outer and inner environments differ at least for '
|
|
f'{key}: "{val}" != "{d2.get(key)}"'
|
|
)
|
|
|
|
fw_cmd: list[str] = []
|
|
fw_env: dict[str, str] = {}
|
|
|
|
if opts is None:
|
|
opts = []
|
|
|
|
if mod_env_cmd:
|
|
fw_env.update(mod_env_cmd)
|
|
|
|
if self.username != 'root':
|
|
if mod_env_sudo and mod_env_cmd:
|
|
__check_equal_values(mod_env_sudo, mod_env_cmd)
|
|
__check_equal_values(mod_env_cmd, mod_env_sudo)
|
|
|
|
fw_cmd.append('/usr/bin/sudo')
|
|
if mod_env_sudo:
|
|
fw_env.update(mod_env_sudo)
|
|
if mod_env_cmd:
|
|
fw_cmd.append('--preserve-env=' + ','.join(mod_env_cmd.keys()))
|
|
|
|
if wd is not None:
|
|
opts.extend(['-D', wd])
|
|
wd = None
|
|
|
|
fw_cmd.extend(opts)
|
|
|
|
mod_env = fw_env if fw_env else None
|
|
|
|
fw_cmd.extend(cmd)
|
|
|
|
return await self._run(
|
|
fw_cmd,
|
|
wd = wd,
|
|
mod_env = mod_env,
|
|
verbose = verbose,
|
|
cmd_input = cmd_input,
|
|
interactive = interactive,
|
|
log_prefix = log_prefix,
|
|
)
|
|
|
|
async def sudo(
|
|
self,
|
|
cmd: list[str],
|
|
opts: list[str] | None = None,
|
|
wd: str | None = None,
|
|
mod_env_sudo: dict[str, str] | None = None,
|
|
mod_env_cmd: dict[str, str] | None = None,
|
|
throw: bool = True,
|
|
verbose: bool | None = None,
|
|
cmd_input: Input = InputMode.OptInteractive,
|
|
title: str | None = None,
|
|
) -> Result:
|
|
|
|
# Note that in the calls to the wrapped method, cmd_input == None can
|
|
# be returned by CallContext and is very much allowed
|
|
assert cmd_input is not None, 'Invalid: cmd_input is None'
|
|
|
|
ret = Result(cmd = cmd)
|
|
with self.CallContext(
|
|
self,
|
|
title = title,
|
|
cmd = cmd,
|
|
cmd_input = cmd_input,
|
|
mod_env = mod_env_cmd,
|
|
wd = wd,
|
|
log_prefix = '|',
|
|
throw = throw,
|
|
verbose = verbose,
|
|
) as cc:
|
|
try:
|
|
ret = await self._sudo(
|
|
cmd = cc.cmd,
|
|
opts = opts,
|
|
wd = cc.wd,
|
|
mod_env_sudo = mod_env_sudo,
|
|
mod_env_cmd = cc.mod_env,
|
|
verbose = cc.verbose,
|
|
cmd_input = cc.cmd_input,
|
|
interactive = cc.interactive,
|
|
log_prefix = cc.log_prefix,
|
|
)
|
|
|
|
except Exception as e:
|
|
return cc.exception(ret, e)
|
|
cc.check_exit_code(ret)
|
|
return ret
|
|
|
|
async def _get(
|
|
self, path: str, wd: str | None, throw: bool, verbose: bool | None, title: str
|
|
) -> Result:
|
|
ret = Result()
|
|
if wd is not None:
|
|
path = wd + '/' + path
|
|
with self.CallContext(
|
|
self,
|
|
title = title,
|
|
cmd = ['cat', path],
|
|
cmd_input = InputMode.NonInteractive,
|
|
wd = None,
|
|
mod_env = None,
|
|
log_prefix = '|',
|
|
throw = throw,
|
|
verbose = verbose,
|
|
) as cc:
|
|
try:
|
|
ret = await self._run(
|
|
cmd = cc.cmd,
|
|
wd = wd,
|
|
verbose = cc.verbose,
|
|
cmd_input = cc.cmd_input,
|
|
mod_env = cc.mod_env,
|
|
interactive = cc.interactive,
|
|
log_prefix = cc.log_prefix,
|
|
)
|
|
except Exception as e:
|
|
return cc.exception(ret, e)
|
|
if ret.matches_error('No such file'):
|
|
raise FileNotFoundError(ret.summarize(cc.cmd, wd = cc.wd))
|
|
cc.check_exit_code(ret)
|
|
return ret
|
|
|
|
async def _put(
|
|
self,
|
|
path: str,
|
|
content: bytes,
|
|
wd: str | None,
|
|
throw: bool,
|
|
verbose: bool | None,
|
|
title: str,
|
|
owner: str | None,
|
|
group: str | None,
|
|
mode: str | None,
|
|
atomic: bool,
|
|
) -> Result:
|
|
|
|
from .util import pretty_cmd
|
|
|
|
async def __run(
|
|
cmd: list[str],
|
|
cmd_input: Input = InputMode.NonInteractive,
|
|
**kwargs
|
|
) -> Result:
|
|
return await self.run(cmd, cmd_input = cmd_input, **kwargs)
|
|
|
|
ret = Result()
|
|
try:
|
|
|
|
class RemoteCmd(NamedTuple):
|
|
cmd: list[str]
|
|
cmd_input: Input = InputMode.NonInteractive
|
|
|
|
tmp_file: str | None = None
|
|
if wd is not None:
|
|
path = wd + '/' + path
|
|
cmds: list[RemoteCmd] = []
|
|
out = path
|
|
if atomic:
|
|
out = (await __run(['mktemp', path + '.XXXXXX'])).stdout_str.strip()
|
|
tmp_file = out
|
|
cmds.append(RemoteCmd(
|
|
cmd = ['tee', out],
|
|
cmd_input = content,
|
|
))
|
|
if owner is not None and group is not None:
|
|
cmds.append(RemoteCmd(
|
|
cmd = ['chown', f'{owner}:{group}', out],
|
|
))
|
|
elif owner is not None:
|
|
cmds.append(RemoteCmd(
|
|
cmd = ['chown', owner, out],
|
|
))
|
|
elif group is not None:
|
|
cmds.append(RemoteCmd(
|
|
cmd = ['chgrp', group, out],
|
|
))
|
|
if mode is not None:
|
|
cmds.append(RemoteCmd(
|
|
cmd = ['chmod', mode, out],
|
|
))
|
|
if atomic:
|
|
cmds.append(RemoteCmd(
|
|
cmd = ['mv', out, path],
|
|
))
|
|
await self.open()
|
|
try:
|
|
for cmd in cmds:
|
|
log(DEBUG, f'{self.log_name}: Running {pretty_cmd(cmd.cmd, wd)}')
|
|
ret = await __run(cmd.cmd, cmd_input = cmd.cmd_input)
|
|
tmp_file = None # Has been successfully moved at this point
|
|
return ret
|
|
finally:
|
|
if tmp_file is not None:
|
|
await self.erase(tmp_file)
|
|
await self.close()
|
|
except Exception as e:
|
|
msg = f'Failed to get {path} from {self.root} ({str(e)})'
|
|
if throw:
|
|
raise Exception(msg)
|
|
log(ERR, msg)
|
|
return ret
|
|
|
|
async def _unlink(self, path: str) -> None:
|
|
cmd = ['rm', '-f', path]
|
|
await self.run(cmd, cmd_input = InputMode.NonInteractive)
|
|
|
|
async def _erase(self, path: str) -> None:
|
|
cmd = ['rm', '-rf', path]
|
|
await self.run(cmd, cmd_input = InputMode.NonInteractive)
|
|
|
|
async def _rename(self, src: str, dst: str) -> None:
|
|
cmd = ['mv', src, dst]
|
|
await self.run(cmd, cmd_input = InputMode.NonInteractive)
|
|
|
|
async def _mkdir(self, path: str, mode: int) -> None:
|
|
cmd = ['mkdir', path, '-m', self.__mode_str(mode)]
|
|
await self.run(cmd, cmd_input = InputMode.NonInteractive)
|
|
|
|
async def _mktemp(self, tmpl: str, directory: bool) -> str:
|
|
cmd = ['mktemp']
|
|
if directory:
|
|
cmd.append('-d')
|
|
cmd.append(tmpl)
|
|
result = await self.run(cmd, cmd_input = InputMode.NonInteractive, throw = True)
|
|
if result.status != 0 or result.stdout is None:
|
|
raise Exception(
|
|
f'Failed to create temporary file on {self.root}: {result.summary}'
|
|
)
|
|
return result.stdout_str.strip()
|
|
|
|
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
|
|
|
|
async def __stat(opts: list[str]) -> Result:
|
|
mod_env = {'LC_ALL': 'C'}
|
|
cmd = ['stat']
|
|
if follow_symlinks:
|
|
cmd.append('-L')
|
|
cmd.extend(opts)
|
|
cmd.append(path)
|
|
return await self.run(
|
|
cmd,
|
|
mod_env = mod_env,
|
|
throw = False,
|
|
cmd_input = InputMode.NonInteractive
|
|
)
|
|
|
|
# GNU coreutils stat
|
|
gnu_format = _US.join(
|
|
[
|
|
'%f', # st_mode in hex
|
|
'%i', # st_ino
|
|
'%d', # st_dev
|
|
'%h', # st_nlink
|
|
'%U', # st_uid
|
|
'%G', # st_gid
|
|
'%s', # st_size
|
|
'%.9X', # st_atime
|
|
'%.9Y', # st_mtime
|
|
'%.9Z', # st_ctime
|
|
'%o', # st_blksize hint
|
|
'%b', # st_blocks
|
|
'%r', # st_rdev
|
|
]
|
|
)
|
|
|
|
result = await __stat(['--printf', gnu_format])
|
|
if result.status == 0 and result.stdout is not None:
|
|
return _build_stat_result(result.stdout_str.split(_US), mode_base = 16)
|
|
|
|
if not _looks_like_option_error(result.stderr_str_or_none):
|
|
# log(DEBUG, f'GNU stat attempt failed on "{path}" ({str(e)})')
|
|
_raise_stat_error(path, result)
|
|
|
|
# BSD / macOS / OpenBSD / NetBSD stat
|
|
bsd_format = _US.join(
|
|
[
|
|
'%p', # st_mode in octal
|
|
'%i', # st_ino
|
|
'%d', # st_dev
|
|
'%l', # st_nlink
|
|
'%U', # st_uid
|
|
'%G', # st_gid
|
|
'%z', # st_size
|
|
'%.9Fa', # st_atime
|
|
'%.9Fm', # st_mtime
|
|
'%.9Fc', # st_ctime
|
|
'%k', # st_blksize
|
|
'%b', # st_blocks
|
|
'%r', # st_rdev
|
|
]
|
|
)
|
|
|
|
result = await __stat(['-n', '-f', bsd_format])
|
|
stdout = result.stdout_str_or_none
|
|
if result.status != 0 or stdout is None:
|
|
_raise_stat_error(path, result)
|
|
assert stdout is not None # Just there to pacify the linter
|
|
return _build_stat_result(stdout.rstrip('\n').split(_US), mode_base = 8)
|
|
|
|
async def _chown(self, path: str, owner: str | None, group: str | None) -> None:
|
|
if owner is None and group is None:
|
|
raise ValueError(f'Tried to chown("{path}") without owner and group')
|
|
if group is None:
|
|
ownership = owner
|
|
elif owner is None:
|
|
ownership = ':' + group
|
|
else:
|
|
ownership = owner + ':' + group
|
|
assert ownership is not None # Impossible, just there to calm the linter
|
|
await self.run(['chown', ownership, path], cmd_input = InputMode.NonInteractive)
|
|
|
|
async def _chmod(self, path: str, mode: int) -> None:
|
|
await self.run(
|
|
['chmod', self.__mode_str(mode), path],
|
|
cmd_input = InputMode.NonInteractive
|
|
)
|