loguru icon indicating copy to clipboard operation
loguru copied to clipboard

RFC: Nested structured logging

Open sliedes opened this issue 2 years ago • 6 comments

Introduction

First of all, thanks for developing and maintaining Loguru! It's my general go-to logging solution.

Now, this is more of a piece for discussion and ideas. It actually arose from me misunderstanding what people mean by structured logging and ending up wanting something that makes a lot of sense to me. Loguru doesn't, as far as I can tell, support that use case terribly well, but I have some code that builds on Loguru and almost works, apart from logging weird "$$query_log_entry" log entries on the text log (that could be filtered on the sink).

It may be that I have missed some completely obvious way to do it better, in which case I'm interested. I think I did explore a few avenues, though, when designing this.

Nested logging

When people say "structured logging", I think they mean serializing something like valid JSON, with their provided values as JSON. This makes sense to me, and is part of what I want. But there's another part that may require some explanation: Nested logging.

It's always seemed to me like using a flat list of single-point-in-time log events throws away a lot of useful information. Consider this list of events:

  • Session 123 opened by user "jrandom".
  • In session 123, the user executed a query "ls".
  • We performed alias expansion on the query, getting some result.
  • We sent the query to our query servers.
  • We also sent it to our moderation servers to ensure that it's acceptable.
  • We received a response from the query server.
  • We received a response from the moderation server.
  • We sent the response to the frontend to be shown to the user.

To me, this all is inherently structured, and not only in the individual event level. Rather, we have two types of things:

  • Singular events
  • Regions (or sections, or what you would like to call them) that can contain other regions and singular events

In Python, I'd express this as something like:


import nested_logging as NL, logger

async def query_query_server(query: str) -> str:
    with NL.ServerQueryEvent(query=query):
        ...

async def query_moderation_server(query: str) -> bool:
    with NL.ModerationQueryEvent(query=query):
        ...

async def handle_session(query: str) -> None:
    with NL.SessionOpened(id=123, user="jrandom"):
        with NL.QueryRegion(query=query):
            logger.debug("Performed variable expansions: {}", ...)
            query_result, mod_result = await asyncio.gather(
                query_query_server(query),
                query_moderation_server(query))
            if mod_result:
                logger.info("Sending result to user: {}", ...)
                ...
                return
            logger.warning("Query triggered moderation, not forwarding result")

As a result, I'd expect to get something that has a logical nested structure (think flame graph); the serialization format probably still is JSONL with lines for region starts and ends and events (and if everything is logged when it comes, the logical view will really need to be reconstructed):

  • 00:12:12 session id=123 user=jrandom opened
    • 00:12:13 executing query "ls" - 00:12:13 performed variable expansions: result is "dir" - 00:12:14 sent query to query server | - 00:12:20 received query result | - 00:12:20 <region closed> - 00:12:14 sent query to moderation server
      • 00:12:18 received moderation result: deny
      • 00:12:19 <region closed> - 00:12:20 query triggered moderation, not forwarding result - 00:12:20 <region closed>
    • 00:12:20 <region closed>

Implementation and discussion

Some wishlist items

