from __future__ import annotations import abc from enum import Enum, auto from functools import cached_property from typing import TYPE_CHECKING from .log import DEBUG, ERR, log from .Uri import Uri from .ProcFilter import ProcPipeline if TYPE_CHECKING: from .base import Result, StatResult from .ProcFilter import ProcFilter class FileContext(abc.ABC): class Direction(Enum): In = auto() Out = auto() def __init__( self, uri: str | Uri, interactive: bool | None = None, verbose_default = False, chroot: bool = False, in_pipe: ProcPipeline | None = None, out_pipe: ProcPipeline | None = None, ): self.__uri = Uri.pimp(uri) self.__chroot = chroot self.__interactive = interactive self.__verbose_default = verbose_default self.__log_name: str | None = None self.__in_pipe = in_pipe self.__out_pipe = out_pipe self.__open_count = 0 if verbose_default not in [True, False]: raise ValueError( 'Tried to instantiate FileContext with verbose_default ' f'= "{verbose_default}"' ) async def __aenter__(self): await self.open() return self async def __aexit__(self, exc_type, exc, tb): await self.close() def __repr__(self) -> str: return self.__uri.id def __pipe(self, d: Direction): match d: case self.Direction.In: if not self.__in_pipe: self.__in_pipe = ProcPipeline() return self.__in_pipe case self.Direction.Out: if not self.__out_pipe: self.__out_pipe = ProcPipeline() return self.__out_pipe case _: raise Exception(f'Invalid pipe direction "{str(d)}"') def _chroot(self, path: str) -> str: if not self.__chroot: return path if not len(path): return self.root if path[-1] == '/': return self.root + path return self.root + '/' + path def add_proc_filter(self, d: Direction, proc_filter: ProcFilter): self.__pipe(d).append(proc_filter) async def _open(self) -> None: pass async def open(self) -> None: self.__open_count += 1 if self.__open_count == 1: await self._open() async def _close(self) -> None: pass async def close(self) -> None: if self.__open_count == 1: await self._close() self.__open_count -= 1 assert self.__open_count >= 0, ( f'Closed file context "{self}" more often than opened' ) @property def uri(self) -> Uri: return self.__uri @property def id(self) -> str: return self.__uri.id @cached_property def root(self) -> str: return self.__uri.path @property def username(self) -> str | None: return self.__uri.username @property def log_name(self) -> str: return self.__uri.id @property def interactive(self) -> bool | None: return self.__interactive @property def verbose_default(self) -> bool: return self.__verbose_default @abc.abstractmethod async def _get( self, path: str, wd: str | None, throw: bool, verbose: bool | None, title: str ) -> Result: raise NotImplementedError() async def get( self, path: str, wd: str | None = None, throw: bool = True, verbose: bool | None = None, title: str | None = None, ) -> Result: ret = await self._get( self._chroot(path), wd = wd, throw = throw, verbose = verbose, title = title or f'Fetching {path} from {self.uri}', ) return await self.__in_pipe.run(ret) if self.__in_pipe else ret async def _put( self, path: str, content: bytes, wd: str | None, throw: bool, verbose: bool | None, title: str, owner: str | None, group: str | None, mode: str | None, atomic: bool, ) -> Result: raise NotImplementedError() async def put( self, path: str, content: bytes, wd: str | None = None, throw: bool = True, verbose: bool | None = None, title: str | None = None, owner: str | None = None, group: str | None = None, mode: int | None = None, atomic: bool = False, ) -> Result: mode_str = None if mode is None else oct(mode).replace('0o', '0') if self.__out_pipe is not None: content = (await self.__out_pipe.run(content)).stdout return await self._put( self._chroot(path), content, wd = wd, throw = throw, verbose = verbose, title = title or f'Pushing content to {path} on {self.uri}', owner = owner, group = group, mode = mode_str, atomic = atomic, ) async def _unlink(self, path: str) -> None: raise NotImplementedError( f'{self.log_name}: unlink("{path}") is not implemented' ) async def unlink(self, path: str) -> None: return await self._unlink(self._chroot(path)) async def _erase(self, path: str) -> None: raise NotImplementedError( f'{self.log_name}: erase("{path}") is not implemented' ) async def erase(self, path: str) -> None: return await self._erase(self._chroot(path)) async def _rename(self, src: str, dst: str) -> None: raise NotImplementedError( f'{self.log_name}: rename("{src}" -> "{dst}") is not implemented' ) async def rename(self, src: str, dst: str) -> None: return await self._rename(src, dst) async def _mkdir(self, path: str, mode: int) -> None: raise NotImplementedError(f'{self.log_name}: mkdir({path}) is not implemented') async def mkdir(self, path: str, mode: int = 0o777) -> None: return await self._mkdir(path, mode) async def _mktemp(self, tmpl: str, directory: bool) -> str: raise NotImplementedError( f'{self.log_name}: mktemp("{tmpl}") is not implemented' ) async def mktemp(self, tmpl: str, directory: bool = False) -> str: return await self._mktemp(self._chroot(tmpl), directory) async def _chown(self, path: str, owner: str | None, group: str | None) -> None: raise NotImplementedError( f'{self.log_name}: chown("{path}") is not implemented' ) async def chown( self, path: str, owner: str | None = None, group: str | None = None ) -> None: if owner is None and group is None: raise ValueError( f'Tried to change ownership of {path} with neither owner nor group' ) return await self._chown(self._chroot(path), owner, group) async def _chmod(self, path: str, mode: int) -> None: raise NotImplementedError( f'{self.log_name}: chmod("{path}") is not implemented' ) async def chmod(self, path: str, mode: int) -> None: return await self._chmod(self._chroot(path), mode) async def _stat(self, path: str, follow_symlinks: bool) -> StatResult: raise NotImplementedError( f'{self.log_name}: lstat("{path}") is not implemented' ) async def stat(self, path: str, follow_symlinks: bool = True) -> StatResult: if not isinstance(path, str): raise TypeError(f'path must be str, got {type(path).__name__}') return await self._stat(self._chroot(path), follow_symlinks) async def _file_exists(self, path: str) -> bool: try: await self._stat(path, False) except FileNotFoundError as e: log(DEBUG, f'Could not stat file {path} ({str(e)}), ignored') return False except Exception as e: log(ERR, f'Could not stat file {path} ({str(e)}), ignored') raise return True async def file_exists(self, path: str) -> bool: return await self._file_exists(self._chroot(path)) async def _is_dir(self, path: str, follow_symlinks: bool) -> bool: import stat try: return stat.S_ISDIR((await self._stat(path, follow_symlinks)).mode) except NotImplementedError: log( DEBUG, ( f"{self.log_name} doesn't implement stat(), judging by trailing " 'slash if {path} is a directory' ), ) return path[-1] == '/' except FileNotFoundError as e: log(DEBUG, f'{self.log_name}: Failed to stat({path}) ({str(e)})') return False except Exception as e: log(ERR, f'{self.log_name}: Failed to stat({path}) ({str(e)})') raise return False async def is_dir(self, path: str, follow_symlinks = True) -> bool: return await self._is_dir(self._chroot(path), follow_symlinks = follow_symlinks) @classmethod def create(cls, uri: str | Uri, *args, **kwargs) -> FileContext: uri = Uri.pimp(uri) match uri.protocol: case 'local' | 'file': from .ec.Local import Local return Local(uri, *args, **kwargs) case 'ssh': from .ec.SSHClient import ssh_client return ssh_client(uri, *args, **kwargs) case 'http' | 'https': from .ec.Curl import Curl return Curl(uri, *args, **kwargs) case _: pass raise Exception( f'Can\'t create file context instance for "{uri}" with unsupported ' f'protocol "{uri.protocol}"' )