structlog
structlog copied to clipboard
Example of passing context variables to worker threads
This is a follow up to https://github.com/hynek/structlog/issues/297. I put together a small example of how to pass context variables to workers in a pool. It uses pathos to create a ThreadPool. The current context variables are captured from the threadlocal._get_context() and passed to the worker(). Without this, the workers do not receive any context from their parent.
from functools import partial
import pytest
import structlog
from pathos.threading import ThreadPool
from structlog.testing import LogCapture
from structlog.threadlocal import bind_threadlocal
@pytest.fixture(name="log_capture")
def log_capture():
return LogCapture()
@pytest.fixture(autouse=True)
def configure_structlog(log_capture):
structlog.configure(processors=[
structlog.threadlocal.merge_threadlocal,
log_capture,
])
def worker(ctx: dict, stage):
logger = structlog.get_logger(__name__)
bind_threadlocal(**ctx)
logger.info("worker", stage=stage)
def structlog_with_threadpool():
ctx = structlog.threadlocal._get_context()
func = partial(worker, ctx)
stage = "12345"
with ThreadPool() as pool:
return list(
pool.map(
func,
stage
)
)
def test_structlog_with_threadpool(log_capture):
_expected = [
{"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "1"},
{"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "2"},
{"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "3"},
{"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "4"},
{"threadvar": "my-thread-var", "event": "worker", "log_level": "info", "stage": "5"}
]
bind_threadlocal(threadvar="my-thread-var")
structlog_with_threadpool()
actual = sorted(log_capture.entries, key=lambda ele: sorted(ele.items()))
expected = sorted(_expected, key=lambda ele: sorted(ele.items()))
assert actual == expected
Checking to make sure that the call to the private method, _get_context() is still the best way to go.
Have you tried all of this with contextvars? structlog.threadlocal will be deprecated as of next release (not going away for now tho).
Updated to use contextvars and now includes two tests. One where the context vars are bound from the parent and one where they are not. Both tests are passing.
from functools import partial
import pytest
import structlog
from pathos.threading import ThreadPool
from structlog.contextvars import bind_contextvars
from structlog.testing import LogCapture
@pytest.fixture(name="log_capture")
def log_capture():
return LogCapture()
@pytest.fixture(autouse=True)
def configure_structlog(log_capture):
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
log_capture,
]
)
def upsert_with_bind(ctx: dict, stage):
logger = structlog.get_logger(__name__)
bind_contextvars(**ctx)
logger.info("upsert", stage=stage)
def upsert_without_bind(ctx: dict, stage):
logger = structlog.get_logger(__name__)
logger.info("upsert", stage=stage)
def structlog_with_threadpool(f):
ctx = structlog.contextvars.get_contextvars()
func = partial(f, ctx)
stage = "12345"
with ThreadPool() as pool:
return list(pool.map(func, stage))
def test_structlog_with_bind(log_capture):
_expected = [
{
"threadvar": "my-thread-var",
"event": "upsert",
"log_level": "info",
"stage": "1",
},
{
"threadvar": "my-thread-var",
"event": "upsert",
"log_level": "info",
"stage": "2",
},
{
"threadvar": "my-thread-var",
"event": "upsert",
"log_level": "info",
"stage": "3",
},
{
"threadvar": "my-thread-var",
"event": "upsert",
"log_level": "info",
"stage": "4",
},
{
"threadvar": "my-thread-var",
"event": "upsert",
"log_level": "info",
"stage": "5",
},
]
bind_contextvars(threadvar="my-thread-var")
structlog_with_threadpool(upsert_with_bind)
actual = sorted(log_capture.entries, key=lambda ele: sorted(ele.items()))
expected = sorted(_expected, key=lambda ele: sorted(ele.items()))
assert actual == expected
def test_structlog_without_bind(log_capture):
_expected = [
{
"event": "upsert",
"log_level": "info",
"stage": "1",
},
{
"event": "upsert",
"log_level": "info",
"stage": "2",
},
{
"event": "upsert",
"log_level": "info",
"stage": "3",
},
{
"event": "upsert",
"log_level": "info",
"stage": "4",
},
{
"event": "upsert",
"log_level": "info",
"stage": "5",
},
]
bind_contextvars(threadvar="my-thread-var")
structlog_with_threadpool(upsert_without_bind)
actual = sorted(log_capture.entries, key=lambda ele: sorted(ele.items()))
expected = sorted(_expected, key=lambda ele: sorted(ele.items()))
assert actual == expected
Would you mind to contribute it sans tests to https://github.com/hynek/structlog/blob/main/docs/recipes.md? You could link here, if people want the tests too.
Looking closer, have you tried a construct using contextvars.copy_context().run() like we use in async loggers? E.g:
https://github.com/hynek/structlog/blob/6b06c062dbcd166bfba0da407e9af63432bedea2/src/structlog/_log_levels.py#L158-L165
That should pass the context into the new thread too, right?