beats icon indicating copy to clipboard operation
beats copied to clipboard

`aws-s3` input: Split S3 poller and SQS reader into explicit input objects

Open faec opened this issue 1 year ago • 7 comments

A large cleanup in the aws-s3 input, reorganizing the file structure and splitting internal APIs into additional helpers.

This change is meant to have no functional effect, it is strictly a cleanup and reorganization in preparation for future changes. The hope is that the new layout makes initialization steps and logical dependencies clearer. The main changes are:

  • Make s3Poller and sqsReader into standalone input objects, s3PollerInput and sqsReaderInput, that implement the v2.Input interface, instead of interleaving the two implementations within the same object.
    • Choose the appropriate input in (*s3InputManager).Create based on configuration
    • Move associated internal API out of the shared input.go into the new s3_input.go and sqs_input.go, while leaving s3.go and sqs.go for auxiliary helpers.
    • Give each input a copy of config and awsConfig, and remove redundant struct fields that simply shadowed fields already in those configs.
  • In sqsReaderInput, use a fixed set of worker goroutines and track task allocation via channel-based work requests instead of creating ephemeral workers via the previous custom semaphore implementation (similar to the recent cloudwatch cleanup).
    • Delete aws.Sem, since this was its last remaining caller
  • Collect the helpers related to approximate message count polling into a helper object, messageCountMonitor, so their role in the input is clearer.
  • Generally, break larger steps up into smaller helper functions
  • Generally, collect initialization dependencies in the same place so the sequencing is clearer.

Checklist

  • [x] My code follows the style guidelines of this project
  • [x] I have commented my code, particularly in hard-to-understand areas
  • [ ] ~~I have made corresponding changes to the documentation~~
  • [ ] ~~I have made corresponding change to the default configuration files~~
  • [ ] I have added tests that prove my fix is effective or that my feature works
  • [ ] ~~I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.~~

faec avatar May 01 '24 20:05 faec

This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @faec? 🙏. For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

mergify[bot] avatar May 01 '24 20:05 mergify[bot]

:green_heart: Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Duration: 142 min 20 sec

:grey_exclamation: Flaky test report

No test was executed to be analysed.

:robot: GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

elasticmachine avatar May 01 '24 21:05 elasticmachine

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b awss3-cleanup upstream/awss3-cleanup
git merge upstream/main
git push upstream awss3-cleanup

mergify[bot] avatar May 03 '24 02:05 mergify[bot]

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

elasticmachine avatar May 03 '24 13:05 elasticmachine

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

elasticmachine avatar May 03 '24 14:05 elasticmachine

