Beautify logging code and output #17
8 changed files with 391 additions and 113 deletions
|
|
@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from .AsyncRunner import AsyncRunner
|
from .AsyncRunner import AsyncRunner
|
||||||
from .log import DEBUG, ERR, NOTICE, log, set_log_flags, set_log_level
|
from .log import DEBUG, ERR, NOTICE, log, set_log_flags, set_log_level
|
||||||
|
from .util import pretty_cmd
|
||||||
from .Types import LoadTypes
|
from .Types import LoadTypes
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -135,9 +136,9 @@ class App: # export
|
||||||
set_log_flags(args.log_flags)
|
set_log_flags(args.log_flags)
|
||||||
set_log_level(args.log_level)
|
set_log_level(args.log_level)
|
||||||
|
|
||||||
log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<')
|
log(DEBUG, f'-------------- Running: >{pretty_cmd(sys.argv)}<')
|
||||||
|
|
||||||
cmd_classes = LoadTypes(
|
cmd_classes: LoadTypes[AbstractCmd] = LoadTypes(
|
||||||
modules if modules else ['__main__'],
|
modules if modules else ['__main__'],
|
||||||
type_name_filter = name_filter,
|
type_name_filter = name_filter,
|
||||||
type_filter = [AbstractCmd], # type: ignore[type-abstract]
|
type_filter = [AbstractCmd], # type: ignore[type-abstract]
|
||||||
|
|
|
||||||
|
|
@ -313,7 +313,7 @@ class ExecContext(Base):
|
||||||
await self.open()
|
await self.open()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ret = Result()
|
ret = Result(cmd = cmd)
|
||||||
with self.CallContext(
|
with self.CallContext(
|
||||||
self,
|
self,
|
||||||
title = title,
|
title = title,
|
||||||
|
|
@ -421,7 +421,7 @@ class ExecContext(Base):
|
||||||
# be returned by CallContext and is very much allowed
|
# be returned by CallContext and is very much allowed
|
||||||
assert cmd_input is not None, 'Invalid: cmd_input is None'
|
assert cmd_input is not None, 'Invalid: cmd_input is None'
|
||||||
|
|
||||||
ret = Result()
|
ret = Result(cmd = cmd)
|
||||||
with self.CallContext(
|
with self.CallContext(
|
||||||
self,
|
self,
|
||||||
title = title,
|
title = title,
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,64 @@ class Result:
|
||||||
return ret.strip()
|
return ret.strip()
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
def __try_decode(
|
||||||
|
self,
|
||||||
|
stdxxx: bytes | None,
|
||||||
|
quote = False,
|
||||||
|
truncate: int | None = None,
|
||||||
|
annotate: bool = True,
|
||||||
|
label: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
if label is None:
|
||||||
|
label = ''
|
||||||
|
else:
|
||||||
|
label = f'{label}: '
|
||||||
|
if stdxxx is None:
|
||||||
|
return f'{label}None'
|
||||||
|
try:
|
||||||
|
ret = stdxxx.decode()[:truncate].strip()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
chunk = stdxxx[:truncate]
|
||||||
|
ret = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
|
||||||
|
if (not annotate) or truncate is None or len(stdxxx) <= truncate:
|
||||||
|
return f'{label}"{ret}"' if quote else label + ret
|
||||||
|
ret = '{labe}"{ret} ..."' if quote else '{labe}{ret} ...'
|
||||||
|
ret += f' + {len(stdxxx) - truncate} more bytes'
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __summarize(
|
||||||
|
self,
|
||||||
|
cmd: list[str] | None = None,
|
||||||
|
wd: str | None = None,
|
||||||
|
verbose = True
|
||||||
|
) -> str:
|
||||||
|
if not verbose:
|
||||||
|
ret = f'{self.__status}: '
|
||||||
|
else:
|
||||||
|
from .util import pretty_cmd
|
||||||
|
if cmd is None:
|
||||||
|
cmd = self.__cmd
|
||||||
|
call = ''
|
||||||
|
if cmd is not None:
|
||||||
|
if wd is None:
|
||||||
|
wd = self.__wd
|
||||||
|
call = f'"{pretty_cmd(cmd, wd)}" '
|
||||||
|
ret = (
|
||||||
|
f'Command {call}has not yet exited' if self.__status is None else
|
||||||
|
f'Command {call}has exited with status {self.__status}'
|
||||||
|
)
|
||||||
|
label, stdxxx, truncate = (
|
||||||
|
('stdout', self.__stdout, 40) if self.status == 0
|
||||||
|
else ('stderr', self.__stderr, None)
|
||||||
|
)
|
||||||
|
ret += self.__try_decode(
|
||||||
|
stdxxx, quote = True, truncate = truncate, annotate = True, label = label
|
||||||
|
)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return self.__summarize(verbose = False)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def status(self) -> int | None:
|
def status(self) -> int | None:
|
||||||
return self.__status
|
return self.__status
|
||||||
|
|
@ -40,29 +98,6 @@ class Result:
|
||||||
def encoding(self, value: str) -> None:
|
def encoding(self, value: str) -> None:
|
||||||
self.__encoding = value
|
self.__encoding = value
|
||||||
|
|
||||||
def __stdout_footprint(self, quote = False) -> str:
|
|
||||||
if self.__stdout is None:
|
|
||||||
ret = ''
|
|
||||||
else:
|
|
||||||
max_len = 40
|
|
||||||
try:
|
|
||||||
ret = self.stdout_str[:max_len]
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
chunk = self.__stdout[:max_len]
|
|
||||||
ret = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
|
|
||||||
if quote:
|
|
||||||
ret = f'"{ret}"'
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
ret = f'{self.__status}:'
|
|
||||||
if self.__status is not None:
|
|
||||||
if self.status != 0:
|
|
||||||
ret += f' err: {self.stderr_str_or_none}'
|
|
||||||
else:
|
|
||||||
ret += f' out: {self.__stdout_footprint(quote=True)}'
|
|
||||||
return ret
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def strip(self) -> bool:
|
def strip(self) -> bool:
|
||||||
return self.__strip
|
return self.__strip
|
||||||
|
|
@ -96,29 +131,6 @@ class Result:
|
||||||
import re
|
import re
|
||||||
return re.search(pattern, err) is not None
|
return re.search(pattern, err) is not None
|
||||||
|
|
||||||
def __summarize(self, cmd: list[str] | None, wd: str | None = None) -> str:
|
|
||||||
from .util import pretty_cmd
|
|
||||||
if cmd is None:
|
|
||||||
cmd = self.__cmd
|
|
||||||
call = ''
|
|
||||||
if cmd is not None:
|
|
||||||
if wd is None:
|
|
||||||
wd = self.__wd
|
|
||||||
call = f'"{pretty_cmd(cmd, wd)}" '
|
|
||||||
ret = (
|
|
||||||
f'Command {call}has not yet exited' if self.__status is None else
|
|
||||||
f'Command {call}has exited with status {self.__status}'
|
|
||||||
)
|
|
||||||
call = pretty_cmd(cmd, wd)
|
|
||||||
if self.status != 0:
|
|
||||||
ret += f' -> stderr="{self.__stderr!r}"'
|
|
||||||
else:
|
|
||||||
if self.__stdout:
|
|
||||||
ret += f' -> stdout has {len(self.__stdout)} bytes'
|
|
||||||
else:
|
|
||||||
ret += ' -> stdout = None'
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def summarize(self, cmd: list[str] | None = None, wd: str | None = None) -> str:
|
def summarize(self, cmd: list[str] | None = None, wd: str | None = None) -> str:
|
||||||
return self.__summarize(cmd, wd)
|
return self.__summarize(cmd, wd)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
@ -10,7 +11,7 @@ from .log import OFF, log, parse_log_level
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
from typing import Any
|
from typing import Any, Sequence
|
||||||
|
|
||||||
T = TypeVar('T')
|
T = TypeVar('T')
|
||||||
|
|
||||||
|
|
@ -44,7 +45,7 @@ class LoadTypes(Types[T]): # export
|
||||||
self,
|
self,
|
||||||
mod_names: list[str],
|
mod_names: list[str],
|
||||||
type_name_filter: str | None = None,
|
type_name_filter: str | None = None,
|
||||||
type_filter: list[type[T]] = [],
|
type_filter: Sequence[type[Any]] | None = None,
|
||||||
debug_level = None,
|
debug_level = None,
|
||||||
):
|
):
|
||||||
if debug_level is None:
|
if debug_level is None:
|
||||||
|
|
@ -64,9 +65,12 @@ class LoadTypes(Types[T]): # export
|
||||||
log(self.__debug_level, *args, **kwargs)
|
log(self.__debug_level, *args, **kwargs)
|
||||||
|
|
||||||
def _stringify(self):
|
def _stringify(self):
|
||||||
|
tf = 'None' if self.__type_filter is None else (
|
||||||
|
', '.join([str(f) for f in self.__type_filter])
|
||||||
|
)
|
||||||
return [
|
return [
|
||||||
'type_name_filter: ' + str(self.__type_name_filter),
|
'type_name_filter: ' + str(self.__type_name_filter),
|
||||||
'type_filter: ' + ', '.join([str(f) for f in self.__type_filter]),
|
'type_filter: ' + tf,
|
||||||
'mod_names: ' + ', '.join(self.__mod_names),
|
'mod_names: ' + ', '.join(self.__mod_names),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
@ -86,35 +90,22 @@ class LoadTypes(Types[T]): # export
|
||||||
for member_name, c in inspect.getmembers(
|
for member_name, c in inspect.getmembers(
|
||||||
sys.modules[mod_name], inspect.isclass
|
sys.modules[mod_name], inspect.isclass
|
||||||
):
|
):
|
||||||
|
name = f'{mod_name}.{member_name}'
|
||||||
if rx is not None and not re.match(rx, member_name):
|
if rx is not None and not re.match(rx, member_name):
|
||||||
self._debug(
|
self._debug(f'o "{name}" has wrong name')
|
||||||
'o "{}.{}" has wrong name'.format(mod_name, member_name)
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
if inspect.isabstract(c):
|
if inspect.isabstract(c):
|
||||||
self._debug(
|
self._debug(f'o "{name}" is abstract: {c.__abstractmethods__}')
|
||||||
'o "{}.{}" is abstract'.format(mod_name, member_name)
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
if self.__type_filter:
|
if self.__type_filter:
|
||||||
for tp in self.__type_filter:
|
for tp in self.__type_filter:
|
||||||
if issubclass(c, tp):
|
if issubclass(c, tp):
|
||||||
break
|
break
|
||||||
self._debug(
|
self._debug(f'o "{name}" is not of type {tp}')
|
||||||
'o "{}.{}" is not of type {}'.format(
|
|
||||||
mod_name, member_name, tp
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
self._debug(
|
self._debug(f'o "{name}" doesn\'t match type filter')
|
||||||
'o "{}.{}" doesn\'t match type filter'.format(
|
|
||||||
mod_name, member_name
|
|
||||||
)
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
self._debug(
|
self._debug(f'o "{name}" is fine, adding')
|
||||||
'o "{}.{}" is fine, adding'.format(mod_name, member_name)
|
|
||||||
)
|
|
||||||
ret.append(c)
|
ret.append(c)
|
||||||
self.__classes = ret
|
self.__classes = ret
|
||||||
return self.__classes
|
return self.__classes
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,7 @@ class Local(Base):
|
||||||
|
|
||||||
# PTY merges stdout/stderr
|
# PTY merges stdout/stderr
|
||||||
stdout = b''.join(stdout_chunks) if stdout_chunks else None
|
stdout = b''.join(stdout_chunks) if stdout_chunks else None
|
||||||
return Result(stdout, None, exit_code)
|
return Result(stdout, None, exit_code, cmd = cmd)
|
||||||
|
|
||||||
# -- non-interactive mode
|
# -- non-interactive mode
|
||||||
|
|
||||||
|
|
@ -149,7 +149,7 @@ class Local(Base):
|
||||||
stdout = b''.join(stdout_parts) if stdout_parts else None
|
stdout = b''.join(stdout_parts) if stdout_parts else None
|
||||||
stderr = b''.join(stderr_parts) if stderr_parts else None
|
stderr = b''.join(stderr_parts) if stderr_parts else None
|
||||||
|
|
||||||
return Result(stdout, stderr, exit_code)
|
return Result(stdout, stderr, exit_code, cmd = cmd)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if cwd is not None:
|
if cwd is not None:
|
||||||
|
|
|
||||||
|
|
@ -273,7 +273,7 @@ class AsyncSSH(Base):
|
||||||
)
|
)
|
||||||
|
|
||||||
stdout = b''.join(stdout_parts) if stdout_parts else None
|
stdout = b''.join(stdout_parts) if stdout_parts else None
|
||||||
return Result(stdout, None, exit_code)
|
return Result(stdout, None, exit_code, cmd = cmd)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if stdin_reader_installed:
|
if stdin_reader_installed:
|
||||||
|
|
@ -353,7 +353,7 @@ class AsyncSSH(Base):
|
||||||
exit_code = completed.returncode if completed.returncode is not None else -1
|
exit_code = completed.returncode if completed.returncode is not None else -1
|
||||||
|
|
||||||
stdout = b''.join(stdout_parts) if stdout_parts else None
|
stdout = b''.join(stdout_parts) if stdout_parts else None
|
||||||
return Result(stdout, None, exit_code)
|
return Result(stdout, None, exit_code, cmd = cmd)
|
||||||
|
|
||||||
async def _run_ssh(
|
async def _run_ssh(
|
||||||
self,
|
self,
|
||||||
|
|
@ -446,7 +446,7 @@ class AsyncSSH(Base):
|
||||||
completed.returncode if completed.returncode is not None else -1
|
completed.returncode if completed.returncode is not None else -1
|
||||||
)
|
)
|
||||||
|
|
||||||
return Result(stdout, stderr, exit_code)
|
return Result(stdout, stderr, exit_code, cmd = cmd)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log(ERR, f'Failed to run command {" ".join(cmd)} ({e})')
|
log(ERR, f'Failed to run command {" ".join(cmd)} ({e})')
|
||||||
|
|
|
||||||
|
|
@ -84,4 +84,4 @@ class Paramiko(Base):
|
||||||
if cmd_input is not None:
|
if cmd_input is not None:
|
||||||
stdin.write(cmd_input)
|
stdin.write(cmd_input)
|
||||||
exit_status = stdout.channel.recv_exit_status()
|
exit_status = stdout.channel.recv_exit_status()
|
||||||
return Result(stdout.read(), stderr.read(), exit_status)
|
return Result(stdout.read(), stderr.read(), exit_status, cmd = cmd)
|
||||||
|
|
|
||||||
|
|
@ -1,31 +1,51 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import datetime
|
import inspect
|
||||||
|
import re
|
||||||
import sys
|
import sys
|
||||||
import syslog
|
import syslog
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from os.path import basename
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
import io
|
||||||
|
|
||||||
from typing import Final
|
from typing import Any, Final, List, Optional, Tuple
|
||||||
|
|
||||||
# fmt: disable # don't conflate
|
# fmt: disable # don't conflate
|
||||||
# yapf: disable # don't conflate
|
# yapf: disable # don't conflate
|
||||||
EMERG = int(syslog.LOG_EMERG)
|
_special_chars = {
|
||||||
ALERT = int(syslog.LOG_ALERT)
|
'\a' : '\\a',
|
||||||
CRIT = int(syslog.LOG_CRIT)
|
'\b' : '\\b',
|
||||||
ERR = int(syslog.LOG_ERR)
|
'\t' : '\\t',
|
||||||
WARNING = int(syslog.LOG_WARNING)
|
'\n' : '\\n',
|
||||||
NOTICE = int(syslog.LOG_NOTICE)
|
'\v' : '\\v',
|
||||||
INFO = int(syslog.LOG_INFO)
|
'\f' : '\\f',
|
||||||
DEBUG = int(syslog.LOG_DEBUG)
|
'\r' : '\\r',
|
||||||
DEVEL = int(syslog.LOG_DEBUG + 1)
|
}
|
||||||
OFF = DEVEL + 1
|
# yapf: enable
|
||||||
|
# fmt: enable
|
||||||
|
|
||||||
_log_level = NOTICE
|
_special_char_regex = re.compile(
|
||||||
_last_tstamp = datetime.datetime.now()
|
"(%s)" % "|".join(map(re.escape, _special_chars.keys()))
|
||||||
_first_tstamp = _last_tstamp
|
)
|
||||||
|
|
||||||
|
_clean_str_regex = re.compile(r'\033\[[0-9]*m|[\x00-\x1F\x7F-\x9F]')
|
||||||
|
|
||||||
|
# fmt: disable # don't conflate
|
||||||
|
# yapf: disable # don't conflate
|
||||||
|
EMERG = int(syslog.LOG_EMERG)
|
||||||
|
ALERT = int(syslog.LOG_ALERT)
|
||||||
|
CRIT = int(syslog.LOG_CRIT)
|
||||||
|
ERR = int(syslog.LOG_ERR)
|
||||||
|
WARNING = int(syslog.LOG_WARNING)
|
||||||
|
NOTICE = int(syslog.LOG_NOTICE)
|
||||||
|
INFO = int(syslog.LOG_INFO)
|
||||||
|
DEBUG = int(syslog.LOG_DEBUG)
|
||||||
|
DEVEL = int(syslog.LOG_DEBUG + 1)
|
||||||
|
OFF = DEVEL + 1
|
||||||
|
|
||||||
_LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = {
|
_LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = {
|
||||||
EMERG: 'EMERG',
|
EMERG: 'EMERG',
|
||||||
|
|
@ -39,8 +59,6 @@ _LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = {
|
||||||
DEVEL: 'DEVEL',
|
DEVEL: 'DEVEL',
|
||||||
OFF: 'OFF',
|
OFF: 'OFF',
|
||||||
}
|
}
|
||||||
# yapf: enable
|
|
||||||
# fmt: enable
|
|
||||||
|
|
||||||
_LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = {
|
_LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = {
|
||||||
alias: value
|
alias: value
|
||||||
|
|
@ -48,8 +66,205 @@ _LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = {
|
||||||
for alias in (name, name.lower())
|
for alias in (name, name.lower())
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_log_level_name(level: int) -> str:
|
_level = NOTICE
|
||||||
return _LOG_LEVEL_NAME_BY_VALUE[level]
|
|
||||||
|
CONSOLE_FONT_BOLD = '\033[1m'
|
||||||
|
CONSOLE_FONT_RED = '\033[31m'
|
||||||
|
CONSOLE_FONT_GREEN = '\033[32m'
|
||||||
|
CONSOLE_FONT_YELLOW = '\033[33m'
|
||||||
|
CONSOLE_FONT_BLUE = '\033[34m'
|
||||||
|
|
||||||
|
CONSOLE_FONT_MAGENTA = '\033[35m'
|
||||||
|
CONSOLE_FONT_CYAN = '\033[36m'
|
||||||
|
CONSOLE_FONT_WHITE = '\033[37m'
|
||||||
|
|
||||||
|
CONSOLE_FONT_BLINK = '\033[5m'
|
||||||
|
CONSOLE_FONT_OFF = '\033[m'
|
||||||
|
|
||||||
|
f_position = 'position'
|
||||||
|
f_module = 'module'
|
||||||
|
f_date = 'date'
|
||||||
|
f_stderr = 'stderr'
|
||||||
|
f_stdout = 'stdout'
|
||||||
|
f_prio = 'prio'
|
||||||
|
f_color = 'color'
|
||||||
|
f_default = [ f_position, f_stderr, f_prio, f_color ]
|
||||||
|
|
||||||
|
_flags = set(f_default)
|
||||||
|
_log_prefix = ''
|
||||||
|
_clean_log_prefix = ''
|
||||||
|
_file_name_len = 20
|
||||||
|
_module_name_len = 50
|
||||||
|
_log_file_streams: list[io.TextIOWrapper] = []
|
||||||
|
|
||||||
|
_short_prio_str = {
|
||||||
|
EMERG : '<Y>',
|
||||||
|
ALERT : '<A>',
|
||||||
|
CRIT : '<C>',
|
||||||
|
ERR : '<E>',
|
||||||
|
WARNING : '<W>',
|
||||||
|
NOTICE : '<N>',
|
||||||
|
INFO : '<I>',
|
||||||
|
DEBUG : '<D>',
|
||||||
|
DEVEL : '<V>',
|
||||||
|
}
|
||||||
|
|
||||||
|
_prio_colors = {
|
||||||
|
DEVEL : [ "", "" ],
|
||||||
|
DEBUG : [ "", "" ],
|
||||||
|
INFO : [ CONSOLE_FONT_BLUE, CONSOLE_FONT_OFF ],
|
||||||
|
NOTICE : [ CONSOLE_FONT_GREEN, CONSOLE_FONT_OFF ],
|
||||||
|
WARNING : [ CONSOLE_FONT_YELLOW, CONSOLE_FONT_OFF ],
|
||||||
|
ERR : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_RED, CONSOLE_FONT_OFF ],
|
||||||
|
CRIT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
||||||
|
ALERT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
||||||
|
EMERG : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
||||||
|
}
|
||||||
|
|
||||||
|
# yapf: enable
|
||||||
|
# fmt: enable
|
||||||
|
|
||||||
|
class Stream:
|
||||||
|
|
||||||
|
def __init__(self, stream, flags):
|
||||||
|
self.stream = stream
|
||||||
|
self.flags = flags
|
||||||
|
|
||||||
|
_streams: dict[int, Stream] = dict()
|
||||||
|
_stream_descriptors = list(reversed(range(1, 16)))
|
||||||
|
|
||||||
|
def pad(token: str, total_size: int, right_align: bool = False) -> str:
|
||||||
|
add = total_size - len(token)
|
||||||
|
if add <= 0:
|
||||||
|
return token
|
||||||
|
space = ' ' * add
|
||||||
|
if right_align:
|
||||||
|
return space + token
|
||||||
|
return token + space
|
||||||
|
|
||||||
|
def add_capture_stream(stream, flags = 0x0):
|
||||||
|
ret = _stream_descriptors.pop()
|
||||||
|
_streams[ret] = Stream(stream = stream, flags = flags)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def rm_capture_stream(sd):
|
||||||
|
del _streams[sd]
|
||||||
|
_stream_descriptors.append(sd)
|
||||||
|
|
||||||
|
def prio_gets_logged(prio: int) -> bool: # export
|
||||||
|
if prio > _level:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def log_level(s: Optional[str] = None) -> int: # export
|
||||||
|
if s is None:
|
||||||
|
return _level
|
||||||
|
return parse_log_prio_str(s)
|
||||||
|
|
||||||
|
def get_caller_pos(up: int = 1,
|
||||||
|
kwargs: Optional[dict[str, Any]] = None) -> Tuple[str, str, int]:
|
||||||
|
if kwargs and 'caller' in kwargs:
|
||||||
|
r = kwargs['caller']
|
||||||
|
del kwargs['caller']
|
||||||
|
return r
|
||||||
|
caller = inspect.stack()[up + 1]
|
||||||
|
mod = inspect.getmodule(caller[0])
|
||||||
|
mod_name = '' if mod is None else mod.__name__
|
||||||
|
return (mod_name, basename(caller.filename), caller.lineno)
|
||||||
|
|
||||||
|
def log_m(prio: int, *args, **kwargs) -> None: # export
|
||||||
|
if prio > _level:
|
||||||
|
return
|
||||||
|
if len(args):
|
||||||
|
margs = ''
|
||||||
|
for a in args:
|
||||||
|
if isinstance(a, list):
|
||||||
|
margs += '\n'.join([str(elem) for elem in a])
|
||||||
|
continue
|
||||||
|
margs += ' ' + str(a)
|
||||||
|
if 'caller' not in kwargs:
|
||||||
|
caller = get_caller_pos(1)
|
||||||
|
else:
|
||||||
|
caller = kwargs['caller']
|
||||||
|
del kwargs['caller']
|
||||||
|
for line in margs[1:].split('\n'):
|
||||||
|
log(prio, line, **kwargs, caller = caller)
|
||||||
|
|
||||||
|
def log(prio: int, *args, only_printable: bool = False, **kwargs) -> None: # export
|
||||||
|
|
||||||
|
if prio > _level:
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = ''
|
||||||
|
color_on = ''
|
||||||
|
color_off = ''
|
||||||
|
|
||||||
|
if f_date in _flags:
|
||||||
|
msg += datetime.now().strftime("%b %d %H:%M:%S.%f ")
|
||||||
|
|
||||||
|
if f_prio in _flags:
|
||||||
|
msg += _short_prio_str[prio] + ' '
|
||||||
|
|
||||||
|
if f_position in _flags:
|
||||||
|
|
||||||
|
if 'caller' in kwargs:
|
||||||
|
mod, name, line = kwargs['caller']
|
||||||
|
else:
|
||||||
|
mod, name, line = get_caller_pos(1)
|
||||||
|
|
||||||
|
if f_module in _flags:
|
||||||
|
msg += pad(mod, _module_name_len)
|
||||||
|
|
||||||
|
msg += pad(name, _file_name_len) + '[' + pad(str(line), 4, True) + ']'
|
||||||
|
|
||||||
|
if f_color in _flags:
|
||||||
|
color_on, color_off = console_color_chars(prio)
|
||||||
|
|
||||||
|
margs = ''
|
||||||
|
if len(args):
|
||||||
|
for a in args:
|
||||||
|
margs += ' ' + str(a)
|
||||||
|
if only_printable:
|
||||||
|
margs = _special_char_regex.sub(
|
||||||
|
lambda mo: _special_chars[mo.string[mo.start():mo.end()]], margs
|
||||||
|
)
|
||||||
|
margs = re.sub('[\x01-\x1f]', '.', margs)
|
||||||
|
|
||||||
|
for file in _log_file_streams:
|
||||||
|
print(msg + _clean_log_prefix + margs, file = file)
|
||||||
|
|
||||||
|
msg += _log_prefix
|
||||||
|
|
||||||
|
if not len(msg):
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(margs):
|
||||||
|
msg += color_on + margs + color_off
|
||||||
|
|
||||||
|
files = []
|
||||||
|
if 'capture' in kwargs:
|
||||||
|
files.append(kwargs['capture'])
|
||||||
|
elif _streams:
|
||||||
|
files = [s.stream for s in _streams.values()]
|
||||||
|
else:
|
||||||
|
if f_stdout in _flags:
|
||||||
|
files.append(sys.stdout)
|
||||||
|
|
||||||
|
if f_stderr in _flags:
|
||||||
|
files.append(sys.stderr)
|
||||||
|
|
||||||
|
if not len(files):
|
||||||
|
files = [sys.stdout]
|
||||||
|
|
||||||
|
for file in files:
|
||||||
|
print(msg, file = file)
|
||||||
|
|
||||||
|
def throw(*args, prio = ERR, caller = None, **kwargs) -> None:
|
||||||
|
if caller is None:
|
||||||
|
caller = get_caller_pos(1)
|
||||||
|
msg = ' '.join([str(arg) for arg in args])
|
||||||
|
log(prio, msg, caller = caller)
|
||||||
|
raise Exception(msg)
|
||||||
|
|
||||||
def parse_log_level(level: str | int) -> int:
|
def parse_log_level(level: str | int) -> int:
|
||||||
|
|
||||||
|
|
@ -64,24 +279,83 @@ def parse_log_level(level: str | int) -> int:
|
||||||
return _LOG_LEVEL_VALUE_BY_NAME[level]
|
return _LOG_LEVEL_VALUE_BY_NAME[level]
|
||||||
return __int_level(int(level))
|
return __int_level(int(level))
|
||||||
|
|
||||||
|
def parse_log_prio_str(prio: str) -> int: # export
|
||||||
|
return parse_log_level(prio)
|
||||||
|
|
||||||
|
def console_color_chars(prio: int) -> List[str]: # export
|
||||||
|
if not sys.stdout.isatty():
|
||||||
|
return ['', '']
|
||||||
|
return _prio_colors[prio]
|
||||||
|
|
||||||
def set_log_level(level: str | int | None = None) -> int:
|
def set_log_level(level: str | int | None = None) -> int:
|
||||||
global _log_level
|
global _level
|
||||||
ret = _log_level
|
ret = _level
|
||||||
if level is not None:
|
if level is not None:
|
||||||
_log_level = parse_log_level(level)
|
_level = parse_log_level(level)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def set_log_flags(*args, **kwargs):
|
def set_level(level: str | int | None = None) -> int: # export
|
||||||
pass
|
return set_log_level(level)
|
||||||
|
|
||||||
def log(prio: int, *args, **kwargs):
|
def set_flags(flags: str | None) -> str: # export
|
||||||
global _log_level
|
global _flags
|
||||||
if prio > _log_level:
|
ret = ','.join(_flags)
|
||||||
return
|
if flags is not None:
|
||||||
print(*args, file = sys.stderr)
|
_flags = set(flags.split(','))
|
||||||
|
return ret
|
||||||
|
|
||||||
def log_time_diff(prio: int, *args, **kwargs):
|
def set_log_flags(flags: str) -> str:
|
||||||
global _last_tstamp
|
return set_flags(flags)
|
||||||
now = datetime.datetime.now()
|
|
||||||
log(prio, now - _last_tstamp, now - _first_tstamp, *args)
|
#syslog
|
||||||
_last_tstamp = now
|
#console
|
||||||
|
#color
|
||||||
|
#prio
|
||||||
|
#position
|
||||||
|
#ide
|
||||||
|
#trace_rename_thread_to_shorter
|
||||||
|
#trace_rename_thread_to_longer
|
||||||
|
#trace_inout
|
||||||
|
#skip_openlog
|
||||||
|
#id
|
||||||
|
#date
|
||||||
|
#pid
|
||||||
|
#highlight_first_error
|
||||||
|
|
||||||
|
def append_to_prefix(prefix: str) -> str: # export
|
||||||
|
global _log_prefix
|
||||||
|
global _clean_log_prefix
|
||||||
|
r = _log_prefix
|
||||||
|
if prefix:
|
||||||
|
_log_prefix += prefix
|
||||||
|
_clean_log_prefix = _clean_str_regex.sub('', _log_prefix)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def remove_from_prefix(count) -> str: # export
|
||||||
|
if isinstance(count, str):
|
||||||
|
count = len(count)
|
||||||
|
global _log_prefix
|
||||||
|
global _clean_log_prefix
|
||||||
|
r = _log_prefix
|
||||||
|
_log_prefix = _log_prefix[:-count]
|
||||||
|
_clean_log_prefix = _clean_str_regex.sub('', _log_prefix)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def set_filename_length(length: int) -> int: # export
|
||||||
|
global _file_name_len
|
||||||
|
r = _file_name_len
|
||||||
|
if length:
|
||||||
|
_file_name_len = length
|
||||||
|
return r
|
||||||
|
|
||||||
|
def set_module_name_length(length: int) -> int: # export
|
||||||
|
global _module_name_len
|
||||||
|
r = _module_name_len
|
||||||
|
if length:
|
||||||
|
_module_name_len = length
|
||||||
|
return r
|
||||||
|
|
||||||
|
def add_log_file(path: str) -> None: # export
|
||||||
|
global _log_file_streams
|
||||||
|
fd = open(path, 'w', buffering = 1)
|
||||||
|
_log_file_streams.append(fd)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue