jw-pkg/src/python/jw/pkg/lib/App.py
Jan Lindemann ffe0cfd41d
All checks were successful
CI / Packaging - Kali Linux (pull_request) Successful in 3m5s
CI / Packaging - OpenSUSE Tumbleweed (pull_request) Successful in 3m11s
CI / Packaging test (pull_request) Successful in 0s
CI / Packaging - Kali Linux (push) Successful in 3m8s
CI / Packaging - OpenSUSE Tumbleweed (push) Successful in 3m17s
CI / Packaging test (push) Successful in 0s
lib.App: Allow _run() without subcommands

Overriding the _run() method entirely in App subclasses is currently only possible if the application supports a subcommand structure. Make it possible to use it as an abstraction for a single-command application.

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-06-17 18:38:58 +02:00

288 lines
9.5 KiB
Python

from __future__ import annotations
import asyncio
import cProfile
import os
import sys
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from typing import TYPE_CHECKING, Any
from .AsyncRunner import AsyncRunner
from .log import DEBUG, ERR, NOTICE, log, set_log_flags, set_log_level
from .util import pretty_cmd
from .Types import LoadTypes
if TYPE_CHECKING:
from collections.abc import Awaitable
from typing import TypeVar
T = TypeVar('T')
class App: # export
def _add_arguments(self, parser):
self.__parser.add_argument(
'--log-flags', help = 'Log flags', default = self.__default_log_flags
)
self.__parser.add_argument(
'--log-level', help = 'Log level', default = self.__default_log_level
)
self.__parser.add_argument(
'--log-file', help = 'Log file', default = self.__default_log_file
)
self.__parser.add_argument(
'--backtrace',
help = 'Show exception backtraces',
action = 'store_true',
default = self.__back_trace,
)
self.__parser.add_argument(
'--write-profile',
help = 'Profile code and store output to file',
default = None,
)
def __init__(
self,
description: str = '',
name_filter: str = '^Cmd.*',
modules: list[str] | None = None,
eloop: None = None,
) -> None:
def add_cmd_to_parser(cmd, parsers):
parser = parsers.add_parser(
cmd.name,
help = cmd.help,
description = cmd.description,
formatter_class = ArgumentDefaultsHelpFormatter,
)
parser.set_defaults(func = cmd.run)
cmd.add_arguments(parser)
cmd.set_parser(parser)
return parser
def add_cmds_to_parser(
parent: AbstractCmd | App,
parser: ArgumentParser,
cmds,
all = False
) -> None:
if not cmds:
return
class SubCommand:
def __init__(self, cmd: AbstractCmd, parser: Any):
self.cmd = cmd
self.parser = parser
title = 'Available subcommands'
if hasattr(parent, 'name'):
title += ' of ' + getattr(parent, 'name')
subparsers = parser.add_subparsers(
title = title, metavar = '', dest = 'command'
)
scs: dict[str, SubCommand] = {}
for cmd in cmds:
cmd.set_parent(parent)
scs[cmd.name] = SubCommand(cmd, add_cmd_to_parser(cmd, subparsers))
if all:
for sc in scs.values():
add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all = all)
return
args, unknown = self.__parser.parse_known_args()
if args.command in scs:
sc = scs[args.command]
add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all = all)
from .Cmd import AbstractCmd
self.__args: Namespace | None = None
self.__cmdline: str | None = None
self.__default_log_flags: str = os.getenv(
'JW_DEFAULT_LOG_FLAGS', default = 'stderr,position,prio,color'
)
self.__default_log_level: str | int | None = os.getenv(
'JW_DEFAULT_LOG_LEVEL', default = NOTICE
)
self.__default_log_file: str | None = os.getenv(
'JW_DEFAULT_LOG_FILE', default = None
)
backtrace: str | bool = os.getenv('JW_DEFAULT_SHOW_BACKTRACE', False)
self.__back_trace = isinstance(backtrace, str) and backtrace.lower() in {
'1',
'true',
}
set_log_flags(self.__default_log_flags)
set_log_level(self.__default_log_level)
self.__eloop = eloop
self.__own_eloop = False
if eloop is None:
self.__eloop = asyncio.get_event_loop()
self.__own_eloop = True
self.__async_runner: AsyncRunner | None = None
self.__parser = ArgumentParser(
formatter_class = ArgumentDefaultsHelpFormatter,
description = description,
add_help = False,
)
self._add_arguments(self.__parser)
args, unknown = self.__parser.parse_known_args()
set_log_flags(args.log_flags)
set_log_level(args.log_level)
log(DEBUG, f'-------------- Running: >{pretty_cmd(sys.argv)}<')
cmd_classes: LoadTypes[AbstractCmd] = LoadTypes(
modules if modules else ['__main__'],
type_name_filter = name_filter,
type_filter = [AbstractCmd], # type: ignore[type-abstract]
)
add_all_parsers = (
'-h' in sys.argv or '--help' in sys.argv or '_ARGCOMPLETE' in os.environ
)
add_cmds_to_parser(
self,
self.__parser,
[cmd_class(self) for cmd_class in cmd_classes],
all = add_all_parsers,
)
# -- Add help only now, wouldn't want to have parse_known_args() exit
# on --help with subcommands missing
self.__parser.add_argument(
'-h', '--help', action = 'help', help = 'Show this help message and exit'
)
def __del__(self):
if self.__own_eloop:
if self.__eloop is not None:
self.__eloop.close()
self.__eloop = None
self.__own_eloop = False
async def __aenter__(self) -> None:
pass
async def __aexit__(self, exc_type, exc, tb) -> None:
pass
async def __run(self, argv = None) -> None:
try:
# Import argcomplete only here to not require it to be compatible
# with minimal environments
from argcomplete.completers import BaseCompleter # type: ignore[import-not-found]
class NoopCompleter(BaseCompleter):
def __call__(self, *args, **kwargs):
return None
import argcomplete # type: ignore[import-not-found]
argcomplete.autocomplete(self.__parser, default_completer = NoopCompleter())
except Exception:
pass
self.__args = self.__parser.parse_args(args = argv)
set_log_flags(self.__args.log_flags)
set_log_level(self.__args.log_level)
self.__back_trace = self.__args.backtrace
exit_status = 0
pr = None if self.__args.write_profile is None else cProfile.Profile()
if pr is not None:
pr.enable()
try:
ret = await self._run(self.__args)
if isinstance(ret, int) and ret >= 0 and ret <= 0xFF:
exit_status = ret
except Exception as e:
log(ERR, 'Failed: {}'.format(repr(e) if self.__back_trace else str(e)))
exit_status = 1
# AssertionErrors are programming errors, hence a programmer should
# get a chance to figure it out
if self.__back_trace or isinstance(e, AssertionError):
raise
finally:
if pr is not None:
pr.disable()
log(
NOTICE,
f'Writing profile statistics to {self.__args.write_profile}'
)
assert self.__args.write_profile is not None, 'args.write_profile'
pr.dump_stats(self.__args.write_profile)
if exit_status:
sys.exit(exit_status)
# Do the main work. Tries to run sub-commands by default. Overwrite if you
# want to do something else, for instance if you don't have sub-commands,
# or if want to do anything before and / or after the subcommands.
async def _run(self, args: Namespace) -> None | int:
if not hasattr(self.__args, 'func'):
self.__parser.print_help()
return None
# Run sub-command. Overwrite if you want to do anything before or after
return await self.args.func(args)
def call_async(self, awaitable: Awaitable[T], timeout: float | None = None) -> T:
return self.async_runner.call(awaitable, timeout)
@property
def eloop(self) -> asyncio.AbstractEventLoop:
if self.__eloop is None:
raise Exception('Tried to get inexistent event loop from application')
return self.__eloop
@property
def async_runner(self) -> AsyncRunner:
if self.__async_runner is None:
self.__async_runner = AsyncRunner()
return self.__async_runner
@property
def cmdline(self) -> str:
if self.__cmdline is None:
import shlex
with open('/proc/self/cmdline', 'rb') as f:
raw = f.read().split(b'\0')[:-1]
self.__cmdline = ' '.join(shlex.quote(arg.decode()) for arg in raw)
return self.__cmdline
@property
def args(self) -> Namespace:
if self.__args is None:
raise Exception('Tried to get inexistent argument list from application')
return self.__args
@property
def parser(self) -> ArgumentParser:
return self.__parser
def run(self, argv = None) -> None:
try:
ret = self.eloop.run_until_complete(self.__run(argv))
finally:
if self.__async_runner:
self.__async_runner.close()
self.__async_runner = None
return ret
def run_sub_commands(
description = '', name_filter = '^Cmd.*', modules = None, argv = None
): # export
app = App(description, name_filter, modules)
return app.run(argv = argv)