structlog icon indicating copy to clipboard operation
structlog copied to clipboard

Example of passing context variables to worker threads

Open bruce-szalwinski opened this issue 3 years ago • 3 comments

This is a follow up to https://github.com/hynek/structlog/issues/297. I put together a small example of how to pass context variables to workers in a pool. It uses pathos to create a ThreadPool. The current context variables are captured from the threadlocal._get_context() and passed to the worker(). Without this, the workers do not receive any context from their parent.

from functools import partial

import pytest
import structlog
from pathos.threading import ThreadPool
from structlog.testing import LogCapture
from structlog.threadlocal import bind_threadlocal


@pytest.fixture(name="log_capture")
def log_capture():
    return LogCapture()


@pytest.fixture(autouse=True)
def configure_structlog(log_capture):
    structlog.configure(processors=[
        structlog.threadlocal.merge_threadlocal,
        log_capture,
    ])


def worker(ctx: dict, stage):
    logger = structlog.get_logger(__name__)
    bind_threadlocal(**ctx)
    logger.info("worker", stage=stage)


def structlog_with_threadpool():

    ctx = structlog.threadlocal._get_context()

    func = partial(worker, ctx)
    stage = "12345"
    with ThreadPool() as pool:
        return list(
            pool.map(
                func,
                stage
            )
        )


def test_structlog_with_threadpool(log_capture):
    _expected = [
        {"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "1"},
        {"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "2"},
        {"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "3"},
        {"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "4"},
        {"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "5"}
    ]
    bind_threadlocal(threadvar="my-thread-var")
    structlog_with_threadpool()

    actual = sorted(log_capture.entries, key=lambda ele: sorted(ele.items()))
    expected = sorted(_expected, key=lambda ele: sorted(ele.items()))

    assert actual == expected

bruce-szalwinski avatar Jun 27 '22 20:06 bruce-szalwinski

Checking to make sure that the call to the private method, _get_context() is still the best way to go.

bruce-szalwinski avatar Jun 27 '22 21:06 bruce-szalwinski

Have you tried all of this with contextvars? structlog.threadlocal will be deprecated as of next release (not going away for now tho).

hynek avatar Jun 28 '22 08:06 hynek

Updated to use contextvars and now includes two tests. One where the context vars are bound from the parent and one where they are not. Both tests are passing.

from functools import partial

import pytest
import structlog
from pathos.threading import ThreadPool
from structlog.contextvars import bind_contextvars
from structlog.testing import LogCapture


@pytest.fixture(name="log_capture")
def log_capture():
    return LogCapture()


@pytest.fixture(autouse=True)
def configure_structlog(log_capture):
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            log_capture,
        ]
    )


def upsert_with_bind(ctx: dict, stage):
    logger = structlog.get_logger(__name__)
    bind_contextvars(**ctx)
    logger.info("upsert", stage=stage)


def upsert_without_bind(ctx: dict, stage):
    logger = structlog.get_logger(__name__)
    logger.info("upsert", stage=stage)


def structlog_with_threadpool(f):
    ctx = structlog.contextvars.get_contextvars()
    func = partial(f, ctx)
    stage = "12345"
    with ThreadPool() as pool:
        return list(pool.map(func, stage))


def test_structlog_with_bind(log_capture):
    _expected = [
        {
            "threadvar": "my-thread-var",
            "event": "upsert",
            "log_level": "info",
            "stage": "1",
        },
        {
            "threadvar": "my-thread-var",
            "event": "upsert",
            "log_level": "info",
            "stage": "2",
        },
        {
            "threadvar": "my-thread-var",
            "event": "upsert",
            "log_level": "info",
            "stage": "3",
        },
        {
            "threadvar": "my-thread-var",
            "event": "upsert",
            "log_level": "info",
            "stage": "4",
        },
        {
            "threadvar": "my-thread-var",
            "event": "upsert",
            "log_level": "info",
            "stage": "5",
        },
    ]
    bind_contextvars(threadvar="my-thread-var")
    structlog_with_threadpool(upsert_with_bind)

    actual = sorted(log_capture.entries, key=lambda ele: sorted(ele.items()))
    expected = sorted(_expected, key=lambda ele: sorted(ele.items()))

    assert actual == expected


def test_structlog_without_bind(log_capture):
    _expected = [
        {
            "event": "upsert",
            "log_level": "info",
            "stage": "1",
        },
        {
            "event": "upsert",
            "log_level": "info",
            "stage": "2",
        },
        {
            "event": "upsert",
            "log_level": "info",
            "stage": "3",
        },
        {
            "event": "upsert",
            "log_level": "info",
            "stage": "4",
        },
        {
            "event": "upsert",
            "log_level": "info",
            "stage": "5",
        },
    ]
    bind_contextvars(threadvar="my-thread-var")
    structlog_with_threadpool(upsert_without_bind)

    actual = sorted(log_capture.entries, key=lambda ele: sorted(ele.items()))
    expected = sorted(_expected, key=lambda ele: sorted(ele.items()))

    assert actual == expected

bruce-szalwinski avatar Jun 29 '22 03:06 bruce-szalwinski

Would you mind to contribute it sans tests to https://github.com/hynek/structlog/blob/main/docs/recipes.md? You could link here, if people want the tests too.

hynek avatar Oct 15 '22 08:10 hynek

Looking closer, have you tried a construct using contextvars.copy_context().run() like we use in async loggers? E.g:

https://github.com/hynek/structlog/blob/6b06c062dbcd166bfba0da407e9af63432bedea2/src/structlog/_log_levels.py#L158-L165

That should pass the context into the new thread too, right?

hynek avatar Nov 02 '22 09:11 hynek