franz-go
franz-go copied to clipboard
Expected behavior when producing to non-existent partition
Hey again,
We recently had a bug in some code that was doing manual assignment of partitions to messages where it was configured with an incorrect number of partitions and so was producing messages to partitions that did not exist (the topic had 100 partitions and the partitioning code was assigning messages across 1000).
Naively I would have expected the produce to fail or at worst to log an error, but from what I could tell the message was silently dropped (whether by franz or by Kafka I don't know)?
Now, obviously the solution is to use the correct number of partitions (and probably asking Kafka for this number is better than statically configuring it), but I'm curious as to whether this is the expected behavior when one does produce to a partition that does not exist.
It sounds like you were using manual partitioning? The client forever tries to send to partitions that don't exist. Usually, it just outright wont happen that you try to produce to a partition that does not exist. The number of partitions in a topic only goes up (stonks), so if a partition can be produced to via metadata discovery, the only way for it not to be produced to in the future is if you delete a topic and recreate it. Or, if you have a bug and try producing to a partition manually that doesn't exist.
There is an option, UnknownTopicRetries
, that causes produces to fail when the entire topic does not exist. This can probably be extended -- though not super easily -- to when partitions keep repeatedly receiving this error. If you want to bite this off, feel free (I can try helping on discord). I may have some time to address this once I finally solve the implementation for the new KIP (that I'm not prioritizing that highly, to be honest).
Right, that's exactly what happened in our case -- we had a bug in a manual partitioner so it was producing to partitions that didn't (and wouldn't) exist.
Now that I've fixed the bug I'm not super worried about it, but it was a bit surprising that there was no logging whatsoever. Would it be easier (and would it make sense) to emit a warning log after some number of retries producing to a non-existent partition?
There's definitely a log, but only at the debug level.
Looking at the code though, https://github.com/twmb/franz-go/blob/6a58760afaa702521cf2623f4077d034ee2b445e/pkg/kgo/sink.go#L1298-L1305 The unknown check actually is at the partition level (recBuf == per-partition buffer for records).
I think maybe something else is up -- IIRC, the records are never even attempted to be produced because the client knows the partition does not exist. In fact, they should be failed outright, because of this chunk of code: https://github.com/twmb/franz-go/blob/6a58760afaa702521cf2623f4077d034ee2b445e/pkg/kgo/producer.go#L578-L581
Something else is up here. I'll see about reproducing this locally with kfake...
These guys here https://www.youtube.com/watch?v=paVdXL5vDzg starting at 17:45 ending at 20:50 demonstrate that Java client will synchronously block until the partition gets created. No log message is produced.
Sorry for the delay on getting back to this, but I can't reproduce the initial problem when running the following locally against a topic ("foo") with 10 partitions:
package main
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
cl, _ := kgo.NewClient(
kgo.DefaultProduceTopic("foo"),
kgo.RecordPartitioner(kgo.ManualPartitioner()),
)
r := &kgo.Record{
Value: []byte("foo"),
Partition: 0,
}
{
r, err := cl.ProduceSync(context.Background(), r).First()
fmt.Println(r, err)
}
r.Partition = 11
{
r, err := cl.ProduceSync(context.Background(), r).First()
fmt.Println(r, err)
}
}
$ go run main.go
&{[] [102 111 111] [] 2024-05-25 20:14:28.1 -0600 MDT foo 0 {0} 0 2039761367311953381 0 0 context.Background} <nil>
&{[] [102 111 111] [] 2024-05-25 20:14:28.1 -0600 MDT foo 0 {0} 0 0 0 0 context.Background} invalid record partitioning choice of 11 from 10 available
I'm going to close this for now but if you have more details (maybe a slimmed down repro) about how you ran into a record being invisibly dropped, I'm open to investigate further.
Thanks for looking into this. It turns out that we were not passing an error callback function to our async Produce calls, so the errors were being silently dropped. Sorry for the fire drill!