beats
beats copied to clipboard
x-pack/filebeat/input/cel: new input
WIP
What does this PR do?
This adds a new input to filebeat that allows processing of datastreams using the Common Expression Language.
Why is it important?
It provides a consistent framework for generalised input processing.
Checklist
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] I have made corresponding change to the default configuration files
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] I have added an entry in
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.
Author's Checklist
- [ ]
How to test this PR locally
Related issues
Use cases
Screenshots
Logs
This pull request doesn't have a Team:<team>
label.
:green_heart: Build Succeeded
the below badges are clickable and redirect to their specific view in the CI or DOCS
![]()
![]()
![]()
![]()
![]()
Expand to view the summary
Build stats
-
Start Time: 2022-11-08T11:14:01.064+0000
-
Duration: 131 min 56 sec
Test stats :test_tube:
Test | Results |
---|---|
Failed | 0 |
Passed | 23907 |
Skipped | 1951 |
Total | 25858 |
:green_heart: Flaky test report
Tests succeeded.
:robot: GitHub comments
Expand to view the GitHub comments
To re-run your PR in the CI, just comment with:
-
/test
: Re-trigger the build. -
/package
: Generate the packages and run the E2E tests. -
/beats-tester
: Run the installation tests with beats-tester. -
run
elasticsearch-ci/docs
: Re-trigger the docs validation. (use unformatted text in the comment!)
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
Pinging @elastic/security-external-integrations (Team:Security-External Integrations)
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @efd6? 🙏. For such, you'll need to label your PR with:
- The upcoming major version of the Elastic Stack
- The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change) To fixup this pull request, you need to add the backport labels for the needed branches, such as:
-
backport-v8./d.0
is the label to automatically backport to the8./d
branch./d
is the digit
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
I recommend thinking a little about the observability of the input itself.
- Are there any metrics that would be useful to expose? If so I recommend to register a set of per instance metrics under
dataset/<input_id>/*
similar to the what is done in aws-s3 or lumberjack. - How will we debug requests and responses? Perhaps we could reuse the LoggingRoundTripper from httpjson to add a trace logging feature.
- I will do that.
- That was something that I was considering. In the first instance though I have been using the mito CLI program which will do nearly all of what is happening here but standalone. Both together are probably a good plan.
The inclusion of logging round tripper will require moving the httplog package out of the httpjson/internal directory, ~so that should probably happen in a later PR~. I have sent another PR to move that so that it can be used here.
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
/package
/test
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
I was testing the input from b053b4c60f6 and I encountered some issues, but I was able to prove out the concept that I was testing 😄 . I was using this as my test: https://gist.github.com/andrewkroh/2a4216f36b4303635ca2d319e554c9dd?permalink_comment_id=4356113#gistcomment-4356113.
- panic in
run
relating to
diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go
index 9140ed4663..0b6d6e23cd 100644
--- a/x-pack/filebeat/input/cel/input.go
+++ b/x-pack/filebeat/input/cel/input.go
@@ -100,7 +100,7 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor
}
func (input) run(env v2.Context, src inputcursor.Source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
- cfg := src.(*source).cfg
+ cfg := src.(source).cfg
log := env.Logger.With("input_url", cfg.Resource.URL)
metrics := newInputMetrics(monitoring.GetNamespace("dataset").GetRegistry(), env.ID)
- After fixing that, it was making repeated requests every few milliseconds. Looks like it's coming from go-retryable, but the response from the server is always HTTP 200.
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.389-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.400-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.407-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
- I could not stop the input with Ctrl+C. It might be retrying after the input's context.Context has been cancelled.
Thanks. I'll take a look.
-
[x] fix expected
src
type — error in constructor -
[x] fix repeated requests — this is working as intended, though this may need revisiting
When a non-empty set of events is received from the CEL evaluation, the input assumes there will be more events still available from the API. To signal that no further events are wanted from a periodic run of the input the program should return a
state
with a fieldwant_more
set tofalse
.So the following will repeatedly hit api.ipify.org more frequently than the config interval suggests.
filebeat.inputs: - type: cel interval: 1m resource.url: https://api.ipify.org/?format=json program: | bytes(get(state.url).Body).as(body, { "events": [body.decode_json()], }) output.console.pretty: true
but with the addition of the
"want_more": false,
line, only a single request will be made per minute.filebeat.inputs: - type: cel interval: 1m resource.url: https://api.ipify.org/?format=json program: | bytes(get(state.url).Body).as(body, { "events": [body.decode_json()], "want_more": false, }) output.console.pretty: true
It is important to be able to re-query in a single period, so some behaviour like this is required, but maybe we can mitigate the footgun a little.
Possible options to mitigate footgun:
-
invert the default so that an absent
want_more
means do not repeat and the program must explicitly request more with"want_more": true
. - ~~split the repeat request behaviour into a tri-state:~~
- ~~absent — current behaviour up to a configurable limit of requests defaulting to some small number~~
- ~~true — always repeat~~
- ~~false — never repeat~~
-
invert the default so that an absent
-
[x] fix cancelation —
cel.Program.ContextEval
does not return context errorsPrior to the fix, running an instance of filebeat with the following configuration would result in an unstoppable instance.
filebeat.inputs: - type: cel interval: 1m resource.url: https://api.ipify.org/?format=json program: | bytes(get(state.url).Body).as(body, { "events": [body.decode_json()] }) output.console.pretty: true
This happens because the cel program evaluation method does not return the context cancellation error when a context is cancelled. We also don't check for cancellation except in the case that we have events or we have a limit policy in place, so add a check immediately after the return of the evaluation.
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito