connect icon indicating copy to clipboard operation
connect copied to clipboard

Kafka-> S3- invalid header field value for \"X-Amz-Meta-Kafka_key\"

Open MarcinGinszt opened this issue 2 years ago • 4 comments

Hello! I'm afraid the thing I'm reporting is not a bug per se, but I find it exceptionally difficult to figure out the reason for the error. I'm using pretty standard configuration to consume messages from Kafka (attached below), but I'm getting rather cryptic error

Failed to send message to aws_s3: RequestError: send request failed
caused by: Put \"<HTTP S3 URL>\": net/http: invalid header field value for \"X-Amz-Meta-Kafka_key\"

I'm using jeffail/benthos:4.9.1.

Can it be caused by unprocessable keys for key/value pairs in Kafka?

    input:
      kafka:
        addresses:
          - ${KAFKA_ADDRESSES}
        client_id: ${CLIENT_ID}
        commit_period: 3s
        consumer_group: ${CONSUMER_GROUP}
        group:
          heartbeat_interval: 3s
          rebalance_timeout: 60s
          session_timeout: 10s
        checkpoint_limit: 1
        batching:
          count: 1024
          period: 1s
        max_processing_period: 2s
        start_from_oldest: true
        target_version: 3.2.0
        topics:
          - ${INPUT_TOPIC}

    pipeline:
      processors:
        - group_by_value:
            value: ${! meta("kafka_partition") }
        - archive:
            format: binary

    output:
      aws_s3:
        bucket: ${S3_BUCKET}
        content_encoding: ""
        content_type: application/octet-stream
        credentials:
          id: ${AWS_ACCESS_KEY_ID}
          profile: ""
          role: ""
          role_external_id: ""
          secret: ${AWS_SECRET_ACCESS_KEY}
          token: ""
        endpoint: ""
        force_path_style_urls: false
        path: ${S3_PATH}
        region: ${AWS_REGION}
        timeout: 5s

MarcinGinszt avatar Jan 10 '23 13:01 MarcinGinszt

Hey @MarcinGinszt it's because the messages from kafka have metadata values including the key, and the s3 output is attempting to add those metadata values to the object it uploads as headers, and in this case the kafka key contains invalid characters.

You can work around this by adding - mapping: 'meta kafka_key = deleted()' to your processors.

I'm going to mark this issue as a bug though as we shouldn't even be attempting to send headers that are invalid.

Jeffail avatar Jan 10 '23 14:01 Jeffail

Thanks a lot! Some loose idea regarding that- if Benthos is going to ignore wrong kafka_keys, could it warn users about it in a log?

MarcinGinszt avatar Jan 11 '23 08:01 MarcinGinszt

This is still a problem with fallbacks in output where I am unable to handle it using a processor. My first call in the fallback is http_client. The metadata header gets added to the second fallback which is aws_s3. I get the following error while writing to s3.

net/http: invalid header field value for "X-Amz-Meta-Fallback_error"

bn-sriramkannan avatar Mar 22 '24 15:03 bn-sriramkannan

The metadata header gets added to the second fallback which is aws_s3.

You can stick mapping: 'meta kafka_key = deleted()' in the processors section at the top output level (as a sibling to fallback) or at the input level or inside the pipeline section.

mihaitodor avatar Mar 26 '24 14:03 mihaitodor