beats icon indicating copy to clipboard operation
beats copied to clipboard

[AWS-S3] Add a timestamp filter for s3 polling mode

Open kaiyan-sheng opened this issue 1 year ago • 2 comments

Describe the enhancement:

Current S3 input without SQS notification calls ListObjects API to collect all logs/objects from the given S3 bucket. There is no filter functionality so users will get logs both old and new from the bucket.

It would be nice to have a start_timestamp config parameter for users to specify a timestamp. Instead of ingesting all logs from the bucket, we can call the same ListObjects API call, filter the results using the start_timestamp and only store logs that has a content LastModified >= start_timestamp.

Describe a specific use case for the enhancement or feature: With the config below, we should only store logs with LastModified after start_timestamp: 2024-10-11T00:00:00+00:00.

filebeat.inputs:
- type: aws-s3
  enabled: true
  bucket_arn: arn:aws:s3:::test-s3-bucket
  start_timestamp: 2024-10-11T00:00:00+00:00

This is what the ListObjects API call returns:

kaiyansheng ~  $ aws s3api list-objects --profile elastic-observability --bucket test-s3-bucket-ks
{
    "Contents": [
        {
            "Key": "AWSLogs/123/",
            "LastModified": "2024-10-11T17:14:41+00:00",
            "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\"",
            "Size": 0,
            "StorageClass": "STANDARD",
            "Owner": {
                "ID": "xxx"
            }
        },
        {
            "Key": "AWSLogs/123/vpcflowlogs/us-east-1/2024/10/11/627286350134_vpcflowlogs_us-east-1_fl-076d15c25200b764f_20241011T1715Z_b12bee6c.log.gz",
            "LastModified": "2024-10-11T17:21:43+00:00",
            "ETag": "\"910555fdc5893bc433a020f5baee904e\"",
            "Size": 1021,
            "StorageClass": "STANDARD",
            "Owner": {
                "ID": "xxx"
            }
        },
...

kaiyan-sheng avatar Oct 14 '24 19:10 kaiyan-sheng

Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)

elasticmachine avatar Oct 14 '24 20:10 elasticmachine

This is a nice addition for end users. Besides, this should improve performance with object parsing and storing, as beats will only process objects that fulfill the timestamp requirement.

Kavindu-Dodan avatar Oct 15 '24 17:10 Kavindu-Dodan

@Kavindu-Dodan @kaiyan-sheng For your reference, I’m sharing the workaround the customer developed to retrieve only the new files. Please take a look and let me know if the logic might be useful for us as well.

Overview -

The scripts uses Boto3 library to interact with AWS services which are present in S3 bucket for fetching AWS logs. Once the logs are fetched , they are sent over UDP to a designated Port for further processing.

Logic Breakdown

  1. AWS credentials and Boto3 Setup

First the script initializes the Boto3 client with the necessary AWS credentials, allowing it to authenticate and interact with AWS services.

        /*

           s3_client=boto3.client('s3',

                        aws_access_key_id=' ',

                       aws_secret_access_key=' ',

                        region_name=' '

    )



        */
  1. get_object_key() - This is the function which will retrieve Key( file name) from aws bucket based on last modified time. We have to provide predefined bucket name and prefix
  • First we determine the current_time and the time from the last 15 minutes .Both times are then adjusted to set the seconds and microseconds to zero, ensuring that no files are lost due to potential differences in microseconds.

          /*
    
              current_time=datetime.now(timezone.utc)
    
              time_15_minutes_ago = current_time - timedelta(minutes=15)
    
             year=time_15_minutes_ago.year
    
              month=time_15_minutes_ago.month
    
              day=time_15_minutes_ago.day
    
              current_time=current_time.replace(second=0,microsecond=0)
    
              time_15_minutes_ago=time_15_minutes_ago.replace(second=0,microsecond=0)
    
    
    
          */
    
  • Next, we set the provided prefix and append '{year}/{month}/{day}' ( which is extracted from time which is last 15 minutes from current_time) to it in order to retrieve files from the current day.

                      /*
    
                      "prefix"="path/to/prefix/{year}/{month}/{day}"
    
                      prefix=prefix.format(year=year,month=month,day=day)
    
    
    
                      */
    
    
            - We are currently setting the last modified time to 15 minutes ago and retrieving file names based on the following  logic:
    
    
    
                      If the current time is 00:15 and the last_modified_time  is 00:00, the prefix will be '{2024}/{10}/{28}'. However, if there are files in the current (present day) directory with a last_modified_time of 2024-10-23 11:59:58, we will include all files that are present in current prefix.
    
    
                      /*
    
    
    
                      if time_15_minutes_ago.hour == 0 and time_15_minutes_ago.minute == 0 and time_15_minutes_ago.second == 0:
    
                      for obj in page['Contents']:
    
                          if obj['LastModified']<=current_time:
    
                              obj_list.append(obj['Key'])
    
                      */
    
                      - else we check if last_modified_time is less than current_time and last_modified_time  is within last 15 minutes time , we add that file in our list
    
                      /*
    
                                  for obj in page['Contents']:
    
    
    
                          if obj['LastModified']<=current_time and obj['LastModified']>=time_15_minutes_ago:
    
                              obj_list.append(obj['Key'])
    
    
    
                      */
    
  1. get_object_data()

We then pass bucket name and object key asynchronously to retreive file data based on file suffix

        /*


        response=s3_client.get_object(Bucket=bucket,Key=object_key)

        */
  1. Then we forward data on particular UDP port

anuj-elastic avatar Oct 29 '24 11:10 anuj-elastic

Hi @Kavindu-Dodan @kaiyan-sheng ,

Could you please provide an update and share the ETA for this? This would help us communicate effectively with the customer, as they consider this a critical feature and have been following up regularly.

anuj-elastic avatar Nov 11 '24 09:11 anuj-elastic

@anuj-elastic I am working on adding this feature through PR https://github.com/elastic/beats/pull/41817. And I am aiming to release this with 8.18.0, which is planned for early next year. Along with that, I am planning to upgrade the integrations to support the new feature. This is planned through https://github.com/elastic/integrations/issues/11919

Kavindu-Dodan avatar Dec 06 '24 17:12 Kavindu-Dodan

Thanks for the update @Kavindu-Dodan. It's wonderful to hear this update. The customer has been consistently following up on this for the past couple of months, and now I have a timeline to share with them and set expectations accordingly.

anuj-elastic avatar Dec 06 '24 18:12 anuj-elastic

@Kavindu-Dodan Do you have any idea if the similar issue with Cloudflare Integration will also clubbed with the same fix?

anuj-elastic avatar Dec 09 '24 06:12 anuj-elastic

@anuj-elastic related to your question, please see the update here

  1. Once 8.16.2 is released, we could check if the registry cleanup improves the performance for this customer.
  2. The registry cleanup mentioned above is a first step of the improvement, if this is not enough we will have to wait for 8.18.0 to be released, to benefit the timestamp filter.

bturquet avatar Dec 09 '24 08:12 bturquet

@anuj-elastic Additionally to what @bturquet said, PR https://github.com/elastic/beats/pull/41817 will bring configurations to address the performance considerations. Since Cloudflare integration internally utilizes the S3 implementation, I hope these new configurations can also fix the referenced issue.

The update of the integrations will be done through https://github.com/elastic/integrations/issues/11919

Kavindu-Dodan avatar Dec 30 '24 21:12 Kavindu-Dodan

PR #41817 was merged on 07-Jan-2025 and configurations to avoid registry state growth, ignore_older & start_timestamp will be delivered with beats 8.18.0

Kavindu-Dodan avatar Jan 07 '25 19:01 Kavindu-Dodan

Update - We are backporting this improvement to 8.16.x & 8.17.x releases. So, the change will be available with 8.16.5 & 8.17.3 releases.

Kavindu-Dodan avatar Feb 14 '25 19:02 Kavindu-Dodan

@Kavindu-Dodan could you please confirm if this improvement is backported to v8.17.2 or v8.17.3?

Thank you.

smartkathycat avatar Mar 05 '25 09:03 smartkathycat

@Kavindu-Dodan could you please confirm if this improvement is backported to v8.17.2 or v8.17.3? For v8.16 please also let us the detailed version. Thank you!

smartkathycat avatar Mar 06 '25 06:03 smartkathycat

@smartkathycat sorry, I should have added the specific versions. The backports are available with 8.16.5 & 8.17.3 releases.

Kavindu-Dodan avatar Mar 06 '25 15:03 Kavindu-Dodan