Working on this has produced a few wishlist items. Now, I'm appreciative of the fact that it probably makes sense to keep Loguru focused, so please take this more as food for thought!

  1. Some kind of support for nested logging, or at least something making it less hard to implement
    • Either a context manager that introduces context in some nestable fashion, or...
    • Being able to somehow append to a context would be a big help already (see #749)
    • Actually, unit testing this is kind of nontrivial because it's quite hard to start from a clean slate. I use something unloading and reloading loguru. I don't know what really could be done on Loguru side. Perhaps I should look at Loguru's unit tests, unless they use some private unexposed functionality to achieve the same.
    • A context manager for temporarily, and not globally but within a context, switching log files would actually be rather neat (let's say we want each session in its own log file)
  2. (rather trivial:) Some of the names are not exported by the loguru package, necessitating putting type annotations as strings (at least loguru.Message)

Code implementing something like what I described

Now, I have some code implementing this (see below). I must admit I do not remember why I decided to build this on top of Loguru, and if Loguru actually provides some benefits for my particular use case of a few well structured log messages. However, I do think this would fit quite neatly into "what Loguru does", with the idea that every log message would have a context. My solution does not tie the free-form messages to a context (since we wanted to log a limited set of well formed messages), but I think that would be useful too.

The using code generally looks something like this:


import query_logging as qlog

# In my implementation, ServerStartedEvent also initializes the logging and opens the log files (an implementation detail, surely)
qlog.ServerStartedEvent.log(args=sys.argv)

...

with qlog.ConnectionEvent.section(id=websocket.id, local_addr=websocket.local_address, remote_addr=remote_address):
    ...

Here's the core of the code (my full implementation does things like switch log files on specific events etc.).

from __future__ import annotations

import contextlib
import copy
import dataclasses
import os
import sys
from abc import ABC, abstractmethod
from contextvars import ContextVar
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Iterator

import loguru
from overrides import override
from typing_extensions import Self


class LoggingNotInitializedError(Exception):
    """Raised when logging is used before it's initialized."""


class _LoggingEnvironment:
    """
    Base class representing a logging environment. This one logs to /dev/null and can thus also
    be used directly in tests.

    There is (at most) one per execution. It contains things like the run UUID.

    For server runs, this is initialized by logging a ServerStartedEvent. For test and script runs,
    something else should usually be done in order to not pollute the server logs.
    """

    _query_logger: loguru.Logger

    LOG_PATH: str = os.devnull  # override in subclasses

    @classmethod
    def _add_regular_logger_handlers(cls) -> None:
        """
        Add regular logger handlers.
        """

        if cls.LOG_PATH == os.devnull:
            return

        Path(cls.LOG_PATH).mkdir(parents=True, exist_ok=True)

        # Log the regular logs as JSON...
        loguru.logger.add(
            Path(cls.LOG_PATH) / "debug.log.json",
            level="DEBUG",
            serialize=True,
            enqueue=True,
            rotation="250 MB",
        )

        # ... and, for convenience, also as text.
        loguru.logger.add(
            Path(cls.LOG_PATH) / "debug.log.txt",
            level="DEBUG",
            enqueue=True,
            rotation="250 MB",
        )

    def __init__(self) -> None:
        """Initialize the logging environment."""

        loguru.logger.info("Starting logging to '{}'", self.LOG_PATH)

        if self.LOG_PATH != os.devnull:
            loguru.logger.remove()
            loguru.logger.add(sys.stderr, level=os.environ.get("LOGURU_LEVEL", "INFO"))
            self._add_regular_logger_handlers()

        self._query_logger = loguru.logger.bind(logger="query_logger")
        self._query_logger.add(_handler, filter=lambda r: r["message"] == _RawQueryLogEntry._MAGIC)


class _ServerLoggingEnvironment(_LoggingEnvironment):
    """A logging environment for server runs where we want to save the logs (as opposed to unit tests)."""

    LOG_PATH = "logs"


_logging_environment: _LoggingEnvironment | None = None
_query_logger_context: ContextVar[_Context | None] = ContextVar("query_logger_context", default=None)


def _env() -> _LoggingEnvironment:
    """Get the logging environment."""
    if _logging_environment is None:
        raise LoggingNotInitializedError(
            "Logging environment not initialized by ServerStartedEvent (for servers) or similar"
        )
    return _logging_environment


@dataclass
class _Context:
    """
    A context for logging. This is used to keep track of the current logfile and sections we're in,
    in a stack-like fashion.
    """

    vars: dict[str, Any]
    outer: _Context | None = None
    depth: int = field(init=False)
    _old_context: _Context | None = field(default=None, repr=False, init=False)

    def __post_init__(self) -> None:
        self.depth = 0 if self.outer is None else self.outer.depth + 1

    def get(self, key: str, default: Any = None) -> Any:
        if not isinstance(key, str):
            raise TypeError
        """Get a context variable."""
        if key in self.vars:
            return self.vars[key]
        if self.outer is not None:
            return self.outer.get(key, default)
        return default

    def as_dict(self) -> dict[str, Any]:
        """Convert the context to a dictionary."""
        return dict(depth=self.depth, vars=self.vars, outer=self.outer.as_dict() if self.outer else None)

    def nest(self, **kwargs: Any) -> _Context:
        """Nest a new context."""
        return _Context(vars=kwargs, outer=self)

    def __enter__(self) -> Self:
        """Enter the context."""
        self._old_context = _query_logger_context.get()
        _query_logger_context.set(self)
        return self

    def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None:
        """Exit the context."""
        _query_logger_context.set(self._old_context)


def _get_query_logger_context() -> _Context:
    """
    Get the current query logger context.

    For creating new contexts, prefer _context().
    """
    context = _query_logger_context.get()
    if context is None:
        context = _Context(vars={})
        _query_logger_context.set(context)
    return context


@contextlib.contextmanager
def _context(**kwargs: Any) -> Iterator[_Context]:
    """
    Add contextual information to the query log.

    When creating a new context, this context manager takes care of the cleanup of the old context.
    """
    outer_context = _get_query_logger_context()
    with outer_context.nest(**kwargs) as inner_context:
        yield inner_context


@dataclass(frozen=True)
class _Metadata:
    """Metadata about a query log entry."""

    level: str
    time: datetime
    elapsed: timedelta

    @staticmethod
    def from_loguru_record(record: loguru.Record) -> _Metadata:
        assert isinstance(record["time"], datetime)
        assert isinstance(record["elapsed"], timedelta)
        return _Metadata(
            elapsed=record["elapsed"],
            level=record["level"].name,
            time=record["time"],
        )


@dataclass(frozen=True)
class _RawQueryLogEntry:
    """A single entry in the query log, encompassing metadata and log data."""

    rec: dict[str, Any]
    meta: _Metadata
    _MAGIC = "$$query_log_entry"

    @staticmethod
    def from_loguru_record(record: loguru.Record) -> _RawQueryLogEntry:
        if record["message"] != _RawQueryLogEntry._MAGIC:
            raise ValueError(f"Invalid log entry: {record}")
        extra = copy.copy(record["extra"])
        if "logger" in extra and extra["logger"] == "query_logger":
            del extra["logger"]
        return _RawQueryLogEntry(
            meta=_Metadata.from_loguru_record(record),
            rec=extra,
        )

    def as_dict(self) -> dict[str, Any]:
        """Convert the query log entry to a dictionary."""
        return dict(
            rec=self.rec,
            meta=dataclasses.asdict(self.meta),
        )


def _handler(msg: "loguru.Message") -> None:
    ...  # Write to a log file


@contextlib.contextmanager
def _log_section(event: _LogEvent) -> Iterator[None]:
    """Add contextual information to the query log."""
    event_dict = event.to_dict()
    event_name = event_dict.pop("event")
    with _context(event=event_name, args=event_dict) as inner_context:
        _env()._query_logger.bind(
            event=event_name, args=event_dict, section=True, context=inner_context.as_dict()
        ).info(_RawQueryLogEntry._MAGIC)

        exception: str | None = None
        try:
            yield
        except Exception as e:
            try:
                exception = f"{type(e).__name__}: {e}"
            except Exception:
                exception = f"Exception of type {type(e)} that cannot be converted to string"
            raise
        finally:
            _env()._query_logger.bind(
                event="end_section", name=event_name, section=True, context=inner_context.as_dict(), exception=exception
            ).info(_RawQueryLogEntry._MAGIC)


class _LogEvent(ABC):
    """
    An abstract base class representing a log event.
    Classes derived from this class represent specific kinds of log

    There are two ways to use the events:

    1. query_logging.ServerStartedEvent.log(args=sys.argv)
    4. with query_logging.ServerStartedEvent.section(args=sys.argv): ...

    Most subclasses only support one or the other.
    """

    @abstractmethod
    def to_dict(self) -> dict[str, Any]:
        """Convert the event to a dictionary."""


def _log(event: _LogEvent) -> None:
    """Log a query log entry."""
    d = event.to_dict()
    event_name = d.pop("event")

    _env()._query_logger.bind(
        event=event_name, args=d, section=False, context=_get_query_logger_context().as_dict()
    ).info(_RawQueryLogEntry._MAGIC)


def init_disabled_logging() -> None:
    """Initialize logging for a test run."""
    global _logging_environment
    _logging_environment = _LoggingEnvironment()


# This is used just as a hint to make sure users don't instantiate some classes.
_PRIVATE_OBJECT = object()


class ServerStartedEvent(_LogEvent):
    """A server started event."""

    args: list[str]

    def __init__(self, private_object: object, args: list[str]) -> None:
        """Private constructor. Use the log() method instead."""
        assert private_object is _PRIVATE_OBJECT, "Cannot instantiate ServerStartedEvent"
        self.args = args

    # We need to repeat to_dict and log in every subclass, sadly, to get mypy and type hinting
    # in IDEs to work well.
    @override
    def to_dict(self) -> dict[str, Any]:
        """Convert the event to a dictionary."""
        return dict(event="server_started", args=self.args)

    @staticmethod
    def log(args: list[str]) -> None:
        """Log the event. Also initializes the logging environment."""

        global _logging_environment
        if _logging_environment is None:
            _logging_environment = _ServerLoggingEnvironment()
        else:
            raise RuntimeError("ServerStartedEvent already logged. There should be only one.")

        _log(ServerStartedEvent(_PRIVATE_OBJECT, args=args))


class ConnectionEvent(_LogEvent):
    """A connection event."""

    id: str
    local_addr: str
    remote_addr: str

    def __init__(self, private_object: object, id: str, local_addr: str, remote_addr: str) -> None:
        """Private constructor. Use the section() method instead."""
        assert private_object is _PRIVATE_OBJECT, "Cannot instantiate ConnectionEvent"
        self.id = id
        self.local_addr = local_addr
        self.remote_addr = remote_addr

    @override
    def to_dict(self) -> dict[str, Any]:
        """Convert the event to a dictionary."""
        return dict(event="connection", id=self.id, local_addr=self.local_addr, remote_addr=self.remote_addr)

    @staticmethod
    @contextlib.contextmanager
    def section(id: str, local_addr: str, remote_addr: str) -> Iterator[None]:
        with _log_section(ConnectionEvent(_PRIVATE_OBJECT, id=id, local_addr=local_addr, remote_addr=remote_addr)):
            yield


class QueryEvent(_LogEvent):
    """A query event."""

    text: str
    uuid: str
    prompt: str

    def __init__(self, private_object: object, text: str, uuid: str, prompt: str) -> None:
        """Private constructor. Use the section() method instead."""
        assert private_object is _PRIVATE_OBJECT, "Cannot instantiate QueryEvent"
        self.text = text
        self.uuid = uuid
        self.prompt = prompt

    @override
    def to_dict(self) -> dict[str, Any]:
        """Convert the event to a dictionary."""
        return dict(event="query", text=self.text, uuid=self.uuid, prompt=self.prompt)

    @staticmethod
    @contextlib.contextmanager
    def section(text: str, uuid: str, prompt: str) -> Iterator[None]:
        with _log_section(QueryEvent(_PRIVATE_OBJECT, text=text, uuid=uuid, prompt=prompt)):
            yield


class UserInputReceivedEvent(_LogEvent):
    """A user input event."""

    text: str
    uuid: str

    def __init__(self, private_object: object, text: str, uuid: str) -> None:
        """Private constructor. Use the log() method instead."""
        assert private_object is _PRIVATE_OBJECT, "Cannot instantiate UserInputReceivedEvent"
        self.text = text
        self.uuid = uuid

    @override
    def to_dict(self) -> dict[str, Any]:
        """Convert the event to a dictionary."""
        return dict(event="user_input_received", text=self.text, uuid=self.uuid)

    @staticmethod
    def log(text: str, uuid: str) -> None:
        """Log the event."""
        _log(UserInputReceivedEvent(_PRIVATE_OBJECT, text=text, uuid=uuid))

sliedes avatar Sep 19 '23 16:09 sliedes

Hello @sliedes,

I appreciate you sharing your thoughts on this matter. Your approach to log visualization brings to mind another project I came across some time ago: eliottree. I find it to be an intriguing concept. While it's not natively integrated into Loguru, I'd like Loguru to be compatible with this type of representation.

As a demonstration, I've created a rudimentary implementation:

import sys

from loguru import logger


class NestedLogging:

    def __init__(self):
        self._kwargs = {}
        self._nesting = 0

    def __enter__(self):
        context = " ".join(f"{k}={v}" for k, v in self._kwargs.items())
        logger.info("Session {}", context)
        self._nesting += 1

    def __exit__(self, *exc):
        logger.info("<Region Closed>")
        self._nesting -= 1
        return False

    def Session(self, **kwargs):
        self._kwargs = kwargs
        return self

    def make_logging_formatter(self, record):
        prefix = "    " * self._nesting
        return prefix + "{time:HH-mm-ss} {message}\n"


N = NestedLogging()

logger.remove()
logger.add(sys.stderr, format=N.make_logging_formatter)

def query():
    logger.info("Blabla")
    with N.Session(foo="bar"):
        logger.debug("Bla")
        with N.Session():
            logger.debug("...")

def handle():
    with N.Session(id=123, user="jrandom"):
        with N.Session(query="fetch"):
            logger.debug("Performed variable expansions: {}", ...)
            logger.warning("Query triggered moderation, not forwarding result")
        query()
        logger.debug("Handling finished")

if __name__ == "__main__":
    handle()

However, it's worth noting that this approach has technical challenges, particularly in the context of multi-threading. It may lead to sessions interleaving with one another, which is not desirable. Consequently, it might be more prudent to implement this as a post-processing step. This post-processing step could parse the logs and format them according to your preferences, utilizing a "depth" tag that's present in the logs.

Given the complexity of this business, I think it's better to make it a tool built on top of Loguru, rather than trying to integrate it directly. Feel free to share your thoughts or concerns regarding this approach.

Delgan avatar Sep 19 '23 18:09 Delgan

Indeed. In some sense, I think it would be nice if the output is still the "list of events" and usable to the same extent as logs are now, but with some contextualizing information (unique identifiers) that tie it into a session. Which luckily is pretty close to what happens by default.

Some nuances come from the decision of whether log lines should be self contained (having the entire context in all of them). That probably depends on the application. Not having them self-contained is neater in many ways, but then you cannot parse them without having the entire log from the beginning of the whole execution.

For my project, I needed to produce a machine parseable log of what happened. In that sense, the inclusion of a textual "message" was counterproductive, although of course it could be thought of as similar to a "type" field in a JSON object.

The multiprocessing caveat you mentioned also applies to async, and is quite natural there. In this sense, it feels to me like the most useful thing would be to "define a context", getting a unique identifier, and then only parametrizing each log entry with that identifier.

In my code, I ended up replicating Loguru's internal ContextVar mechanism because I needed to maintain information that was not just a dict. To make applications like this more straightforward, I think it would be nice if Loguru offered the possibility of 1) contextualizing with arbitrary things; 2) allowing some mechanism to extend the context.

When I think of it, an approach that would feel reasonable and extendable (hopefully future-proof) would be to allow handlers that not only receive a loguru.Message but some kind of an object through which it can query for more information. It could be either a function/coroutine that takes an additional object, or in more OOPy style something that allows overriding default behavior / hooks. In this way, adding new available information would not break backwards compatibility.

class MyHandler(loguru.Handler):
    @override
    def handle_message(msg: loguru.Message, ctx: loguru.LoggingContext) -> None:
            # Here "LoggingContext" is perhaps a confusing name because it's not related to .contextualize().
            # What I mean is an object through which the function can get info:
            curr_context = ctx.current_context()
            log_file = ctx.log_file_name()  # Just as an example of random things that _could_ be available

     @override
     def colorize(...) -> ...: ...  # or whatever overridable behavior

    # maybe it would be possible for this to also be a logging.Handler or to have a method that returns a logging.Handler?

On the test-related note: Do you think you'd be interested in a PR that modified Loguru to additionally and purely optionally support a "no global state" model, where you could do something like


from loguru import Loguru

def main() -> None:
    loguru = Loguru()
    logger = loguru.logger

    some_func(logger)

    # if we for some reason wanted another completely independent Loguru, we could do it:
    loguru2 = Loguru()
    ...

? On my side, this would at least make testing easier, since I also want to test logging functionality. For most software I'd assume a singleton/global state logging facility to be the best solution, but sometimes something else would be useful.

sliedes avatar Sep 20 '23 13:09 sliedes

In my code, I ended up replicating Loguru's internal ContextVar mechanism because I needed to maintain information that was not just a dict. To make applications like this more straightforward, I think it would be nice if Loguru offered the possibility of 1) contextualizing with arbitrary things; 2) allowing some mechanism to extend the context.

When I think of it, an approach that would feel reasonable and extendable (hopefully future-proof) would be to allow handlers that not only receive a loguru.Message but some kind of an object through which it can query for more information. It could be either a function/coroutine that takes an additional object, or in more OOPy style something that allows overriding default behavior / hooks. In this way, adding new available information would not break backwards compatibility.

Isn't it possible to do this already with the "extra" attribute of the dict attached to each log record? It doesn't have to contain strings, it can accept any type of object. From inside your sink, you can access the record via message.record and find your additional object.

I have trouble understanding the true added value of the functionality you suggest. It seems very similar to what already exists, and it would be confusing to have different ways of doing the same thing. For more complex cases, I think it's better to create helpers classes that work adjacent to Loguru.

On the test-related note: Do you think you'd be interested in a PR that modified Loguru to additionally and purely optionally support a "no global state" model, where you could do something like

from loguru import Loguru

def main() -> None:
    loguru = Loguru()
    logger = loguru.logger

    some_func(logger)

    # if we for some reason wanted another completely independent Loguru, we could do it:
    loguru2 = Loguru()
    ...

? On my side, this would at least make testing easier, since I also want to test logging functionality. For most software I'd assume a singleton/global state logging facility to be the best solution, but sometimes something else would be useful.

I actually plan to add a logger.new() method that would do just that: return a completely independent Logger instance. Thanks for sharing your use case. You're right that for testing purposes it would be better than having a global variable.

Delgan avatar Sep 22 '23 09:09 Delgan

Isn't it possible to do this already with the "extra" attribute of the dict attached to each log record? It doesn't have to contain strings, it can accept any type of object. From inside your sink, you can access the record via message.record and find your additional object.

I clearly should have written this RFC earlier. :) I remember playing with that and rejecting it for some reason, and now I'm digging my memory. So I think my reasoning may be wrong, and I of course also may have missed something that would help me do it.

Looking at the documentation, there's two methods, bind() and contextualize(). In some sense, I think contextualize() is very close to what I want for the whole nested context; the problem is that there's no API that allows access to the context, making creating the nested context (instead of replacing it altogether) it impossible without maintaining a separate ContextVar outside Loguru. And if it's maintained outside Loguru, then there's presumably little reason to not use it directly in the handler instead of the extra object.

In other words, I'd love to do:


logger: loguru.Logger

@contextlib.contextmanager
def nested_section(name: str) -> Iterable[None]:
    old_context = logger.get_current_context("ctx")
    new_context = Context(name=name, inner=old_context)
    with logger.contextualize(ctx=new_context):
        yield

or perhaps something like:

def _nest_context(old: Context, name: str) -> Context:
    return Context(name=name, inner=old)

with logger.contextualize_with("ctx", _nest_context):
    # contextualize_with would call the given function. I think it may be too magic and functional programming for most people.
    ...

except that there is no logger.get_current_context() or anything similar, which I think makes the extra attribute not useful for this use case.

Logger.bind() does something quite reasonable, but then it needs to be passed around to all code instead of using a global logger in the same style as contextualize(). This is not bad in itself and possibly should be done, but it's quite verbose to pass it to all possible helper functions.

I actually plan to add a logger.new() method that would do just that: return a completely independent Logger instance. Thanks for sharing your use case. You're right that for testing purposes it would be better than having a global variable.

That sounds cool, thanks :)

