Beautify logging code and output #17
8 changed files with 391 additions and 113 deletions
|
|
@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any
|
|||
|
||||
from .AsyncRunner import AsyncRunner
|
||||
from .log import DEBUG, ERR, NOTICE, log, set_log_flags, set_log_level
|
||||
from .util import pretty_cmd
|
||||
from .Types import LoadTypes
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -135,9 +136,9 @@ class App: # export
|
|||
set_log_flags(args.log_flags)
|
||||
set_log_level(args.log_level)
|
||||
|
||||
log(DEBUG, '-------------- Running: >' + ' '.join(sys.argv) + '<')
|
||||
log(DEBUG, f'-------------- Running: >{pretty_cmd(sys.argv)}<')
|
||||
|
||||
cmd_classes = LoadTypes(
|
||||
cmd_classes: LoadTypes[AbstractCmd] = LoadTypes(
|
||||
modules if modules else ['__main__'],
|
||||
type_name_filter = name_filter,
|
||||
type_filter = [AbstractCmd], # type: ignore[type-abstract]
|
||||
|
|
|
|||
|
|
@ -313,7 +313,7 @@ class ExecContext(Base):
|
|||
await self.open()
|
||||
|
||||
try:
|
||||
ret = Result()
|
||||
ret = Result(cmd = cmd)
|
||||
with self.CallContext(
|
||||
self,
|
||||
title = title,
|
||||
|
|
@ -421,7 +421,7 @@ class ExecContext(Base):
|
|||
# be returned by CallContext and is very much allowed
|
||||
assert cmd_input is not None, 'Invalid: cmd_input is None'
|
||||
|
||||
ret = Result()
|
||||
ret = Result(cmd = cmd)
|
||||
with self.CallContext(
|
||||
self,
|
||||
title = title,
|
||||
|
|
|
|||
|
|
@ -28,6 +28,64 @@ class Result:
|
|||
return ret.strip()
|
||||
return ret
|
||||
|
||||
def __try_decode(
|
||||
self,
|
||||
stdxxx: bytes | None,
|
||||
quote = False,
|
||||
truncate: int | None = None,
|
||||
annotate: bool = True,
|
||||
label: str | None = None,
|
||||
) -> str:
|
||||
if label is None:
|
||||
label = ''
|
||||
else:
|
||||
label = f'{label}: '
|
||||
if stdxxx is None:
|
||||
return f'{label}None'
|
||||
try:
|
||||
ret = stdxxx.decode()[:truncate].strip()
|
||||
except UnicodeDecodeError:
|
||||
chunk = stdxxx[:truncate]
|
||||
ret = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
|
||||
if (not annotate) or truncate is None or len(stdxxx) <= truncate:
|
||||
return f'{label}"{ret}"' if quote else label + ret
|
||||
ret = '{labe}"{ret} ..."' if quote else '{labe}{ret} ...'
|
||||
ret += f' + {len(stdxxx) - truncate} more bytes'
|
||||
return ret
|
||||
|
||||
def __summarize(
|
||||
self,
|
||||
cmd: list[str] | None = None,
|
||||
wd: str | None = None,
|
||||
verbose = True
|
||||
) -> str:
|
||||
if not verbose:
|
||||
ret = f'{self.__status}: '
|
||||
else:
|
||||
from .util import pretty_cmd
|
||||
if cmd is None:
|
||||
cmd = self.__cmd
|
||||
call = ''
|
||||
if cmd is not None:
|
||||
if wd is None:
|
||||
wd = self.__wd
|
||||
call = f'"{pretty_cmd(cmd, wd)}" '
|
||||
ret = (
|
||||
f'Command {call}has not yet exited' if self.__status is None else
|
||||
f'Command {call}has exited with status {self.__status}'
|
||||
)
|
||||
label, stdxxx, truncate = (
|
||||
('stdout', self.__stdout, 40) if self.status == 0
|
||||
else ('stderr', self.__stderr, None)
|
||||
)
|
||||
ret += self.__try_decode(
|
||||
stdxxx, quote = True, truncate = truncate, annotate = True, label = label
|
||||
)
|
||||
return ret
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.__summarize(verbose = False)
|
||||
|
||||
@property
|
||||
def status(self) -> int | None:
|
||||
return self.__status
|
||||
|
|
@ -40,29 +98,6 @@ class Result:
|
|||
def encoding(self, value: str) -> None:
|
||||
self.__encoding = value
|
||||
|
||||
def __stdout_footprint(self, quote = False) -> str:
|
||||
if self.__stdout is None:
|
||||
ret = ''
|
||||
else:
|
||||
max_len = 40
|
||||
try:
|
||||
ret = self.stdout_str[:max_len]
|
||||
except UnicodeDecodeError:
|
||||
chunk = self.__stdout[:max_len]
|
||||
ret = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
|
||||
if quote:
|
||||
ret = f'"{ret}"'
|
||||
return ret
|
||||
|
||||
def __repr__(self) -> str:
|
||||
ret = f'{self.__status}:'
|
||||
if self.__status is not None:
|
||||
if self.status != 0:
|
||||
ret += f' err: {self.stderr_str_or_none}'
|
||||
else:
|
||||
ret += f' out: {self.__stdout_footprint(quote=True)}'
|
||||
return ret
|
||||
|
||||
@property
|
||||
def strip(self) -> bool:
|
||||
return self.__strip
|
||||
|
|
@ -96,29 +131,6 @@ class Result:
|
|||
import re
|
||||
return re.search(pattern, err) is not None
|
||||
|
||||
def __summarize(self, cmd: list[str] | None, wd: str | None = None) -> str:
|
||||
from .util import pretty_cmd
|
||||
if cmd is None:
|
||||
cmd = self.__cmd
|
||||
call = ''
|
||||
if cmd is not None:
|
||||
if wd is None:
|
||||
wd = self.__wd
|
||||
call = f'"{pretty_cmd(cmd, wd)}" '
|
||||
ret = (
|
||||
f'Command {call}has not yet exited' if self.__status is None else
|
||||
f'Command {call}has exited with status {self.__status}'
|
||||
)
|
||||
call = pretty_cmd(cmd, wd)
|
||||
if self.status != 0:
|
||||
ret += f' -> stderr="{self.__stderr!r}"'
|
||||
else:
|
||||
if self.__stdout:
|
||||
ret += f' -> stdout has {len(self.__stdout)} bytes'
|
||||
else:
|
||||
ret += ' -> stdout = None'
|
||||
return ret
|
||||
|
||||
def summarize(self, cmd: list[str] | None = None, wd: str | None = None) -> str:
|
||||
return self.__summarize(cmd, wd)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import os
|
||||
import re
|
||||
|
|
@ -10,7 +11,7 @@ from .log import OFF, log, parse_log_level
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
from typing import Any, Sequence
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
|
@ -44,7 +45,7 @@ class LoadTypes(Types[T]): # export
|
|||
self,
|
||||
mod_names: list[str],
|
||||
type_name_filter: str | None = None,
|
||||
type_filter: list[type[T]] = [],
|
||||
type_filter: Sequence[type[Any]] | None = None,
|
||||
debug_level = None,
|
||||
):
|
||||
if debug_level is None:
|
||||
|
|
@ -64,9 +65,12 @@ class LoadTypes(Types[T]): # export
|
|||
log(self.__debug_level, *args, **kwargs)
|
||||
|
||||
def _stringify(self):
|
||||
tf = 'None' if self.__type_filter is None else (
|
||||
', '.join([str(f) for f in self.__type_filter])
|
||||
)
|
||||
return [
|
||||
'type_name_filter: ' + str(self.__type_name_filter),
|
||||
'type_filter: ' + ', '.join([str(f) for f in self.__type_filter]),
|
||||
'type_filter: ' + tf,
|
||||
'mod_names: ' + ', '.join(self.__mod_names),
|
||||
]
|
||||
|
||||
|
|
@ -86,35 +90,22 @@ class LoadTypes(Types[T]): # export
|
|||
for member_name, c in inspect.getmembers(
|
||||
sys.modules[mod_name], inspect.isclass
|
||||
):
|
||||
name = f'{mod_name}.{member_name}'
|
||||
if rx is not None and not re.match(rx, member_name):
|
||||
self._debug(
|
||||
'o "{}.{}" has wrong name'.format(mod_name, member_name)
|
||||
)
|
||||
self._debug(f'o "{name}" has wrong name')
|
||||
continue
|
||||
if inspect.isabstract(c):
|
||||
self._debug(
|
||||
'o "{}.{}" is abstract'.format(mod_name, member_name)
|
||||
)
|
||||
self._debug(f'o "{name}" is abstract: {c.__abstractmethods__}')
|
||||
continue
|
||||
if self.__type_filter:
|
||||
for tp in self.__type_filter:
|
||||
if issubclass(c, tp):
|
||||
break
|
||||
self._debug(
|
||||
'o "{}.{}" is not of type {}'.format(
|
||||
mod_name, member_name, tp
|
||||
)
|
||||
)
|
||||
self._debug(f'o "{name}" is not of type {tp}')
|
||||
else:
|
||||
self._debug(
|
||||
'o "{}.{}" doesn\'t match type filter'.format(
|
||||
mod_name, member_name
|
||||
)
|
||||
)
|
||||
self._debug(f'o "{name}" doesn\'t match type filter')
|
||||
continue
|
||||
self._debug(
|
||||
'o "{}.{}" is fine, adding'.format(mod_name, member_name)
|
||||
)
|
||||
self._debug(f'o "{name}" is fine, adding')
|
||||
ret.append(c)
|
||||
self.__classes = ret
|
||||
return self.__classes
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ class Local(Base):
|
|||
|
||||
# PTY merges stdout/stderr
|
||||
stdout = b''.join(stdout_chunks) if stdout_chunks else None
|
||||
return Result(stdout, None, exit_code)
|
||||
return Result(stdout, None, exit_code, cmd = cmd)
|
||||
|
||||
# -- non-interactive mode
|
||||
|
||||
|
|
@ -149,7 +149,7 @@ class Local(Base):
|
|||
stdout = b''.join(stdout_parts) if stdout_parts else None
|
||||
stderr = b''.join(stderr_parts) if stderr_parts else None
|
||||
|
||||
return Result(stdout, stderr, exit_code)
|
||||
return Result(stdout, stderr, exit_code, cmd = cmd)
|
||||
|
||||
finally:
|
||||
if cwd is not None:
|
||||
|
|
|
|||
|
|
@ -273,7 +273,7 @@ class AsyncSSH(Base):
|
|||
)
|
||||
|
||||
stdout = b''.join(stdout_parts) if stdout_parts else None
|
||||
return Result(stdout, None, exit_code)
|
||||
return Result(stdout, None, exit_code, cmd = cmd)
|
||||
|
||||
finally:
|
||||
if stdin_reader_installed:
|
||||
|
|
@ -353,7 +353,7 @@ class AsyncSSH(Base):
|
|||
exit_code = completed.returncode if completed.returncode is not None else -1
|
||||
|
||||
stdout = b''.join(stdout_parts) if stdout_parts else None
|
||||
return Result(stdout, None, exit_code)
|
||||
return Result(stdout, None, exit_code, cmd = cmd)
|
||||
|
||||
async def _run_ssh(
|
||||
self,
|
||||
|
|
@ -446,7 +446,7 @@ class AsyncSSH(Base):
|
|||
completed.returncode if completed.returncode is not None else -1
|
||||
)
|
||||
|
||||
return Result(stdout, stderr, exit_code)
|
||||
return Result(stdout, stderr, exit_code, cmd = cmd)
|
||||
|
||||
except Exception as e:
|
||||
log(ERR, f'Failed to run command {" ".join(cmd)} ({e})')
|
||||
|
|
|
|||
|
|
@ -84,4 +84,4 @@ class Paramiko(Base):
|
|||
if cmd_input is not None:
|
||||
stdin.write(cmd_input)
|
||||
exit_status = stdout.channel.recv_exit_status()
|
||||
return Result(stdout.read(), stderr.read(), exit_status)
|
||||
return Result(stdout.read(), stderr.read(), exit_status, cmd = cmd)
|
||||
|
|
|
|||
|
|
@ -1,31 +1,51 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import inspect
|
||||
import re
|
||||
import sys
|
||||
import syslog
|
||||
|
||||
from datetime import datetime
|
||||
from os.path import basename
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import io
|
||||
|
||||
from typing import Final
|
||||
from typing import Any, Final, List, Optional, Tuple
|
||||
|
||||
# fmt: disable # don't conflate
|
||||
# yapf: disable # don't conflate
|
||||
EMERG = int(syslog.LOG_EMERG)
|
||||
ALERT = int(syslog.LOG_ALERT)
|
||||
CRIT = int(syslog.LOG_CRIT)
|
||||
ERR = int(syslog.LOG_ERR)
|
||||
WARNING = int(syslog.LOG_WARNING)
|
||||
NOTICE = int(syslog.LOG_NOTICE)
|
||||
INFO = int(syslog.LOG_INFO)
|
||||
DEBUG = int(syslog.LOG_DEBUG)
|
||||
DEVEL = int(syslog.LOG_DEBUG + 1)
|
||||
OFF = DEVEL + 1
|
||||
_special_chars = {
|
||||
'\a' : '\\a',
|
||||
'\b' : '\\b',
|
||||
'\t' : '\\t',
|
||||
'\n' : '\\n',
|
||||
'\v' : '\\v',
|
||||
'\f' : '\\f',
|
||||
'\r' : '\\r',
|
||||
}
|
||||
# yapf: enable
|
||||
# fmt: enable
|
||||
|
||||
_log_level = NOTICE
|
||||
_last_tstamp = datetime.datetime.now()
|
||||
_first_tstamp = _last_tstamp
|
||||
_special_char_regex = re.compile(
|
||||
"(%s)" % "|".join(map(re.escape, _special_chars.keys()))
|
||||
)
|
||||
|
||||
_clean_str_regex = re.compile(r'\033\[[0-9]*m|[\x00-\x1F\x7F-\x9F]')
|
||||
|
||||
# fmt: disable # don't conflate
|
||||
# yapf: disable # don't conflate
|
||||
EMERG = int(syslog.LOG_EMERG)
|
||||
ALERT = int(syslog.LOG_ALERT)
|
||||
CRIT = int(syslog.LOG_CRIT)
|
||||
ERR = int(syslog.LOG_ERR)
|
||||
WARNING = int(syslog.LOG_WARNING)
|
||||
NOTICE = int(syslog.LOG_NOTICE)
|
||||
INFO = int(syslog.LOG_INFO)
|
||||
DEBUG = int(syslog.LOG_DEBUG)
|
||||
DEVEL = int(syslog.LOG_DEBUG + 1)
|
||||
OFF = DEVEL + 1
|
||||
|
||||
_LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = {
|
||||
EMERG: 'EMERG',
|
||||
|
|
@ -39,8 +59,6 @@ _LOG_LEVEL_NAME_BY_VALUE: Final[dict[int, str]] = {
|
|||
DEVEL: 'DEVEL',
|
||||
OFF: 'OFF',
|
||||
}
|
||||
# yapf: enable
|
||||
# fmt: enable
|
||||
|
||||
_LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = {
|
||||
alias: value
|
||||
|
|
@ -48,8 +66,205 @@ _LOG_LEVEL_VALUE_BY_NAME: Final[dict[str, int]] = {
|
|||
for alias in (name, name.lower())
|
||||
}
|
||||
|
||||
def get_log_level_name(level: int) -> str:
|
||||
return _LOG_LEVEL_NAME_BY_VALUE[level]
|
||||
_level = NOTICE
|
||||
|
||||
CONSOLE_FONT_BOLD = '\033[1m'
|
||||
CONSOLE_FONT_RED = '\033[31m'
|
||||
CONSOLE_FONT_GREEN = '\033[32m'
|
||||
CONSOLE_FONT_YELLOW = '\033[33m'
|
||||
CONSOLE_FONT_BLUE = '\033[34m'
|
||||
|
||||
CONSOLE_FONT_MAGENTA = '\033[35m'
|
||||
CONSOLE_FONT_CYAN = '\033[36m'
|
||||
CONSOLE_FONT_WHITE = '\033[37m'
|
||||
|
||||
CONSOLE_FONT_BLINK = '\033[5m'
|
||||
CONSOLE_FONT_OFF = '\033[m'
|
||||
|
||||
f_position = 'position'
|
||||
f_module = 'module'
|
||||
f_date = 'date'
|
||||
f_stderr = 'stderr'
|
||||
f_stdout = 'stdout'
|
||||
f_prio = 'prio'
|
||||
f_color = 'color'
|
||||
f_default = [ f_position, f_stderr, f_prio, f_color ]
|
||||
|
||||
_flags = set(f_default)
|
||||
_log_prefix = ''
|
||||
_clean_log_prefix = ''
|
||||
_file_name_len = 20
|
||||
_module_name_len = 50
|
||||
_log_file_streams: list[io.TextIOWrapper] = []
|
||||
|
||||
_short_prio_str = {
|
||||
EMERG : '<Y>',
|
||||
ALERT : '<A>',
|
||||
CRIT : '<C>',
|
||||
ERR : '<E>',
|
||||
WARNING : '<W>',
|
||||
NOTICE : '<N>',
|
||||
INFO : '<I>',
|
||||
DEBUG : '<D>',
|
||||
DEVEL : '<V>',
|
||||
}
|
||||
|
||||
_prio_colors = {
|
||||
DEVEL : [ "", "" ],
|
||||
DEBUG : [ "", "" ],
|
||||
INFO : [ CONSOLE_FONT_BLUE, CONSOLE_FONT_OFF ],
|
||||
NOTICE : [ CONSOLE_FONT_GREEN, CONSOLE_FONT_OFF ],
|
||||
WARNING : [ CONSOLE_FONT_YELLOW, CONSOLE_FONT_OFF ],
|
||||
ERR : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_RED, CONSOLE_FONT_OFF ],
|
||||
CRIT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
||||
ALERT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
||||
EMERG : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
||||
}
|
||||
|
||||
# yapf: enable
|
||||
# fmt: enable
|
||||
|
||||
class Stream:
|
||||
|
||||
def __init__(self, stream, flags):
|
||||
self.stream = stream
|
||||
self.flags = flags
|
||||
|
||||
_streams: dict[int, Stream] = dict()
|
||||
_stream_descriptors = list(reversed(range(1, 16)))
|
||||
|
||||
def pad(token: str, total_size: int, right_align: bool = False) -> str:
|
||||
add = total_size - len(token)
|
||||
if add <= 0:
|
||||
return token
|
||||
space = ' ' * add
|
||||
if right_align:
|
||||
return space + token
|
||||
return token + space
|
||||
|
||||
def add_capture_stream(stream, flags = 0x0):
|
||||
ret = _stream_descriptors.pop()
|
||||
_streams[ret] = Stream(stream = stream, flags = flags)
|
||||
return ret
|
||||
|
||||
def rm_capture_stream(sd):
|
||||
del _streams[sd]
|
||||
_stream_descriptors.append(sd)
|
||||
|
||||
def prio_gets_logged(prio: int) -> bool: # export
|
||||
if prio > _level:
|
||||
return False
|
||||
return True
|
||||
|
||||
def log_level(s: Optional[str] = None) -> int: # export
|
||||
if s is None:
|
||||
return _level
|
||||
return parse_log_prio_str(s)
|
||||
|
||||
def get_caller_pos(up: int = 1,
|
||||
kwargs: Optional[dict[str, Any]] = None) -> Tuple[str, str, int]:
|
||||
if kwargs and 'caller' in kwargs:
|
||||
r = kwargs['caller']
|
||||
del kwargs['caller']
|
||||
return r
|
||||
caller = inspect.stack()[up + 1]
|
||||
mod = inspect.getmodule(caller[0])
|
||||
mod_name = '' if mod is None else mod.__name__
|
||||
return (mod_name, basename(caller.filename), caller.lineno)
|
||||
|
||||
def log_m(prio: int, *args, **kwargs) -> None: # export
|
||||
if prio > _level:
|
||||
return
|
||||
if len(args):
|
||||
margs = ''
|
||||
for a in args:
|
||||
if isinstance(a, list):
|
||||
margs += '\n'.join([str(elem) for elem in a])
|
||||
continue
|
||||
margs += ' ' + str(a)
|
||||
if 'caller' not in kwargs:
|
||||
caller = get_caller_pos(1)
|
||||
else:
|
||||
caller = kwargs['caller']
|
||||
del kwargs['caller']
|
||||
for line in margs[1:].split('\n'):
|
||||
log(prio, line, **kwargs, caller = caller)
|
||||
|
||||
def log(prio: int, *args, only_printable: bool = False, **kwargs) -> None: # export
|
||||
|
||||
if prio > _level:
|
||||
return
|
||||
|
||||
msg = ''
|
||||
color_on = ''
|
||||
color_off = ''
|
||||
|
||||
if f_date in _flags:
|
||||
msg += datetime.now().strftime("%b %d %H:%M:%S.%f ")
|
||||
|
||||
if f_prio in _flags:
|
||||
msg += _short_prio_str[prio] + ' '
|
||||
|
||||
if f_position in _flags:
|
||||
|
||||
if 'caller' in kwargs:
|
||||
mod, name, line = kwargs['caller']
|
||||
else:
|
||||
mod, name, line = get_caller_pos(1)
|
||||
|
||||
if f_module in _flags:
|
||||
msg += pad(mod, _module_name_len)
|
||||
|
||||
msg += pad(name, _file_name_len) + '[' + pad(str(line), 4, True) + ']'
|
||||
|
||||
if f_color in _flags:
|
||||
color_on, color_off = console_color_chars(prio)
|
||||
|
||||
margs = ''
|
||||
if len(args):
|
||||
for a in args:
|
||||
margs += ' ' + str(a)
|
||||
if only_printable:
|
||||
margs = _special_char_regex.sub(
|
||||
lambda mo: _special_chars[mo.string[mo.start():mo.end()]], margs
|
||||
)
|
||||
margs = re.sub('[\x01-\x1f]', '.', margs)
|
||||
|
||||
for file in _log_file_streams:
|
||||
print(msg + _clean_log_prefix + margs, file = file)
|
||||
|
||||
msg += _log_prefix
|
||||
|
||||
if not len(msg):
|
||||
return
|
||||
|
||||
if len(margs):
|
||||
msg += color_on + margs + color_off
|
||||
|
||||
files = []
|
||||
if 'capture' in kwargs:
|
||||
files.append(kwargs['capture'])
|
||||
elif _streams:
|
||||
files = [s.stream for s in _streams.values()]
|
||||
else:
|
||||
if f_stdout in _flags:
|
||||
files.append(sys.stdout)
|
||||
|
||||
if f_stderr in _flags:
|
||||
files.append(sys.stderr)
|
||||
|
||||
if not len(files):
|
||||
files = [sys.stdout]
|
||||
|
||||
for file in files:
|
||||
print(msg, file = file)
|
||||
|
||||
def throw(*args, prio = ERR, caller = None, **kwargs) -> None:
|
||||
if caller is None:
|
||||
caller = get_caller_pos(1)
|
||||
msg = ' '.join([str(arg) for arg in args])
|
||||
log(prio, msg, caller = caller)
|
||||
raise Exception(msg)
|
||||
|
||||
def parse_log_level(level: str | int) -> int:
|
||||
|
||||
|
|
@ -64,24 +279,83 @@ def parse_log_level(level: str | int) -> int:
|
|||
return _LOG_LEVEL_VALUE_BY_NAME[level]
|
||||
return __int_level(int(level))
|
||||
|
||||
def parse_log_prio_str(prio: str) -> int: # export
|
||||
return parse_log_level(prio)
|
||||
|
||||
def console_color_chars(prio: int) -> List[str]: # export
|
||||
if not sys.stdout.isatty():
|
||||
return ['', '']
|
||||
return _prio_colors[prio]
|
||||
|
||||
def set_log_level(level: str | int | None = None) -> int:
|
||||
global _log_level
|
||||
ret = _log_level
|
||||
global _level
|
||||
ret = _level
|
||||
if level is not None:
|
||||
_log_level = parse_log_level(level)
|
||||
_level = parse_log_level(level)
|
||||
return ret
|
||||
|
||||
def set_log_flags(*args, **kwargs):
|
||||
pass
|
||||
def set_level(level: str | int | None = None) -> int: # export
|
||||
return set_log_level(level)
|
||||
|
||||
def log(prio: int, *args, **kwargs):
|
||||
global _log_level
|
||||
if prio > _log_level:
|
||||
return
|
||||
print(*args, file = sys.stderr)
|
||||
def set_flags(flags: str | None) -> str: # export
|
||||
global _flags
|
||||
ret = ','.join(_flags)
|
||||
if flags is not None:
|
||||
_flags = set(flags.split(','))
|
||||
return ret
|
||||
|
||||
def log_time_diff(prio: int, *args, **kwargs):
|
||||
global _last_tstamp
|
||||
now = datetime.datetime.now()
|
||||
log(prio, now - _last_tstamp, now - _first_tstamp, *args)
|
||||
_last_tstamp = now
|
||||
def set_log_flags(flags: str) -> str:
|
||||
return set_flags(flags)
|
||||
|
||||
#syslog
|
||||
#console
|
||||
#color
|
||||
#prio
|
||||
#position
|
||||
#ide
|
||||
#trace_rename_thread_to_shorter
|
||||
#trace_rename_thread_to_longer
|
||||
#trace_inout
|
||||
#skip_openlog
|
||||
#id
|
||||
#date
|
||||
#pid
|
||||
#highlight_first_error
|
||||
|
||||
def append_to_prefix(prefix: str) -> str: # export
|
||||
global _log_prefix
|
||||
global _clean_log_prefix
|
||||
r = _log_prefix
|
||||
if prefix:
|
||||
_log_prefix += prefix
|
||||
_clean_log_prefix = _clean_str_regex.sub('', _log_prefix)
|
||||
return r
|
||||
|
||||
def remove_from_prefix(count) -> str: # export
|
||||
if isinstance(count, str):
|
||||
count = len(count)
|
||||
global _log_prefix
|
||||
global _clean_log_prefix
|
||||
r = _log_prefix
|
||||
_log_prefix = _log_prefix[:-count]
|
||||
_clean_log_prefix = _clean_str_regex.sub('', _log_prefix)
|
||||
return r
|
||||
|
||||
def set_filename_length(length: int) -> int: # export
|
||||
global _file_name_len
|
||||
r = _file_name_len
|
||||
if length:
|
||||
_file_name_len = length
|
||||
return r
|
||||
|
||||
def set_module_name_length(length: int) -> int: # export
|
||||
global _module_name_len
|
||||
r = _module_name_len
|
||||
if length:
|
||||
_module_name_len = length
|
||||
return r
|
||||
|
||||
def add_log_file(path: str) -> None: # export
|
||||
global _log_file_streams
|
||||
fd = open(path, 'w', buffering = 1)
|
||||
_log_file_streams.append(fd)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue