This commit is contained in:
Gourieff 2023-11-05 23:30:05 +07:00
parent 0fee58d4bf
commit 7771f4ac38
27 changed files with 5240 additions and 7 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@ __pycache__/
*.py[cod]
*$py.class
*.pyc
*.log
.vscode/

View File

@ -14,6 +14,9 @@ except:
except:
model_path = os.path.abspath("models")
from loguru import logger as debug_logger
log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "_install.log")
debug_logger.add(log_path, backtrace=True, diagnose=True)
BASE_PATH = os.path.dirname(os.path.realpath(__file__))
@ -39,10 +42,6 @@ model_url = "https://github.com/facefusion/facefusion-assets/releases/download/m
model_name = os.path.basename(model_url)
model_path = os.path.join(models_dir, model_name)
def get_sd_option(name: str, default: Any) -> Any:
assert shared.opts.data is not None
return shared.opts.data.get(name, default)
def pip_install(*args):
subprocess.run([sys.executable, "-m", "pip", "install", *args])
@ -120,6 +119,7 @@ with open(req_file) as file:
install_count += 1
pip_install(ort)
except Exception as e:
debug_logger.exception("InstallError")
print(e)
print(f"\nERROR: Failed to install {ort} - ReActor won't start")
raise e
@ -138,6 +138,7 @@ with open(req_file) as file:
install_count += 1
pip_install(package)
except Exception as e:
debug_logger.exception("InstallError")
print(e)
print(f"\nERROR: Failed to install {package} - ReActor won't start")
raise e

33
loguru/__init__.py Normal file
View File

@ -0,0 +1,33 @@
"""
The Loguru library provides a pre-instanced logger to facilitate dealing with logging in Python.
Just ``from loguru import logger``.
"""
import atexit as _atexit
import sys as _sys
from . import _defaults
from ._logger import Core as _Core
from ._logger import Logger as _Logger
__version__ = "0.7.2"
__all__ = ["logger"]
logger = _Logger(
core=_Core(),
exception=None,
depth=0,
record=False,
lazy=False,
colors=False,
raw=False,
capture=True,
patchers=[],
extra={},
)
if _defaults.LOGURU_AUTOINIT and _sys.stderr:
logger.add(_sys.stderr)
_atexit.register(logger.remove)

414
loguru/__init__.pyi Normal file
View File

