loguru icon indicating copy to clipboard operation
loguru copied to clipboard

Feature request: handling Fluent Bit partial messages

Open mrkmcnamee opened this issue 2 years ago • 8 comments

Hi @Delgan,

there is a known problem with very large log entries (>16KB) being split into parts by Fluent Bit as discussed here: https://docs.fluentbit.io/manual/pipeline/filters/multiline-stacktrace#docker-partial-message-use-case https://github.com/aws/aws-for-fluent-bit/issues/25

I am also running into this problem with the result that the log record is chopped into (at least) two log entries each containing a broken JSON structure. Here are two raw messages sent by Fluent bit to Datadog with the partial_message field set to true:

{
	"id": "XXXXXXXXXXXXXXXXEwAAAAAAAAAYAAAAAEFZdFpJOUR3XXXXXXXXXXOGEwZGFk",
	"content": {
		"timestamp": "2023-10-22T20:46:40.049Z",
		"tags": [
			"cluster_name:XXXXXXXX",
			"region:us-east-1",
			"task_family:XXXXXX-td",
			"container_name:XXXXXXXX",
			"task_arn:XXXXXX",
			"source:python",
			"container_id:XXXXXXXXXX",
			"task_version:2",
			"datadog.submission_auth:private_api_key"
		],
		"service": "XXXXXXX",
		"message": "{\"text\": \"An exception occurred while executing XXXXXXXX with ID XXXXXX\", \"record\": {\"elapsed\": 
{\"repr\": \"4:37:51.681603\", \"seconds\": 16671.681603}, \"exception\": {\"type\": \"TimeoutError\", \"value\": \"QueuePool limit of 
size 10 overflow 10 reached, connection timed out, timeout 10.00 (Background on this error at: https://sqlalche.me/e/14/3o7r)\", 
\"traceback\": true, \"exc_info\": \"Traceback (most recent call last):\\n\\n  File \\\"/usr/local/lib/python3.11/site-packages/celery
/app/trace.py\\\", line 449, in trace_task\\n    task_before_start(uuid, args, kwargs)\\n    \\u2502                 \\u2502     

		8<---- snip --->8

		File \\\"/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py\\\", line 3245, in raw_connection\\n    
return self._wrap_pool_connect(self.pool.connect, _connection)\\n           \\u2502    \\u2502                  \\u2502    \\u2502   
 \\u2502        \\u2514 None\\n           \\u2502    \\u2502                  \\u2502    \\u2502    \\u2514 <function Pool.connect at
 0x7f74c3213380>\\n           \\u2502    \\u2502                  \\u2502    \\u2514 <sqlalchemy.pool.impl.QueuePool object at 
0x7f74b4433190>\\n",
		"attributes": {
			"partial_message": "true",
			"partial_last": "false",
			"service": "XXXXXXX",
			"partial_id": "XXXXXXXXX",
			"source": "stderr",
			"partial_ordinal": "1",
			"timestamp": 1698007600049
		}
	}
}

and

{
	"id": "XXXXXXXXXXXXXXXXEwAAAAAAAAAYAAAAAEFZdFpJOUR3XXXXXXXXXXOGEwZGFk",
	"content": {
		"timestamp": "2023-10-22T20:46:40.049Z",
		"tags": [
			"cluster_name:XXXXXXXX",
			"region:us-east-1",
			"task_family:XXXXXX-td",
			"container_name:XXXXXXXX",
			"task_arn:XXXXXX",
			"source:python",
			"container_id:XXXXXXXXXX",
			"task_version:2",
			"datadog.submission_auth:private_api_key"
		],
		"service": "XXXXXX",
		"message": "           \\u2502    \\u2502                  \\u2514 Engine(postgresql+pg8000://postgres:***@XXXXXXX/XXXXXX)\\n
           \\u2502    \\u2514 <function Engine._wrap_pool_connect at 0x7f74c33fd300>\\n           \\u2514 
Engine(postgresql+pg8000://postgres:***@XXXXXX/XXXXX)\\n  File \\\"/usr/local/lib/python3.11/site-packages/sqlalchemy/
engine/base.py\\\", line 3212, in _wrap_pool_connect\\n    return fn()\\n           \\u2514 <bound method Pool.connect of 
<sqlalchemy.pool.impl.QueuePool object at 0x7f74b4433190>>\\n  File \\\"/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/base.py\\\", line 307, in connect\\n    return 
		
		8<---- snip --->8
		
		\"name\": \"ERROR\", \"no\": 40}, \"message\": \"An exception occurred while executing task XXXXXXX with 
ID XXXXXXXX\", \"module\": \"XXXXXX\", \"name\": \"app.XXXXXX\", \"process\": {\"id\": XX, \"name\": \"MainProcess\"}, 
\"thread\": {\"id\": XXXXXXX, \"name\": \"Dummy-861\"}, \"time\": {\"repr\": \"2023-10-22 20:46:32.899637+00:00\", 
\"timestamp\": 1698007592.899637}}, \"levelname\": \"ERROR\"}",
		"attributes": {
			"partial_message": "true",
			"partial_last": "true",
			"service": "XXXXXXX",
			"partial_id": "XXXXXXXXX",
			"source": "stderr",
			"partial_ordinal": "2",
			"timestamp": 1698007600049
		}
	}
}

This results in two log entries in Datadog which Datadog cannot parse to find the metadata since the JSON structure is broken. Both messages are registered as INFO messages and no alarm is generated even though levelname is set to ERROR in the first message. This is a problem that I think could be neatly solved by the logging library itself rather than the log driver (Fluent bit).

My suggestion is that the logging configuration accepts a parameter to allow log messages to be split into multiple parts before emitting them so that the JSON structure will always survives the journey to the logging tool. The first partial log message would have the correct log level and subsequent parts would be set to INFO. For example:

logger.configure(handlers=[{"max_size": 16000}])

The Fluent Bit metadata could be added inside the log record by Loguru to provide the same information: partial_message, partial_last, partial_ordinal.

Cheers, Kevin

mrkmcnamee avatar Oct 23 '23 09:10 mrkmcnamee

Hmm, it seems there is a solution for this in Fluent Bit but it requires building a custom Fluent Bit image. https://github.com/aws-samples/amazon-ecs-firelens-examples/tree/mainline/examples/fluent-bit/filter-multiline-partial-message-mode

mrkmcnamee avatar Oct 24 '23 06:10 mrkmcnamee

I will reopen the issue to get an opinion on this.

While a custom built Fluent Bit image can solve the problem, it seems like a lot of work, and in some cases could be a non-starter for some teams. For my own part, I could build a custom image, but then I am forced into maintaining yet another project which I am reluctant to do.

That Loguru (or any logging library) could be configured to adapt to the available infrastructure and size messages appropriately seems like a good idea.

mrkmcnamee avatar Oct 26 '23 11:10 mrkmcnamee

Hi @mrkmcnamee.

Thanks for providing detailed insight into the issue you're facing with Fluent Bit and Loguru.

I understand the problem you're encountering, but I am not inclined to incorporate such kind of workaround into Loguru. The 16KB size limitation is highly specific to Fluent Bit, and I believe it is beyond the scope of Loguru's responsibilities. Loguru is designed to provide a versatile and customizable API, allowing you to construct specialized handlers tailored to your specific development environment and external framework constraints.

I'm not sure of the output you would expect, anyway. Wouldn't that be possible to write a custom sink in charge of splitting the logs?

Delgan avatar Oct 29 '23 10:10 Delgan

Point taken, and given that it is a specific use case then writing a custom sink would be a good solution. I am not sure if this is stated explicitly in the documentation, but can a sink emit two log messages at a time?

mrkmcnamee avatar Oct 29 '23 10:10 mrkmcnamee

Sinks don't emit logs, they only receive them (one by one) and write them to an endpoint.

Here is a fictive example:

def my_sink(message):
    parts = split_message(message) 
    for part in parts:
        sys.stderr.write(part + "\n")

logger.add(my_sink, serialize=True)

Delgan avatar Oct 29 '23 10:10 Delgan

Right, that's perfect, thanks.

mrkmcnamee avatar Oct 29 '23 11:10 mrkmcnamee

If you want to rely on some of the built-in features provided by a file sink, you can also implement the splitting procedure in a custom formatter, like the following:

def my_formatter(record):
    parts = split_record(record)
    formatted_parts = "\n".join(str(part) for part in parts)
    record["extra"]["formatted_parts"] = formatted_parts
    return "{formatted_parts}\n"

logger.add("file.log", formatter=my_formatter, rotation="00:00")

Delgan avatar Oct 29 '23 11:10 Delgan

I solved this by simply truncating the stack trace. The useful information was always found in the first part anyway. So the sink looks like this now:

import json
import logging
import sys
from loguru import logger


def datadog_sink(message):
    message_json = json.loads(message)

    # First move the stack trace to the exception dictionary:
    if message_json["record"]["exception"]:
        message_json["record"]["exception"]["exc_info"] = message_json["text"]
    message_json["text"] = message_json["record"]["message"]

    # Then truncate the stack trace
    if message_json["record"]["exception"]:
        maximum_exc_info_length = 15000
        if len(message_json["record"]["exception"]["exc_info"]) > maximum_exc_info_length:
            message_json["record"]["exception"]["exc_info"] = message_json["record"]["exception"]["exc_info"][:maximum_exc_info_length] + "... (traceback truncated)"

    serialized = json.dumps(message_json, default=str)

    print(serialized, file=sys.stdout, flush=True)


logger.configure(
    handlers=[
        {
            "sink": datadog_sink, 
            "format": lambda _: "{exception}",
            "level": logging.INFO,
            "serialize": True,
        }
    ]
)

mrkmcnamee avatar Dec 06 '23 15:12 mrkmcnamee