fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

aws_msk_iam: add AWS MSK IAM authentication support

Open kalavt opened this issue 2 weeks ago • 7 comments

Summary

Add comprehensive AWS MSK IAM authentication support with simplified configuration and fix OAuth token expiration on idle connections. This PR automatically extracts region and cluster type information from broker addresses, provides explicit opt-in for MSK IAM, enhances OAUTHBEARER token refresh for all OAuth methods, and enables automatic background token refresh to prevent authentication failures on idle connections.

Changes

Key Features

  1. Explicit MSK IAM Opt-in

    • MSK IAM is only activated when explicitly requested via rdkafka.sasl.mechanism=aws_msk_iam
    • Uses explicit aws_msk_iam flag to track user intent
    • Ensures compatibility with other OAUTHBEARER methods (OIDC, custom OAuth, etc.)
  2. Simplified Configuration

    • No need for cluster_arn parameter
    • Enable AWS MSK IAM authentication by simply setting rdkafka.sasl.mechanism=aws_msk_iam
    • Automatically converts to OAUTHBEARER internally and registers OAuth callback
  3. Automatic Region Extraction

    • Intelligently extract AWS region information from broker addresses
    • Supports both MSK Standard and Serverless formats
  4. Automatic Cluster Type Detection

    • Automatically identify MSK Standard and MSK Serverless cluster types
    • Selects correct service endpoint based on cluster type
  5. Universal OAUTHBEARER Enhancements

    • Enhanced background token refresh for ALL OAUTHBEARER methods
    • Enabled SASL queue and background callbacks for all OAUTHBEARER configurations
    • Benefits AWS MSK IAM, librdkafka OIDC, custom OAuth implementations, etc.
    • Prevents token expiration on idle connections for both producers and consumers
    • Fixes authentication failures that occurred on idle connections after token expiration
  6. OAuth Token Lifetime Management

    • Maintains 5-minute OAuth token lifetime (AWS industry standard, matches AWS Go SDK)
    • Automatic refresh at 80% of token lifetime (4 minutes)
    • librdkafka's background thread handles refresh independently
    • Works perfectly for completely idle connections without requiring rd_kafka_poll()
    • Fixes authentication failures that occurred on idle connections after 5+ minutes
  7. TLS Support for AWS Credentials

    • Added TLS support for secure AWS credential fetching
    • Supports EC2 metadata, ECS, STS, and credential file sources
    • Ensures secure communication with AWS services
    • Properly manages TLS lifecycle (creation and cleanup)

Technical Details

  1. Explicit MSK IAM Activation:

    // Only activates when user explicitly sets aws_msk_iam
    if (ctx->aws_msk_iam && ctx->sasl_mechanism && 
        strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
        // Register MSK IAM OAuth callback
    }
    
    • Prevents automatic activation for generic OAUTHBEARER users
    • Allows users to use OIDC or custom OAuth on AWS brokers without interference
  2. Configuration Simplification:

    • Users only need to set rdkafka.sasl.mechanism=aws_msk_iam
    • System automatically converts it to OAUTHBEARER and registers OAuth callback
    • Automatically sets rdkafka.security.protocol=SASL_SSL (if not configured)
  3. Region Extraction Logic:

    • Parse region from broker address (e.g., b-1.example.kafka.us-east-1.amazonaws.com)
    • Support MSK Standard format: *.kafka.<region>.amazonaws.com
    • Support MSK Serverless format: *.kafka-serverless.<region>.amazonaws.com
  4. Cluster Type Detection:

    • Check if broker address contains .kafka-serverless. to determine cluster type
    • Automatically select correct service endpoint (kafka or kafka-serverless)
  5. Universal OAUTHBEARER Background Processing:

    // Applied to ALL OAUTHBEARER configurations
    if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
        rd_kafka_conf_enable_sasl_queue(conf, 1);
        rd_kafka_sasl_background_callbacks_enable(rk);
    }
    
    • Enables automatic token refresh for all OAUTHBEARER methods
    • Handles idle connections, large poll intervals, paused collectors
    • Benefits both consumers (in_kafka) and producers (out_kafka)

