connect
connect copied to clipboard
Additional `kafka_franz` input kafka consumer configuration
What
Adding addition kafka configuration parameters that can be passed from kafka_franz input to the underlying franz client opts.
the configurations added are:
kgo.FetchMinBytes(int32(f.fetchMinBytes)),
kgo.FetchMaxWait(f.fetchMaxWaitDuration),
kgo.FetchMaxPartitionBytes(int32(f.maxPartitionFetchBytes)),
Why
- This allows greater flexibility for configuring consumerthroughput of benthos apps using
kafka_franz - The configs I've added are commonly tuned parameters (across all kafka client implementations) for improving consumer throughput (at the cost of latency)
Notes
for Default() I've used the defaults from franz-go client opts as to not introduce new behaviour