from __future__ import annotations import asyncio import cProfile import os import sys from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace from typing import TYPE_CHECKING, Any from .AsyncRunner import AsyncRunner from .log import DEBUG, ERR, NOTICE, log, set_log_flags, set_log_level from .Types import LoadTypes if TYPE_CHECKING: from collections.abc import Awaitable from typing import TypeVar T = TypeVar('T') class App: # export def _add_arguments(self, parser): self.__parser.add_argument( '--log-flags', help = 'Log flags', default = self.__default_log_flags ) self.__parser.add_argument( '--log-level', help = 'Log level', default = self.__default_log_level ) self.__parser.add_argument( '--log-file', help = 'Log file', default = self.__default_log_file ) self.__parser.add_argument( '--backtrace', help = 'Show exception backtraces', action = 'store_true', default = self.__back_trace, ) self.__parser.add_argument( '--write-profile', help = 'Profile code and store output to file', default = None, ) def __init__( self, description: str = '', name_filter: str = '^Cmd.*', modules: list[str] | None = None, eloop: None = None, ) -> None: def add_cmd_to_parser(cmd, parsers): parser = parsers.add_parser( cmd.name, help = cmd.help, description = cmd.description, formatter_class = ArgumentDefaultsHelpFormatter, ) parser.set_defaults(func = cmd.run) cmd.add_arguments(parser) cmd.set_parser(parser) return parser def add_cmds_to_parser( parent: AbstractCmd | App, parser: ArgumentParser, cmds, all = False ) -> None: if not cmds: return class SubCommand: def __init__(self, cmd: AbstractCmd, parser: Any): self.cmd = cmd self.parser = parser title = 'Available subcommands' if hasattr(parent, 'name'): title += ' of ' + getattr(parent, 'name') subparsers = parser.add_subparsers( title = title, metavar = '', dest = 'command' ) scs: dict[str, SubCommand] = {} for cmd in cmds: cmd.set_parent(parent) scs[cmd.name] = SubCommand(cmd, add_cmd_to_parser(cmd, subparsers)) if all: for sc in scs.values(): add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all = all) return args, unknown = self.__parser.parse_known_args() if args.command in scs: sc = scs[args.command] add_cmds_to_parser(sc.cmd, sc.parser, sc.cmd.children, all = all) from .Cmd import AbstractCmd self.__args: Namespace | None = None self.__cmdline: str | None = None self.__default_log_flags: str = os.getenv( 'JW_DEFAULT_LOG_FLAGS', default = 'stderr,position,prio,color' ) self.__default_log_level: str | int | None = os.getenv( 'JW_DEFAULT_LOG_LEVEL', default = NOTICE ) self.__default_log_file: str | None = os.getenv( 'JW_DEFAULT_LOG_FILE', default = None ) backtrace: str | bool = os.getenv('JW_DEFAULT_SHOW_BACKTRACE', False) self.__back_trace = isinstance(backtrace, str) and backtrace.lower() in { '1', 'true', } set_log_flags(self.__default_log_flags) set_log_level(self.__default_log_level) self.__eloop = eloop self.__own_eloop = False if eloop is None: self.__eloop = asyncio.get_event_loop() self.__own_eloop = True self.__async_runner: AsyncRunner | None = None self.__parser = ArgumentParser( formatter_class = ArgumentDefaultsHelpFormatter, description = description, add_help = False, ) self._add_arguments(self.__parser) args, unknown = self.__parser.parse_known_args() set_log_flags(args.log_flags) set_log_level(args.log_level) log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<') cmd_classes = LoadTypes( modules if modules else ['__main__'], type_name_filter = name_filter, type_filter = [AbstractCmd], # type: ignore[type-abstract] ) add_all_parsers = ( '-h' in sys.argv or '--help' in sys.argv or '_ARGCOMPLETE' in os.environ ) add_cmds_to_parser( self, self.__parser, [cmd_class(self) for cmd_class in cmd_classes], all = add_all_parsers, ) # -- Add help only now, wouldn't want to have parse_known_args() exit # on --help with subcommands missing self.__parser.add_argument( '-h', '--help', action = 'help', help = 'Show this help message and exit' ) def __del__(self): if self.__own_eloop: if self.__eloop is not None: self.__eloop.close() self.__eloop = None self.__own_eloop = False async def __aenter__(self) -> None: pass async def __aexit__(self, exc_type, exc, tb) -> None: pass async def __run(self, argv = None) -> None: try: # Import argcomplete only here to not require it to be compatible # with minimal environments from argcomplete.completers import BaseCompleter class NoopCompleter(BaseCompleter): def __call__(self, **kwargs): return () import argcomplete argcomplete.autocomplete(self.__parser, default_completer = NoopCompleter()) except Exception: pass self.__args = self.__parser.parse_args(args = argv) set_log_flags(self.__args.log_flags) set_log_level(self.__args.log_level) self.__back_trace = self.__args.backtrace exit_status = 0 if not hasattr(self.__args, 'func'): self.__parser.print_help() return None pr = None if self.__args.write_profile is None else cProfile.Profile() if pr is not None: pr.enable() try: ret = await self._run(self.__args) if isinstance(ret, int) and ret >= 0 and ret <= 0xFF: exit_status = ret except Exception as e: log(ERR, 'Failed: {}'.format(repr(e) if self.__back_trace else str(e))) exit_status = 1 # AssertionErrors are programming errors, hence a programmer should # get a chance to figure it out if self.__back_trace or isinstance(e, AssertionError): raise finally: if pr is not None: pr.disable() log( NOTICE, f'Writing profile statistics to {self.__args.write_profile}' ) pr.dump_stats(self.__args.write_profile) if exit_status: sys.exit(exit_status) # Run sub-command. Overwrite if you want to do anything before or after async def _run(self, args: Namespace) -> None | int: return await self.args.func(args) def call_async(self, awaitable: Awaitable[T], timeout: float | None = None) -> T: return self.async_runner.call(awaitable, timeout) @property def eloop(self) -> asyncio.AbstractEventLoop: if self.__eloop is None: raise Exception('Tried to get inexistent event loop from application') return self.__eloop @property def async_runner(self) -> AsyncRunner: if self.__async_runner is None: self.__async_runner = AsyncRunner() return self.__async_runner @property def cmdline(self) -> str: if self.__cmdline is None: import shlex with open('/proc/self/cmdline', 'rb') as f: raw = f.read().split(b'\0')[:-1] self.__cmdline = ' '.join(shlex.quote(arg.decode()) for arg in raw) return self.__cmdline @property def args(self) -> Namespace: if self.__args is None: raise Exception('Tried to get inexistent argument list from application') return self.__args @property def parser(self) -> ArgumentParser: return self.__parser def run(self, argv = None) -> None: try: ret = self.eloop.run_until_complete(self.__run(argv)) finally: if self.__async_runner: self.__async_runner.close() self.__async_runner = None return ret def run_sub_commands( description = '', name_filter = '^Cmd.*', modules = None, argv = None ): # export app = App(description, name_filter, modules) return app.run(argv = argv)