x-pack/filebeat/input/cel: new input
WIP
What does this PR do?
This adds a new input to filebeat that allows processing of datastreams using the Common Expression Language.
Why is it important?
It provides a consistent framework for generalised input processing.
Checklist
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] I have made corresponding change to the default configuration files
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] I have added an entry in
CHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.
Author's Checklist
- [ ]
How to test this PR locally
Related issues
Use cases
Screenshots
Logs
This pull request doesn't have a Team:<team> label.
:green_heart: Build Succeeded
the below badges are clickable and redirect to their specific view in the CI or DOCS
![]()
![]()
![]()
![]()
![]()
Expand to view the summary
Build stats
-
Start Time: 2022-11-14T22:52:48.891+0000
-
Duration: 65 min 36 sec
:grey_exclamation: Flaky test report
No test was executed to be analysed.
:robot: GitHub comments
Expand to view the GitHub comments
To re-run your PR in the CI, just comment with:
-
/test: Re-trigger the build. -
/package: Generate the packages and run the E2E tests. -
/beats-tester: Run the installation tests with beats-tester. -
runelasticsearch-ci/docs: Re-trigger the docs validation. (use unformatted text in the comment!)
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
Pinging @elastic/security-external-integrations (Team:Security-External Integrations)
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @efd6? 🙏. For such, you'll need to label your PR with:
- The upcoming major version of the Elastic Stack
- The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change) To fixup this pull request, you need to add the backport labels for the needed branches, such as:
backport-v8./d.0is the label to automatically backport to the8./dbranch./dis the digit
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
I recommend thinking a little about the observability of the input itself.
- Are there any metrics that would be useful to expose? If so I recommend to register a set of per instance metrics under
dataset/<input_id>/*similar to the what is done in aws-s3 or lumberjack. - How will we debug requests and responses? Perhaps we could reuse the LoggingRoundTripper from httpjson to add a trace logging feature.
- I will do that.
- That was something that I was considering. In the first instance though I have been using the mito CLI program which will do nearly all of what is happening here but standalone. Both together are probably a good plan.
The inclusion of logging round tripper will require moving the httplog package out of the httpjson/internal directory, ~so that should probably happen in a later PR~. I have sent another PR to move that so that it can be used here.
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
/package
/test
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
I was testing the input from b053b4c60f6 and I encountered some issues, but I was able to prove out the concept that I was testing 😄 . I was using this as my test: https://gist.github.com/andrewkroh/2a4216f36b4303635ca2d319e554c9dd?permalink_comment_id=4356113#gistcomment-4356113.
- panic in
runrelating to
diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go
index 9140ed4663..0b6d6e23cd 100644
--- a/x-pack/filebeat/input/cel/input.go
+++ b/x-pack/filebeat/input/cel/input.go
@@ -100,7 +100,7 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor
}
func (input) run(env v2.Context, src inputcursor.Source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
- cfg := src.(*source).cfg
+ cfg := src.(source).cfg
log := env.Logger.With("input_url", cfg.Resource.URL)
metrics := newInputMetrics(monitoring.GetNamespace("dataset").GetRegistry(), env.ID)
- After fixing that, it was making repeated requests every few milliseconds. Looks like it's coming from go-retryable, but the response from the server is always HTTP 200.
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.389-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.400-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.407-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
- I could not stop the input with Ctrl+C. It might be retrying after the input's context.Context has been cancelled.
Thanks. I'll take a look.
-
[x] fix expected
srctype — error in constructor -
[x] fix repeated requests — this is working as intended, though this may need revisiting
When a non-empty set of events is received from the CEL evaluation, the input assumes there will be more events still available from the API. To signal that no further events are wanted from a periodic run of the input the program should return a
statewith a fieldwant_moreset tofalse.So the following will repeatedly hit api.ipify.org more frequently than the config interval suggests.
filebeat.inputs: - type: cel interval: 1m resource.url: https://api.ipify.org/?format=json program: | bytes(get(state.url).Body).as(body, { "events": [body.decode_json()], }) output.console.pretty: truebut with the addition of the
"want_more": false,line, only a single request will be made per minute.filebeat.inputs: - type: cel interval: 1m resource.url: https://api.ipify.org/?format=json program: | bytes(get(state.url).Body).as(body, { "events": [body.decode_json()], "want_more": false, }) output.console.pretty: trueIt is important to be able to re-query in a single period, so some behaviour like this is required, but maybe we can mitigate the footgun a little.
Possible options to mitigate footgun:
- invert the default so that an absent
want_moremeans do not repeat and the program must explicitly request more with"want_more": true. - ~~split the repeat request behaviour into a tri-state:~~
- ~~absent — current behaviour up to a configurable limit of requests defaulting to some small number~~
- ~~true — always repeat~~
- ~~false — never repeat~~
- invert the default so that an absent
-
[x] fix cancelation —
cel.Program.ContextEvaldoes not return context errorsPrior to the fix, running an instance of filebeat with the following configuration would result in an unstoppable instance.
filebeat.inputs: - type: cel interval: 1m resource.url: https://api.ipify.org/?format=json program: | bytes(get(state.url).Body).as(body, { "events": [body.decode_json()] }) output.console.pretty: trueThis happens because the cel program evaluation method does not return the context cancellation error when a context is cancelled. We also don't check for cancellation except in the case that we have events or we have a limit policy in place, so add a check immediately after the return of the evaluation.
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/
git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito
The linter errors may be related to #33649.
Stumbled on this through the linked linter PR:
Cool! I am excited to see this implemented. I have used CEL in the past for configurable processing and have been wondering if we could make use of it. I am particularly interested to see if CEL stays contained to this one one input after we get some experience with it, or if we'll want to use it in more places.
/test
@cmacknz I can see value in having CEL processing available elsewhere and this is initially an experiment to see how well it will work as an analogue of the httpjson input. You may want to take a look at github.com/elastic/mito which provides the CEL extensions used here.
E2E failure is unrelated.