Go event-handler produces multiple Content-Length values
I'm trying to setup knative functions with MTChannelBasedBroker based on KafkaChannels.
A Ping-Source is producing events every minute:
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
name: aia-cron-ping
spec:
schedule: "*/1 * * * *"
contentType: "application/json"
data: '{"ping":"load aia"}'
ceOverrides:
extensions:
load: "aia"
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: default
My simple function has a subscription to the PingSource-Events:
specVersion: 0.36.0
name: simple-loader
runtime: go
registry: demo.jfrog.io/devops-docker-local/
image: demo.jfrog.io/devops-docker-local/simple-loader:latest
namespace: default
created: 2024-08-02T10:48:42.077191+02:00
invoke: cloudevent
build:
builder: host
buildEnvs:
- name: FUNC_ENABLE_HOST_BUILDER
value: "true"
- name: FUNC_BUILDER
value: host
- name: FUNC_CONTAINER
value: "true"
- name: LOG_LEVEL
value: error
deploy:
namespace: default
image: demo.jfrog.io/devops-docker-local/simple-loader@sha256:ad917ad5ee8e24d250306b355ed1d22ab4374101b0713e2780fb7ab3aba66b22
options:
scale:
min: 1
max: 2
subscriptions:
- source: default
filters:
load: aia
type: dev.knative.sources.ping
And generates a new cloudEvent in it's handle-Method, with a json-Body:
package function
import (
"context"
"log/slog"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
"github.com/rs/zerolog"
)
func (h *SimpleHandler) Handle(ctx context.Context, e event.Event) (*event.Event, error) {
h.logger.Debug("Entering handler", slog.Group("event", "id", e.ID()))
defer h.logger.Debug("Exiting handler", slog.Group("event", "id", e.ID()))
h.logger.Info("Event data", slog.Group("event", "id", e.ID(), "data", e.Data()))
data := struct {
Simple string `json:"simple"`
Time string `json:"time"`
}{
Simple: "simple-modified",
Time: time.Now().Format(time.RFC3339),
}
outEvent := event.New()
id := uuid.New().String()
outEvent.SetID(id)
outEvent.SetType("works.demo.simple")
outEvent.SetSource("urn:demo:devops:simple:loader:1")
outEvent.SetExtension("tenant", "some tenant")
outEvent.SetData(cloudevents.ApplicationJSON, data)
h.logger.Info("Outgoing event", "id", id, "data", data)
return &outEvent, nil
}
But the knative-eventing/kafka-channel-dispatcher rejects the responded events from my function with the following message:
{"@timestamp":"2024-10-07T12:38:18.525Z","@version":"1","message":"failed to send event to subscriber context={topics=[knative-messaging-kafka.default.default-kne-trigger], consumerGroup='kafka.default.default-kne-trigger.25ea5c36-820a-4c5a-b39a-5433e198ebb9', reference=uuid: \"25ea5c36-820a-4c5a-b39a-5433e198ebb9\"\nnamespace: \"default\"\nname: \"default-simple-loader-function-656b5b9173502685d2682f8db4c22eca\"\n} target=http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/simple-loader-function-trigger-0/3b75c7f5-55a4-4187-ada2-c21b31caf13b","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender","thread_name":"vert.x-worker-thread-5","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalArgumentException: Multiple Content-Length values found: [58, 58]\n\tat io.netty.handler.codec.http.HttpUtil.normalizeAndGetContentLength(HttpUtil.java:612)\n\tat io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:683)\n\tat io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:285)\n\tat io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:238)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n","context":{},"target":"http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/simple-loader-function-trigger-0/3b75c7f5-55a4-4187-ada2-c21b31caf13b"}
In short: Multiple Content-Length values found: [58, 58].
So the function sends the json-body to the broker, but sets the HTTP-Header Content-Length twice, which isn't supported by the kafka-channel-dispatcher.
We guess, it has something to do with the used cloudevents-sdk and the Add on this line.
Versions:
- knative 1.13.0 (serving, eventing, eventing-kafka, mt-channel-broker)
- kafka via OLM 0.28.0
- func v1.15.0, but also tried until v.1.12.2 with similar errors
Please have a look at my setup and what I or some part of the knative framework is doing wrong here.
This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen. Mark the issue as
fresh by adding the comment /remove-lifecycle stale.
This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen. Mark the issue as
fresh by adding the comment /remove-lifecycle stale.