# -*- coding: utf-8 -*- from __future__ import annotations import abc, re from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Self from .log import * from .base import Input, InputMode, Result class FileContext(abc.ABC): def __init__(self, uri: str, interactive: bool|None=None, verbose_default=False): self.__uri = uri self.__interactive = interactive self.__verbose_default = verbose_default self.__log_name: str|None = None assert verbose_default is not None async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): await self.close() @property def uri(self) -> str: return self.__uri @property def log_name(self) -> str: if self.__log_name is None: self.__log_name = self.__class__.__name__.lower() from urllib.parse import urlparse parsed = urlparse(self.__uri) uri: list[str] = [] if parsed.scheme: uri.append(parsed.scheme) if parsed.hostname: uri.append(parsed.hostname) if uri: self.__log_name += ' ' + '://'.join(uri) return self.__log_name @property def interactive(self) -> bool|None: return self.__interactive @property def verbose_default(self) -> bool: return self.__verbose_default @abc.abstractmethod async def _get( self, path: str, wd: str|None, throw: bool, verbose: bool|None, title: str ) -> Result: raise NotImplementedError() async def get( self, path: str, wd: str|None = None, throw: bool = True, verbose: bool|None = None, title: str=None, owner: str|None=None, group: str|None=None, mode: str|None=None, ) -> Result: return await self._get(path, wd=wd, throw=throw, verbose=verbose, title=title) async def _put( self, path: str, content: bytes, wd: str|None, throw: bool, verbose: bool|None, title: str, owner: str|None, group: str|None, mode: str|None, atomic: bool, ) -> Result: raise NotImplementedError() async def put( self, path: str, content: str, wd: str|None = None, throw: bool = True, verbose: bool|None = None, title: str = None, owner: str|None = None, group: str|None = None, mode: int|None = None, atomic: bool = False ) -> Result: mode_str = None if mode is None else oct(mode).replace('0o', '0') return await self._put(path, content, wd=wd, throw=throw, verbose=verbose, title=title, owner=owner, group=group, mode=mode_str, atomic=atomic) async def _close(self) -> None: pass async def close(self) -> None: await self._close() @classmethod def create(cls, uri: str, *args, **kwargs) -> Self: tokens = re.split(r'://', uri) schema = tokens[0] if tokens[0] != uri else 'file' match schema: case 'local' | 'file': from .ec.Local import Local return Local(uri, *args, **kwargs) case 'ssh': from .ec.SSHClient import ssh_client return ssh_client(uri, *args, **kwargs) case 'http' | 'https': from .ec.Curl import Curl return Curl(uri, *args, **kwargs) case _: pass raise Exception(f'Can\'t create file transfer instance for {uri} with unknown schema "{schema}"')