2026-04-16 10:37:34 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import abc, re
|
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from typing import Self
|
|
|
|
|
|
|
|
|
|
from .log import *
|
lib.FileContext: Add file methods
Add the following methods, meant to do the obvious:
unlink(self, path: str) -> None
erase(self, path: str) -> None
rename(self, src: str, dst: str) -> None
mktemp(self, tmpl: str, directory: bool=False) -> None
chown(self, path: str, owner: str|None=None, group: str|None=None) -> None
chmod(self, path: str, mode: int) -> None
stat(self, path: str, follow_symlinks: bool=True) -> StatResult
file_exists(self, path: str) -> bool
is_dir(self, path: str) -> bool
All methods are async and call their protected counterpart, which is
designed to be overridden. If possible, default implementations do
something meaningful, if not, they just raise plain
NotImplementedError.
Signed-off-by: Jan Lindemann <jan@janware.com>
2026-04-17 09:15:06 +02:00
|
|
|
from .base import Input, InputMode, Result, StatResult
|
2026-04-16 10:37:34 +02:00
|
|
|
|
2026-04-17 09:26:42 +02:00
|
|
|
class FileContext(abc.ABC):
|
2026-04-16 10:37:34 +02:00
|
|
|
|
|
|
|
|
def __init__(self, uri: str, interactive: bool|None=None, verbose_default=False):
|
|
|
|
|
self.__uri = uri
|
|
|
|
|
self.__interactive = interactive
|
|
|
|
|
self.__verbose_default = verbose_default
|
|
|
|
|
self.__log_name: str|None = None
|
|
|
|
|
assert verbose_default is not None
|
|
|
|
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
|
|
|
await self.close()
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def uri(self) -> str:
|
|
|
|
|
return self.__uri
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def log_name(self) -> str:
|
|
|
|
|
if self.__log_name is None:
|
2026-04-17 09:47:20 +02:00
|
|
|
self.__log_name = self.__class__.__name__.lower()
|
2026-04-16 10:37:34 +02:00
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
parsed = urlparse(self.__uri)
|
2026-04-17 09:47:20 +02:00
|
|
|
uri: list[str] = []
|
2026-04-16 10:37:34 +02:00
|
|
|
if parsed.scheme:
|
2026-04-17 09:47:20 +02:00
|
|
|
uri.append(parsed.scheme)
|
2026-04-16 10:37:34 +02:00
|
|
|
if parsed.hostname:
|
2026-04-17 09:47:20 +02:00
|
|
|
uri.append(parsed.hostname)
|
|
|
|
|
if uri:
|
|
|
|
|
self.__log_name += ' ' + '://'.join(uri)
|
2026-04-16 10:37:34 +02:00
|
|
|
return self.__log_name
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def interactive(self) -> bool|None:
|
|
|
|
|
return self.__interactive
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def verbose_default(self) -> bool:
|
|
|
|
|
return self.__verbose_default
|
|
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
|
async def _get(
|
|
|
|
|
self,
|
|
|
|
|
path: str,
|
|
|
|
|
wd: str|None,
|
|
|
|
|
throw: bool,
|
|
|
|
|
verbose: bool|None,
|
|
|
|
|
title: str
|
|
|
|
|
) -> Result:
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
async def get(
|
|
|
|
|
self,
|
|
|
|
|
path: str,
|
|
|
|
|
wd: str|None = None,
|
|
|
|
|
throw: bool = True,
|
|
|
|
|
verbose: bool|None = None,
|
|
|
|
|
title: str=None,
|
|
|
|
|
owner: str|None=None,
|
|
|
|
|
group: str|None=None,
|
|
|
|
|
mode: str|None=None,
|
|
|
|
|
) -> Result:
|
|
|
|
|
return await self._get(path, wd=wd, throw=throw, verbose=verbose, title=title)
|
|
|
|
|
|
|
|
|
|
async def _put(
|
|
|
|
|
self,
|
|
|
|
|
path: str,
|
2026-04-17 14:18:06 +02:00
|
|
|
content: bytes,
|
2026-04-16 10:37:34 +02:00
|
|
|
wd: str|None,
|
|
|
|
|
throw: bool,
|
|
|
|
|
verbose: bool|None,
|
|
|
|
|
title: str,
|
|
|
|
|
owner: str|None,
|
|
|
|
|
group: str|None,
|
|
|
|
|
mode: str|None,
|
2026-04-17 11:29:36 +02:00
|
|
|
atomic: bool,
|
2026-04-16 10:37:34 +02:00
|
|
|
) -> Result:
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
async def put(
|
|
|
|
|
self,
|
|
|
|
|
path: str,
|
2026-04-17 14:18:06 +02:00
|
|
|
content: str,
|
2026-04-16 10:37:34 +02:00
|
|
|
wd: str|None = None,
|
|
|
|
|
throw: bool = True,
|
|
|
|
|
verbose: bool|None = None,
|
2026-04-17 11:29:36 +02:00
|
|
|
title: str = None,
|
|
|
|
|
owner: str|None = None,
|
|
|
|
|
group: str|None = None,
|
2026-04-17 15:05:42 +02:00
|
|
|
mode: int|None = None,
|
2026-04-17 11:29:36 +02:00
|
|
|
atomic: bool = False
|
2026-04-16 10:37:34 +02:00
|
|
|
) -> Result:
|
2026-04-17 15:05:42 +02:00
|
|
|
mode_str = None if mode is None else oct(mode).replace('0o', '0')
|
2026-04-17 14:18:06 +02:00
|
|
|
return await self._put(path, content, wd=wd, throw=throw, verbose=verbose,
|
2026-04-17 15:05:42 +02:00
|
|
|
title=title, owner=owner, group=group, mode=mode_str, atomic=atomic)
|
2026-04-16 10:37:34 +02:00
|
|
|
|
lib.FileContext: Add file methods
Add the following methods, meant to do the obvious:
unlink(self, path: str) -> None
erase(self, path: str) -> None
rename(self, src: str, dst: str) -> None
mktemp(self, tmpl: str, directory: bool=False) -> None
chown(self, path: str, owner: str|None=None, group: str|None=None) -> None
chmod(self, path: str, mode: int) -> None
stat(self, path: str, follow_symlinks: bool=True) -> StatResult
file_exists(self, path: str) -> bool
is_dir(self, path: str) -> bool
All methods are async and call their protected counterpart, which is
designed to be overridden. If possible, default implementations do
something meaningful, if not, they just raise plain
NotImplementedError.
Signed-off-by: Jan Lindemann <jan@janware.com>
2026-04-17 09:15:06 +02:00
|
|
|
async def _unlink(self, path: str) -> None:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: unlink() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def unlink(self, path: str) -> None:
|
|
|
|
|
return await self._unlink(path)
|
|
|
|
|
|
|
|
|
|
async def _erase(self, path: str) -> None:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: erase() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def erase(self, path: str) -> None:
|
|
|
|
|
return await self._erase(path)
|
|
|
|
|
|
|
|
|
|
async def _rename(self, src: str, dst: str) -> None:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: rename() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def rename(self, src: str, dst: str) -> None:
|
|
|
|
|
return await self._rename(src, dst)
|
|
|
|
|
|
|
|
|
|
async def _mktemp(self, tmpl: str, directory: bool) -> None:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: mktemp() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def mktemp(self, tmpl: str, directory: bool=False) -> None:
|
|
|
|
|
return await self._mktemp(tmpl, directory)
|
|
|
|
|
|
|
|
|
|
async def _chown(self, path: str, owner: str|None, group: str|None) -> None:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: chown() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def chown(self, path: str, owner: str|None=None, group: str|None=None) -> None:
|
|
|
|
|
if owner is None and group is None:
|
|
|
|
|
raise ValueError(f'Tried to change ownership of {path} specifying neither owner nor group')
|
|
|
|
|
return await self._chown(path, owner, group)
|
|
|
|
|
|
|
|
|
|
async def _chmod(self, path: str, mode: int) -> None:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: chmod() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def chmod(self, path: str, mode: int) -> None:
|
|
|
|
|
return await self._chmod(path, mode)
|
|
|
|
|
|
|
|
|
|
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
|
|
|
|
|
raise NotImplementedError(f'{self.log_name}: lstat() is not implemented')
|
|
|
|
|
|
|
|
|
|
async def stat(self, path: str, follow_symlinks: bool=True) -> StatResult:
|
|
|
|
|
if not isinstance(path, str):
|
|
|
|
|
raise TypeError(f"path must be str, got {type(path).__name__}")
|
|
|
|
|
return await self._stat(path, follow_symlinks)
|
|
|
|
|
|
|
|
|
|
async def _file_exists(self, path: str) -> bool:
|
|
|
|
|
try:
|
2026-04-21 10:21:22 +02:00
|
|
|
await self._stat(path, False)
|
lib.FileContext: Add file methods
Add the following methods, meant to do the obvious:
unlink(self, path: str) -> None
erase(self, path: str) -> None
rename(self, src: str, dst: str) -> None
mktemp(self, tmpl: str, directory: bool=False) -> None
chown(self, path: str, owner: str|None=None, group: str|None=None) -> None
chmod(self, path: str, mode: int) -> None
stat(self, path: str, follow_symlinks: bool=True) -> StatResult
file_exists(self, path: str) -> bool
is_dir(self, path: str) -> bool
All methods are async and call their protected counterpart, which is
designed to be overridden. If possible, default implementations do
something meaningful, if not, they just raise plain
NotImplementedError.
Signed-off-by: Jan Lindemann <jan@janware.com>
2026-04-17 09:15:06 +02:00
|
|
|
except FileNotFoundError as e:
|
|
|
|
|
log(DEBUG, f'Could not stat file {path} ({str(e)}), ignored')
|
|
|
|
|
return False
|
|
|
|
|
except Exception as e:
|
|
|
|
|
log(ERR, f'Could not stat file {path} ({str(e)}), ignored')
|
|
|
|
|
raise
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
async def file_exists(self, path: str) -> bool:
|
2026-04-21 10:21:22 +02:00
|
|
|
return await self._file_exists(path)
|
lib.FileContext: Add file methods
Add the following methods, meant to do the obvious:
unlink(self, path: str) -> None
erase(self, path: str) -> None
rename(self, src: str, dst: str) -> None
mktemp(self, tmpl: str, directory: bool=False) -> None
chown(self, path: str, owner: str|None=None, group: str|None=None) -> None
chmod(self, path: str, mode: int) -> None
stat(self, path: str, follow_symlinks: bool=True) -> StatResult
file_exists(self, path: str) -> bool
is_dir(self, path: str) -> bool
All methods are async and call their protected counterpart, which is
designed to be overridden. If possible, default implementations do
something meaningful, if not, they just raise plain
NotImplementedError.
Signed-off-by: Jan Lindemann <jan@janware.com>
2026-04-17 09:15:06 +02:00
|
|
|
|
|
|
|
|
async def _is_dir(self, path: str) -> bool:
|
|
|
|
|
try:
|
|
|
|
|
return S_ISDIR(await self._stat(path).st_mode)
|
|
|
|
|
except NotImplementedError:
|
|
|
|
|
log(DEBUG, f'{self.log_name} doesn\'t implement stat(), judging by trailing slash if {path} is a directory')
|
|
|
|
|
return path[-1] == '/'
|
|
|
|
|
except FileNotFoundError as e:
|
|
|
|
|
log(DEBUG, f'{self.log_name}: Failed to stat({path}) ({str(e)})')
|
|
|
|
|
return False
|
|
|
|
|
except Exception as e:
|
|
|
|
|
log(ERR, f'{self.log_name}: Failed to stat({path}) ({str(e)})')
|
|
|
|
|
raise
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
async def is_dir(self, path: str) -> bool:
|
|
|
|
|
if not path:
|
|
|
|
|
raise ValueError('Tried to investigate file system resource with empty path')
|
|
|
|
|
return self._is_dir(path)
|
|
|
|
|
|
2026-04-16 10:37:34 +02:00
|
|
|
async def _close(self) -> None:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def close(self) -> None:
|
|
|
|
|
await self._close()
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def create(cls, uri: str, *args, **kwargs) -> Self:
|
|
|
|
|
tokens = re.split(r'://', uri)
|
|
|
|
|
schema = tokens[0] if tokens[0] != uri else 'file'
|
|
|
|
|
match schema:
|
|
|
|
|
case 'local' | 'file':
|
|
|
|
|
from .ec.Local import Local
|
|
|
|
|
return Local(uri, *args, **kwargs)
|
|
|
|
|
case 'ssh':
|
|
|
|
|
from .ec.SSHClient import ssh_client
|
|
|
|
|
return ssh_client(uri, *args, **kwargs)
|
|
|
|
|
case 'http' | 'https':
|
|
|
|
|
from .ec.Curl import Curl
|
|
|
|
|
return Curl(uri, *args, **kwargs)
|
|
|
|
|
case _:
|
|
|
|
|
pass
|
|
|
|
|
raise Exception(f'Can\'t create file transfer instance for {uri} with unknown schema "{schema}"')
|