@ -0,0 +1,414 @@
"""
.. |str| replace:: :class:`str`
.. |namedtuple| replace:: :func:`namedtuple<collections.namedtuple>`
.. |dict| replace:: :class:`dict`
.. |Logger| replace:: :class:`~loguru._logger.Logger`
.. |catch| replace:: :meth:`~loguru._logger.Logger.catch()`
.. |contextualize| replace:: :meth:`~loguru._logger.Logger.contextualize()`
.. |complete| replace:: :meth:`~loguru._logger.Logger.complete()`
.. |bind| replace:: :meth:`~loguru._logger.Logger.bind()`
.. |patch| replace:: :meth:`~loguru._logger.Logger.patch()`
.. |opt| replace:: :meth:`~loguru._logger.Logger.opt()`
.. |level| replace:: :meth:`~loguru._logger.Logger.level()`
.. _stub file: https://www.python.org/dev/peps/pep-0484/#stub-files
.. _string literals: https://www.python.org/dev/peps/pep-0484/#forward-references
.. _postponed evaluation of annotations: https://www.python.org/dev/peps/pep-0563/
.. |future| replace:: ``__future__``
.. _future: https://www.python.org/dev/peps/pep-0563/#enabling-the-future-behavior-in-python-3-7
.. |loguru-mypy| replace:: ``loguru-mypy``
.. _loguru-mypy: https://github.com/kornicameister/loguru-mypy
.. |documentation of loguru-mypy| replace:: documentation of ``loguru-mypy``
.. _documentation of loguru-mypy:
https://github.com/kornicameister/loguru-mypy/blob/master/README.md
.. _@kornicameister: https://github.com/kornicameister
Loguru relies on a `stub file`_ to document its types. This implies that these types are not
accessible during execution of your program, however they can be used by type checkers and IDE.
Also, this means that your Python interpreter has to support `postponed evaluation of annotations`_
to prevent error at runtime. This is achieved with a |future|_ import in Python 3.7+ or by using
`string literals`_ for earlier versions.
A basic usage example could look like this:
.. code-block:: python
from __future__ import annotations
import loguru
from loguru import logger
def good_sink(message: loguru.Message):
print("My name is", message.record["name"])
def bad_filter(record: loguru.Record):
return record["invalid"]
logger.add(good_sink, filter=bad_filter)
.. code-block:: bash
$ mypy test.py
test.py:8: error: TypedDict "Record" has no key 'invalid'
Found 1 error in 1 file (checked 1 source file)
There are several internal types to which you can be exposed using Loguru's public API, they are
listed here and might be useful to type hint your code:
- ``Logger``: the usual |logger| object (also returned by |opt|, |bind| and |patch|).
- ``Message``: the formatted logging message sent to the sinks (a |str| with ``record``
attribute).
- ``Record``: the |dict| containing all contextual information of the logged message.
- ``Level``: the |namedtuple| returned by |level| (with ``name``, ``no``, ``color`` and ``icon``
attributes).
- ``Catcher``: the context decorator returned by |catch|.
- ``Contextualizer``: the context decorator returned by |contextualize|.
- ``AwaitableCompleter``: the awaitable object returned by |complete|.
- ``RecordFile``: the ``record["file"]`` with ``name`` and ``path`` attributes.
- ``RecordLevel``: the ``record["level"]`` with ``name``, ``no`` and ``icon`` attributes.
- ``RecordThread``: the ``record["thread"]`` with ``id`` and ``name`` attributes.
- ``RecordProcess``: the ``record["process"]`` with ``id`` and ``name`` attributes.
- ``RecordException``: the ``record["exception"]`` with ``type``, ``value`` and ``traceback``
attributes.
If that is not enough, one can also use the |loguru-mypy|_ library developed by `@kornicameister`_.
Plugin can be installed separately using::
pip install loguru-mypy
It helps to catch several possible runtime errors by performing additional checks like:
- ``opt(lazy=True)`` loggers accepting only ``typing.Callable[[], typing.Any]`` arguments
- ``opt(record=True)`` loggers wrongly calling log handler like so ``logger.info(..., record={})``
- and even more...
For more details, go to official |documentation of loguru-mypy|_.
"""
import sys
from asyncio import AbstractEventLoop
from datetime import datetime, time, timedelta
from logging import Handler
from multiprocessing.context import BaseContext
from types import TracebackType
from typing import (
Any,
BinaryIO,
Callable,
Dict,
Generator,
Generic,
List,
NamedTuple,
NewType,
Optional,
Pattern,
Sequence,
TextIO,
Tuple,
Type,
TypeVar,
Union,
overload,
)
if sys.version_info >= (3, 5, 3):
from typing import Awaitable
else:
from typing_extensions import Awaitable
if sys.version_info >= (3, 6):
from os import PathLike
from typing import ContextManager
PathLikeStr = PathLike[str]
else:
from pathlib import PurePath as PathLikeStr
from typing_extensions import ContextManager
if sys.version_info >= (3, 8):
from typing import Protocol, TypedDict
else:
from typing_extensions import Protocol, TypedDict
_T = TypeVar("_T")
_F = TypeVar("_F", bound=Callable[..., Any])
ExcInfo = Tuple[Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]]
class _GeneratorContextManager(ContextManager[_T], Generic[_T]):
def __call__(self, func: _F) -> _F: ...
def __exit__(
self,
typ: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]: ...
Catcher = NewType("Catcher", _GeneratorContextManager[None])
Contextualizer = NewType("Contextualizer", _GeneratorContextManager[None])
AwaitableCompleter = Awaitable[None]
class Level(NamedTuple):
name: str
no: int
color: str
icon: str
class _RecordAttribute:
def __repr__(self) -> str: ...
def __format__(self, spec: str) -> str: ...
class RecordFile(_RecordAttribute):
name: str
path: str
class RecordLevel(_RecordAttribute):
name: str
no: int
icon: str
class RecordThread(_RecordAttribute):
id: int
name: str
class RecordProcess(_RecordAttribute):
id: int
name: str
class RecordException(NamedTuple):
type: Optional[Type[BaseException]]
value: Optional[BaseException]
traceback: Optional[TracebackType]
class Record(TypedDict):
elapsed: timedelta
exception: Optional[RecordException]
extra: Dict[Any, Any]
file: RecordFile
function: str
level: RecordLevel
line: int
message: str
module: str
name: Union[str, None]
process: RecordProcess
thread: RecordThread
time: datetime
class Message(str):
record: Record
class Writable(Protocol):
def write(self, message: Message) -> None: ...
FilterDict = Dict[Union[str, None], Union[str, int, bool]]
FilterFunction = Callable[[Record], bool]
FormatFunction = Callable[[Record], str]
PatcherFunction = Callable[[Record], None]
RotationFunction = Callable[[Message, TextIO], bool]
RetentionFunction = Callable[[List[str]], None]
CompressionFunction = Callable[[str], None]
# Actually unusable because TypedDict can't allow extra keys: python/mypy#4617
class _HandlerConfig(TypedDict, total=False):
sink: Union[str, PathLikeStr, TextIO, Writable, Callable[[Message], None], Handler]
level: Union[str, int]
format: Union[str, FormatFunction]
filter: Optional[Union[str, FilterFunction, FilterDict]]
colorize: Optional[bool]
serialize: bool
backtrace: bool
diagnose: bool
enqueue: bool
catch: bool
class LevelConfig(TypedDict, total=False):
name: str
no: int
color: str
icon: str
ActivationConfig = Tuple[Union[str, None], bool]
class Logger:
@overload
def add(
self,
sink: Union[TextIO, Writable, Callable[[Message], None], Handler],
*,
level: Union[str, int] = ...,
format: Union[str, FormatFunction] = ...,
filter: Optional[Union[str, FilterFunction, FilterDict]] = ...,
colorize: Optional[bool] = ...,
serialize: bool = ...,
backtrace: bool = ...,
diagnose: bool = ...,
enqueue: bool = ...,
context: Optional[Union[str, BaseContext]] = ...,
catch: bool = ...
) -> int: ...
@overload
def add(
self,
sink: Callable[[Message], Awaitable[None]],
*,
level: Union[str, int] = ...,
format: Union[str, FormatFunction] = ...,
filter: Optional[Union[str, FilterFunction, FilterDict]] = ...,
colorize: Optional[bool] = ...,
serialize: bool = ...,
backtrace: bool = ...,
diagnose: bool = ...,
enqueue: bool = ...,
context: Optional[Union[str, BaseContext]] = ...,
catch: bool = ...,
loop: Optional[AbstractEventLoop] = ...
) -> int: ...
@overload
def add(
self,
sink: Union[str, PathLikeStr],
*,
level: Union[str, int] = ...,
format: Union[str, FormatFunction] = ...,
filter: Optional[Union[str, FilterFunction, FilterDict]] = ...,
colorize: Optional[bool] = ...,
serialize: bool = ...,
backtrace: bool = ...,
diagnose: bool = ...,
enqueue: bool = ...,
context: Optional[Union[str, BaseContext]] = ...,
catch: bool = ...,
rotation: Optional[Union[str, int, time, timedelta, RotationFunction]] = ...,
retention: Optional[Union[str, int, timedelta, RetentionFunction]] = ...,
compression: Optional[Union[str, CompressionFunction]] = ...,
delay: bool = ...,
watch: bool = ...,
mode: str = ...,
buffering: int = ...,
encoding: str = ...,
**kwargs: Any
) -> int: ...
def remove(self, handler_id: Optional[int] = ...) -> None: ...
def complete(self) -> AwaitableCompleter: ...
@overload
def catch( # type: ignore[misc]
self,
exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = ...,
*,
level: Union[str, int] = ...,
reraise: bool = ...,
onerror: Optional[Callable[[BaseException], None]] = ...,
exclude: Optional[Union[Type[BaseException], Tuple[Type[BaseException], ...]]] = ...,
default: Any = ...,
message: str = ...
) -> Catcher: ...
@overload
def catch(self, exception: _F) -> _F: ...
def opt(
self,
*,
exception: Optional[Union[bool, ExcInfo, BaseException]] = ...,
record: bool = ...,
lazy: bool = ...,
colors: bool = ...,
raw: bool = ...,
capture: bool = ...,
depth: int = ...,
ansi: bool = ...
) -> Logger: ...
def bind(__self, **kwargs: Any) -> Logger: ... # noqa: N805
def contextualize(__self, **kwargs: Any) -> Contextualizer: ... # noqa: N805
def patch(self, patcher: PatcherFunction) -> Logger: ...
@overload
def level(self, name: str) -> Level: ...
@overload
def level(
self, name: str, no: int = ..., color: Optional[str] = ..., icon: Optional[str] = ...
) -> Level: ...
@overload
def level(
self,
name: str,
no: Optional[int] = ...,
color: Optional[str] = ...,
icon: Optional[str] = ...,
) -> Level: ...
def disable(self, name: Union[str, None]) -> None: ...
def enable(self, name: Union[str, None]) -> None: ...
def configure(
self,
*,
handlers: Sequence[Dict[str, Any]] = ...,
levels: Optional[Sequence[LevelConfig]] = ...,
extra: Optional[Dict[Any, Any]] = ...,
patcher: Optional[PatcherFunction] = ...,
activation: Optional[Sequence[ActivationConfig]] = ...
) -> List[int]: ...
# @staticmethod cannot be used with @overload in mypy (python/mypy#7781).
# However Logger is not exposed and logger is an instance of Logger
# so for type checkers it is all the same whether it is defined here
# as a static method or an instance method.
@overload
def parse(
self,
file: Union[str, PathLikeStr, TextIO],
pattern: Union[str, Pattern[str]],
*,
cast: Union[Dict[str, Callable[[str], Any]], Callable[[Dict[str, str]], None]] = ...,
chunk: int = ...
) -> Generator[Dict[str, Any], None, None]: ...
@overload
def parse(
self,
file: BinaryIO,
pattern: Union[bytes, Pattern[bytes]],
*,
cast: Union[Dict[str, Callable[[bytes], Any]], Callable[[Dict[str, bytes]], None]] = ...,
chunk: int = ...
) -> Generator[Dict[str, Any], None, None]: ...
@overload
def trace(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def trace(__self, __message: Any) -> None: ... # noqa: N805
@overload
def debug(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def debug(__self, __message: Any) -> None: ... # noqa: N805
@overload
def info(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def info(__self, __message: Any) -> None: ... # noqa: N805
@overload
def success(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def success(__self, __message: Any) -> None: ... # noqa: N805
@overload
def warning(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def warning(__self, __message: Any) -> None: ... # noqa: N805
@overload
def error(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def error(__self, __message: Any) -> None: ... # noqa: N805
@overload
def critical(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def critical(__self, __message: Any) -> None: ... # noqa: N805
@overload
def exception(__self, __message: str, *args: Any, **kwargs: Any) -> None: ... # noqa: N805
@overload
def exception(__self, __message: Any) -> None: ... # noqa: N805
@overload
def log(
__self, __level: Union[int, str], __message: str, *args: Any, **kwargs: Any # noqa: N805
) -> None: ...
@overload
def log(__self, __level: Union[int, str], __message: Any) -> None: ... # noqa: N805
def start(self, *args: Any, **kwargs: Any) -> int: ...
def stop(self, *args: Any, **kwargs: Any) -> None: ...
logger: Logger

27
loguru/_asyncio_loop.py Normal file
View File

@ -0,0 +1,27 @@
import asyncio
import sys
def load_loop_functions():
if sys.version_info >= (3, 7):
def get_task_loop(task):
return task.get_loop()
get_running_loop = asyncio.get_running_loop
else:
def get_task_loop(task):
return task._loop
def get_running_loop():
loop = asyncio.get_event_loop()
if not loop.is_running():
raise RuntimeError("There is no running event loop")
return loop
return get_task_loop, get_running_loop
get_task_loop, get_running_loop = load_loop_functions()

View File

@ -0,0 +1,528 @@
import builtins
import inspect
import io
import keyword
import linecache
import os
import re
import sys
import sysconfig
import tokenize
import traceback
if sys.version_info >= (3, 11):
def is_exception_group(exc):
return isinstance(exc, ExceptionGroup)
else:
try:
from exceptiongroup import ExceptionGroup
except ImportError:
def is_exception_group(exc):
return False
else:
def is_exception_group(exc):
return isinstance(exc, ExceptionGroup)
class SyntaxHighlighter:
_default_style = {
"comment": "\x1b[30m\x1b[1m{}\x1b[0m",
"keyword": "\x1b[35m\x1b[1m{}\x1b[0m",
"builtin": "\x1b[1m{}\x1b[0m",
"string": "\x1b[36m{}\x1b[0m",
"number": "\x1b[34m\x1b[1m{}\x1b[0m",
"operator": "\x1b[35m\x1b[1m{}\x1b[0m",
"punctuation": "\x1b[1m{}\x1b[0m",
"constant": "\x1b[36m\x1b[1m{}\x1b[0m",
"identifier": "\x1b[1m{}\x1b[0m",
"other": "{}",
}
_builtins = set(dir(builtins))
_constants = {"True", "False", "None"}
_punctation = {"(", ")", "[", "]", "{", "}", ":", ",", ";"}
_strings = {tokenize.STRING}
_fstring_middle = None
if sys.version_info >= (3, 12):
_strings.update({tokenize.FSTRING_START, tokenize.FSTRING_MIDDLE, tokenize.FSTRING_END})
_fstring_middle = tokenize.FSTRING_MIDDLE
def __init__(self, style=None):
self._style = style or self._default_style
def highlight(self, source):
style = self._style
row, column = 0, 0
output = ""
for token in self.tokenize(source):
type_, string, (start_row, start_column), (_, end_column), line = token
if type_ == self._fstring_middle:
# When an f-string contains "{{" or "}}", they appear as "{" or "}" in the "string"
# attribute of the token. However, they do not count in the column position.
end_column += string.count("{") + string.count("}")
if type_ == tokenize.NAME:
if string in self._constants:
color = style["constant"]
elif keyword.iskeyword(string):
color = style["keyword"]
elif string in self._builtins:
color = style["builtin"]
else:
color = style["identifier"]
elif type_ == tokenize.OP:
if string in self._punctation:
color = style["punctuation"]
else:
color = style["operator"]
elif type_ == tokenize.NUMBER:
color = style["number"]
elif type_ in self._strings:
color = style["string"]
elif type_ == tokenize.COMMENT:
color = style["comment"]
else:
color = style["other"]
if start_row != row:
source = source[column:]
row, column = start_row, 0
if type_ != tokenize.ENCODING:
output += line[column:start_column]
output += color.format(line[start_column:end_column])
column = end_column
output += source[column:]
return output
@staticmethod
def tokenize(source):
# Worth reading: https://www.asmeurer.com/brown-water-python/
source = source.encode("utf-8")
source = io.BytesIO(source)
try:
yield from tokenize.tokenize(source.readline)
except tokenize.TokenError:
return
class ExceptionFormatter:
_default_theme = {
"introduction": "\x1b[33m\x1b[1m{}\x1b[0m",
"cause": "\x1b[1m{}\x1b[0m",
"context": "\x1b[1m{}\x1b[0m",
"dirname": "\x1b[32m{}\x1b[0m",
"basename": "\x1b[32m\x1b[1m{}\x1b[0m",
"line": "\x1b[33m{}\x1b[0m",
"function": "\x1b[35m{}\x1b[0m",
"exception_type": "\x1b[31m\x1b[1m{}\x1b[0m",
"exception_value": "\x1b[1m{}\x1b[0m",
"arrows": "\x1b[36m{}\x1b[0m",
"value": "\x1b[36m\x1b[1m{}\x1b[0m",
}
def __init__(
self,
colorize=False,
backtrace=False,
diagnose=True,
theme=None,
style=None,
max_length=128,
encoding="ascii",
hidden_frames_filename=None,
prefix="",
):
self._colorize = colorize
self._diagnose = diagnose
self._theme = theme or self._default_theme
self._backtrace = backtrace
self._syntax_highlighter = SyntaxHighlighter(style)
self._max_length = max_length
self._encoding = encoding
self._hidden_frames_filename = hidden_frames_filename
self._prefix = prefix
self._lib_dirs = self._get_lib_dirs()
self._pipe_char = self._get_char("\u2502", "|")
self._cap_char = self._get_char("\u2514", "->")
self._catch_point_identifier = " <Loguru catch point here>"
@staticmethod
def _get_lib_dirs():
schemes = sysconfig.get_scheme_names()
names = ["stdlib", "platstdlib", "platlib", "purelib"]
paths = {sysconfig.get_path(name, scheme) for scheme in schemes for name in names}
return [os.path.abspath(path).lower() + os.sep for path in paths if path in sys.path]
@staticmethod
def _indent(text, count, *, prefix="| "):
if count == 0:
yield text
return
for line in text.splitlines(True):
indented = " " * count + prefix + line
yield indented.rstrip() + "\n"
def _get_char(self, char, default):
try:
char.encode(self._encoding)
except (UnicodeEncodeError, LookupError):
return default
else:
return char
def _is_file_mine(self, file):
filepath = os.path.abspath(file).lower()
if not filepath.endswith(".py"):
return False
return not any(filepath.startswith(d) for d in self._lib_dirs)
def _extract_frames(self, tb, is_first, *, limit=None, from_decorator=False):
frames, final_source = [], None
if tb is None or (limit is not None and limit <= 0):
return frames, final_source
def is_valid(frame):
return frame.f_code.co_filename != self._hidden_frames_filename
def get_info(frame, lineno):
filename = frame.f_code.co_filename
function = frame.f_code.co_name
source = linecache.getline(filename, lineno).strip()
return filename, lineno, function, source
infos = []
if is_valid(tb.tb_frame):
infos.append((get_info(tb.tb_frame, tb.tb_lineno), tb.tb_frame))
get_parent_only = from_decorator and not self._backtrace
if (self._backtrace and is_first) or get_parent_only:
frame = tb.tb_frame.f_back
while frame:
if is_valid(frame):
infos.insert(0, (get_info(frame, frame.f_lineno), frame))
if get_parent_only:
break
frame = frame.f_back
if infos and not get_parent_only:
(filename, lineno, function, source), frame = infos[-1]
function += self._catch_point_identifier
infos[-1] = ((filename, lineno, function, source), frame)
tb = tb.tb_next
while tb:
if is_valid(tb.tb_frame):
infos.append((get_info(tb.tb_frame, tb.tb_lineno), tb.tb_frame))
tb = tb.tb_next
if limit is not None:
infos = infos[-limit:]
for (filename, lineno, function, source), frame in infos:
final_source = source
if source:
colorize = self._colorize and self._is_file_mine(filename)
lines = []
if colorize:
lines.append(self._syntax_highlighter.highlight(source))
else:
lines.append(source)
if self._diagnose:
relevant_values = self._get_relevant_values(source, frame)
values = self._format_relevant_values(list(relevant_values), colorize)
lines += list(values)
source = "\n ".join(lines)
frames.append((filename, lineno, function, source))
return frames, final_source
def _get_relevant_values(self, source, frame):
value = None
pending = None
is_attribute = False
is_valid_value = False
is_assignment = True
for token in self._syntax_highlighter.tokenize(source):
type_, string, (_, col), *_ = token
if pending is not None:
# Keyword arguments are ignored
if type_ != tokenize.OP or string != "=" or is_assignment:
yield pending
pending = None
if type_ == tokenize.NAME and not keyword.iskeyword(string):
if not is_attribute:
for variables in (frame.f_locals, frame.f_globals):
try:
value = variables[string]
except KeyError:
continue
else:
is_valid_value = True
pending = (col, self._format_value(value))
break
elif is_valid_value:
try:
value = inspect.getattr_static(value, string)
except AttributeError:
is_valid_value = False
else:
yield (col, self._format_value(value))
elif type_ == tokenize.OP and string == ".":
is_attribute = True
is_assignment = False
elif type_ == tokenize.OP and string == ";":
is_assignment = True
is_attribute = False
is_valid_value = False
else:
is_attribute = False
is_valid_value = False
is_assignment = False
if pending is not None:
yield pending
def _format_relevant_values(self, relevant_values, colorize):
for i in reversed(range(len(relevant_values))):
col, value = relevant_values[i]
pipe_cols = [pcol for pcol, _ in relevant_values[:i]]
pre_line = ""
index = 0
for pc in pipe_cols:
pre_line += (" " * (pc - index)) + self._pipe_char
index = pc + 1
pre_line += " " * (col - index)
value_lines = value.split("\n")
for n, value_line in enumerate(value_lines):
if n == 0:
arrows = pre_line + self._cap_char + " "
else:
arrows = pre_line + " " * (len(self._cap_char) + 1)
if colorize:
arrows = self._theme["arrows"].format(arrows)
value_line = self._theme["value"].format(value_line)
yield arrows + value_line
def _format_value(self, v):
try:
v = repr(v)
except Exception:
v = "<unprintable %s object>" % type(v).__name__
max_length = self._max_length
if max_length is not None and len(v) > max_length:
v = v[: max_length - 3] + "..."
return v
def _format_locations(self, frames_lines, *, has_introduction):
prepend_with_new_line = has_introduction
regex = r'^ File "(?P<file>.*?)", line (?P<line>[^,]+)(?:, in (?P<function>.*))?\n'
for frame in frames_lines:
match = re.match(regex, frame)
if match:
file, line, function = match.group("file", "line", "function")
is_mine = self._is_file_mine(file)
if function is not None:
pattern = ' File "{}", line {}, in {}\n'
else:
pattern = ' File "{}", line {}\n'
if self._backtrace and function and function.endswith(self._catch_point_identifier):
function = function[: -len(self._catch_point_identifier)]
pattern = ">" + pattern[1:]
if self._colorize and is_mine:
dirname, basename = os.path.split(file)
if dirname:
dirname += os.sep
dirname = self._theme["dirname"].format(dirname)
basename = self._theme["basename"].format(basename)
file = dirname + basename
line = self._theme["line"].format(line)
function = self._theme["function"].format(function)
if self._diagnose and (is_mine or prepend_with_new_line):
pattern = "\n" + pattern
location = pattern.format(file, line, function)
frame = location + frame[match.end() :]
prepend_with_new_line = is_mine
yield frame
def _format_exception(
self, value, tb, *, seen=None, is_first=False, from_decorator=False, group_nesting=0
):
# Implemented from built-in traceback module:
# https://github.com/python/cpython/blob/a5b76167/Lib/traceback.py#L468
exc_type, exc_value, exc_traceback = type(value), value, tb
if seen is None:
seen = set()
seen.add(id(exc_value))
if exc_value:
if exc_value.__cause__ is not None and id(exc_value.__cause__) not in seen:
yield from self._format_exception(
exc_value.__cause__,
exc_value.__cause__.__traceback__,
seen=seen,
group_nesting=group_nesting,
)
cause = "The above exception was the direct cause of the following exception:"
if self._colorize:
cause = self._theme["cause"].format(cause)
if self._diagnose:
yield from self._indent("\n\n" + cause + "\n\n\n", group_nesting)
else:
yield from self._indent("\n" + cause + "\n\n", group_nesting)
elif (
exc_value.__context__ is not None
and id(exc_value.__context__) not in seen
and not exc_value.__suppress_context__
):
yield from self._format_exception(
exc_value.__context__,
exc_value.__context__.__traceback__,
seen=seen,
group_nesting=group_nesting,
)
context = "During handling of the above exception, another exception occurred:"
if self._colorize:
context = self._theme["context"].format(context)
if self._diagnose:
yield from self._indent("\n\n" + context + "\n\n\n", group_nesting)
else:
yield from self._indent("\n" + context + "\n\n", group_nesting)
is_grouped = is_exception_group(value)
if is_grouped and group_nesting == 0:
yield from self._format_exception(
value,
tb,
seen=seen,
group_nesting=1,
is_first=is_first,
from_decorator=from_decorator,
)
return
try:
traceback_limit = sys.tracebacklimit
except AttributeError:
traceback_limit = None
frames, final_source = self._extract_frames(
exc_traceback, is_first, limit=traceback_limit, from_decorator=from_decorator
)
exception_only = traceback.format_exception_only(exc_type, exc_value)
# Determining the correct index for the "Exception: message" part in the formatted exception
# is challenging. This is because it might be preceded by multiple lines specific to
# "SyntaxError" or followed by various notes. However, we can make an educated guess based
# on the indentation; the preliminary context for "SyntaxError" is always indented, while
# the Exception itself is not. This allows us to identify the correct index for the
# exception message.
for error_message_index, part in enumerate(exception_only): # noqa: B007
if not part.startswith(" "):
break
error_message = exception_only[error_message_index][:-1] # Remove last new line temporarily
if self._colorize:
if ":" in error_message:
exception_type, exception_value = error_message.split(":", 1)
exception_type = self._theme["exception_type"].format(exception_type)
exception_value = self._theme["exception_value"].format(exception_value)
error_message = exception_type + ":" + exception_value
else:
error_message = self._theme["exception_type"].format(error_message)
if self._diagnose and frames:
if issubclass(exc_type, AssertionError) and not str(exc_value) and final_source:
if self._colorize:
final_source = self._syntax_highlighter.highlight(final_source)
error_message += ": " + final_source
error_message = "\n" + error_message
exception_only[error_message_index] = error_message + "\n"
if is_first:
yield self._prefix
has_introduction = bool(frames)
if has_introduction:
if is_grouped:
introduction = "Exception Group Traceback (most recent call last):"
else:
introduction = "Traceback (most recent call last):"
if self._colorize:
introduction = self._theme["introduction"].format(introduction)
if group_nesting == 1: # Implies we're processing the root ExceptionGroup.
yield from self._indent(introduction + "\n", group_nesting, prefix="+ ")
else:
yield from self._indent(introduction + "\n", group_nesting)
frames_lines = traceback.format_list(frames) + exception_only
if self._colorize or self._backtrace or self._diagnose:
frames_lines = self._format_locations(frames_lines, has_introduction=has_introduction)
yield from self._indent("".join(frames_lines), group_nesting)
if is_grouped:
for n, exc in enumerate(value.exceptions, start=1):
ruler = "+" + (" %s " % ("..." if n > 15 else n)).center(35, "-")
yield from self._indent(ruler, group_nesting, prefix="+-" if n == 1 else " ")
if n > 15:
message = "and %d more exceptions\n" % (len(value.exceptions) - 15)
yield from self._indent(message, group_nesting + 1)
break
elif group_nesting == 10 and is_exception_group(exc):
message = "... (max_group_depth is 10)\n"
yield from self._indent(message, group_nesting + 1)
else:
yield from self._format_exception(
exc,
exc.__traceback__,
seen=seen,
group_nesting=group_nesting + 1,
)
if not is_exception_group(exc) or group_nesting == 10:
yield from self._indent("-" * 35, group_nesting + 1, prefix="+-")
def format_exception(self, type_, value, tb, *, from_decorator=False):
yield from self._format_exception(value, tb, is_first=True, from_decorator=from_decorator)

66
loguru/_colorama.py Normal file
View File

@ -0,0 +1,66 @@
import os
import sys
def should_colorize(stream):
if stream is None:
return False
if stream is sys.stdout or stream is sys.stderr:
try:
import ipykernel
import IPython
ipython = IPython.get_ipython()
is_jupyter_stream = isinstance(stream, ipykernel.iostream.OutStream)
is_jupyter_shell = isinstance(ipython, ipykernel.zmqshell.ZMQInteractiveShell)
except Exception:
pass
else:
if is_jupyter_stream and is_jupyter_shell:
return True
if stream is sys.__stdout__ or stream is sys.__stderr__:
if "CI" in os.environ and any(
ci in os.environ
for ci in ["TRAVIS", "CIRCLECI", "APPVEYOR", "GITLAB_CI", "GITHUB_ACTIONS"]
):
return True
if "PYCHARM_HOSTED" in os.environ:
return True
if os.name == "nt" and "TERM" in os.environ:
return True
try:
return stream.isatty()
except Exception:
return False
def should_wrap(stream):
if os.name != "nt":
return False
if stream is not sys.__stdout__ and stream is not sys.__stderr__:
return False
from colorama.win32 import winapi_test
if not winapi_test():
return False
try:
from colorama.winterm import enable_vt_processing
except ImportError:
return True
try:
return not enable_vt_processing(stream.fileno())
except Exception:
return True
def wrap(stream):
from colorama import AnsiToWin32
return AnsiToWin32(stream, convert=True, strip=True, autoreset=False).stream

471
loguru/_colorizer.py Normal file
View File

@ -0,0 +1,471 @@
import re
from string import Formatter
class Style:
RESET_ALL = 0
BOLD = 1
DIM = 2
ITALIC = 3
UNDERLINE = 4
BLINK = 5
REVERSE = 7
HIDE = 8
STRIKE = 9
NORMAL = 22
class Fore:
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
RESET = 39
LIGHTBLACK_EX = 90
LIGHTRED_EX = 91
LIGHTGREEN_EX = 92
LIGHTYELLOW_EX = 93
LIGHTBLUE_EX = 94
LIGHTMAGENTA_EX = 95
LIGHTCYAN_EX = 96
LIGHTWHITE_EX = 97
class Back:
BLACK = 40
RED = 41
GREEN = 42
YELLOW = 43
BLUE = 44
MAGENTA = 45
CYAN = 46
WHITE = 47
RESET = 49
LIGHTBLACK_EX = 100
LIGHTRED_EX = 101
LIGHTGREEN_EX = 102
LIGHTYELLOW_EX = 103
LIGHTBLUE_EX = 104
LIGHTMAGENTA_EX = 105
LIGHTCYAN_EX = 106
LIGHTWHITE_EX = 107
def ansi_escape(codes):
return {name: "\033[%dm" % code for name, code in codes.items()}
class TokenType:
TEXT = 1
ANSI = 2
LEVEL = 3
CLOSING = 4
class AnsiParser:
_style = ansi_escape(
{
"b": Style.BOLD,
"d": Style.DIM,
"n": Style.NORMAL,
"h": Style.HIDE,
"i": Style.ITALIC,
"l": Style.BLINK,
"s": Style.STRIKE,
"u": Style.UNDERLINE,
"v": Style.REVERSE,
"bold": Style.BOLD,
"dim": Style.DIM,
"normal": Style.NORMAL,
"hide": Style.HIDE,
"italic": Style.ITALIC,
"blink": Style.BLINK,
"strike": Style.STRIKE,
"underline": Style.UNDERLINE,
"reverse": Style.REVERSE,
}
)
_foreground = ansi_escape(
{
"k": Fore.BLACK,
"r": Fore.RED,
"g": Fore.GREEN,
"y": Fore.YELLOW,
"e": Fore.BLUE,
"m": Fore.MAGENTA,
"c": Fore.CYAN,
"w": Fore.WHITE,
"lk": Fore.LIGHTBLACK_EX,
"lr": Fore.LIGHTRED_EX,
"lg": Fore.LIGHTGREEN_EX,
"ly": Fore.LIGHTYELLOW_EX,
"le": Fore.LIGHTBLUE_EX,
"lm": Fore.LIGHTMAGENTA_EX,
"lc": Fore.LIGHTCYAN_EX,
"lw": Fore.LIGHTWHITE_EX,
"black": Fore.BLACK,
"red": Fore.RED,
"green": Fore.GREEN,
"yellow": Fore.YELLOW,
"blue": Fore.BLUE,
"magenta": Fore.MAGENTA,
"cyan": Fore.CYAN,
"white": Fore.WHITE,
"light-black": Fore.LIGHTBLACK_EX,
"light-red": Fore.LIGHTRED_EX,
"light-green": Fore.LIGHTGREEN_EX,
"light-yellow": Fore.LIGHTYELLOW_EX,
"light-blue": Fore.LIGHTBLUE_EX,
"light-magenta": Fore.LIGHTMAGENTA_EX,
"light-cyan": Fore.LIGHTCYAN_EX,
"light-white": Fore.LIGHTWHITE_EX,
}
)
_background = ansi_escape(
{
"K": Back.BLACK,
"R": Back.RED,
"G": Back.GREEN,
"Y": Back.YELLOW,
"E": Back.BLUE,
"M": Back.MAGENTA,
"C": Back.CYAN,
"W": Back.WHITE,
"LK": Back.LIGHTBLACK_EX,
"LR": Back.LIGHTRED_EX,
"LG": Back.LIGHTGREEN_EX,
"LY": Back.LIGHTYELLOW_EX,
"LE": Back.LIGHTBLUE_EX,
"LM": Back.LIGHTMAGENTA_EX,
"LC": Back.LIGHTCYAN_EX,
"LW": Back.LIGHTWHITE_EX,
"BLACK": Back.BLACK,
"RED": Back.RED,
"GREEN": Back.GREEN,
"YELLOW": Back.YELLOW,
"BLUE": Back.BLUE,
"MAGENTA": Back.MAGENTA,
"CYAN": Back.CYAN,
"WHITE": Back.WHITE,
"LIGHT-BLACK": Back.LIGHTBLACK_EX,
"LIGHT-RED": Back.LIGHTRED_EX,
"LIGHT-GREEN": Back.LIGHTGREEN_EX,
"LIGHT-YELLOW": Back.LIGHTYELLOW_EX,
"LIGHT-BLUE": Back.LIGHTBLUE_EX,
"LIGHT-MAGENTA": Back.LIGHTMAGENTA_EX,
"LIGHT-CYAN": Back.LIGHTCYAN_EX,
"LIGHT-WHITE": Back.LIGHTWHITE_EX,
}
)
_regex_tag = re.compile(r"\\?</?((?:[fb]g\s)?[^<>\s]*)>")
def __init__(self):
self._tokens = []
self._tags = []
self._color_tokens = []
@staticmethod
def strip(tokens):
output = ""
for type_, value in tokens:
if type_ == TokenType.TEXT:
output += value
return output
@staticmethod
def colorize(tokens, ansi_level):
output = ""
for type_, value in tokens:
if type_ == TokenType.LEVEL:
if ansi_level is None:
raise ValueError(
"The '<level>' color tag is not allowed in this context, "
"it has not yet been associated to any color value."
)
value = ansi_level
output += value
return output
@staticmethod
def wrap(tokens, *, ansi_level, color_tokens):
output = ""
for type_, value in tokens:
if type_ == TokenType.LEVEL:
value = ansi_level
output += value
if type_ == TokenType.CLOSING:
for subtype, subvalue in color_tokens:
if subtype == TokenType.LEVEL:
subvalue = ansi_level
output += subvalue
return output
def feed(self, text, *, raw=False):
if raw:
self._tokens.append((TokenType.TEXT, text))
return
position = 0
for match in self._regex_tag.finditer(text):
markup, tag = match.group(0), match.group(1)
self._tokens.append((TokenType.TEXT, text[position : match.start()]))
position = match.end()
if markup[0] == "\\":
self._tokens.append((TokenType.TEXT, markup[1:]))
continue
if markup[1] == "/":
if self._tags and (tag == "" or tag == self._tags[-1]):
self._tags.pop()
self._color_tokens.pop()
self._tokens.append((TokenType.CLOSING, "\033[0m"))
self._tokens.extend(self._color_tokens)
continue
elif tag in self._tags:
raise ValueError('Closing tag "%s" violates nesting rules' % markup)
else:
raise ValueError('Closing tag "%s" has no corresponding opening tag' % markup)
if tag in {"lvl", "level"}:
token = (TokenType.LEVEL, None)
else:
ansi = self._get_ansicode(tag)
if ansi is None:
raise ValueError(
'Tag "%s" does not correspond to any known color directive, '
"make sure you did not misspelled it (or prepend '\\' to escape it)"
% markup
)
token = (TokenType.ANSI, ansi)
self._tags.append(tag)
self._color_tokens.append(token)
self._tokens.append(token)
self._tokens.append((TokenType.TEXT, text[position:]))
def done(self, *, strict=True):
if strict and self._tags:
faulty_tag = self._tags.pop(0)
raise ValueError('Opening tag "<%s>" has no corresponding closing tag' % faulty_tag)
return self._tokens
def current_color_tokens(self):
return list(self._color_tokens)
def _get_ansicode(self, tag):
style = self._style
foreground = self._foreground
background = self._background
# Substitute on a direct match.
if tag in style:
return style[tag]
elif tag in foreground:
return foreground[tag]
elif tag in background:
return background[tag]
# An alternative syntax for setting the color (e.g. <fg red>, <bg red>).
elif tag.startswith("fg ") or tag.startswith("bg "):
st, color = tag[:2], tag[3:]
code = "38" if st == "fg" else "48"
if st == "fg" and color.lower() in foreground:
return foreground[color.lower()]
elif st == "bg" and color.upper() in background:
return background[color.upper()]
elif color.isdigit() and int(color) <= 255:
return "\033[%s;5;%sm" % (code, color)
elif re.match(r"#(?:[a-fA-F0-9]{3}){1,2}$", color):
hex_color = color[1:]
if len(hex_color) == 3:
hex_color *= 2
rgb = tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4))
return "\033[%s;2;%s;%s;%sm" % ((code,) + rgb)
elif color.count(",") == 2:
colors = tuple(color.split(","))
if all(x.isdigit() and int(x) <= 255 for x in colors):
return "\033[%s;2;%s;%s;%sm" % ((code,) + colors)
return None
class ColoringMessage(str):
__fields__ = ("_messages",)
def __format__(self, spec):
return next(self._messages).__format__(spec)
class ColoredMessage:
def __init__(self, tokens):
self.tokens = tokens
self.stripped = AnsiParser.strip(tokens)
def colorize(self, ansi_level):
return AnsiParser.colorize(self.tokens, ansi_level)
class ColoredFormat:
def __init__(self, tokens, messages_color_tokens):
self._tokens = tokens
self._messages_color_tokens = messages_color_tokens
def strip(self):
return AnsiParser.strip(self._tokens)
def colorize(self, ansi_level):
return AnsiParser.colorize(self._tokens, ansi_level)
def make_coloring_message(self, message, *, ansi_level, colored_message):
messages = [
message
if color_tokens is None
else AnsiParser.wrap(
colored_message.tokens, ansi_level=ansi_level, color_tokens=color_tokens
)
for color_tokens in self._messages_color_tokens
]
coloring = ColoringMessage(message)
coloring._messages = iter(messages)
return coloring
class Colorizer:
@staticmethod
def prepare_format(string):
tokens, messages_color_tokens = Colorizer._parse_without_formatting(string)
return ColoredFormat(tokens, messages_color_tokens)
@staticmethod
def prepare_message(string, args=(), kwargs={}): # noqa: B006
tokens = Colorizer._parse_with_formatting(string, args, kwargs)
return ColoredMessage(tokens)
@staticmethod
def prepare_simple_message(string):
parser = AnsiParser()
parser.feed(string)
tokens = parser.done()
return ColoredMessage(tokens)
@staticmethod
def ansify(text):
parser = AnsiParser()
parser.feed(text.strip())
tokens = parser.done(strict=False)
return AnsiParser.colorize(tokens, None)
@staticmethod
def _parse_with_formatting(
string, args, kwargs, *, recursion_depth=2, auto_arg_index=0, recursive=False
):
# This function re-implements Formatter._vformat()
if recursion_depth < 0:
raise ValueError("Max string recursion exceeded")
formatter = Formatter()
parser = AnsiParser()
for literal_text, field_name, format_spec, conversion in formatter.parse(string):
parser.feed(literal_text, raw=recursive)
if field_name is not None:
if field_name == "":
if auto_arg_index is False:
raise ValueError(
"cannot switch from manual field "
"specification to automatic field "
"numbering"
)
field_name = str(auto_arg_index)
auto_arg_index += 1
elif field_name.isdigit():
if auto_arg_index:
raise ValueError(
"cannot switch from manual field "
"specification to automatic field "
"numbering"
)
auto_arg_index = False
obj, _ = formatter.get_field(field_name, args, kwargs)
obj = formatter.convert_field(obj, conversion)
format_spec, auto_arg_index = Colorizer._parse_with_formatting(
format_spec,
args,
kwargs,
recursion_depth=recursion_depth - 1,
auto_arg_index=auto_arg_index,
recursive=True,
)
formatted = formatter.format_field(obj, format_spec)
parser.feed(formatted, raw=True)
tokens = parser.done()
if recursive:
return AnsiParser.strip(tokens), auto_arg_index
return tokens
@staticmethod
def _parse_without_formatting(string, *, recursion_depth=2, recursive=False):
if recursion_depth < 0:
raise ValueError("Max string recursion exceeded")
formatter = Formatter()
parser = AnsiParser()
messages_color_tokens = []
for literal_text, field_name, format_spec, conversion in formatter.parse(string):
if literal_text and literal_text[-1] in "{}":
literal_text += literal_text[-1]
parser.feed(literal_text, raw=recursive)
if field_name is not None:
if field_name == "message":
if recursive:
messages_color_tokens.append(None)
else:
color_tokens = parser.current_color_tokens()
messages_color_tokens.append(color_tokens)
field = "{%s" % field_name
if conversion:
field += "!%s" % conversion
if format_spec:
field += ":%s" % format_spec
field += "}"
parser.feed(field, raw=True)
_, color_tokens = Colorizer._parse_without_formatting(
format_spec, recursion_depth=recursion_depth - 1, recursive=True
)
messages_color_tokens.extend(color_tokens)
return parser.done(), messages_color_tokens

15
loguru/_contextvars.py Normal file
View File

@ -0,0 +1,15 @@
import sys
def load_contextvar_class():
if sys.version_info >= (3, 7):
from contextvars import ContextVar
elif sys.version_info >= (3, 5, 3):
from aiocontextvars import ContextVar
else:
from contextvars import ContextVar
return ContextVar
ContextVar = load_contextvar_class()

View File

@ -0,0 +1,57 @@
import os
def load_ctime_functions():
if os.name == "nt":
import win32_setctime
def get_ctime_windows(filepath):
return os.stat(filepath).st_ctime
def set_ctime_windows(filepath, timestamp):
if not win32_setctime.SUPPORTED:
return
try:
win32_setctime.setctime(filepath, timestamp)
except (OSError, ValueError):
pass
return get_ctime_windows, set_ctime_windows
elif hasattr(os.stat_result, "st_birthtime"):
def get_ctime_macos(filepath):
return os.stat(filepath).st_birthtime
def set_ctime_macos(filepath, timestamp):
pass
return get_ctime_macos, set_ctime_macos
elif hasattr(os, "getxattr") and hasattr(os, "setxattr"):
def get_ctime_linux(filepath):
try:
return float(os.getxattr(filepath, b"user.loguru_crtime"))
except OSError:
return os.stat(filepath).st_mtime
def set_ctime_linux(filepath, timestamp):
try:
os.setxattr(filepath, b"user.loguru_crtime", str(timestamp).encode("ascii"))
except OSError:
pass
return get_ctime_linux, set_ctime_linux
def get_ctime_fallback(filepath):
return os.stat(filepath).st_mtime
def set_ctime_fallback(filepath, timestamp):
pass
return get_ctime_fallback, set_ctime_fallback
get_ctime, set_ctime = load_ctime_functions()

105
loguru/_datetime.py Normal file
View File

@ -0,0 +1,105 @@
import re
from calendar import day_abbr, day_name, month_abbr, month_name
from datetime import datetime as datetime_
from datetime import timedelta, timezone
from time import localtime, strftime
tokens = r"H{1,2}|h{1,2}|m{1,2}|s{1,2}|S+|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d"
pattern = re.compile(r"(?:{0})|\[(?:{0}|!UTC|)\]".format(tokens))
class datetime(datetime_): # noqa: N801
def __format__(self, spec):
if spec.endswith("!UTC"):
dt = self.astimezone(timezone.utc)
spec = spec[:-4]
else:
dt = self
if not spec:
spec = "%Y-%m-%dT%H:%M:%S.%f%z"
if "%" in spec:
return datetime_.__format__(dt, spec)
if "SSSSSSS" in spec:
raise ValueError(
"Invalid time format: the provided format string contains more than six successive "
"'S' characters. This may be due to an attempt to use nanosecond precision, which "
"is not supported."
)
year, month, day, hour, minute, second, weekday, yearday, _ = dt.timetuple()
microsecond = dt.microsecond
timestamp = dt.timestamp()
tzinfo = dt.tzinfo or timezone(timedelta(seconds=0))
offset = tzinfo.utcoffset(dt).total_seconds()
sign = ("-", "+")[offset >= 0]
(h, m), s = divmod(abs(offset // 60), 60), abs(offset) % 60
rep = {
"YYYY": "%04d" % year,
"YY": "%02d" % (year % 100),
"Q": "%d" % ((month - 1) // 3 + 1),
"MMMM": month_name[month],
"MMM": month_abbr[month],
"MM": "%02d" % month,
"M": "%d" % month,
"DDDD": "%03d" % yearday,
"DDD": "%d" % yearday,
"DD": "%02d" % day,
"D": "%d" % day,
"dddd": day_name[weekday],
"ddd": day_abbr[weekday],
"d": "%d" % weekday,
"E": "%d" % (weekday + 1),
"HH": "%02d" % hour,
"H": "%d" % hour,
"hh": "%02d" % ((hour - 1) % 12 + 1),
"h": "%d" % ((hour - 1) % 12 + 1),
"mm": "%02d" % minute,
"m": "%d" % minute,
"ss": "%02d" % second,
"s": "%d" % second,
"S": "%d" % (microsecond // 100000),
"SS": "%02d" % (microsecond // 10000),
"SSS": "%03d" % (microsecond // 1000),
"SSSS": "%04d" % (microsecond // 100),
"SSSSS": "%05d" % (microsecond // 10),
"SSSSSS": "%06d" % microsecond,
"A": ("AM", "PM")[hour // 12],
"Z": "%s%02d:%02d%s" % (sign, h, m, (":%09.06f" % s)[: 11 if s % 1 else 3] * (s > 0)),
"ZZ": "%s%02d%02d%s" % (sign, h, m, ("%09.06f" % s)[: 10 if s % 1 else 2] * (s > 0)),
"zz": tzinfo.tzname(dt) or "",
"X": "%d" % timestamp,
"x": "%d" % (int(timestamp) * 1000000 + microsecond),
}
def get(m):
try:
return rep[m.group(0)]
except KeyError:
return m.group(0)[1:-1]
return pattern.sub(get, spec)
def aware_now():
now = datetime_.now()
timestamp = now.timestamp()
local = localtime(timestamp)
try:
seconds = local.tm_gmtoff
zone = local.tm_zone
except AttributeError:
# Workaround for Python 3.5.
utc_naive = datetime_.fromtimestamp(timestamp, tz=timezone.utc).replace(tzinfo=None)
offset = datetime_.fromtimestamp(timestamp) - utc_naive
seconds = offset.total_seconds()
zone = strftime("%Z")
tzinfo = timezone(timedelta(seconds=seconds), zone)
return datetime.combine(now.date(), now.time().replace(tzinfo=tzinfo))

74
loguru/_defaults.py Normal file
View File

@ -0,0 +1,74 @@
from os import environ
def env(key, type_, default=None):
if key not in environ:
return default
val = environ[key]
if type_ == str:
return val
elif type_ == bool:
if val.lower() in ["1", "true", "yes", "y", "ok", "on"]:
return True
if val.lower() in ["0", "false", "no", "n", "nok", "off"]:
return False
raise ValueError(
"Invalid environment variable '%s' (expected a boolean): '%s'" % (key, val)
)
elif type_ == int:
try:
return int(val)
except ValueError:
raise ValueError(
"Invalid environment variable '%s' (expected an integer): '%s'" % (key, val)
) from None
LOGURU_AUTOINIT = env("LOGURU_AUTOINIT", bool, True)
LOGURU_FORMAT = env(
"LOGURU_FORMAT",
str,
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
LOGURU_FILTER = env("LOGURU_FILTER", str, None)
LOGURU_LEVEL = env("LOGURU_LEVEL", str, "DEBUG")
LOGURU_COLORIZE = env("LOGURU_COLORIZE", bool, None)
LOGURU_SERIALIZE = env("LOGURU_SERIALIZE", bool, False)
LOGURU_BACKTRACE = env("LOGURU_BACKTRACE", bool, True)
LOGURU_DIAGNOSE = env("LOGURU_DIAGNOSE", bool, True)
LOGURU_ENQUEUE = env("LOGURU_ENQUEUE", bool, False)
LOGURU_CONTEXT = env("LOGURU_CONTEXT", str, None)
LOGURU_CATCH = env("LOGURU_CATCH", bool, True)
LOGURU_TRACE_NO = env("LOGURU_TRACE_NO", int, 5)
LOGURU_TRACE_COLOR = env("LOGURU_TRACE_COLOR", str, "<cyan><bold>")
LOGURU_TRACE_ICON = env("LOGURU_TRACE_ICON", str, "\u270F\uFE0F") # Pencil
LOGURU_DEBUG_NO = env("LOGURU_DEBUG_NO", int, 10)
LOGURU_DEBUG_COLOR = env("LOGURU_DEBUG_COLOR", str, "<blue><bold>")
LOGURU_DEBUG_ICON = env("LOGURU_DEBUG_ICON", str, "\U0001F41E") # Lady Beetle
LOGURU_INFO_NO = env("LOGURU_INFO_NO", int, 20)
LOGURU_INFO_COLOR = env("LOGURU_INFO_COLOR", str, "<bold>")
LOGURU_INFO_ICON = env("LOGURU_INFO_ICON", str, "\u2139\uFE0F") # Information
LOGURU_SUCCESS_NO = env("LOGURU_SUCCESS_NO", int, 25)
LOGURU_SUCCESS_COLOR = env("LOGURU_SUCCESS_COLOR", str, "<green><bold>")
LOGURU_SUCCESS_ICON = env("LOGURU_SUCCESS_ICON", str, "\u2705") # White Heavy Check Mark
LOGURU_WARNING_NO = env("LOGURU_WARNING_NO", int, 30)
LOGURU_WARNING_COLOR = env("LOGURU_WARNING_COLOR", str, "<yellow><bold>")
LOGURU_WARNING_ICON = env("LOGURU_WARNING_ICON", str, "\u26A0\uFE0F") # Warning
LOGURU_ERROR_NO = env("LOGURU_ERROR_NO", int, 40)
LOGURU_ERROR_COLOR = env("LOGURU_ERROR_COLOR", str, "<red><bold>")
LOGURU_ERROR_ICON = env("LOGURU_ERROR_ICON", str, "\u274C") # Cross Mark
LOGURU_CRITICAL_NO = env("LOGURU_CRITICAL_NO", int, 50)
LOGURU_CRITICAL_COLOR = env("LOGURU_CRITICAL_COLOR", str, "<RED><bold>")
LOGURU_CRITICAL_ICON = env("LOGURU_CRITICAL_ICON", str, "\u2620\uFE0F") # Skull and Crossbones

View File

@ -0,0 +1,34 @@
import sys
import traceback
class ErrorInterceptor:
def __init__(self, should_catch, handler_id):
self._should_catch = should_catch
self._handler_id = handler_id
def should_catch(self):
return self._should_catch
def print(self, record=None, *, exception=None):
if not sys.stderr:
return
if exception is None:
type_, value, traceback_ = sys.exc_info()
else:
type_, value, traceback_ = (type(exception), exception, exception.__traceback__)
try:
sys.stderr.write("--- Logging error in Loguru Handler #%d ---\n" % self._handler_id)
try:
record_repr = str(record)
except Exception:
record_repr = "/!\\ Unprintable record /!\\"
sys.stderr.write("Record was: %s\n" % record_repr)
traceback.print_exception(type_, value, traceback_, None, sys.stderr)
sys.stderr.write("--- End of logging error ---\n")
except OSError:
pass
finally:
del type_, value, traceback_

434
loguru/_file_sink.py Normal file
View File

@ -0,0 +1,434 @@
import datetime
import decimal
import glob
import numbers
import os
import shutil
import string
from functools import partial
from stat import ST_DEV, ST_INO
from . import _string_parsers as string_parsers
from ._ctime_functions import get_ctime, set_ctime
from ._datetime import aware_now
def generate_rename_path(root, ext, creation_time):
creation_datetime = datetime.datetime.fromtimestamp(creation_time)
date = FileDateFormatter(creation_datetime)
renamed_path = "{}.{}{}".format(root, date, ext)
counter = 1
while os.path.exists(renamed_path):
counter += 1
renamed_path = "{}.{}.{}{}".format(root, date, counter, ext)
return renamed_path
class FileDateFormatter:
def __init__(self, datetime=None):
self.datetime = datetime or aware_now()
def __format__(self, spec):
if not spec:
spec = "%Y-%m-%d_%H-%M-%S_%f"
return self.datetime.__format__(spec)
class Compression:
@staticmethod
def add_compress(path_in, path_out, opener, **kwargs):
with opener(path_out, **kwargs) as f_comp:
f_comp.add(path_in, os.path.basename(path_in))
@staticmethod
def write_compress(path_in, path_out, opener, **kwargs):
with opener(path_out, **kwargs) as f_comp:
f_comp.write(path_in, os.path.basename(path_in))
@staticmethod
def copy_compress(path_in, path_out, opener, **kwargs):
with open(path_in, "rb") as f_in:
with opener(path_out, **kwargs) as f_out:
shutil.copyfileobj(f_in, f_out)
@staticmethod
def compression(path_in, ext, compress_function):
path_out = "{}{}".format(path_in, ext)
if os.path.exists(path_out):
creation_time = get_ctime(path_out)
root, ext_before = os.path.splitext(path_in)
renamed_path = generate_rename_path(root, ext_before + ext, creation_time)
os.rename(path_out, renamed_path)
compress_function(path_in, path_out)
os.remove(path_in)
class Retention:
@staticmethod
def retention_count(logs, number):
def key_log(log):
return (-os.stat(log).st_mtime, log)
for log in sorted(logs, key=key_log)[number:]:
os.remove(log)
@staticmethod
def retention_age(logs, seconds):
t = datetime.datetime.now().timestamp()
for log in logs:
if os.stat(log).st_mtime <= t - seconds:
os.remove(log)
class Rotation:
@staticmethod
def forward_day(t):
return t + datetime.timedelta(days=1)
@staticmethod
def forward_weekday(t, weekday):
while True:
t += datetime.timedelta(days=1)
if t.weekday() == weekday:
return t
@staticmethod
def forward_interval(t, interval):
return t + interval
@staticmethod
def rotation_size(message, file, size_limit):
file.seek(0, 2)
return file.tell() + len(message) > size_limit
class RotationTime:
def __init__(self, step_forward, time_init=None):
self._step_forward = step_forward
self._time_init = time_init
self._limit = None
def __call__(self, message, file):
record_time = message.record["time"]
if self._limit is None:
filepath = os.path.realpath(file.name)
creation_time = get_ctime(filepath)
set_ctime(filepath, creation_time)
start_time = datetime.datetime.fromtimestamp(
creation_time, tz=datetime.timezone.utc
)
time_init = self._time_init
if time_init is None:
limit = start_time.astimezone(record_time.tzinfo).replace(tzinfo=None)
limit = self._step_forward(limit)
else:
tzinfo = record_time.tzinfo if time_init.tzinfo is None else time_init.tzinfo
limit = start_time.astimezone(tzinfo).replace(
hour=time_init.hour,
minute=time_init.minute,
second=time_init.second,
microsecond=time_init.microsecond,
)
if limit <= start_time:
limit = self._step_forward(limit)
if time_init.tzinfo is None:
limit = limit.replace(tzinfo=None)
self._limit = limit
if self._limit.tzinfo is None:
record_time = record_time.replace(tzinfo=None)
if record_time >= self._limit:
while self._limit <= record_time:
self._limit = self._step_forward(self._limit)
return True
return False
class FileSink:
def __init__(
self,
path,
*,
rotation=None,
retention=None,
compression=None,
delay=False,
watch=False,
mode="a",
buffering=1,
encoding="utf8",
**kwargs
):
self.encoding = encoding
self._kwargs = {**kwargs, "mode": mode, "buffering": buffering, "encoding": self.encoding}
self._path = str(path)
self._glob_patterns = self._make_glob_patterns(self._path)
self._rotation_function = self._make_rotation_function(rotation)
self._retention_function = self._make_retention_function(retention)
self._compression_function = self._make_compression_function(compression)
self._file = None
self._file_path = None
self._watch = watch
self._file_dev = -1
self._file_ino = -1
if not delay:
path = self._create_path()
self._create_dirs(path)
self._create_file(path)
def write(self, message):
if self._file is None:
path = self._create_path()
self._create_dirs(path)
self._create_file(path)
if self._watch:
self._reopen_if_needed()
if self._rotation_function is not None and self._rotation_function(message, self._file):
self._terminate_file(is_rotating=True)
self._file.write(message)
def stop(self):
if self._watch:
self._reopen_if_needed()
self._terminate_file(is_rotating=False)
def tasks_to_complete(self):
return []
def _create_path(self):
path = self._path.format_map({"time": FileDateFormatter()})
return os.path.abspath(path)
def _create_dirs(self, path):
dirname = os.path.dirname(path)
os.makedirs(dirname, exist_ok=True)
def _create_file(self, path):
self._file = open(path, **self._kwargs)
self._file_path = path
if self._watch:
fileno = self._file.fileno()
result = os.fstat(fileno)
self._file_dev = result[ST_DEV]
self._file_ino = result[ST_INO]
def _close_file(self):
self._file.flush()
self._file.close()
self._file = None
self._file_path = None
self._file_dev = -1
self._file_ino = -1
def _reopen_if_needed(self):
# Implemented based on standard library:
# https://github.com/python/cpython/blob/cb589d1b/Lib/logging/handlers.py#L486
if not self._file:
return
filepath = self._file_path
try:
result = os.stat(filepath)
except FileNotFoundError:
result = None
if not result or result[ST_DEV] != self._file_dev or result[ST_INO] != self._file_ino:
self._close_file()
self._create_dirs(filepath)
self._create_file(filepath)
def _terminate_file(self, *, is_rotating=False):
old_path = self._file_path
if self._file is not None:
self._close_file()
if is_rotating:
new_path = self._create_path()
self._create_dirs(new_path)
if new_path == old_path:
creation_time = get_ctime(old_path)
root, ext = os.path.splitext(old_path)
renamed_path = generate_rename_path(root, ext, creation_time)
os.rename(old_path, renamed_path)
old_path = renamed_path
if is_rotating or self._rotation_function is None:
if self._compression_function is not None and old_path is not None:
self._compression_function(old_path)
if self._retention_function is not None:
logs = {
file
for pattern in self._glob_patterns
for file in glob.glob(pattern)
if os.path.isfile(file)
}
self._retention_function(list(logs))
if is_rotating:
self._create_file(new_path)
set_ctime(new_path, datetime.datetime.now().timestamp())
@staticmethod
def _make_glob_patterns(path):
formatter = string.Formatter()
tokens = formatter.parse(path)
escaped = "".join(glob.escape(text) + "*" * (name is not None) for text, name, *_ in tokens)
root, ext = os.path.splitext(escaped)
if not ext:
return [escaped, escaped + ".*"]
return [escaped, escaped + ".*", root + ".*" + ext, root + ".*" + ext + ".*"]
@staticmethod
def _make_rotation_function(rotation):
if rotation is None:
return None
elif isinstance(rotation, str):
size = string_parsers.parse_size(rotation)
if size is not None:
return FileSink._make_rotation_function(size)
interval = string_parsers.parse_duration(rotation)
if interval is not None:
return FileSink._make_rotation_function(interval)
frequency = string_parsers.parse_frequency(rotation)
if frequency is not None:
return Rotation.RotationTime(frequency)
daytime = string_parsers.parse_daytime(rotation)
if daytime is not None:
day, time = daytime
if day is None:
return FileSink._make_rotation_function(time)
if time is None:
time = datetime.time(0, 0, 0)
step_forward = partial(Rotation.forward_weekday, weekday=day)
return Rotation.RotationTime(step_forward, time)
raise ValueError("Cannot parse rotation from: '%s'" % rotation)
elif isinstance(rotation, (numbers.Real, decimal.Decimal)):
return partial(Rotation.rotation_size, size_limit=rotation)
elif isinstance(rotation, datetime.time):
return Rotation.RotationTime(Rotation.forward_day, rotation)
elif isinstance(rotation, datetime.timedelta):
step_forward = partial(Rotation.forward_interval, interval=rotation)
return Rotation.RotationTime(step_forward)
elif callable(rotation):
return rotation
else:
raise TypeError(
"Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__
)
@staticmethod
def _make_retention_function(retention):
if retention is None:
return None
elif isinstance(retention, str):
interval = string_parsers.parse_duration(retention)
if interval is None:
raise ValueError("Cannot parse retention from: '%s'" % retention)
return FileSink._make_retention_function(interval)
elif isinstance(retention, int):
return partial(Retention.retention_count, number=retention)
elif isinstance(retention, datetime.timedelta):
return partial(Retention.retention_age, seconds=retention.total_seconds())
elif callable(retention):
return retention
else:
raise TypeError(
"Cannot infer retention for objects of type: '%s'" % type(retention).__name__
)
@staticmethod
def _make_compression_function(compression):
if compression is None:
return None
elif isinstance(compression, str):
ext = compression.strip().lstrip(".")
if ext == "gz":
import gzip
compress = partial(Compression.copy_compress, opener=gzip.open, mode="wb")
elif ext == "bz2":
import bz2
compress = partial(Compression.copy_compress, opener=bz2.open, mode="wb")
elif ext == "xz":
import lzma
compress = partial(
Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_XZ
)
elif ext == "lzma":
import lzma
compress = partial(
Compression.copy_compress, opener=lzma.open, mode="wb", format=lzma.FORMAT_ALONE
)
elif ext == "tar":
import tarfile
compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:")
elif ext == "tar.gz":
import gzip
import tarfile
compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:gz")
elif ext == "tar.bz2":
import bz2
import tarfile
compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:bz2")
elif ext == "tar.xz":
import lzma
import tarfile
compress = partial(Compression.add_compress, opener=tarfile.open, mode="w:xz")
elif ext == "zip":
import zipfile
compress = partial(
Compression.write_compress,
opener=zipfile.ZipFile,
mode="w",
compression=zipfile.ZIP_DEFLATED,
)
else:
raise ValueError("Invalid compression format: '%s'" % ext)
return partial(Compression.compression, ext="." + ext, compress_function=compress)
elif callable(compression):
return compression
else:
raise TypeError(
"Cannot infer compression for objects of type: '%s'" % type(compression).__name__
)

24
loguru/_filters.py Normal file
View File

@ -0,0 +1,24 @@
def filter_none(record):
return record["name"] is not None
def filter_by_name(record, parent, length):
name = record["name"]
if name is None:
return False
return (name + ".")[:length] == parent
def filter_by_level(record, level_per_module):
name = record["name"]
while True:
level = level_per_module.get(name, None)
if level is False:
return False
if level is not None:
return record["level"].no >= level
if not name:
return True
index = name.rfind(".")
name = name[:index] if index != -1 else ""

23
loguru/_get_frame.py Normal file
View File

@ -0,0 +1,23 @@
import sys
from sys import exc_info
def get_frame_fallback(n):
try:
raise Exception
except Exception:
frame = exc_info()[2].tb_frame.f_back
for _ in range(n):
frame = frame.f_back
return frame
def load_get_frame_function():
if hasattr(sys, "_getframe"):
get_frame = sys._getframe
else:
get_frame = get_frame_fallback
return get_frame
get_frame = load_get_frame_function()

341
loguru/_handler.py Normal file
View File

@ -0,0 +1,341 @@
import functools
import json
import multiprocessing
import os
import threading
from contextlib import contextmanager
from threading import Thread
from ._colorizer import Colorizer
from ._locks_machinery import create_handler_lock
def prepare_colored_format(format_, ansi_level):
colored = Colorizer.prepare_format(format_)
return colored, colored.colorize(ansi_level)
def prepare_stripped_format(format_):
colored = Colorizer.prepare_format(format_)
return colored.strip()
def memoize(function):
return functools.lru_cache(maxsize=64)(function)
class Message(str):
__slots__ = ("record",)
class Handler:
def __init__(
self,
*,
sink,
name,
levelno,
formatter,
is_formatter_dynamic,
filter_,
colorize,
serialize,
enqueue,
multiprocessing_context,
error_interceptor,
exception_formatter,
id_,
levels_ansi_codes
):
self._name = name
self._sink = sink
self._levelno = levelno
self._formatter = formatter
self._is_formatter_dynamic = is_formatter_dynamic
self._filter = filter_
self._colorize = colorize
self._serialize = serialize
self._enqueue = enqueue
self._multiprocessing_context = multiprocessing_context
self._error_interceptor = error_interceptor
self._exception_formatter = exception_formatter
self._id = id_
self._levels_ansi_codes = levels_ansi_codes # Warning, reference shared among handlers
self._decolorized_format = None
self._precolorized_formats = {}
self._memoize_dynamic_format = None
self._stopped = False
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
self._queue = None
self._queue_lock = None
self._confirmation_event = None
self._confirmation_lock = None
self._owner_process_pid = None
self._thread = None
if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
else:
self._memoize_dynamic_format = memoize(prepare_stripped_format)
else:
if self._colorize:
for level_name in self._levels_ansi_codes:
self.update_format(level_name)
else:
self._decolorized_format = self._formatter.strip()
if self._enqueue:
if self._multiprocessing_context is None:
self._queue = multiprocessing.SimpleQueue()
self._confirmation_event = multiprocessing.Event()
self._confirmation_lock = multiprocessing.Lock()
else:
self._queue = self._multiprocessing_context.SimpleQueue()
self._confirmation_event = self._multiprocessing_context.Event()
self._confirmation_lock = self._multiprocessing_context.Lock()
self._queue_lock = create_handler_lock()
self._owner_process_pid = os.getpid()
self._thread = Thread(
target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
)
self._thread.start()
def __repr__(self):
return "(id=%d, level=%d, sink=%s)" % (self._id, self._levelno, self._name)
@contextmanager
def _protected_lock(self):
"""Acquire the lock, but fail fast if its already acquired by the current thread."""
if getattr(self._lock_acquired, "acquired", False):
raise RuntimeError(
"Could not acquire internal lock because it was already in use (deadlock avoided). "
"This likely happened because the logger was re-used inside a sink, a signal "
"handler or a '__del__' method. This is not permitted because the logger and its "
"handlers are not re-entrant."
)
self._lock_acquired.acquired = True
try:
with self._lock:
yield
finally:
self._lock_acquired.acquired = False
def emit(self, record, level_id, from_decorator, is_raw, colored_message):
try:
if self._levelno > record["level"].no:
return
if self._filter is not None:
if not self._filter(record):
return
if self._is_formatter_dynamic:
dynamic_format = self._formatter(record)
formatter_record = record.copy()
if not record["exception"]:
formatter_record["exception"] = ""
else:
type_, value, tb = record["exception"]
formatter = self._exception_formatter
lines = formatter.format_exception(type_, value, tb, from_decorator=from_decorator)
formatter_record["exception"] = "".join(lines)
if colored_message is not None and colored_message.stripped != record["message"]:
colored_message = None
if is_raw:
if colored_message is None or not self._colorize:
formatted = record["message"]
else:
ansi_level = self._levels_ansi_codes[level_id]
formatted = colored_message.colorize(ansi_level)
elif self._is_formatter_dynamic:
if not self._colorize:
precomputed_format = self._memoize_dynamic_format(dynamic_format)
formatted = precomputed_format.format_map(formatter_record)
elif colored_message is None:
ansi_level = self._levels_ansi_codes[level_id]
_, precomputed_format = self._memoize_dynamic_format(dynamic_format, ansi_level)
formatted = precomputed_format.format_map(formatter_record)
else:
ansi_level = self._levels_ansi_codes[level_id]
formatter, precomputed_format = self._memoize_dynamic_format(
dynamic_format, ansi_level
)
coloring_message = formatter.make_coloring_message(
record["message"], ansi_level=ansi_level, colored_message=colored_message
)
formatter_record["message"] = coloring_message
formatted = precomputed_format.format_map(formatter_record)
else:
if not self._colorize:
precomputed_format = self._decolorized_format
formatted = precomputed_format.format_map(formatter_record)
elif colored_message is None:
ansi_level = self._levels_ansi_codes[level_id]
precomputed_format = self._precolorized_formats[level_id]
formatted = precomputed_format.format_map(formatter_record)
else:
ansi_level = self._levels_ansi_codes[level_id]
precomputed_format = self._precolorized_formats[level_id]
coloring_message = self._formatter.make_coloring_message(
record["message"], ansi_level=ansi_level, colored_message=colored_message
)
formatter_record["message"] = coloring_message
formatted = precomputed_format.format_map(formatter_record)
if self._serialize:
formatted = self._serialize_record(formatted, record)
str_record = Message(formatted)
str_record.record = record
with self._protected_lock():
if self._stopped:
return
if self._enqueue:
self._queue.put(str_record)
else:
self._sink.write(str_record)
except Exception:
if not self._error_interceptor.should_catch():
raise
self._error_interceptor.print(record)
def stop(self):
with self._protected_lock():
self._stopped = True
if self._enqueue:
if self._owner_process_pid != os.getpid():
return
self._queue.put(None)
self._thread.join()
if hasattr(self._queue, "close"):
self._queue.close()
self._sink.stop()
def complete_queue(self):
if not self._enqueue:
return
with self._confirmation_lock:
self._queue.put(True)
self._confirmation_event.wait()
self._confirmation_event.clear()
def tasks_to_complete(self):
if self._enqueue and self._owner_process_pid != os.getpid():
return []
lock = self._queue_lock if self._enqueue else self._protected_lock()
with lock:
return self._sink.tasks_to_complete()
def update_format(self, level_id):
if not self._colorize or self._is_formatter_dynamic:
return
ansi_code = self._levels_ansi_codes[level_id]
self._precolorized_formats[level_id] = self._formatter.colorize(ansi_code)
@property
def levelno(self):
return self._levelno
@staticmethod
def _serialize_record(text, record):
exception = record["exception"]
if exception is not None:
exception = {
"type": None if exception.type is None else exception.type.__name__,
"value": exception.value,
"traceback": bool(exception.traceback),
}
serializable = {
"text": text,
"record": {
"elapsed": {
"repr": record["elapsed"],
"seconds": record["elapsed"].total_seconds(),
},
"exception": exception,
"extra": record["extra"],
"file": {"name": record["file"].name, "path": record["file"].path},
"function": record["function"],
"level": {
"icon": record["level"].icon,
"name": record["level"].name,
"no": record["level"].no,
},
"line": record["line"],
"message": record["message"],
"module": record["module"],
"name": record["name"],
"process": {"id": record["process"].id, "name": record["process"].name},
"thread": {"id": record["thread"].id, "name": record["thread"].name},
"time": {"repr": record["time"], "timestamp": record["time"].timestamp()},
},
}
return json.dumps(serializable, default=str, ensure_ascii=False) + "\n"
def _queued_writer(self):
message = None
queue = self._queue
# We need to use a lock to protect sink during fork.
# Particularly, writing to stderr may lead to deadlock in child process.
lock = self._queue_lock
while True:
try:
message = queue.get()
except Exception:
with lock:
self._error_interceptor.print(None)
continue
if message is None:
break
if message is True:
self._confirmation_event.set()
continue
with lock:
try:
self._sink.write(message)
except Exception:
self._error_interceptor.print(message.record)
def __getstate__(self):
state = self.__dict__.copy()
state["_lock"] = None
state["_lock_acquired"] = None
state["_memoize_dynamic_format"] = None
if self._enqueue:
state["_sink"] = None
state["_thread"] = None
state["_owner_process"] = None
state["_queue_lock"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
if self._enqueue:
self._queue_lock = create_handler_lock()
if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
else:
self._memoize_dynamic_format = memoize(prepare_stripped_format)

View File

@ -0,0 +1,50 @@
import os
import threading
import weakref
if not hasattr(os, "register_at_fork"):
def create_logger_lock():
return threading.Lock()
def create_handler_lock():
return threading.Lock()
else:
# While forking, we need to sanitize all locks to make sure the child process doesn't run into
# a deadlock (if a lock already acquired is inherited) and to protect sink from corrupted state.
# It's very important to acquire logger locks before handlers one to prevent possible deadlock
# while 'remove()' is called for example.
logger_locks = weakref.WeakSet()
handler_locks = weakref.WeakSet()
def acquire_locks():
for lock in logger_locks:
lock.acquire()
for lock in handler_locks:
lock.acquire()
def release_locks():
for lock in logger_locks:
lock.release()
for lock in handler_locks:
lock.release()
os.register_at_fork(
before=acquire_locks,
after_in_parent=release_locks,
after_in_child=release_locks,
)
def create_logger_lock():
lock = threading.Lock()
logger_locks.add(lock)
return lock
def create_handler_lock():
lock = threading.Lock()
handler_locks.add(lock)
return lock

2101
loguru/_logger.py Normal file

File diff suppressed because it is too large Load Diff

90
loguru/_recattrs.py Normal file
View File

@ -0,0 +1,90 @@
import pickle
from collections import namedtuple
class RecordLevel:
__slots__ = ("name", "no", "icon")
def __init__(self, name, no, icon):
self.name = name
self.no = no
self.icon = icon
def __repr__(self):
return "(name=%r, no=%r, icon=%r)" % (self.name, self.no, self.icon)
def __format__(self, spec):
return self.name.__format__(spec)
class RecordFile:
__slots__ = ("name", "path")
def __init__(self, name, path):
self.name = name
self.path = path
def __repr__(self):
return "(name=%r, path=%r)" % (self.name, self.path)
def __format__(self, spec):
return self.name.__format__(spec)
class RecordThread:
__slots__ = ("id", "name")
def __init__(self, id_, name):
self.id = id_
self.name = name
def __repr__(self):
return "(id=%r, name=%r)" % (self.id, self.name)
def __format__(self, spec):
return self.id.__format__(spec)
class RecordProcess:
__slots__ = ("id", "name")
def __init__(self, id_, name):
self.id = id_
self.name = name
def __repr__(self):
return "(id=%r, name=%r)" % (self.id, self.name)
def __format__(self, spec):
return self.id.__format__(spec)
class RecordException(namedtuple("RecordException", ("type", "value", "traceback"))):
def __repr__(self):
return "(type=%r, value=%r, traceback=%r)" % (self.type, self.value, self.traceback)
def __reduce__(self):
# The traceback is not picklable, therefore it needs to be removed. Additionally, there's a
# possibility that the exception value is not picklable either. In such cases, we also need
# to remove it. This is done for user convenience, aiming to prevent error logging caused by
# custom exceptions from third-party libraries. If the serialization succeeds, we can reuse
# the pickled value later for optimization (so that it's not pickled twice). It's important
# to note that custom exceptions might not necessarily raise a PickleError, hence the
# generic Exception catch.
try:
pickled_value = pickle.dumps(self.value)
except Exception:
return (RecordException, (self.type, None, None))
else:
return (RecordException._from_pickled_value, (self.type, pickled_value, None))
@classmethod
def _from_pickled_value(cls, type_, pickled_value, traceback_):
try:
# It's safe to use "pickle.loads()" in this case because the pickled value is generated
# by the same code and is not coming from an untrusted source.
value = pickle.loads(pickled_value)
except Exception:
return cls(type_, None, traceback_)
else:
return cls(type_, value, traceback_)

128
loguru/_simple_sinks.py Normal file
View File

@ -0,0 +1,128 @@
import asyncio
import logging
import weakref
from ._asyncio_loop import get_running_loop, get_task_loop
class StreamSink:
def __init__(self, stream):
self._stream = stream
self._flushable = callable(getattr(stream, "flush", None))
self._stoppable = callable(getattr(stream, "stop", None))
self._completable = asyncio.iscoroutinefunction(getattr(stream, "complete", None))
def write(self, message):
self._stream.write(message)
if self._flushable:
self._stream.flush()
def stop(self):
if self._stoppable:
self._stream.stop()
def tasks_to_complete(self):
if not self._completable:
return []
return [self._stream.complete()]
class StandardSink:
def __init__(self, handler):
self._handler = handler
def write(self, message):
record = message.record
message = str(message)
exc = record["exception"]
record = logging.getLogger().makeRecord(
record["name"],
record["level"].no,
record["file"].path,
record["line"],
message,
(),
(exc.type, exc.value, exc.traceback) if exc else None,
record["function"],
{"extra": record["extra"]},
)
if exc:
record.exc_text = "\n"
self._handler.handle(record)
def stop(self):
self._handler.close()
def tasks_to_complete(self):
return []
class AsyncSink:
def __init__(self, function, loop, error_interceptor):
self._function = function
self._loop = loop
self._error_interceptor = error_interceptor
self._tasks = weakref.WeakSet()
def write(self, message):
try:
loop = self._loop or get_running_loop()
except RuntimeError:
return
coroutine = self._function(message)
task = loop.create_task(coroutine)
def check_exception(future):
if future.cancelled() or future.exception() is None:
return
if not self._error_interceptor.should_catch():
raise future.exception()
self._error_interceptor.print(message.record, exception=future.exception())
task.add_done_callback(check_exception)
self._tasks.add(task)
def stop(self):
for task in self._tasks:
task.cancel()
def tasks_to_complete(self):
# To avoid errors due to "self._tasks" being mutated while iterated, the
# "tasks_to_complete()" method must be protected by the same lock as "write()" (which
# happens to be the handler lock). However, the tasks must not be awaited while the lock is
# acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks
# to complete, then return them so that they can be awaited outside of the lock.
return [self._complete_task(task) for task in self._tasks]
async def _complete_task(self, task):
loop = get_running_loop()
if get_task_loop(task) is not loop:
return
try:
await task
except Exception:
pass # Handled in "check_exception()"
def __getstate__(self):
state = self.__dict__.copy()
state["_tasks"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._tasks = weakref.WeakSet()
class CallableSink:
def __init__(self, function):
self._function = function
def write(self, message):
self._function(message)
def stop(self):
pass
def tasks_to_complete(self):
return []

187
loguru/_string_parsers.py Normal file
View File

@ -0,0 +1,187 @@
import datetime
import re
class Frequencies:
@staticmethod
def hourly(t):
dt = t + datetime.timedelta(hours=1)
return dt.replace(minute=0, second=0, microsecond=0)
@staticmethod
def daily(t):
dt = t + datetime.timedelta(days=1)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
@staticmethod
def weekly(t):
dt = t + datetime.timedelta(days=7 - t.weekday())
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
@staticmethod
def monthly(t):
if t.month == 12:
y, m = t.year + 1, 1
else:
y, m = t.year, t.month + 1
return t.replace(year=y, month=m, day=1, hour=0, minute=0, second=0, microsecond=0)
@staticmethod
def yearly(t):
y = t.year + 1
return t.replace(year=y, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
def parse_size(size):
size = size.strip()
reg = re.compile(r"([e\+\-\.\d]+)\s*([kmgtpezy])?(i)?(b)", flags=re.I)
match = reg.fullmatch(size)
if not match:
return None
s, u, i, b = match.groups()
try:
s = float(s)
except ValueError as e:
raise ValueError("Invalid float value while parsing size: '%s'" % s) from e
u = "kmgtpezy".index(u.lower()) + 1 if u else 0
i = 1024 if i else 1000
b = {"b": 8, "B": 1}[b] if b else 1
size = s * i**u / b
return size
def parse_duration(duration):
duration = duration.strip()
reg = r"(?:([e\+\-\.\d]+)\s*([a-z]+)[\s\,]*)"
units = [
("y|years?", 31536000),
("months?", 2628000),
("w|weeks?", 604800),
("d|days?", 86400),
("h|hours?", 3600),
("min(?:ute)?s?", 60),
("s|sec(?:ond)?s?", 1),
("ms|milliseconds?", 0.001),
("us|microseconds?", 0.000001),
]
if not re.fullmatch(reg + "+", duration, flags=re.I):
return None
seconds = 0
for value, unit in re.findall(reg, duration, flags=re.I):
try:
value = float(value)
except ValueError as e:
raise ValueError("Invalid float value while parsing duration: '%s'" % value) from e
try:
unit = next(u for r, u in units if re.fullmatch(r, unit, flags=re.I))
except StopIteration:
raise ValueError("Invalid unit value while parsing duration: '%s'" % unit) from None
seconds += value * unit
return datetime.timedelta(seconds=seconds)
def parse_frequency(frequency):
frequencies = {
"hourly": Frequencies.hourly,
"daily": Frequencies.daily,
"weekly": Frequencies.weekly,
"monthly": Frequencies.monthly,
"yearly": Frequencies.yearly,
}
frequency = frequency.strip().lower()
return frequencies.get(frequency, None)
def parse_day(day):
days = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
day = day.strip().lower()
if day in days:
return days[day]
elif day.startswith("w") and day[1:].isdigit():
day = int(day[1:])
if not 0 <= day < 7:
raise ValueError("Invalid weekday value while parsing day (expected [0-6]): '%d'" % day)
else:
day = None
return day
def parse_time(time):
time = time.strip()
reg = re.compile(r"^[\d\.\:]+\s*(?:[ap]m)?$", flags=re.I)
if not reg.match(time):
return None
formats = [
"%H",
"%H:%M",
"%H:%M:%S",
"%H:%M:%S.%f",
"%I %p",
"%I:%M %S",
"%I:%M:%S %p",
"%I:%M:%S.%f %p",
]
for format_ in formats:
try:
dt = datetime.datetime.strptime(time, format_)
except ValueError:
pass
else:
return dt.time()
raise ValueError("Unrecognized format while parsing time: '%s'" % time)
def parse_daytime(daytime):
daytime = daytime.strip()
reg = re.compile(r"^(.*?)\s+at\s+(.*)$", flags=re.I)
match = reg.match(daytime)
if match:
day, time = match.groups()
else:
day = time = daytime
try:
day = parse_day(day)
if match and day is None:
raise ValueError
except ValueError as e:
raise ValueError("Invalid day while parsing daytime: '%s'" % day) from e
try:
time = parse_time(time)
if match and time is None:
raise ValueError
except ValueError as e:
raise ValueError("Invalid time while parsing daytime: '%s'" % time) from e
if day is None and time is None:
return None
return day, time

0
loguru/py.typed Normal file
View File

View File

@ -1,9 +1,11 @@
import os, glob
from loguru import logger as debug_logger
import gradio as gr
from PIL import Image
try:
import torch.cuda as cuda
except:
debug_logger.exception("Error")
cuda = None
from typing import List
@ -33,6 +35,8 @@ from scripts.console_log_patch import apply_logging_patch
from scripts.reactor_helpers import make_grid, get_image_path, set_Device
from scripts.reactor_globals import DEVICE, DEVICE_LIST
log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "_faceswap.log")
debug_logger.add(log_path, backtrace=True, diagnose=True)
MODELS_PATH = None
@ -58,6 +62,7 @@ class FaceSwapScript(scripts.Script):
def show(self, is_img2img):
return scripts.AlwaysVisible
@debug_logger.catch
def ui(self, is_img2img):
with gr.Accordion(f"{app_title}", open=False):
with gr.Tab("Main"):
@ -467,6 +472,7 @@ class FaceSwapScriptExtras(scripts_postprocessing.ScriptPostprocessing):
name = 'ReActor'
order = 20000
@debug_logger.catch
def ui(self):
with gr.Accordion(f"{app_title}", open=False):
with gr.Tab("Main"):

View File

@ -1,10 +1,15 @@
import os
from pathlib import Path
from loguru import logger as debug_logger
log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "_globals.log")
debug_logger.add(log_path, backtrace=True, diagnose=True)
IS_RUN: bool = False
BASE_PATH = os.path.join(Path(__file__).parents[1])
DEVICE_LIST: list = ["CPU", "CUDA"]
@debug_logger.catch
def updateDevice():
try:
LAST_DEVICE_PATH = os.path.join(BASE_PATH, "last_device.txt")
@ -12,7 +17,14 @@ def updateDevice():
for el in f:
device = el.strip()
except:
debug_logger.exception("Error")
device = "CPU"
return device
DEVICE = updateDevice()
# @debug_logger.catch
# def test(a, b):
# return a / b
# test(1, 0)

View File

@ -10,12 +10,18 @@ from modules.images import FilenameGenerator, get_next_sequence_number
from modules import shared, script_callbacks
from scripts.reactor_globals import DEVICE, BASE_PATH
from loguru import logger as debug_logger
log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "_helpers.log")
debug_logger.add(log_path, backtrace=True, diagnose=True)
@debug_logger.catch
def set_Device(value):
global DEVICE
DEVICE = value
with open(os.path.join(BASE_PATH, "last_device.txt"), "w") as txt:
txt.write(DEVICE)
@debug_logger.catch
def get_Device():
global DEVICE
return DEVICE

View File

@ -28,6 +28,10 @@ except:
import warnings
from loguru import logger as debug_logger
log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "_swapper.log")
debug_logger.add(log_path, backtrace=True, diagnose=True)
np.warnings = warnings
np.warnings.filterwarnings('ignore')
@ -85,7 +89,7 @@ SOURCE_IMAGE_HASH = None
TARGET_FACES = None
TARGET_IMAGE_HASH = None
@debug_logger.catch
def getAnalysisModel():
global ANALYSIS_MODEL
if ANALYSIS_MODEL is None:
@ -94,7 +98,7 @@ def getAnalysisModel():
)
return ANALYSIS_MODEL
@debug_logger.catch
def getFaceSwapModel(model_path: str):
global FS_MODEL
global CURRENT_FS_MODEL_PATH
@ -220,6 +224,7 @@ def half_det_size(det_size):
logger.status("Trying to halve 'det_size' parameter")
return (det_size[0] // 2, det_size[1] // 2)
@debug_logger.catch
def analyze_faces(img_data: np.ndarray, det_size=(640, 640)):
logger.info("Applied Execution Provider: %s", PROVIDERS[0])
face_analyser = copy.deepcopy(getAnalysisModel())
@ -269,7 +274,7 @@ def get_face_single(img_data: np.ndarray, face, face_index=0, det_size=(640, 640
except IndexError:
return None, 0, face_age, face_gender
@debug_logger.catch
def swap_face(
source_img: Image.Image,
target_img: Image.Image,