sliedes avatar Sep 22 '23 12:09 sliedes

Hey @sliedes. Sorry for the late answer.

I believe that adding such a feature to Loguru might be too unconventional. As briefly mentioned in a previous comment, this aligns more with the core philosophy of the eliot library, which might be more appropriate for your use case. Loguru is not ideally suited for the kind of functionality you're proposing, and I think additional nesting through a context manager is too cumbersome to be very appealing.

However, for the sake of completeness, I'd like to offer a potential workaround. It might be possible to achieve the desired outcome by creating a custom context manager and patching the logger. Here's a rough example of how this could be implemented:

from loguru import logger
import sys
from contextlib import contextmanager
from contextvars import ContextVar

context = ContextVar("context", default=None)

@contextmanager
def nested_section(name):
    current_context = context.get()
    new_context = {"name": name, "inner": current_context}
    context.set(new_context)
    yield
    context.set(current_context)

def add_context(record):
    record["extra"]["context"] = context.get()

def foo():
    logger.info("In foo")

def bar():
    with nested_section("Part 1"):
        foo()
    logger.info("In bar")
    with nested_section("Part 2"):
        foo()
        with nested_section("Part 3"):
            logger.info("End bar")

if __name__ == "__main__":
    logger.configure(
        handlers=[{"sink": sys.stderr, "format": "{message} (context={extra[context]})"}],
        patcher=add_context,
    )

    bar()

Delgan avatar Oct 29 '23 15:10 Delgan

Thanks anyway for the workaround and pointing me towards Eliot!

sliedes avatar Oct 29 '23 19:10 sliedes