jw-pkg/src/python/jw/pkg/lib/TarIo.py
Jan Lindemann 5d1ba6e15a
pyproject.toml: Enforce import annotations style

Add new ruff rules and fix their fallout:

future-annotations = true

select = [ "TC", # type-checking import placement rules "FA", # future annotations rules ]

This comprises:

- Streamline imports and exports in cmds.xxx.Cmd

- Import base class as "Base"
- Export types Cmd and Parent via __all__

- Move all types imported only for annotation below TYPE_CHECKING

- Use "from __future__ import annotations" all over the place

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-06-01 14:34:25 +02:00

129 lines
4.1 KiB
Python

from __future__ import annotations
import abc
import io
import tarfile
from tarfile import TarFile, TarInfo
from .CopyContext import CopyContext
from .ExecContext import ExecContext
from .log import DEBUG, ERR, log
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .base import StatResult
class TarIo(CopyContext):
def __init__(self, *args, **kwargs) -> None:
kwargs['chroot'] = False
super().__init__(*args, **kwargs)
def _match(self, path: str, path_filter: list[str]) -> bool:
return path in path_filter
def _filter_tar_file(
self,
blob: bytes,
path_filter: list[str] | None = None,
matched: list[str] | None = None,
) -> bytes:
ret = io.BytesIO()
with tarfile.open(fileobj = ret, mode = 'w') as tf_out:
tf_in = TarFile(fileobj = io.BytesIO(blob))
for info in tf_in.getmembers():
if path_filter is not None and not self._match(info.name, path_filter):
continue
log(DEBUG, f'Adding {info.name}')
if matched is not None:
matched.append(info.name)
buf = tf_in.extractfile(info)
tf_out.addfile(info, buf)
return ret.getvalue()
async def _read_filtered(
self,
path,
path_filter: list[str] | None = None,
matched: list[str] | None = None,
) -> bytes:
try:
blob = (await self.src.get(path)).stdout
except Exception as e:
log(ERR, f'Failed to read tar file "{path}" ({str(e)}')
raise
return self._filter_tar_file(blob, path_filter, matched = matched)
def _add(self, tf: TarFile, path: str, st: StatResult, contents: bytes) -> None:
file_obj = io.BytesIO(contents)
info = TarInfo()
info.name = path
info.mode = st.mode
info.uname = st.owner
info.gname = st.group
info.size = st.size
info.mtime = int(st.mtime)
tf.addfile(info, file_obj)
@abc.abstractmethod
async def _extract(self, blob: bytes, root: str | None = None) -> None:
raise NotImplementedError()
async def extract(
self,
root: str | None = None,
path_filter: list[str] | None = None
) -> list[str]:
ret: list[str] = []
filtered = await self._read_filtered(self.src.root, path_filter, matched = ret)
await self._extract(blob = filtered, root = root)
return ret
@classmethod
def create(cls, *args, type: str | None = None, **kwargs):
if type is not None:
raise NotImplementedError
# return TarIoTarFile(*args, **kwargs)
return TarIoTarExec(*args, **kwargs)
class TarIoTarFile(TarIo):
async def _extract(self, blob: bytes, root: str | None = None) -> None:
tf = TarFile(fileobj = io.BytesIO(blob))
for info in tf.getmembers():
log(DEBUG, f'Extracting {info.name}')
path = root + '/' + info.name if root else info.name
buf = tf.extractfile(info)
if buf is None:
if info.isdir():
await self.dst.mkdir(path, info.mode)
await self.dst.chown(path, info.uname, info.gname)
continue
raise Exception(f'Can\'t extract unsupported file type of "{path}"')
await self.dst.put(
path,
buf.read(),
owner = info.uname,
group = info.gname,
mode = info.mode,
)
class TarIoTarExec(TarIo):
@property
def dst(self) -> ExecContext:
ret = super().dst
if not isinstance(ret, ExecContext):
raise Exception(
'Tried to get executable destination context from copy '
'context, which only has a file context'
)
return ret
async def _extract(self, blob: bytes, root: str | None = None) -> None:
cmd = ['tar']
if root is not None:
cmd += ['-C', root]
cmd += ['-x', '-f', '-']
await self.dst.run(cmd, cmd_input = blob)