Add new ruff rules and fix their fallout:
future-annotations = true
select = [ "TC", # type-checking import placement rules "FA", # future annotations rules ]This comprises:
- Streamline imports and exports in cmds.xxx.Cmd
- Import base class as "Base"- Export types Cmd and Parent via __all__- Move all types imported only for annotation below TYPE_CHECKING
- Use "from __future__ import annotations" all over the place
Signed-off-by: Jan Lindemann <jan@janware.com>
325 lines
9.8 KiB
Python
325 lines
9.8 KiB
Python
from __future__ import annotations
|
|
|
|
import abc
|
|
|
|
from enum import Enum, auto
|
|
from functools import cached_property
|
|
from typing import TYPE_CHECKING
|
|
|
|
from .log import DEBUG, ERR, log
|
|
from .Uri import Uri
|
|
from .ProcFilter import ProcPipeline
|
|
|
|
if TYPE_CHECKING:
|
|
from .base import Result, StatResult
|
|
from .ProcFilter import ProcFilter
|
|
|
|
class FileContext(abc.ABC):
|
|
|
|
class Direction(Enum):
|
|
In = auto()
|
|
Out = auto()
|
|
|
|
def __init__(
|
|
self,
|
|
uri: str | Uri,
|
|
interactive: bool | None = None,
|
|
verbose_default = False,
|
|
chroot: bool = False,
|
|
in_pipe: ProcPipeline | None = None,
|
|
out_pipe: ProcPipeline | None = None,
|
|
):
|
|
self.__uri = Uri.pimp(uri)
|
|
self.__chroot = chroot
|
|
self.__interactive = interactive
|
|
self.__verbose_default = verbose_default
|
|
self.__log_name: str | None = None
|
|
self.__in_pipe = in_pipe
|
|
self.__out_pipe = out_pipe
|
|
self.__open_count = 0
|
|
if verbose_default not in [True, False]:
|
|
raise ValueError(
|
|
'Tried to instantiate FileContext with verbose_default '
|
|
f'= "{verbose_default}"'
|
|
)
|
|
|
|
async def __aenter__(self):
|
|
await self.open()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
await self.close()
|
|
|
|
def __repr__(self) -> str:
|
|
return self.__uri.id
|
|
|
|
def __pipe(self, d: Direction):
|
|
match d:
|
|
case self.Direction.In:
|
|
if not self.__in_pipe:
|
|
self.__in_pipe = ProcPipeline()
|
|
return self.__in_pipe
|
|
case self.Direction.Out:
|
|
if not self.__out_pipe:
|
|
self.__out_pipe = ProcPipeline()
|
|
return self.__out_pipe
|
|
case _:
|
|
raise Exception(f'Invalid pipe direction "{str(d)}"')
|
|
|
|
def _chroot(self, path: str) -> str:
|
|
if not self.__chroot:
|
|
return path
|
|
if not len(path):
|
|
return self.root
|
|
if path[-1] == '/':
|
|
return self.root + path
|
|
return self.root + '/' + path
|
|
|
|
def add_proc_filter(self, d: Direction, proc_filter: ProcFilter):
|
|
self.__pipe(d).append(proc_filter)
|
|
|
|
async def _open(self) -> None:
|
|
pass
|
|
|
|
async def open(self) -> None:
|
|
self.__open_count += 1
|
|
if self.__open_count == 1:
|
|
await self._open()
|
|
|
|
async def _close(self) -> None:
|
|
pass
|
|
|
|
async def close(self) -> None:
|
|
if self.__open_count == 1:
|
|
await self._close()
|
|
self.__open_count -= 1
|
|
assert self.__open_count >= 0, (
|
|
f'Closed file context "{self}" more often than opened'
|
|
)
|
|
|
|
@property
|
|
def uri(self) -> Uri:
|
|
return self.__uri
|
|
|
|
@property
|
|
def id(self) -> str:
|
|
return self.__uri.id
|
|
|
|
@cached_property
|
|
def root(self) -> str:
|
|
return self.__uri.path
|
|
|
|
@property
|
|
def username(self) -> str | None:
|
|
return self.__uri.username
|
|
|
|
@property
|
|
def log_name(self) -> str:
|
|
return self.__uri.id
|
|
|
|
@property
|
|
def interactive(self) -> bool | None:
|
|
return self.__interactive
|
|
|
|
@property
|
|
def verbose_default(self) -> bool:
|
|
return self.__verbose_default
|
|
|
|
@abc.abstractmethod
|
|
async def _get(
|
|
self, path: str, wd: str | None, throw: bool, verbose: bool | None, title: str
|
|
) -> Result:
|
|
raise NotImplementedError()
|
|
|
|
async def get(
|
|
self,
|
|
path: str,
|
|
wd: str | None = None,
|
|
throw: bool = True,
|
|
verbose: bool | None = None,
|
|
title: str | None = None,
|
|
) -> Result:
|
|
ret = await self._get(
|
|
self._chroot(path),
|
|
wd = wd,
|
|
throw = throw,
|
|
verbose = verbose,
|
|
title = title or f'Fetching {path} from {self.uri}',
|
|
)
|
|
return await self.__in_pipe.run(ret) if self.__in_pipe else ret
|
|
|
|
async def _put(
|
|
self,
|
|
path: str,
|
|
content: bytes,
|
|
wd: str | None,
|
|
throw: bool,
|
|
verbose: bool | None,
|
|
title: str,
|
|
owner: str | None,
|
|
group: str | None,
|
|
mode: str | None,
|
|
atomic: bool,
|
|
) -> Result:
|
|
raise NotImplementedError()
|
|
|
|
async def put(
|
|
self,
|
|
path: str,
|
|
content: bytes,
|
|
wd: str | None = None,
|
|
throw: bool = True,
|
|
verbose: bool | None = None,
|
|
title: str | None = None,
|
|
owner: str | None = None,
|
|
group: str | None = None,
|
|
mode: int | None = None,
|
|
atomic: bool = False,
|
|
) -> Result:
|
|
mode_str = None if mode is None else oct(mode).replace('0o', '0')
|
|
if self.__out_pipe is not None:
|
|
content = (await self.__out_pipe.run(content)).stdout
|
|
return await self._put(
|
|
self._chroot(path),
|
|
content,
|
|
wd = wd,
|
|
throw = throw,
|
|
verbose = verbose,
|
|
title = title or f'Pushing content to {path} on {self.uri}',
|
|
owner = owner,
|
|
group = group,
|
|
mode = mode_str,
|
|
atomic = atomic,
|
|
)
|
|
|
|
async def _unlink(self, path: str) -> None:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: unlink("{path}") is not implemented'
|
|
)
|
|
|
|
async def unlink(self, path: str) -> None:
|
|
return await self._unlink(self._chroot(path))
|
|
|
|
async def _erase(self, path: str) -> None:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: erase("{path}") is not implemented'
|
|
)
|
|
|
|
async def erase(self, path: str) -> None:
|
|
return await self._erase(self._chroot(path))
|
|
|
|
async def _rename(self, src: str, dst: str) -> None:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: rename("{src}" -> "{dst}") is not implemented'
|
|
)
|
|
|
|
async def rename(self, src: str, dst: str) -> None:
|
|
return await self._rename(src, dst)
|
|
|
|
async def _mkdir(self, path: str, mode: int) -> None:
|
|
raise NotImplementedError(f'{self.log_name}: mkdir({path}) is not implemented')
|
|
|
|
async def mkdir(self, path: str, mode: int = 0o777) -> None:
|
|
return await self._mkdir(path, mode)
|
|
|
|
async def _mktemp(self, tmpl: str, directory: bool) -> str:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: mktemp("{tmpl}") is not implemented'
|
|
)
|
|
|
|
async def mktemp(self, tmpl: str, directory: bool = False) -> str:
|
|
return await self._mktemp(self._chroot(tmpl), directory)
|
|
|
|
async def _chown(self, path: str, owner: str | None, group: str | None) -> None:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: chown("{path}") is not implemented'
|
|
)
|
|
|
|
async def chown(
|
|
self, path: str, owner: str | None = None, group: str | None = None
|
|
) -> None:
|
|
if owner is None and group is None:
|
|
raise ValueError(
|
|
f'Tried to change ownership of {path} with neither owner nor group'
|
|
)
|
|
return await self._chown(self._chroot(path), owner, group)
|
|
|
|
async def _chmod(self, path: str, mode: int) -> None:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: chmod("{path}") is not implemented'
|
|
)
|
|
|
|
async def chmod(self, path: str, mode: int) -> None:
|
|
return await self._chmod(self._chroot(path), mode)
|
|
|
|
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
|
|
raise NotImplementedError(
|
|
f'{self.log_name}: lstat("{path}") is not implemented'
|
|
)
|
|
|
|
async def stat(self, path: str, follow_symlinks: bool = True) -> StatResult:
|
|
if not isinstance(path, str):
|
|
raise TypeError(f'path must be str, got {type(path).__name__}')
|
|
return await self._stat(self._chroot(path), follow_symlinks)
|
|
|
|
async def _file_exists(self, path: str) -> bool:
|
|
try:
|
|
await self._stat(path, False)
|
|
except FileNotFoundError as e:
|
|
log(DEBUG, f'Could not stat file {path} ({str(e)}), ignored')
|
|
return False
|
|
except Exception as e:
|
|
log(ERR, f'Could not stat file {path} ({str(e)}), ignored')
|
|
raise
|
|
return True
|
|
|
|
async def file_exists(self, path: str) -> bool:
|
|
return await self._file_exists(self._chroot(path))
|
|
|
|
async def _is_dir(self, path: str, follow_symlinks: bool) -> bool:
|
|
import stat
|
|
|
|
try:
|
|
return stat.S_ISDIR((await self._stat(path, follow_symlinks)).mode)
|
|
except NotImplementedError:
|
|
log(
|
|
DEBUG,
|
|
(
|
|
f"{self.log_name} doesn't implement stat(), judging by trailing "
|
|
'slash if {path} is a directory'
|
|
),
|
|
)
|
|
return path[-1] == '/'
|
|
except FileNotFoundError as e:
|
|
log(DEBUG, f'{self.log_name}: Failed to stat({path}) ({str(e)})')
|
|
return False
|
|
except Exception as e:
|
|
log(ERR, f'{self.log_name}: Failed to stat({path}) ({str(e)})')
|
|
raise
|
|
return False
|
|
|
|
async def is_dir(self, path: str, follow_symlinks = True) -> bool:
|
|
return await self._is_dir(self._chroot(path), follow_symlinks = follow_symlinks)
|
|
|
|
@classmethod
|
|
def create(cls, uri: str | Uri, *args, **kwargs) -> FileContext:
|
|
uri = Uri.pimp(uri)
|
|
match uri.protocol:
|
|
case 'local' | 'file':
|
|
from .ec.Local import Local
|
|
|
|
return Local(uri, *args, **kwargs)
|
|
case 'ssh':
|
|
from .ec.SSHClient import ssh_client
|
|
|
|
return ssh_client(uri, *args, **kwargs)
|
|
case 'http' | 'https':
|
|
from .ec.Curl import Curl
|
|
|
|
return Curl(uri, *args, **kwargs)
|
|
case _:
|
|
pass
|
|
raise Exception(
|
|
f'Can\'t create file context instance for "{uri}" with unsupported '
|
|
f'protocol "{uri.protocol}"'
|
|
)
|