opentelemetry-python
opentelemetry-python copied to clipboard
Clarification: Implementation of custom LogRecordProcessor's
Hey folks :)
I've been eyeballing this project and the otel specification for months, since sooner or later we want to replace our own telemetry sdks (build on tools like structlog and pydantic) in our distributed cloud MLOps platform with otel. I feel this is the right thing to do.
I know the logs sdk is still not stable, but I have started to do hands on experiments nonetheless, by recreating the otel provided docker compose stack with my own apps to get a feeling on how logs, metrics and traces are defined and utilised by otel. This helps me to already layout the roadmap for migrating to otel and prepare all user stories for this epic.
One thing that we do not want to change when moving to otel, is the way how our structured logs are defined, as that has proven of great value across all components of our platform (restful services, lambda functions, containerised apps, batch processing and training jobs, scheduler environments, realtime inference endpoints, etc.).
We currently model our log messages (or rather the log bodies) using pydantic, and emit them by passing instantiated pydantic models to our logging sdk, somewhat similar to this:
from pydantic import BaseModel
class JobInitialized(BaseModel):
configuration: dict[str, str]
logger.info(JobInitialized(configuration={"foo": "bar"}))
This way of logging is established across all platform components and has been adopted by all people involved (Data Scientists, Data Engineers, etc.). It enables a very high degree of consistency and allows us to do advanced operations, like sending all of our logs to a data catalogue and query it via sql (to mention one use case).
After some hours of reading the docs, reverse engineering the logs sdk and iterating over my own docker compose stack, I feel like the correct approach to add support for pydantic based log messages, is using a LogRecordProcessor.
The following snipped roughly demonstrates what we might want to do:
import logging
from uuid import uuid4
from opentelemetry._logs import set_logger_provider
from opentelemetry.attributes import BoundedAttributes
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs import LoggingHandler
from opentelemetry.sdk._logs import LogRecordProcessor
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from pydantic import BaseModel
class PydanticLogProcessor(LogRecordProcessor):
def emit(self, log_data: LogData) -> LogData:
"""open question: is the return type annotation correct? mypy at least does not complain about it"""
body = log_data.log_record.body
attributes = log_data.log_record.attributes
if isinstance(body, BaseModel):
# serialize the model to a json serializable dict
log_data.log_record.body = body.model_dump(mode="json")
if isinstance(attributes, BoundedAttributes):
# attach additional derived attributes
updated_attributes = attributes.copy()
updated_attributes.update({"log.uid": str(uuid4()), "log.type": body.__class__.__name__})
log_data.log_record.attributes = BoundedAttributes(attributes=updated_attributes)
return log_data
def shutdown(self) -> None:
"""open question: clarify usage of shutdown on processors which only emit to the logging pipeline w/o export"""
pass
def force_flush(self, timeout_millis: int = 30000) -> None:
"""open question: clarify usage of force_flush on processors which only emit to the logging pipeline w/o export"""
pass
logger_provider = LoggerProvider(
resource=Resource.create(attributes={"service.namespace": "foo", "service.name": "bar"})
)
set_logger_provider(logger_provider)
processor_chain = [
PydanticLogProcessor(),
BatchLogRecordProcessor(OTLPLogExporter(insecure=True)),
]
for processor in processor_chain:
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=logging.DEBUG, logger_provider=logger_provider)
# Attach OTLP handler to root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(handler)
class JobInitialized(BaseModel):
configuration: dict[str, str]
if __name__ == "__main__":
logger = logging.getLogger("app")
logger.info(JobInitialized(configuration={"job_name": "foo", "input": "foo/lorem", "output": "foo/ipsum"}))
If I spin up a local otel collector (similar to how it is configured by the logs sample in this repo), I do see an output like I would expect it:
Body: Map({"configuration":{"input":"foo/lorem","job_name":"foo","output":"foo/ipsum"}})
Attributes:
-> code.filepath: Str(/foo/bar/simple.py)
-> code.function: Str(<module>)
-> code.lineno: Int(65)
-> log.uid: Str(ce25f96f-325c-43b1-9f76-daea04fc54df)
-> log.type: Str(JobInitialized)
This is great, but I do have some open questions:
- is a
LogRecordProcessorthe right weapon of choice to accomplish this? (as of now, I do not see thatlogging.Formattersare already supported, and honestly, using aLogRecordProcessoralso feels a lot cleaner then messing with the untyped interfaces of formatters 🙄 😬 ) - is the return type annotation for above processor defined correctly? (the base
LogRecordProcessordoes not have a return type annotation at all in it's emit method) - are the
shutdownandforce_flushmethods setup correctly for a log processors whose only purpose is to do some modifications to the log data and emit it to the pipeline?
Cheers and kind regards 🚀
@ddluke
This looks very promising. Apologies for the late response.
is a LogRecordProcessor the right weapon of choice to accomplish this? (as of now, I do not see that logging.Formatters are already supported, and honestly, using a LogRecordProcessor also feels a lot cleaner then messing with the untyped interfaces of formatters 🙄 😬 )
This certainly looks like an acceptable way to achieve this behavior. Another way is to natively support logs that are built on pydantic in the Otel LoggingHandler itself. This can either be added to the already existing LoggingHandler or we can create a new handler every time we want to support other logging formats. The advantage with your proposal is that it is much more modularized and the LogRecordProcessor can be shipped as a separate component. This can also be seen as a "disadvantage" if users would not want to add any more components to their code and just want structured logs to be supported out of the box.
is the return type annotation for above processor defined correctly? (the base LogRecordProcessor does not have a return type annotation at all in it's emit method)
We are working on adding typing to our sdk. The emit method does not return anything. See this as well for a simple example. Processors are usually used to perform some sort of pre/post operation on the telemetry data.
are the shutdown and force_flush methods setup correctly for a log processors whose only purpose is to do some modifications to the log data and emit it to the pipeline?
Yes this looks right to me.
Related feature request thread on adding structlog handler.