beats icon indicating copy to clipboard operation
beats copied to clipboard

x-pack/filebeat/input/cel: new input

Open efd6 opened this issue 2 years ago • 11 comments

WIP

What does this PR do?

This adds a new input to filebeat that allows processing of datastreams using the Common Expression Language.

Why is it important?

It provides a consistent framework for generalised input processing.

Checklist

  • [x] My code follows the style guidelines of this project
  • [x] I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • [ ] I have added tests that prove my fix is effective or that my feature works
  • [ ] I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

efd6 avatar Apr 11 '22 02:04 efd6

This pull request doesn't have a Team:<team> label.

botelastic[bot] avatar Apr 11 '22 02:04 botelastic[bot]

:green_heart: Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-11-08T11:14:01.064+0000

  • Duration: 131 min 56 sec

Test stats :test_tube:

Test Results
Failed 0
Passed 23907
Skipped 1951
Total 25858

:green_heart: Flaky test report

Tests succeeded.

:robot: GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

elasticmachine avatar Apr 11 '22 02:04 elasticmachine

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Apr 27 '22 14:04 mergify[bot]

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

elasticmachine avatar Apr 28 '22 04:04 elasticmachine

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Apr 29 '22 12:04 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar May 17 '22 17:05 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Jun 27 '22 18:06 mergify[bot]

This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @efd6? 🙏. For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change) To fixup this pull request, you need to add the backport labels for the needed branches, such as:
  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

mergify[bot] avatar Aug 16 '22 15:08 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Sep 05 '22 10:09 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Sep 15 '22 11:09 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Sep 26 '22 06:09 mergify[bot]

I recommend thinking a little about the observability of the input itself.

  1. Are there any metrics that would be useful to expose? If so I recommend to register a set of per instance metrics under dataset/<input_id>/* similar to the what is done in aws-s3 or lumberjack.
  2. How will we debug requests and responses? Perhaps we could reuse the LoggingRoundTripper from httpjson to add a trace logging feature.

andrewkroh avatar Oct 25 '22 21:10 andrewkroh

  1. I will do that.
  2. That was something that I was considering. In the first instance though I have been using the mito CLI program which will do nearly all of what is happening here but standalone. Both together are probably a good plan.

efd6 avatar Oct 25 '22 22:10 efd6

The inclusion of logging round tripper will require moving the httplog package out of the httpjson/internal directory, ~so that should probably happen in a later PR~. I have sent another PR to move that so that it can be used here.

efd6 avatar Oct 26 '22 00:10 efd6

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Oct 26 '22 07:10 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Oct 27 '22 02:10 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Oct 28 '22 02:10 mergify[bot]

/package

efd6 avatar Oct 28 '22 06:10 efd6

/test

efd6 avatar Oct 28 '22 07:10 efd6

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Nov 02 '22 06:11 mergify[bot]

I was testing the input from b053b4c60f6 and I encountered some issues, but I was able to prove out the concept that I was testing 😄 . I was using this as my test: https://gist.github.com/andrewkroh/2a4216f36b4303635ca2d319e554c9dd?permalink_comment_id=4356113#gistcomment-4356113.

  1. panic in run relating to
diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go
index 9140ed4663..0b6d6e23cd 100644
--- a/x-pack/filebeat/input/cel/input.go
+++ b/x-pack/filebeat/input/cel/input.go
@@ -100,7 +100,7 @@ func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor
 }
 
 func (input) run(env v2.Context, src inputcursor.Source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
-       cfg := src.(*source).cfg
+       cfg := src.(source).cfg
        log := env.Logger.With("input_url", cfg.Resource.URL)
 
        metrics := newInputMetrics(monitoring.GetNamespace("dataset").GetRegistry(), env.ID)
  1. After fixing that, it was making repeated requests every few milliseconds. Looks like it's coming from go-retryable, but the response from the server is always HTTP 200.
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.389-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.400-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2022-11-02T11:55:30.407-0400","log.logger":"input.cel.retryablehttp","log.origin":{"file.name":"[email protected]/client.go","file.line":504},"message":"performing request","service.name":"filebeat","id":"BA564EA1571E1A1A","input_source":"http://10.100.8.36:21494/dataset","input_url":"http://10.100.8.36:21494/dataset","method":"GET","url":"http://10.100.8.36:21494/dataset","ecs.version":"1.6.0"}
  1. I could not stop the input with Ctrl+C. It might be retrying after the input's context.Context has been cancelled.

andrewkroh avatar Nov 02 '22 15:11 andrewkroh

Thanks. I'll take a look.

  • [x] fix expected src type — error in constructor

  • [x] fix repeated requests — this is working as intended, though this may need revisiting

    When a non-empty set of events is received from the CEL evaluation, the input assumes there will be more events still available from the API. To signal that no further events are wanted from a periodic run of the input the program should return a state with a field want_more set to false.

    So the following will repeatedly hit api.ipify.org more frequently than the config interval suggests.

      filebeat.inputs:
        - type: cel
          interval: 1m
          resource.url: https://api.ipify.org/?format=json
          program: |
            bytes(get(state.url).Body).as(body, {
                "events": [body.decode_json()],
            })
    
      output.console.pretty: true
    

    but with the addition of the "want_more": false, line, only a single request will be made per minute.

      filebeat.inputs:
        - type: cel
          interval: 1m
          resource.url: https://api.ipify.org/?format=json
          program: |
            bytes(get(state.url).Body).as(body, {
                "events": [body.decode_json()],
                "want_more": false,
            })
    
      output.console.pretty: true
    

    It is important to be able to re-query in a single period, so some behaviour like this is required, but maybe we can mitigate the footgun a little.

    Possible options to mitigate footgun:

    • invert the default so that an absent want_more means do not repeat and the program must explicitly request more with "want_more": true.
    • ~~split the repeat request behaviour into a tri-state:~~
      1. ~~absent — current behaviour up to a configurable limit of requests defaulting to some small number~~
      2. ~~true — always repeat~~
      3. ~~false — never repeat~~
  • [x] fix cancelation — cel.Program.ContextEval does not return context errors

    Prior to the fix, running an instance of filebeat with the following configuration would result in an unstoppable instance.

      filebeat.inputs:
        - type: cel
          interval: 1m
          resource.url: https://api.ipify.org/?format=json
          program: |
            bytes(get(state.url).Body).as(body, {
                "events": [body.decode_json()]
            })
    
      output.console.pretty: true
    

    This happens because the cel program evaluation method does not return the context cancellation error when a context is cancelled. We also don't check for cancellation except in the case that we have events or we have a limit policy in place, so add a check immediately after the return of the evaluation.

efd6 avatar Nov 02 '22 20:11 efd6

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Nov 08 '22 07:11 mergify[bot]

This pull request is now in conflicts. Could you fix it? 🙏 To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

mergify[bot] avatar Nov 08 '22 11:11 mergify[bot]