From 0386c351a987a39e082d0092d0443c3a4841b0e7 Mon Sep 17 00:00:00 2001 From: Jan Lindemann Date: Sat, 13 Jun 2026 14:13:15 +0200 Subject: [PATCH] log: Use jw-python's log implementation Use the (mostly) call-compatible log implementation from jw-python. This buys us much nicer logs with color and source-code positon annotations. Signed-off-by: Jan Lindemann --- src/python/jw/pkg/lib/log.py | 342 +++++++++++++++++++++++++++++++---- 1 file changed, 308 insertions(+), 34 deletions(-) diff --git a/src/python/jw/pkg/lib/log.py b/src/python/jw/pkg/lib/log.py index 18c0d0ae..8aa6d148 100644 --- a/src/python/jw/pkg/lib/log.py +++ b/src/python/jw/pkg/lib/log.py @@ -1,31 +1,51 @@ from __future__ import annotations -import datetime +import inspect +import re import sys import syslog +from datetime import datetime +from os.path import basename from typing import TYPE_CHECKING if TYPE_CHECKING: + import io - from typing import Final + from typing import Any, Final, List, Optional, Tuple # fmt: disable # don't conflate # yapf: disable # don't conflate -EMERG = int(syslog.LOG_EMERG) -ALERT = int(syslog.LOG_ALERT) -CRIT = int(syslog.LOG_CRIT) -ERR = int(syslog.LOG_ERR) -WARNING = int(syslog.LOG_WARNING) -NOTICE = int(syslog.LOG_NOTICE) -INFO = int(syslog.LOG_INFO) -DEBUG = int(syslog.LOG_DEBUG) -DEVEL = int(syslog.LOG_DEBUG + 1) -OFF = DEVEL + 1 +_special_chars = { + '\a' : '\\a', + '\b' : '\\b', + '\t' : '\\t', + '\n' : '\\n', + '\v' : '\\v', + '\f' : '\\f', + '\r' : '\\r', +} +# yapf: enable +# fmt: enable -_log_level = NOTICE -_last_tstamp = datetime.datetime.now() -_first_tstamp = _last_tstamp +_special_char_regex = re.compile( + "(%s)" % "|".join(map(re.escape, _special_chars.keys())) +) + +_clean_str_regex = re.compile(r'\033\[[0-9]*m|[\x00-\x1F\x7F-\x9F]') + +# fmt: disable # don't conflate +# yapf: disable # don't conflate +EMERG = int(syslog.LOG_EMERG) +ALERT = int(syslog.LOG_ALERT) +CRIT = int(syslog.LOG_CRIT) +ERR = int(syslog.LOG_ERR) +WARNING = int(syslog.LOG_WARNING) +NOTICE = int(syslog.LOG_NOTICE) +INFO = int(syslog.LOG_INFO) +DEBUG = int(syslog.LOG_DEBUG) +DEVEL = int(syslog.LOG_DEBUG + 1) +OFF = DEVEL + 1 _LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = { EMERG: 'EMERG', @@ -39,8 +59,6 @@ _LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = { DEVEL: 'DEVEL', OFF: 'OFF', } -# yapf: enable -# fmt: enable _LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = { alias: value @@ -48,8 +66,205 @@ _LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = { for alias in (name, name.lower()) } -def get_log_level_name(level: int) -> str: - return _LOG_LEVEL_NAME_BY_VALUE[level] +_level = NOTICE + +CONSOLE_FONT_BOLD = '\033[1m' +CONSOLE_FONT_RED = '\033[31m' +CONSOLE_FONT_GREEN = '\033[32m' +CONSOLE_FONT_YELLOW = '\033[33m' +CONSOLE_FONT_BLUE = '\033[34m' + +CONSOLE_FONT_MAGENTA = '\033[35m' +CONSOLE_FONT_CYAN = '\033[36m' +CONSOLE_FONT_WHITE = '\033[37m' + +CONSOLE_FONT_BLINK = '\033[5m' +CONSOLE_FONT_OFF = '\033[m' + +f_position = 'position' +f_module = 'module' +f_date = 'date' +f_stderr = 'stderr' +f_stdout = 'stdout' +f_prio = 'prio' +f_color = 'color' +f_default = [ f_position, f_stderr, f_prio, f_color ] + +_flags = set(f_default) +_log_prefix = '' +_clean_log_prefix = '' +_file_name_len = 20 +_module_name_len = 50 +_log_file_streams: list[io.TextIOWrapper] = [] + +_short_prio_str = { + EMERG : '', + ALERT : '', + CRIT : '', + ERR : '', + WARNING : '', + NOTICE : '', + INFO : '', + DEBUG : '', + DEVEL : '', +} + +_prio_colors = { + DEVEL : [ "", "" ], + DEBUG : [ "", "" ], + INFO : [ CONSOLE_FONT_BLUE, CONSOLE_FONT_OFF ], + NOTICE : [ CONSOLE_FONT_GREEN, CONSOLE_FONT_OFF ], + WARNING : [ CONSOLE_FONT_YELLOW, CONSOLE_FONT_OFF ], + ERR : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_RED, CONSOLE_FONT_OFF ], + CRIT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ], + ALERT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ], + EMERG : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ], +} + +# yapf: enable +# fmt: enable + +class Stream: + + def __init__(self, stream, flags): + self.stream = stream + self.flags = flags + +_streams: dict[int, Stream] = dict() +_stream_descriptors = list(reversed(range(1, 16))) + +def pad(token: str, total_size: int, right_align: bool = False) -> str: + add = total_size - len(token) + if add <= 0: + return token + space = ' ' * add + if right_align: + return space + token + return token + space + +def add_capture_stream(stream, flags = 0x0): + ret = _stream_descriptors.pop() + _streams[ret] = Stream(stream = stream, flags = flags) + return ret + +def rm_capture_stream(sd): + del _streams[sd] + _stream_descriptors.append(sd) + +def prio_gets_logged(prio: int) -> bool: # export + if prio > _level: + return False + return True + +def log_level(s: Optional[str] = None) -> int: # export + if s is None: + return _level + return parse_log_prio_str(s) + +def get_caller_pos(up: int = 1, + kwargs: Optional[dict[str, Any]] = None) -> Tuple[str, str, int]: + if kwargs and 'caller' in kwargs: + r = kwargs['caller'] + del kwargs['caller'] + return r + caller = inspect.stack()[up + 1] + mod = inspect.getmodule(caller[0]) + mod_name = '' if mod is None else mod.__name__ + return (mod_name, basename(caller.filename), caller.lineno) + +def log_m(prio: int, *args, **kwargs) -> None: # export + if prio > _level: + return + if len(args): + margs = '' + for a in args: + if isinstance(a, list): + margs += '\n'.join([str(elem) for elem in a]) + continue + margs += ' ' + str(a) + if 'caller' not in kwargs: + caller = get_caller_pos(1) + else: + caller = kwargs['caller'] + del kwargs['caller'] + for line in margs[1:].split('\n'): + log(prio, line, **kwargs, caller = caller) + +def log(prio: int, *args, only_printable: bool = False, **kwargs) -> None: # export + + if prio > _level: + return + + msg = '' + color_on = '' + color_off = '' + + if f_date in _flags: + msg += datetime.now().strftime("%b %d %H:%M:%S.%f ") + + if f_prio in _flags: + msg += _short_prio_str[prio] + ' ' + + if f_position in _flags: + + if 'caller' in kwargs: + mod, name, line = kwargs['caller'] + else: + mod, name, line = get_caller_pos(1) + + if f_module in _flags: + msg += pad(mod, _module_name_len) + + msg += pad(name, _file_name_len) + '[' + pad(str(line), 4, True) + ']' + + if f_color in _flags: + color_on, color_off = console_color_chars(prio) + + margs = '' + if len(args): + for a in args: + margs += ' ' + str(a) + if only_printable: + margs = _special_char_regex.sub( + lambda mo: _special_chars[mo.string[mo.start():mo.end()]], margs + ) + margs = re.sub('[\x01-\x1f]', '.', margs) + + for file in _log_file_streams: + print(msg + _clean_log_prefix + margs, file = file) + + msg += _log_prefix + + if not len(msg): + return + + if len(margs): + msg += color_on + margs + color_off + + files = [] + if 'capture' in kwargs: + files.append(kwargs['capture']) + elif _streams: + files = [s.stream for s in _streams.values()] + else: + if f_stdout in _flags: + files.append(sys.stdout) + + if f_stderr in _flags: + files.append(sys.stderr) + + if not len(files): + files = [sys.stdout] + + for file in files: + print(msg, file = file) + +def throw(*args, prio = ERR, caller = None, **kwargs) -> None: + if caller is None: + caller = get_caller_pos(1) + msg = ' '.join([str(arg) for arg in args]) + log(prio, msg, caller = caller) + raise Exception(msg) def parse_log_level(level: str | int) -> int: @@ -64,24 +279,83 @@ def parse_log_level(level: str | int) -> int: return _LOG_LEVEL_VALUE_BY_NAME[level] return __int_level(int(level)) +def parse_log_prio_str(prio: str) -> int: # export + return parse_log_level(prio) + +def console_color_chars(prio: int) -> List[str]: # export + if not sys.stdout.isatty(): + return ['', ''] + return _prio_colors[prio] + def set_log_level(level: str | int | None = None) -> int: - global _log_level - ret = _log_level + global _level + ret = _level if level is not None: - _log_level = parse_log_level(level) + _level = parse_log_level(level) return ret -def set_log_flags(*args, **kwargs): - pass +def set_level(level: str | int | None = None) -> int: # export + return set_log_level(level) -def log(prio: int, *args, **kwargs): - global _log_level - if prio > _log_level: - return - print(*args, file = sys.stderr) +def set_flags(flags: str | None) -> str: # export + global _flags + ret = ','.join(_flags) + if flags is not None: + _flags = set(flags.split(',')) + return ret -def log_time_diff(prio: int, *args, **kwargs): - global _last_tstamp - now = datetime.datetime.now() - log(prio, now - _last_tstamp, now - _first_tstamp, *args) - _last_tstamp = now +def set_log_flags(flags: str) -> str: + return set_flags(flags) + + #syslog + #console + #color + #prio + #position + #ide + #trace_rename_thread_to_shorter + #trace_rename_thread_to_longer + #trace_inout + #skip_openlog + #id + #date + #pid + #highlight_first_error + +def append_to_prefix(prefix: str) -> str: # export + global _log_prefix + global _clean_log_prefix + r = _log_prefix + if prefix: + _log_prefix += prefix + _clean_log_prefix = _clean_str_regex.sub('', _log_prefix) + return r + +def remove_from_prefix(count) -> str: # export + if isinstance(count, str): + count = len(count) + global _log_prefix + global _clean_log_prefix + r = _log_prefix + _log_prefix = _log_prefix[:-count] + _clean_log_prefix = _clean_str_regex.sub('', _log_prefix) + return r + +def set_filename_length(length: int) -> int: # export + global _file_name_len + r = _file_name_len + if length: + _file_name_len = length + return r + +def set_module_name_length(length: int) -> int: # export + global _module_name_len + r = _module_name_len + if length: + _module_name_len = length + return r + +def add_log_file(path: str) -> None: # export + global _log_file_streams + fd = open(path, 'w', buffering = 1) + _log_file_streams.append(fd)