connect icon indicating copy to clipboard operation
connect copied to clipboard

Added support for oauth2 tokens in opensearch output sink

Open cthtrifork opened this issue 11 months ago • 1 comments

Not sure if we can support oauth2 directly with client+id credentials or we need to rely on cache resource to ensure performance. I did the later, as it is how it is done in kafka.

I will make a new PR that supports clientcredentials directly, once my other PRs are merged

  • [x] Test output

cthtrifork avatar Feb 29 '24 13:02 cthtrifork

I have tested the setup successfully:

logger:
  level: DEBUG
  format: logfmt
  add_timestamp: true
  static_fields:
    "@service": benthos

input:
  label: source
  generate:
    interval: 1s
    mapping: |
      root.ID = uuid_v4()
      root.Name = [ "frosty", "spot", "oodles" ].index(random_int() % 3)
      root.Gooeyness = (random_int() % 100) / 100
      root.Bouncing = random_int() % 2 == 0
    batch_size: 10

pipeline:
  processors:
    - bloblang: |
        #!blobl
        root = this

    # Processor to fetch and cache an OAuth2 token
    - branch:
        request_map: |
          root = "client_id=${CLIENT_ID}&client_secret=${CLIENT_SECRET}&grant_type=client_credentials&scope=opensearch"
        processors:
          - http:
              url: "http://localhost:1852/realms/local-development/protocol/openid-connect/token"
              verb: POST
              dump_request_log_level: TRACE
              headers:
                Content-Type: application/x-www-form-urlencoded
          - bloblang: |- # extract access_token
              root = this
              root = json().access_token
          - log:
              message: "Storing new Keycloak token in cache."
              level: DEBUG
          - cache:
              resource: oauth_token_cache
              operator: set
              key: "oauth_token"
              value: "${! content() }"

cache_resources:
  - label: oauth_token_cache
    memory:
      default_ttl: 5m # todo
      init_values: {}

output:
  label: "os"
  opensearch:
    urls: [http://localhost:9200] 
    index: "benthos"
    action: "index" 
    id: ${!counter()}-${!timestamp_unix()} 
    pipeline: ""
    tls:
      enabled: false
    max_in_flight: 64
    batching:
      count: 1
      byte_size: 0
      period: 1s
    oauth2:
      enabled: true
      token_cache: "oauth_token_cache"
      token_key: "oauth_token"

cthtrifork avatar Mar 04 '24 19:03 cthtrifork