Modified Files

AWS MSK IAM Core (2 files)

  • include/fluent-bit/aws/flb_aws_msk_iam.h - Updated function signature (removed cluster_arn parameter)
  • src/aws/flb_aws_msk_iam.c - Refactored region extraction and cluster type detection logic

Kafka Input Plugin (2 files)

  • plugins/in_kafka/in_kafka.h - Added aws_msk_iam flag, removed deprecated fields
  • plugins/in_kafka/in_kafka.c - Added explicit MSK IAM activation, universal OAUTHBEARER support

Kafka Output Plugin (3 files)

  • plugins/out_kafka/kafka_config.h - Added aws_msk_iam flag, removed deprecated fields
  • plugins/out_kafka/kafka_config.c - Added explicit MSK IAM activation, universal OAUTHBEARER support
  • plugins/out_kafka/kafka.c - Removed deprecated configuration mapping

AWS Credentials & TLS Support (4 files)

  • src/aws/flb_aws_credentials_ec2.c - Enhanced TLS support for EC2 metadata credential fetching
  • src/aws/flb_aws_credentials_profile.c - Enhanced TLS support for profile credential fetching
  • src/aws/flb_aws_credentials_sts.c - Enhanced TLS support for STS credential fetching
  • src/flb_kafka.c - Core Kafka integration improvements

Total: 11 files modified

Configuration

Simple AWS MSK IAM Setup:

[INPUT]
    Name kafka
    Brokers b-1.example.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism aws_msk_iam

No cluster_arn or additional AWS-specific parameters needed!

Supported Configurations

This PR ensures compatibility with multiple OAuth scenarios:

1. AWS MSK IAM (Fluent Bit convenience syntax)

[INPUT]
    Name kafka
    Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism aws_msk_iam

2. librdkafka OIDC (unaffected by MSK IAM)

[INPUT]
    Name kafka
    Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism OAUTHBEARER
    rdkafka.sasl.oauthbearer.method oidc
    rdkafka.sasl.oauthbearer.client.id my_client_id
    rdkafka.sasl.oauthbearer.client.secret my_secret
    rdkafka.sasl.oauthbearer.token.endpoint.url https://auth.example.com/token

3. librdkafka AWS method (unaffected by MSK IAM)

[INPUT]
    Name kafka
    Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098
    rdkafka.sasl.mechanism OAUTHBEARER
    rdkafka.sasl.oauthbearer.method aws

All configurations benefit from automatic background token refresh!

Design for Extensibility

This PR establishes a clean, extensible pattern for adding cloud provider IAM authentication:

1. Layered Configuration Approach

Layer 1: Fluent Bit Convenience Syntax (High-level abstraction)
├─ rdkafka.sasl.mechanism=aws_msk_iam       → Auto-configured MSK IAM
├─ rdkafka.sasl.mechanism=gcp_iam           → Future: GCP Kafka IAM
└─ rdkafka.sasl.mechanism=azure_eventhubs   → Future: Azure Event Hubs

Layer 2: librdkafka Native (Direct pass-through)
├─ rdkafka.sasl.mechanism=OAUTHBEARER
├─ rdkafka.sasl.oauthbearer.method=oidc
└─ rdkafka.sasl.oauthbearer.method=aws

Layer 3: Custom Extensions (User plugins)
└─ Custom Fluent Bit extensions

2. Explicit Opt-in Pattern

// Extensible pattern for cloud provider authentication
if (strcasecmp(mechanism, "aws_msk_iam") == 0) {
    ctx->cloud_provider = CLOUD_PROVIDER_AWS;
}
// Future additions follow the same pattern:
// else if (strcasecmp(mechanism, "gcp_iam") == 0) {
//     ctx->cloud_provider = CLOUD_PROVIDER_GCP;
// }

