connect
connect copied to clipboard
Kafka-> S3- invalid header field value for \"X-Amz-Meta-Kafka_key\"
Hello! I'm afraid the thing I'm reporting is not a bug per se, but I find it exceptionally difficult to figure out the reason for the error. I'm using pretty standard configuration to consume messages from Kafka (attached below), but I'm getting rather cryptic error
Failed to send message to aws_s3: RequestError: send request failed
caused by: Put \"<HTTP S3 URL>\": net/http: invalid header field value for \"X-Amz-Meta-Kafka_key\"
I'm using jeffail/benthos:4.9.1
.
Can it be caused by unprocessable keys for key/value pairs in Kafka?
input:
kafka:
addresses:
- ${KAFKA_ADDRESSES}
client_id: ${CLIENT_ID}
commit_period: 3s
consumer_group: ${CONSUMER_GROUP}
group:
heartbeat_interval: 3s
rebalance_timeout: 60s
session_timeout: 10s
checkpoint_limit: 1
batching:
count: 1024
period: 1s
max_processing_period: 2s
start_from_oldest: true
target_version: 3.2.0
topics:
- ${INPUT_TOPIC}
pipeline:
processors:
- group_by_value:
value: ${! meta("kafka_partition") }
- archive:
format: binary
output:
aws_s3:
bucket: ${S3_BUCKET}
content_encoding: ""
content_type: application/octet-stream
credentials:
id: ${AWS_ACCESS_KEY_ID}
profile: ""
role: ""
role_external_id: ""
secret: ${AWS_SECRET_ACCESS_KEY}
token: ""
endpoint: ""
force_path_style_urls: false
path: ${S3_PATH}
region: ${AWS_REGION}
timeout: 5s
Hey @MarcinGinszt it's because the messages from kafka have metadata values including the key, and the s3 output is attempting to add those metadata values to the object it uploads as headers, and in this case the kafka key contains invalid characters.
You can work around this by adding - mapping: 'meta kafka_key = deleted()'
to your processors.
I'm going to mark this issue as a bug though as we shouldn't even be attempting to send headers that are invalid.
Thanks a lot! Some loose idea regarding that- if Benthos is going to ignore wrong kafka_keys, could it warn users about it in a log?
This is still a problem with fallbacks in output where I am unable to handle it using a processor. My first call in the fallback is http_client. The metadata header gets added to the second fallback which is aws_s3. I get the following error while writing to s3.
net/http: invalid header field value for "X-Amz-Meta-Fallback_error"
The metadata header gets added to the second fallback which is aws_s3.
You can stick mapping: 'meta kafka_key = deleted()'
in the processors
section at the top output
level (as a sibling to fallback
) or at the input
level or inside the pipeline
section.