[AWS-S3] Add a timestamp filter for s3 polling mode
Describe the enhancement:
Current S3 input without SQS notification calls ListObjects API to collect all logs/objects from the given S3 bucket. There is no filter functionality so users will get logs both old and new from the bucket.
It would be nice to have a start_timestamp config parameter for users to specify a timestamp. Instead of ingesting all logs from the bucket, we can call the same ListObjects API call, filter the results using the start_timestamp and only store logs that has a content LastModified >= start_timestamp.
Describe a specific use case for the enhancement or feature:
With the config below, we should only store logs with LastModified after start_timestamp: 2024-10-11T00:00:00+00:00.
filebeat.inputs:
- type: aws-s3
enabled: true
bucket_arn: arn:aws:s3:::test-s3-bucket
start_timestamp: 2024-10-11T00:00:00+00:00
This is what the ListObjects API call returns:
kaiyansheng ~ $ aws s3api list-objects --profile elastic-observability --bucket test-s3-bucket-ks
{
"Contents": [
{
"Key": "AWSLogs/123/",
"LastModified": "2024-10-11T17:14:41+00:00",
"ETag": "\"d41d8cd98f00b204e9800998ecf8427e\"",
"Size": 0,
"StorageClass": "STANDARD",
"Owner": {
"ID": "xxx"
}
},
{
"Key": "AWSLogs/123/vpcflowlogs/us-east-1/2024/10/11/627286350134_vpcflowlogs_us-east-1_fl-076d15c25200b764f_20241011T1715Z_b12bee6c.log.gz",
"LastModified": "2024-10-11T17:21:43+00:00",
"ETag": "\"910555fdc5893bc433a020f5baee904e\"",
"Size": 1021,
"StorageClass": "STANDARD",
"Owner": {
"ID": "xxx"
}
},
...
Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)
This is a nice addition for end users. Besides, this should improve performance with object parsing and storing, as beats will only process objects that fulfill the timestamp requirement.
@Kavindu-Dodan @kaiyan-sheng For your reference, I’m sharing the workaround the customer developed to retrieve only the new files. Please take a look and let me know if the logic might be useful for us as well.
Overview -
The scripts uses Boto3 library to interact with AWS services which are present in S3 bucket for fetching AWS logs. Once the logs are fetched , they are sent over UDP to a designated Port for further processing.
Logic Breakdown
- AWS credentials and Boto3 Setup
First the script initializes the Boto3 client with the necessary AWS credentials, allowing it to authenticate and interact with AWS services.
/*
s3_client=boto3.client('s3',
aws_access_key_id=' ',
aws_secret_access_key=' ',
region_name=' '
)
*/
- get_object_key() - This is the function which will retrieve Key( file name) from aws bucket based on last modified time. We have to provide predefined bucket name and prefix
-
First we determine the current_time and the time from the last 15 minutes .Both times are then adjusted to set the seconds and microseconds to zero, ensuring that no files are lost due to potential differences in microseconds.
/* current_time=datetime.now(timezone.utc) time_15_minutes_ago = current_time - timedelta(minutes=15) year=time_15_minutes_ago.year month=time_15_minutes_ago.month day=time_15_minutes_ago.day current_time=current_time.replace(second=0,microsecond=0) time_15_minutes_ago=time_15_minutes_ago.replace(second=0,microsecond=0) */ -
Next, we set the provided prefix and append '{year}/{month}/{day}' ( which is extracted from time which is last 15 minutes from current_time) to it in order to retrieve files from the current day.
/* "prefix"="path/to/prefix/{year}/{month}/{day}" prefix=prefix.format(year=year,month=month,day=day) */ - We are currently setting the last modified time to 15 minutes ago and retrieving file names based on the following logic: If the current time is 00:15 and the last_modified_time is 00:00, the prefix will be '{2024}/{10}/{28}'. However, if there are files in the current (present day) directory with a last_modified_time of 2024-10-23 11:59:58, we will include all files that are present in current prefix. /* if time_15_minutes_ago.hour == 0 and time_15_minutes_ago.minute == 0 and time_15_minutes_ago.second == 0: for obj in page['Contents']: if obj['LastModified']<=current_time: obj_list.append(obj['Key']) */ - else we check if last_modified_time is less than current_time and last_modified_time is within last 15 minutes time , we add that file in our list /* for obj in page['Contents']: if obj['LastModified']<=current_time and obj['LastModified']>=time_15_minutes_ago: obj_list.append(obj['Key']) */
- get_object_data()
We then pass bucket name and object key asynchronously to retreive file data based on file suffix
/*
response=s3_client.get_object(Bucket=bucket,Key=object_key)
*/
- Then we forward data on particular UDP port
Hi @Kavindu-Dodan @kaiyan-sheng ,
Could you please provide an update and share the ETA for this? This would help us communicate effectively with the customer, as they consider this a critical feature and have been following up regularly.
@anuj-elastic I am working on adding this feature through PR https://github.com/elastic/beats/pull/41817. And I am aiming to release this with 8.18.0, which is planned for early next year. Along with that, I am planning to upgrade the integrations to support the new feature. This is planned through https://github.com/elastic/integrations/issues/11919
Thanks for the update @Kavindu-Dodan. It's wonderful to hear this update. The customer has been consistently following up on this for the past couple of months, and now I have a timeline to share with them and set expectations accordingly.
@Kavindu-Dodan Do you have any idea if the similar issue with Cloudflare Integration will also clubbed with the same fix?
@anuj-elastic related to your question, please see the update here
- Once 8.16.2 is released, we could check if the registry cleanup improves the performance for this customer.
- The registry cleanup mentioned above is a first step of the improvement, if this is not enough we will have to wait for
8.18.0to be released, to benefit the timestamp filter.
@anuj-elastic Additionally to what @bturquet said, PR https://github.com/elastic/beats/pull/41817 will bring configurations to address the performance considerations. Since Cloudflare integration internally utilizes the S3 implementation, I hope these new configurations can also fix the referenced issue.
The update of the integrations will be done through https://github.com/elastic/integrations/issues/11919
PR #41817 was merged on 07-Jan-2025 and configurations to avoid registry state growth, ignore_older & start_timestamp will be delivered with beats 8.18.0
Update - We are backporting this improvement to 8.16.x & 8.17.x releases. So, the change will be available with 8.16.5 & 8.17.3 releases.
@Kavindu-Dodan could you please confirm if this improvement is backported to v8.17.2 or v8.17.3?
Thank you.
@Kavindu-Dodan could you please confirm if this improvement is backported to v8.17.2 or v8.17.3? For v8.16 please also let us the detailed version. Thank you!
@smartkathycat sorry, I should have added the specific versions. The backports are available with 8.16.5 & 8.17.3 releases.