3. Benefits of This Design

  • No interference: Each authentication method is explicitly opted-in
  • Clear separation: Cloud-specific logic isolated from generic OAUTHBEARER handling
  • Easy extension: New providers can be added following the same pattern
  • Backward compatible: Existing OAUTHBEARER configurations unaffected
  • Testable: Each auth method can be tested independently

4. Future Extensions This architecture makes it straightforward to add:

  • Google Cloud Platform Kafka IAM
  • Azure Event Hubs authentication
  • Other cloud provider-specific OAuth implementations

Each can be added with the same explicit opt-in pattern without affecting existing functionality.

OAuth Token Expiration Fix

Problem Statement:

After prolonged idle periods (5+ minutes), Kafka outputs experienced authentication failures:

[error] SASL authentication error: Access denied (after 302ms in state AUTH_REQ)
[error] 3/3 brokers are down

Root Cause:

librdkafka's OAuth token refresh mechanism relies on rd_kafka_poll() being called regularly. For idle connections, rd_kafka_poll() is only called when producing messages. This is documented in librdkafka issue #3871:

"You need to explicitly call poll() once after creating the client to trigger the oauth callback"

Timeline without background callbacks:

T=0:     Connection established, OAuth token set (5-min lifetime)
T=1-5min: No messages to produce → rd_kafka_poll() never called
T=5min:  Token expires ❌
T=10min: New data arrives, rd_kafka_poll() called
         ├─ librdkafka tries to use expired token
         └─> Access Denied ❌

Solution: Background Callbacks

librdkafka v1.9.0+ provides rd_kafka_sasl_background_callbacks_enable() specifically for this use case:

"Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. This serves as an alternative for applications that do NOT call rd_kafka_poll() at regular intervals"

// Enable automatic token refresh in background thread
rd_kafka_sasl_background_callbacks_enable(rk);

Timeline with background callbacks:

T=0:00  Token generated (expires T=5:00)
        ├─ librdkafka starts background thread
        └─ Token refresh timer active in background

T=4:00  Background thread detects token at 80% lifetime
        ├─ Automatically triggers oauthbearer_token_refresh_cb()
        ├─ New token generated (fresh 5-min lifetime)
        └─> Token refreshed ✅

T=8:00  Background thread refreshes again
T=12:00 Background thread refreshes again
...

Result: Token NEVER expires, even with ZERO traffic ✅

Benefits:

  • ✅ Token refresh occurs automatically every ~4 minutes
  • ✅ Works on completely idle connections (no traffic for hours)
  • ✅ No application involvement needed (rd_kafka_poll() not required)
  • ✅ Built-in librdkafka feature (v1.9.0+, Fluent Bit uses 2.10.1)
  • ✅ Zero authentication failures on idle connections

TLS Support

This PR includes proper TLS support for AWS credential fetching:

ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
                                FLB_TRUE,
                                FLB_LOG_DEBUG,
                                NULL, NULL, NULL, NULL, NULL, NULL);

Features:

  • ✅ Secure communication with AWS credential services
  • ✅ Supports EC2 metadata, ECS, STS endpoints
  • ✅ Proper TLS lifecycle management (creation and cleanup)
  • ✅ Used by AWS credentials provider chain

Usage:

ctx->provider = flb_standard_chain_provider_create(config,
                                                   ctx->cred_tls,  // ← TLS instance
                                                   ctx->region,
                                                   ...);

Testing

  • [ ] Example configuration file for the change
  • [ ] Debug log output from testing the change
  • [ ] Attached Valgrind output that shows no leaks or memory corruption was found

Packaging

  • [ ] Run local packaging test showing all targets (including any new ones) build
  • [ ] Set ok-package-test label to test for all targets (requires maintainer to do)

Documentation

  • [ ] Documentation required for this feature

Backporting

  • [ ] Backport to latest stable release

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Auto-detect AWS region from MSK broker addresses with optional aws_region override.
    • MSK IAM uses OAUTHBEARER with background token refresh and TLS-backed credential provisioning.
  • Bug Fixes

    • Token lifetime standardized to 5 minutes; credential access made thread-safe.
    • Improved error handling and resource cleanup during Kafka/MSK IAM init/shutdown.
  • Configuration Changes

    • MSK IAM driven by rdkafka.sasl.mechanism; removed legacy aws_msk_iam and aws_msk_iam_cluster_arn options.
  • Documentation

    • Added Kafka MSK IAM examples and README.