@leehinman I was expecting to write more tests ("all existing tests pass" isn't as reassuring as I want it to be). Some scenarios you mentioned might be out of scope for this pass -- one thing I didn't change at all is the S3/SQS API proxies, so things like "number of events per S3 object" or "appending an event to an object" are opaque to this change, it's all handled internally in sqsProcessor and s3ObjectHandler. So for confirming behavior is unchanged, we can restrict scope to three interface points:

  • Calls to S3/SQS API proxies
  • Calls to create a Beats pipeline client, or publish an event through one
  • Calls to awss3.inputMetrics, the helper object used to update the input's monitoring variables.

There are existing tests for at least the first two using auto-generated mocks, and I've updated them for the new code. I'm not satisfied with them, though -- they're the sort of mocks where you list the exact order and inputs of each mocked call even if it isn't part of the spec, so this PR broke them just by removing redundant calls to deterministic functions and changing who's responsible for starting a helper goroutine.

One thing I was considering was rewriting / expanding those tests with more granular boundaries -- e.g. instead of checking that the full input calls NextPage the right number of times, then calls GetObject the right number of times, then calls Publish the right number of times, just confirm that objects given to the reader loop are sent to the work channel, and separately, that objects sent to the work channel are published by the work loop.

Having said all that: does that plan sound good? Are there any areas of this PR where you'd especially like to see expanded test coverage?

faec avatar May 03 '24 17:05 faec

@leehinman I was expecting to write more tests ("all existing tests pass" isn't as reassuring as I want it to be).

:-) yeah.

Some scenarios you mentioned might be out of scope for this pass

Absolutely out of scope. I wrote them down to start thinking/talking about them. Probably should have done 2 separate comments.

-- one thing I didn't change at all is the S3/SQS API proxies, so things like "number of events per S3 object" or "appending an event to an object" are opaque to this change, it's all handled internally in sqsProcessor and s3ObjectHandler. So for confirming behavior is unchanged, we can restrict scope to three interface points:

* Calls to S3/SQS API proxies

* Calls to create a Beats pipeline client, or publish an event through one

* Calls to `awss3.inputMetrics`, the helper object used to update the input's monitoring variables.

There are existing tests for at least the first two using auto-generated mocks, and I've updated them for the new code. I'm not satisfied with them, though -- they're the sort of mocks where you list the exact order and inputs of each mocked call even if it isn't part of the spec, so this PR broke them just by removing redundant calls to deterministic functions and changing who's responsible for starting a helper goroutine.

One thing I was considering was rewriting / expanding those tests with more granular boundaries -- e.g. instead of checking that the full input calls NextPage the right number of times, then calls GetObject the right number of times, then calls Publish the right number of times, just confirm that objects given to the reader loop are sent to the work channel, and separately, that objects sent to the work channel are published by the work loop.

Having said all that: does that plan sound good? Are there any areas of this PR where you'd especially like to see expanded test coverage?

Any additional automated tests would be great.

For this PR a manual test that either mode still works as expected would be fine.

leehinman avatar May 03 '24 18:05 leehinman

I am running some manual tests ingesting CloudTrail logs using the SQS mode.

I'm using a very simple setup, here are the differences with the default modules.d/aws.yml file:


  - module: aws
    cloudtrail:
      #    enabled: false
      enabled: true

      # AWS SQS queue url
      var.queue_url: https://sqs.eu-north-1.amazonaws.com/<redacted>/mbranca-cloudtrail-logs

      var.access_key_id: <redacted>
      var.secret_access_key: <redacted>

Filebeat is running and ingesting CloudTrail logs. Here are a couple of screenshot of the SQS queue and the logs on Discover:

CleanShot 2024-05-08 at 22 43 02

CleanShot 2024-05-08 at 22 51 52

zmoog avatar May 08 '24 20:05 zmoog

I am also running manual tests ingesting CloudTrail logs from the same S3 bucket using the S3 polling.

Here's the config:

  - module: aws
    cloudtrail:
      enabled: true

      # AWS S3 bucket arn
      var.bucket_arn: 'arn:aws:s3:::<redacted>'

      # Number of workers on S3 bucket
      var.number_of_workers: 5

The S3 polling mode panics because the input log is nil; however, I was able to successfully run the input with this small change:

diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go
index 50786626d2..999b27da53 100644
--- a/x-pack/filebeat/input/awss3/s3_input.go
+++ b/x-pack/filebeat/input/awss3/s3_input.go
@@ -68,11 +68,11 @@ func (in *s3PollerInput) Run(
        inputContext v2.Context,
        pipeline beat.Pipeline,
 ) error {
-       log := inputContext.Logger.Named("s3")
+       in.log = inputContext.Logger.Named("s3")
        var err error
 
        // Load the persistent S3 polling state.
-       in.states, err = newStates(log, in.store)
+       in.states, err = newStates(in.log, in.store)
        if err != nil {
                return fmt.Errorf("can not start persistent store: %w", err)
        }
@@ -95,7 +95,7 @@ func (in *s3PollerInput) Run(
        defer in.metrics.Close()
 
        in.s3ObjectHandler = newS3ObjectProcessorFactory(
-               log,
+               in.log,
                in.metrics,
                in.s3,
                in.config.getFileSelectors(),

The log ingestion is making progress:

CleanShot 2024-05-08 at 23 19 16

zmoog avatar May 08 '24 21:05 zmoog

A recent change in the main branch created an inconsistency in the versions of the AWS SDK core and services modules. In practice, all the AWS inputs I tested failed with a "not found, ResolveEndpointV2" error.

A PR is being reviewed to address this problem.

zmoog avatar May 08 '24 21:05 zmoog

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b awss3-cleanup upstream/awss3-cleanup
git merge upstream/main
git push upstream awss3-cleanup

mergify[bot] avatar May 08 '24 21:05 mergify[bot]

Fixed the uninitialized logger, added comments, and synced with main. Doing my own manual testing now.

faec avatar May 09 '24 15:05 faec