beats icon indicating copy to clipboard operation
beats copied to clipboard

[azure-eventhub] Support for AMQP-over-WebSocket transport in the processor v2

Open zmoog opened this issue 3 weeks ago • 3 comments

Proposed commit message

Support for AMQP-over-WebSocket transport in the azure-eventhub processor v2.

Enterprise users often need to comply with network restrictions, which means using AMQP may not be an option.

In addition to AMQP-over-WebSocket support, this change allows users to run the azure-eventhub input behind an HTTPS proxy.

Checklist

  • [x] My code follows the style guidelines of this project
  • [x] I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [x] I have made corresponding change to the default configuration files
  • [ ] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • [x] I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

No disruptive user impact. The input still defaults to AMQP transport, and unless users switch to WS transport, the behavior is unchanged.

Author's Checklist

  • [x] Test with an actual proxy
  • [ ] Update the documentation for AMQP-over-WebSocket
  • [ ] Update the documentation HTTPS proxy

How to test this PR locally

I used the following Filebeat config:

# x-pack/filebeat/filebeat.yml
filebeat.inputs:
- type: azure-eventhub
  enabled: true

  # Event Hub configuration
  eventhub: "${EVENTHUB_NAME}"
  connection_string: "${EVENTHUB_CONNECTION_STRING}"
  consumer_group: "${EVENTHUB_CONSUMER_GROUP}"

  # Azure Storage configuration for checkpointing
  storage_account: "${STORAGE_ACCOUNT}"
  storage_account_connection_string: "${STORAGE_ACCOUNT_CONNECTION_STRING}"

  # Optional settings
  processor_version: ${PROCESSOR_VERSION}
  migrate_checkpoint: ${MIGRATE_CHECKPOINT}
  transport: "${TRANSPORT}"

Here are a couple of vscode configuration to test the WebSocket-only and WebSocket with HTTPS proxy scenarios (make sure to replace the placeholders YOUR * GOES HERE with actual values).

WebSocket only
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch Filebeat",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}/x-pack/filebeat/main.go",
            "args": [
                "-e",
                "-v",
                "-d",
                "*",
                "--strict.perms=false",
                "--path.home",
                "${workspaceFolder}/x-pack/filebeat",
                "-E",
                "cloud.id=<YOUR CLOUD ID GOES HERE>",
                "-E",
                "cloud.auth=<YOUR USERNAME GOES HERE>:<YOUR PASSWORD GOES HERE>",
                "-E",
                "gc_percent=100",
                "-E",
                "setup.ilm.enabled=false",
                "-E",
                "setup.template.enabled=false",
                "-E",
                "output.elasticsearch.allow_older_versions=true"
            ],
            "env": {
                "EVENTHUB_NAME": "logs",
                "EVENTHUB_CONNECTION_STRING": "<YOUR EVENT HUB CONNECTION STRING GOES HERE>",
                "EVENTHUB_CONSUMER_GROUP": "$Default",
                "STORAGE_ACCOUNT": "<YOUR STORAGE ACCOUNT NAME GOES HERE>",
                "STORAGE_ACCOUNT_KEY": "<YOUR STORAGE ACCOUNT KEY GOES HERE>",
                "STORAGE_ACCOUNT_CONNECTION_STRING": "<YOUR STORAGE ACCOUNT CONNECTION STRING GOES HERE>",
                "PROCESSOR_VERSION": "v2",
                "MIGRATE_CHECKPOINT": "true",
                "TRANSPORT": "websocket"
            }
        }
    ]
}
WebSocket with HTTPS proxy
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch Filebeat",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}/x-pack/filebeat/main.go",
            "args": [
                "-e",
                "-v",
                "-d",
                "*",
                "--strict.perms=false",
                "--path.home",
                "${workspaceFolder}/x-pack/filebeat",
                "-E",
                "cloud.id=<YOUR CLOUD ID GOES HERE>",
                "-E",
                "cloud.auth=<YOUR USERNAME GOES HERE>:<YOUR PASSWORD GOES HERE>",
                "-E",
                "gc_percent=100",
                "-E",
                "setup.ilm.enabled=false",
                "-E",
                "setup.template.enabled=false",
                "-E",
                "output.elasticsearch.allow_older_versions=true"
            ],
            "env": {
                "EVENTHUB_NAME": "logs",
                "EVENTHUB_CONNECTION_STRING": "<YOUR EVENT HUB CONNECTION STRING GOES HERE>",
                "EVENTHUB_CONSUMER_GROUP": "$Default",
                "STORAGE_ACCOUNT": "<YOUR STORAGE ACCOUNT NAME GOES HERE>",
                "STORAGE_ACCOUNT_KEY": "<YOUR STORAGE ACCOUNT KEY GOES HERE>",
                "STORAGE_ACCOUNT_CONNECTION_STRING": "<YOUR STORAGE ACCOUNT CONNECTION STRING GOES HERE>",
                "PROCESSOR_VERSION": "v2",
                "MIGRATE_CHECKPOINT": "true",
                "TRANSPORT": "websocket",
                "HTTP_PROXY": "http://127.0.0.1:9090",
                "HTTPS_PROXY": "http://127.0.0.1:9090"
            }
        }
    ]
}

To test the proxy support, I used mitmproxy, an interactive HTTPS proxy with request inspection:

# Install
brew install mitmproxy  # macOS
# or
pip install mitmproxy

# Run the proxy
mitmproxy -p 9090

Since the Azure services require using an HTTPS proxy, you also need to set up a certificate for mitmproxy. See https://docs.mitmproxy.org/stable/concepts/certificates/ fore more details.

The mitmproxy main view:

CleanShot 2025-12-08 at 09 53 14@2x

Upgrade to AMQP-over-WebSocket:

CleanShot 2025-12-08 at 10 16 59@2x

WebSocket details:

CleanShot 2025-12-08 at 09 54 53@2x

Event hub messages received using AMQP-over-websocket through the proxy:

CleanShot 2025-12-08 at 09 52 35@2x

Related issues

  • Closes #47823
  • Relates #26328

Use cases

zmoog avatar Dec 08 '25 07:12 zmoog

:robot: GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

github-actions[bot] avatar Dec 08 '25 07:12 github-actions[bot]

This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @zmoog? 🙏. For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

mergify[bot] avatar Dec 08 '25 07:12 mergify[bot]

Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)

elasticmachine avatar Dec 09 '25 11:12 elasticmachine

@mergifyio backport 8.19 9.1 9.2 9.3

github-actions[bot] avatar Dec 17 '25 00:12 github-actions[bot]

backport 8.19 9.1 9.2 9.3

❌ No backport have been created

GitHub error: Branch not found

mergify[bot] avatar Dec 17 '25 00:12 mergify[bot]