✏️ Tip: You can customize this high-level summary in your review settings.

kalavt avatar Dec 09 '25 02:12 kalavt

[!NOTE]

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

API and config surface changed for AWS MSK IAM: header signature updated (removed cluster ARN, added brokers+region), MSK IAM now uses a persistent TLS-backed provider and mutex-protected token lifecycle with region auto-detection from broker hostnames, and Kafka plugins adopt sasl.mechanism-driven activation and improved rd_kafka conf ownership/cleanup.

Changes

Cohort / File(s) Summary
MSK IAM Header
include/fluent-bit/aws/flb_aws_msk_iam.h
Removed struct flb_msk_iam_cb; updated flb_aws_msk_iam_register_oauth_cb signature to (config, kconf, opaque, brokers, region) and refreshed docblock (removed cluster_arn).
MSK IAM Core
src/aws/flb_aws_msk_iam.c
Major refactor: added cred_tls, persistent provider, pthread_mutex_t lock; fixed token lifetime (300s); extract_region_from_broker added; payload builder now accepts explicit credentials; registration accepts brokers+region; lifecycle, concurrency, and destroy updated.
Input Kafka Plugin
plugins/in_kafka/in_kafka.c, plugins/in_kafka.h
Read rdkafka.sasl.mechanism; treat aws_msk_iam by switching to OAUTHBEARER, set internal aws_msk_iam flag, default security.protocol if missing, enable SASL queue, register OAuth callback only when brokers present, and improve rd_kafka_conf ownership and cleanup; removed aws_msk_iam_cluster_arn config entry.
Output Kafka Plugin
plugins/out_kafka/kafka.c, plugins/out_kafka/kafka_config.c, plugins/out_kafka/kafka_config.h
Added aws_region config option; removed aws_msk_iam_cluster_arn and previous aws_msk_iam config entries; read SASL mechanism and switch to OAUTHBEARER for aws_msk_iam, enable SASL queue, conditional OAuth callback registration, and adjust rd_kafka_conf ownership/cleanup semantics.
Kafka Conf Utilities
src/flb_kafka.c
Error-path cleanup now calls rd_kafka_conf_destroy instead of flb_free for kafka conf on failure.
AWS Credentials Minor Edits
src/aws/flb_aws_credentials_ec2.c, src/aws/flb_aws_credentials_profile.c, src/aws/flb_aws_credentials_sts.c
Small formatting/logging tweaks and log-level change for ENOENT; no control-flow changes.
Examples / Docs
examples/kafka_filter/README.md, examples/kafka_filter/kafka_msk_iam.conf
Added README and example configs demonstrating MSK IAM usage, region auto-detection, PrivateLink notes, sample configs, testing and troubleshooting guidance.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant FB as Fluent Bit
    participant LR as librdkafka
    participant MSK as flb_aws_msk_iam
    participant AWS as AWS Credentials Provider

    Note over FB: Initialization
    FB->>LR: create rd_kafka_conf()
    FB->>FB: read `rdkafka.sasl.mechanism` == aws_msk_iam
    FB->>MSK: flb_aws_msk_iam_register_oauth_cb(config,kconf,opaque,brokers,region)
    MSK->>MSK: extract_region_from_broker(brokers) / init TLS / create provider / init mutex
    MSK-->>FB: return msk_iam_handle

    Note over LR,MSK: Background token refresh
    LR->>MSK: oauthbearer_token_refresh_cb(request)
    MSK->>MSK: lock mutex
    MSK->>AWS: provider->refresh_credentials()
    AWS-->>MSK: credentials
    MSK->>MSK: build_payload(host, credentials)
    MSK-->>LR: rd_kafka_oauthbearer_set_token(token, lifetime=300s)
    MSK->>MSK: unlock mutex

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Focus areas:
    • src/aws/flb_aws_msk_iam.c: mutex correctness, provider/TLS init/teardown, credential lifecycle, payload signing, region detection.
    • plugins/in_kafka/in_kafka.c and plugins/out_kafka/kafka_config.c: ordering around reading sasl.mechanism, switching to OAUTHBEARER, enabling SASL background queue, and rd_kafka_conf ownership/cleanup transitions.
    • Error/cleanup paths: avoid double-free/resource leaks for rd_kafka_conf, rd_kafka, msk_iam, provider, TLS, and mutex.

