[BUG] Processing A Nested List as Individual Log Events
Describe the bug It could potentially possible to do this however I have not been able to find anything in documentation that covers it.
If you have CloudWatch Logs -> Data Firehose -> S3 and want to pull that into DataPrepper it brings in the multi line event.
The structure seems to be like so:
{ "messageType": "DATA_MESSAGE", "owner": "123456789", "logGroup": "foo", "logStream": "bar", "logEvents": [{"id": "123456", "message": "some log message here", "timestamp" 1727880215114}, {"id": "789102", "message": "another log message here", "timestamp" 1727880215114}, {"id": "99999", "message": "yet another log message here", "timestamp" 1727880215114}]}
What I was hoping to do was use DataPrepper to read in the log message from S3 (that is like above) and then parse out the "logEvents" and treat each entry as an individual log message to publish to S3 & OpenSearch alike.
S3 being it will allow me to create a neat structure of a prefix with accountid/log-group/YYYY/MM/DD/HH
However I am not sure that it is possible to extract logEvents dictionary that contains a list of arrays and treat them as separate events.
To Reproduce Steps to reproduce the behavior:
- Create fake log event like above.
- Write to DataPrepper
- Try to parse
Expected behavior I was expecting a feature within DataPrepper to support something like so:
processor:
- parse_json:
- split_string:
entries:
- source: "/logEvents[0]"
delimiter: ","
Environment (please complete the following information):
- OS: macOSX
Additional context AWS Managed OpenSearch & AWS Managed OSIS is being used. I setup a local container deployment to expedite testing and still see the same issue.
Hey there, we also stumbled upon this issue and found it odd that you can not split simple json arrays into multiple documents.
I worked around this by using the following hack:
- parse the json key logEvents
- stringify the key again
- manipulate this stringified key: a) remove the Array characters in the beginning b) remove the Array character at the end c) Insert a unique delimiter character
- split event by this delimiter character
process these split events by a chained pipeline.
I am on mobile so my apologies for the formatting:
preprocess_pipeline: sink s3 processor:
- parse_json: pointer: “logEvents”
- write_json: source: “logEvents” target: “logEventsJson”
- substitute_string: entries: -source: “logEventsJson” from: “\[\{\\”id” to: “{\\”id”
- substitute_string: entries: -source: “logEventsJson” from: “n\”\}\]” to: “n\”\}”
- substitute_string: entries: -source: “logEventsJson” from: “\},\{\\”id” to: “}␟{\\”id”
- split_event:
field: logEventsJson
delimiter_regex: “␟”
sink:
- pipeline: name: message_process_pipeline
message_process_pipeline src: pipeline: name: preprocess_pipeline processor: - parse_json: source: “logEventsJson” sink:…
We asked for the AWS service team to support splitting json arrays into multiple docs, but this hack seems to work for us now.
Edit: formatting seems to remove the backslashes, will try to adjust formatting but I hope you get the idea
@JunChatani Nice workaround! Thanks!
We asked for the AWS service team to support splitting json arrays into multiple docs, but this hack seems to work for us now.
I agree that split_event should support splitting event on json arrays. Have you opened an github issue for this already? If not, we can use this issue to track.
I haven’t opened an issue yet, perhaps this one can be used to track it then.
@Conklin-Spencer-bah ,
To clarify, would you want this output?
{ "messageType": "DATA_MESSAGE", "owner": "123456789", "logGroup": "foo", "logStream": "bar", "logEvents": {"id": "789102", "message": "another log message here", "timestamp" 1727880215114}}
and
{ "messageType": "DATA_MESSAGE", "owner": "123456789", "logGroup": "foo", "logStream": "bar", "logEvents": {"id": "123456", "message": "some log message here", "timestamp" 1727880215114}}
and
{ "messageType": "DATA_MESSAGE", "owner": "123456789", "logGroup": "foo", "logStream": "bar", "logEvents": {"id": "99999", "message": "yet another log message here", "timestamp" 1727880215114}}
you got it, this is particularly useful in cases where you have log messages sent from CloudWatch -> Firehose -> S3.
The object in S3 is stored like I initially put. You can easily emulate this by subscribing a firehose to a cwl group and sending the results to S3.
By implementing this enhancement it would actually resolve the limitation that is noted on the CloudWatch log ingestion today (which suggests directly streaming logs to OpenSearch instead of using Firehose). See here.
"Currently, Firehose does not support the delivery of CloudWatch Logs to Amazon OpenSearch Service destination because Amazon CloudWatch combines multiple log events into one Firehose record and Amazon OpenSearch Service cannot accept multiple log events in one record."
Did you want to do this because OpensearchDashboards cannot visualize nested fields?
I stumbled upon this while trying to resolve a very similar limitation:
- AWS Config Reporter write an array of tracked resources into S3 in the form of
{ "configurationItems": [ { ... }, { ... }, { ... } ]} - OpenSearch Dashboard, despite having nested mapping defined, doesn't work very well (if at all, I digress) with nested fields
In my mind, which is a bit different from OP's suggestion - it would be very helpful if split-event supports splitting an event when the field is pointed directly at a JSONarray or object, making delimiter or delimiter_regex optional for string data type .
My mock configuration:
split-event-pipeline:
source:
http:
processor:
- split_event:
field: configurationItems
sink:
- stdout:
When an event contains the following example input:
{
"fileVersion": "1.0",
"configurationItems": [
{
"relatedEvents": [],
"relationships": [],
"supplementaryConfiguration": {},
"tags": {},
"configurationItemVersion": "1.3",
"configurationItemCaptureTime": "2025-04-13T08:52:10.505Z",
"configurationStateId": 987654321987654,
"awsAccountId": "123456789123",
"configurationItemStatus": "ResourceDeleted",
"resourceType": "AWS::IAM::Policy",
"resourceId": "ANPAASDFADSFASDFA",
"resourceName": "somethingsomething0",
"awsRegion": "global",
"configurationStateMd5Hash": "",
"configurationItemDeliveryTime": "2025-04-13T13:04:38.679Z",
"recordingFrequency": "DAILY"
},
{
"relatedEvents": [],
"relationships": [],
"supplementaryConfiguration": {},
"tags": {},
"configurationItemVersion": "1.3",
"configurationItemCaptureTime": "2025-04-13T08:52:10.451Z",
"configurationStateId": 987654321987654,
"awsAccountId": "123456789123",
"configurationItemStatus": "ResourceDeleted",
"resourceType": "AWS::IAM::Policy",
"resourceId": "ANPAASDFADSFASDFB",
"resourceName": "somethingsomething1",
"awsRegion": "global",
"configurationStateMd5Hash": "",
"configurationItemDeliveryTime": "2025-04-13T13:04:37.946Z",
"recordingFrequency": "DAILY"
}
}
The input will be split into multiple events based on the field, despite neither delimiter is set, as shown in the example below:
{ "fileVersion": "1.0", "configurationItems": { "relatedEvents": [], ... , "resourceName": "somethingsomething0", ... } }
{ "fileVersion": "1.0", "configurationItems": { "relatedEvents": [], ... , "resourceName": "somethingsomething1", ... } }
We also still would like this feature, the KDF buffered CW LogGroups are ndjson and not json, so only the first line is getting split properly. A split-event processor not bound to the source that can split on a json field would be very handy.
@JunChatani ndjson can already be processed with the newline codec though?
@edmondsiu0 I think the json codec solution in #5045, will work for you.
The issue is that it only works for the json codec and not the ndjson codec for the s3 source.
Thank you thank you thank you @JunChatani ! I was able to finally get rid of my horrible funky hacky code with something much cleaner. And more importantly, this is running at least 5x faster than before.
It's a shame this feature isn't documented anywhere!
(It kinda got an honorable mention under kinesis source, but it deserves its own section under the s3 source.
For those who come after me, I was able to use this config:
pipeline:
source:
s3:
codec:
json:
key_name: configurationItems
And data prepper generate one event for each object within the configurationItems array.