connect
connect copied to clipboard
Added support for oauth2 tokens in opensearch output sink
Not sure if we can support oauth2 directly with client+id credentials or we need to rely on cache resource to ensure performance. I did the later, as it is how it is done in kafka.
I will make a new PR that supports clientcredentials directly, once my other PRs are merged
- [x] Test output
I have tested the setup successfully:
logger:
level: DEBUG
format: logfmt
add_timestamp: true
static_fields:
"@service": benthos
input:
label: source
generate:
interval: 1s
mapping: |
root.ID = uuid_v4()
root.Name = [ "frosty", "spot", "oodles" ].index(random_int() % 3)
root.Gooeyness = (random_int() % 100) / 100
root.Bouncing = random_int() % 2 == 0
batch_size: 10
pipeline:
processors:
- bloblang: |
#!blobl
root = this
# Processor to fetch and cache an OAuth2 token
- branch:
request_map: |
root = "client_id=${CLIENT_ID}&client_secret=${CLIENT_SECRET}&grant_type=client_credentials&scope=opensearch"
processors:
- http:
url: "http://localhost:1852/realms/local-development/protocol/openid-connect/token"
verb: POST
dump_request_log_level: TRACE
headers:
Content-Type: application/x-www-form-urlencoded
- bloblang: |- # extract access_token
root = this
root = json().access_token
- log:
message: "Storing new Keycloak token in cache."
level: DEBUG
- cache:
resource: oauth_token_cache
operator: set
key: "oauth_token"
value: "${! content() }"
cache_resources:
- label: oauth_token_cache
memory:
default_ttl: 5m # todo
init_values: {}
output:
label: "os"
opensearch:
urls: [http://localhost:9200]
index: "benthos"
action: "index"
id: ${!counter()}-${!timestamp_unix()}
pipeline: ""
tls:
enabled: false
max_in_flight: 64
batching:
count: 1
byte_size: 0
period: 1s
oauth2:
enabled: true
token_cache: "oauth_token_cache"
token_key: "oauth_token"