[azure-eventhub] Support for AMQP-over-WebSocket transport in the processor v2
Proposed commit message
Support for AMQP-over-WebSocket transport in the azure-eventhub processor v2.
Enterprise users often need to comply with network restrictions, which means using AMQP may not be an option.
In addition to AMQP-over-WebSocket support, this change allows users to run the azure-eventhub input behind an HTTPS proxy.
Checklist
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] I have made corresponding change to the default configuration files
- [ ] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the
stresstest.shscript to run them under stress conditions and race detector to verify their stability. - [x] I have added an entry in
./changelog/fragmentsusing the changelog tool.
Disruptive User Impact
No disruptive user impact. The input still defaults to AMQP transport, and unless users switch to WS transport, the behavior is unchanged.
Author's Checklist
- [x] Test with an actual proxy
- [ ] Update the documentation for AMQP-over-WebSocket
- [ ] Update the documentation HTTPS proxy
How to test this PR locally
I used the following Filebeat config:
# x-pack/filebeat/filebeat.yml
filebeat.inputs:
- type: azure-eventhub
enabled: true
# Event Hub configuration
eventhub: "${EVENTHUB_NAME}"
connection_string: "${EVENTHUB_CONNECTION_STRING}"
consumer_group: "${EVENTHUB_CONSUMER_GROUP}"
# Azure Storage configuration for checkpointing
storage_account: "${STORAGE_ACCOUNT}"
storage_account_connection_string: "${STORAGE_ACCOUNT_CONNECTION_STRING}"
# Optional settings
processor_version: ${PROCESSOR_VERSION}
migrate_checkpoint: ${MIGRATE_CHECKPOINT}
transport: "${TRANSPORT}"
Here are a couple of vscode configuration to test the WebSocket-only and WebSocket with HTTPS proxy scenarios (make sure to replace the placeholders YOUR * GOES HERE with actual values).
WebSocket only
{
"version": "0.2.0",
"configurations": [
{
"name": "Launch Filebeat",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/x-pack/filebeat/main.go",
"args": [
"-e",
"-v",
"-d",
"*",
"--strict.perms=false",
"--path.home",
"${workspaceFolder}/x-pack/filebeat",
"-E",
"cloud.id=<YOUR CLOUD ID GOES HERE>",
"-E",
"cloud.auth=<YOUR USERNAME GOES HERE>:<YOUR PASSWORD GOES HERE>",
"-E",
"gc_percent=100",
"-E",
"setup.ilm.enabled=false",
"-E",
"setup.template.enabled=false",
"-E",
"output.elasticsearch.allow_older_versions=true"
],
"env": {
"EVENTHUB_NAME": "logs",
"EVENTHUB_CONNECTION_STRING": "<YOUR EVENT HUB CONNECTION STRING GOES HERE>",
"EVENTHUB_CONSUMER_GROUP": "$Default",
"STORAGE_ACCOUNT": "<YOUR STORAGE ACCOUNT NAME GOES HERE>",
"STORAGE_ACCOUNT_KEY": "<YOUR STORAGE ACCOUNT KEY GOES HERE>",
"STORAGE_ACCOUNT_CONNECTION_STRING": "<YOUR STORAGE ACCOUNT CONNECTION STRING GOES HERE>",
"PROCESSOR_VERSION": "v2",
"MIGRATE_CHECKPOINT": "true",
"TRANSPORT": "websocket"
}
}
]
}
WebSocket with HTTPS proxy
{
"version": "0.2.0",
"configurations": [
{
"name": "Launch Filebeat",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/x-pack/filebeat/main.go",
"args": [
"-e",
"-v",
"-d",
"*",
"--strict.perms=false",
"--path.home",
"${workspaceFolder}/x-pack/filebeat",
"-E",
"cloud.id=<YOUR CLOUD ID GOES HERE>",
"-E",
"cloud.auth=<YOUR USERNAME GOES HERE>:<YOUR PASSWORD GOES HERE>",
"-E",
"gc_percent=100",
"-E",
"setup.ilm.enabled=false",
"-E",
"setup.template.enabled=false",
"-E",
"output.elasticsearch.allow_older_versions=true"
],
"env": {
"EVENTHUB_NAME": "logs",
"EVENTHUB_CONNECTION_STRING": "<YOUR EVENT HUB CONNECTION STRING GOES HERE>",
"EVENTHUB_CONSUMER_GROUP": "$Default",
"STORAGE_ACCOUNT": "<YOUR STORAGE ACCOUNT NAME GOES HERE>",
"STORAGE_ACCOUNT_KEY": "<YOUR STORAGE ACCOUNT KEY GOES HERE>",
"STORAGE_ACCOUNT_CONNECTION_STRING": "<YOUR STORAGE ACCOUNT CONNECTION STRING GOES HERE>",
"PROCESSOR_VERSION": "v2",
"MIGRATE_CHECKPOINT": "true",
"TRANSPORT": "websocket",
"HTTP_PROXY": "http://127.0.0.1:9090",
"HTTPS_PROXY": "http://127.0.0.1:9090"
}
}
]
}
To test the proxy support, I used mitmproxy, an interactive HTTPS proxy with request inspection:
# Install
brew install mitmproxy # macOS
# or
pip install mitmproxy
# Run the proxy
mitmproxy -p 9090
Since the Azure services require using an HTTPS proxy, you also need to set up a certificate for mitmproxy. See https://docs.mitmproxy.org/stable/concepts/certificates/ fore more details.
The mitmproxy main view:
Upgrade to AMQP-over-WebSocket:
WebSocket details:
Event hub messages received using AMQP-over-websocket through the proxy:
Related issues
- Closes #47823
- Relates #26328
Use cases
:robot: GitHub comments
Just comment with:
rundocs-build: Re-trigger the docs validation. (use unformatted text in the comment!)
This pull request does not have a backport label. If this is a bug or security fix, could you label this PR @zmoog? 🙏. For such, you'll need to label your PR with:
- The upcoming major version of the Elastic Stack
- The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)
To fixup this pull request, you need to add the backport labels for the needed branches, such as:
backport-8./dis the label to automatically backport to the8./dbranch./dis the digitbackport-active-allis the label that automatically backports to all active branches.backport-active-8is the label that automatically backports to all active minor branches for the 8 major.backport-active-9is the label that automatically backports to all active minor branches for the 9 major.
Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)
@mergifyio backport 8.19 9.1 9.2 9.3
backport 8.19 9.1 9.2 9.3
❌ No backport have been created
- #48119 [8.19](backport #47956) [azure-eventhub] Support for AMQP-over-WebSocket transport in the processor v2 has been created for branch
8.19but encountered conflicts - #48120 [9.1](backport #47956) [azure-eventhub] Support for AMQP-over-WebSocket transport in the processor v2 has been created for branch
9.1but encountered conflicts - #48121 [9.2](backport #47956) [azure-eventhub] Support for AMQP-over-WebSocket transport in the processor v2 has been created for branch
9.2but encountered conflicts - Backport to branch
9.3failed
GitHub error: Branch not found