func icon indicating copy to clipboard operation
func copied to clipboard

Go event-handler produces multiple Content-Length values

Open fbartnitzek opened this issue 1 year ago • 1 comments

I'm trying to setup knative functions with MTChannelBasedBroker based on KafkaChannels.

A Ping-Source is producing events every minute:

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: aia-cron-ping
spec:
  schedule: "*/1 * * * *"
  contentType: "application/json"
  data: '{"ping":"load aia"}'
  ceOverrides:
    extensions:
      load: "aia"
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
      namespace: default

My simple function has a subscription to the PingSource-Events:

specVersion: 0.36.0
name: simple-loader
runtime: go
registry: demo.jfrog.io/devops-docker-local/
image: demo.jfrog.io/devops-docker-local/simple-loader:latest
namespace: default
created: 2024-08-02T10:48:42.077191+02:00
invoke: cloudevent
build:
  builder: host
  buildEnvs:
  - name: FUNC_ENABLE_HOST_BUILDER
    value: "true"
  - name: FUNC_BUILDER
    value: host
  - name: FUNC_CONTAINER
    value: "true"
  - name: LOG_LEVEL
    value: error
deploy:
  namespace: default
  image: demo.jfrog.io/devops-docker-local/simple-loader@sha256:ad917ad5ee8e24d250306b355ed1d22ab4374101b0713e2780fb7ab3aba66b22
  options:
    scale:
      min: 1
      max: 2
  subscriptions:
  - source: default
    filters:
      load: aia
      type: dev.knative.sources.ping

And generates a new cloudEvent in it's handle-Method, with a json-Body:

package function

import (
	"context"
	"log/slog"
	"time"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/event"
	"github.com/google/uuid"
	"github.com/rs/zerolog"
)

func (h *SimpleHandler) Handle(ctx context.Context, e event.Event) (*event.Event, error) {
	h.logger.Debug("Entering handler", slog.Group("event", "id", e.ID()))
	defer h.logger.Debug("Exiting handler", slog.Group("event", "id", e.ID()))
	h.logger.Info("Event data", slog.Group("event", "id", e.ID(), "data", e.Data()))

	data := struct {
		Simple string `json:"simple"`
		Time   string `json:"time"`
	}{
		Simple: "simple-modified",
		Time:   time.Now().Format(time.RFC3339),
	}

	outEvent := event.New()
	id := uuid.New().String()
	outEvent.SetID(id)
	outEvent.SetType("works.demo.simple")
	outEvent.SetSource("urn:demo:devops:simple:loader:1")
	outEvent.SetExtension("tenant", "some tenant")
	outEvent.SetData(cloudevents.ApplicationJSON, data)

	h.logger.Info("Outgoing event", "id", id, "data", data)
	return &outEvent, nil
}

But the knative-eventing/kafka-channel-dispatcher rejects the responded events from my function with the following message:

{"@timestamp":"2024-10-07T12:38:18.525Z","@version":"1","message":"failed to send event to subscriber context={topics=[knative-messaging-kafka.default.default-kne-trigger], consumerGroup='kafka.default.default-kne-trigger.25ea5c36-820a-4c5a-b39a-5433e198ebb9', reference=uuid: \"25ea5c36-820a-4c5a-b39a-5433e198ebb9\"\nnamespace: \"default\"\nname: \"default-simple-loader-function-656b5b9173502685d2682f8db4c22eca\"\n} target=http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/simple-loader-function-trigger-0/3b75c7f5-55a4-4187-ada2-c21b31caf13b","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender","thread_name":"vert.x-worker-thread-5","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalArgumentException: Multiple Content-Length values found: [58, 58]\n\tat io.netty.handler.codec.http.HttpUtil.normalizeAndGetContentLength(HttpUtil.java:612)\n\tat io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:683)\n\tat io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:285)\n\tat io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:238)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n","context":{},"target":"http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/simple-loader-function-trigger-0/3b75c7f5-55a4-4187-ada2-c21b31caf13b"}

In short: Multiple Content-Length values found: [58, 58]. So the function sends the json-body to the broker, but sets the HTTP-Header Content-Length twice, which isn't supported by the kafka-channel-dispatcher.

We guess, it has something to do with the used cloudevents-sdk and the Add on this line.

Versions:

  • knative 1.13.0 (serving, eventing, eventing-kafka, mt-channel-broker)
  • kafka via OLM 0.28.0
  • func v1.15.0, but also tried until v.1.12.2 with similar errors

Please have a look at my setup and what I or some part of the knative framework is doing wrong here.

fbartnitzek avatar Oct 07 '24 12:10 fbartnitzek

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

github-actions[bot] avatar Jan 07 '25 01:01 github-actions[bot]

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.

github-actions[bot] avatar May 31 '25 01:05 github-actions[bot]