beats
beats copied to clipboard
`aws-s3` input: Split S3 poller and SQS reader into explicit input objects
A large cleanup in the aws-s3 input, reorganizing the file structure and splitting internal APIs into additional helpers.
This change is meant to have no functional effect, it is strictly a cleanup and reorganization in preparation for future changes. The hope is that the new layout makes initialization steps and logical dependencies clearer. The main changes are:
- Make
s3PollerandsqsReaderinto standalone input objects,s3PollerInputandsqsReaderInput, that implement thev2.Inputinterface, instead of interleaving the two implementations within the same object.- Choose the appropriate input in
(*s3InputManager).Createbased on configuration - Move associated internal API out of the shared
input.gointo the news3_input.goandsqs_input.go, while leavings3.goandsqs.gofor auxiliary helpers. - Give each input a copy of
configandawsConfig, and remove redundant struct fields that simply shadowed fields already in those configs.
- Choose the appropriate input in
- In
sqsReaderInput, use a fixed set of worker goroutines and track task allocation via channel-based work requests instead of creating ephemeral workers via the previous custom semaphore implementation (similar to the recent cloudwatch cleanup).- Delete
aws.Sem, since this was its last remaining caller
- Delete
- Collect the helpers related to approximate message count polling into a helper object,
messageCountMonitor, so their role in the input is clearer. - Generally, break larger steps up into smaller helper functions
- Generally, collect initialization dependencies in the same place so the sequencing is clearer.
Checklist
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] ~~I have added an entry in
CHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.~~
This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @faec? 🙏. For such, you'll need to label your PR with:
- The upcoming major version of the Elastic Stack
- The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)
To fixup this pull request, you need to add the backport labels for the needed branches, such as:
backport-v8./d.0is the label to automatically backport to the8./dbranch./dis the digit
:green_heart: Build Succeeded
the below badges are clickable and redirect to their specific view in the CI or DOCS
![]()
![]()
![]()
![]()
![]()
Expand to view the summary
Build stats
- Duration: 142 min 20 sec
:grey_exclamation: Flaky test report
No test was executed to be analysed.
:robot: GitHub comments
Expand to view the GitHub comments
To re-run your PR in the CI, just comment with:
-
/test: Re-trigger the build. -
/package: Generate the packages and run the E2E tests. -
/beats-tester: Run the installation tests with beats-tester. -
runelasticsearch-ci/docs: Re-trigger the docs validation. (use unformatted text in the comment!)
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b awss3-cleanup upstream/awss3-cleanup
git merge upstream/main
git push upstream awss3-cleanup
Pinging @elastic/elastic-agent (Team:Elastic-Agent)
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)
@leehinman I was expecting to write more tests ("all existing tests pass" isn't as reassuring as I want it to be). Some scenarios you mentioned might be out of scope for this pass -- one thing I didn't change at all is the S3/SQS API proxies, so things like "number of events per S3 object" or "appending an event to an object" are opaque to this change, it's all handled internally in sqsProcessor and s3ObjectHandler. So for confirming behavior is unchanged, we can restrict scope to three interface points:
- Calls to S3/SQS API proxies
- Calls to create a Beats pipeline client, or publish an event through one
- Calls to
awss3.inputMetrics, the helper object used to update the input's monitoring variables.
There are existing tests for at least the first two using auto-generated mocks, and I've updated them for the new code. I'm not satisfied with them, though -- they're the sort of mocks where you list the exact order and inputs of each mocked call even if it isn't part of the spec, so this PR broke them just by removing redundant calls to deterministic functions and changing who's responsible for starting a helper goroutine.
One thing I was considering was rewriting / expanding those tests with more granular boundaries -- e.g. instead of checking that the full input calls NextPage the right number of times, then calls GetObject the right number of times, then calls Publish the right number of times, just confirm that objects given to the reader loop are sent to the work channel, and separately, that objects sent to the work channel are published by the work loop.
Having said all that: does that plan sound good? Are there any areas of this PR where you'd especially like to see expanded test coverage?
@leehinman I was expecting to write more tests ("all existing tests pass" isn't as reassuring as I want it to be).
:-) yeah.
Some scenarios you mentioned might be out of scope for this pass
Absolutely out of scope. I wrote them down to start thinking/talking about them. Probably should have done 2 separate comments.
-- one thing I didn't change at all is the S3/SQS API proxies, so things like "number of events per S3 object" or "appending an event to an object" are opaque to this change, it's all handled internally in sqsProcessor and s3ObjectHandler. So for confirming behavior is unchanged, we can restrict scope to three interface points:
* Calls to S3/SQS API proxies * Calls to create a Beats pipeline client, or publish an event through one * Calls to `awss3.inputMetrics`, the helper object used to update the input's monitoring variables.There are existing tests for at least the first two using auto-generated mocks, and I've updated them for the new code. I'm not satisfied with them, though -- they're the sort of mocks where you list the exact order and inputs of each mocked call even if it isn't part of the spec, so this PR broke them just by removing redundant calls to deterministic functions and changing who's responsible for starting a helper goroutine.
One thing I was considering was rewriting / expanding those tests with more granular boundaries -- e.g. instead of checking that the full input calls NextPage the right number of times, then calls GetObject the right number of times, then calls Publish the right number of times, just confirm that objects given to the reader loop are sent to the work channel, and separately, that objects sent to the work channel are published by the work loop.
Having said all that: does that plan sound good? Are there any areas of this PR where you'd especially like to see expanded test coverage?
Any additional automated tests would be great.
For this PR a manual test that either mode still works as expected would be fine.
I am running some manual tests ingesting CloudTrail logs using the SQS mode.
I'm using a very simple setup, here are the differences with the default modules.d/aws.yml file:
- module: aws
cloudtrail:
# enabled: false
enabled: true
# AWS SQS queue url
var.queue_url: https://sqs.eu-north-1.amazonaws.com/<redacted>/mbranca-cloudtrail-logs
var.access_key_id: <redacted>
var.secret_access_key: <redacted>
Filebeat is running and ingesting CloudTrail logs. Here are a couple of screenshot of the SQS queue and the logs on Discover:
I am also running manual tests ingesting CloudTrail logs from the same S3 bucket using the S3 polling.
Here's the config:
- module: aws
cloudtrail:
enabled: true
# AWS S3 bucket arn
var.bucket_arn: 'arn:aws:s3:::<redacted>'
# Number of workers on S3 bucket
var.number_of_workers: 5
The S3 polling mode panics because the input log is nil; however, I was able to successfully run the input with this small change:
diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go
index 50786626d2..999b27da53 100644
--- a/x-pack/filebeat/input/awss3/s3_input.go
+++ b/x-pack/filebeat/input/awss3/s3_input.go
@@ -68,11 +68,11 @@ func (in *s3PollerInput) Run(
inputContext v2.Context,
pipeline beat.Pipeline,
) error {
- log := inputContext.Logger.Named("s3")
+ in.log = inputContext.Logger.Named("s3")
var err error
// Load the persistent S3 polling state.
- in.states, err = newStates(log, in.store)
+ in.states, err = newStates(in.log, in.store)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}
@@ -95,7 +95,7 @@ func (in *s3PollerInput) Run(
defer in.metrics.Close()
in.s3ObjectHandler = newS3ObjectProcessorFactory(
- log,
+ in.log,
in.metrics,
in.s3,
in.config.getFileSelectors(),
The log ingestion is making progress:
A recent change in the main branch created an inconsistency in the versions of the AWS SDK core and services modules. In practice, all the AWS inputs I tested failed with a "not found, ResolveEndpointV2" error.
A PR is being reviewed to address this problem.
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b awss3-cleanup upstream/awss3-cleanup
git merge upstream/main
git push upstream awss3-cleanup
Fixed the uninitialized logger, added comments, and synced with main. Doing my own manual testing now.