loguru
loguru copied to clipboard
Logging error in Loguru Handler
I'm experiencing the following error, after some attempts and searching in previous issues I haven't found a solution yet:
--- Logging error in Loguru Handler #2 ---
Record was: {'elapsed': datetime.timedelta(seconds=11, microseconds=363235), 'exception': None, 'extra': {}, 'file': (name='logger.py', path='/Users/foo/code/foo/bar/foo/app/logger.py'), 'function': 'emit', 'level': (name='INFO', no=20, icon='ℹ️'), 'line': 40, 'message': 'Cleaning created processes.', 'module': 'logger', 'name': 'logger', 'process': (id=21421, name='MainProcess'), 'thread': (id=8544645696, name='MainThread'), 'time': datetime(2024, 10, 14, 12, 54, 53, 251057, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200), 'CEST'))}
Traceback (most recent call last):
File "/Users/foo/Library/Caches/pypoetry/virtualenvs/foo-py3.12/lib/python3.12/site-packages/loguru/_handler.py", line 206, in emit
self._sink.write(str_record)
File "/Users/foo/Library/Caches/pypoetry/virtualenvs/foo-py3.12/lib/python3.12/site-packages/loguru/_simple_sinks.py", line 16, in write
self._stream.write(message)
ValueError: I/O operation on closed file.
--- End of logging error ---
My logger.py
file:
import logging
import sys
from loguru import logger
from models import (
FOOExceptionEvent,
Event,
PromptEvent,
Reason,
Service,
)
from opencensus.ext.azure.log_exporter import AzureEventHandler
from settings import settings
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = logging.currentframe(), 0
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
def setup_logging(level: str = settings().log_level.upper()) -> None:
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("hpack").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("httpcore.connection").setLevel(logging.WARNING)
logging.getLogger("httpcore.http11").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING)
logging.getLogger("azure.identity").setLevel(logging.WARNING)
logging.captureWarnings(True)
logging.basicConfig(handlers=[InterceptHandler()], level=logging.NOTSET, force=True)
logging.getLogger("azure.monitor.opentelemetry.exporter.export").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").handlers = [InterceptHandler()]
for _log in [
"uvicorn",
"fastapi",
]:
_logger = logging.getLogger(_log)
_logger.handlers = [InterceptHandler()]
if settings().azure_application_insights_active:
azure_event_handler = AzureEventHandler(connection_string=settings().azure_opentelemetry_connection_string)
for event_type in (PromptEvent, FOOExceptionEvent):
event_logger = logging.getLogger(event_type.get_name())
event_logger.addHandler(azure_event_handler)
event_logger.setLevel(logging.INFO)
log_event(PromptEvent("initial", vehicle_id="vehicle"))
log_event(FOOExceptionEvent(Service.AZURE_OPENAI, Reason.UNAVAILABLE))
logger.remove()
logger.add(sink=sys.stdout, level=level, format=settings().log_format)
def log_event(event: Event) -> None:
if settings().azure_application_insights_active:
event_logger = logging.getLogger(event.get_name())
event_logger.info(event.__class__.__name__, extra=event.get_custom_dimensions())
I'd be grateful for any hints that point me in the right direction, thanks!