Possibly related issues

  • fluent/fluent-bit#11255 — Related: changes add persistent AWS provider, TLS, mutex, and credential flow that touch scenarios described in the IRSA/STS failures referenced.

Suggested reviewers

  • edsiper
  • cosmo0920

Poem

🐇 I sniffed the brokers, found the region there,
The long ARN is gone — lighter is my care.
With mutex snug and tokens timed to five,
Background refresh keeps auth alive.
Hop-hop hooray — broker-led MSK thrives!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding AWS MSK IAM authentication support. It is concise, specific, and directly reflects the primary objective of the pull request across multiple files.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • [ ] Create PR with unit tests
  • [ ] Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot] avatar Dec 09 '25 02:12 coderabbitai[bot]

Still failing our linter: ❌ Commit b486cb796a failed: Missing prefix in commit subject: 'aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support'

cosmo0920 avatar Dec 09 '25 02:12 cosmo0920

Still failing our linter: ❌ Commit b486cb7 failed: Missing prefix in commit subject: 'aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support'

@cosmo0920 should be clean now?

kalavt avatar Dec 09 '25 04:12 kalavt

Still failing our linter: ❌ Commit b486cb7 failed: Missing prefix in commit subject: 'aws_msk_iam,in_kafka,out_kafka: add AWS MSK IAM authentication support'

@cosmo0920 should be clean now?

Yes, it's clean now: https://github.com/fluent/fluent-bit/actions/runs/20051852936/job/57522155166?pr=11270

cosmo0920 avatar Dec 09 '25 08:12 cosmo0920

@cosmo0920 what would be next? can you please guide me here. first time contribute to this project.

kalavt avatar Dec 09 '25 11:12 kalavt

@codex review

edsiper avatar Dec 09 '25 22:12 edsiper

Attach config & logs for both provisioned and Serverless MSK cluster read / write with IAM auth

dummy-output.log fluent-bit-dummy.yaml

@cosmo0920 @edsiper let me know for next step.

kalavt avatar Dec 10 '25 09:12 kalavt

I think this needs a docs PR as well right? Please link one if so.

patrick-stephens avatar Dec 12 '25 10:12 patrick-stephens

I think this needs a docs PR as well right? Please link one if so.

you mean (Examples / Docs) or docstring coverage?

examples/kafka_filter/README.md,  examples/kafka_filter/kafka_msk_iam.conf

Added README and example configs demonstrating MSK IAM usage, region auto-detection, PrivateLink notes, sample configs, testing and troubleshooting guidance.

kalavt avatar Dec 12 '25 11:12 kalavt

I mean a PR for the documentation site itself: https://docs.fluentbit.io/manual/ This is a separate PR in the https://github.com/fluent/fluent-bit-docs repo.

I think we should put the example usage there for folks to see how to use it with Kafka, etc. I will 98% guarantee no one will look at any examples here, they'll refer to the docs themselves :)

patrick-stephens avatar Dec 12 '25 11:12 patrick-stephens

well, you can confident to have it 100% guarantee... here you go: https://github.com/fluent/fluent-bit-docs/pull/2294

kalavt avatar Dec 12 '25 13:12 kalavt

@codex review

edsiper avatar Dec 12 '25 16:12 edsiper

Thanks @alexakreizinger review and update docs, it's now ready for merge: https://github.com/fluent/fluent-bit-docs/pull/2294

CC: @patrick-stephens

kalavt avatar Dec 16 '25 00:12 kalavt