kcat
kcat copied to clipboard
Different (undocumented?) behaviour when reading topic from offset between versions
I have a use case where I wish to count the number of records in a Kafka topic that have not yet been processed by a consumer group. To do this, I get the offsets per partition for the consumer group and then read the messages using kafkacat/kcat
.
For example:
$ cat offsets.csv
0,4039803
1,3275229
2,3195019
3,3075505
4,3939620
5,3815132
6,3455052
7,3094848
8,3213984
9,4039825
10,3275221
11,3194980
12,3075551
13,3939570
14,3815207
15,3455071
16,3094869
17,3213985
I use the following command to process all messages from these given offsets until the log end offset:
$ while IFS=, read -r partition offset; \
do kafkacat -C -b broker:9092 -p${partition} -t mytopic -o ${offset} -e -q -f '%o\n' \
| wc -l \
>> num_records.txt; \
done \
< offsets.csv
Writes something like:
$ cat num_records.txt
1553
11907
11566
1699
2460
8427
10815
4040
6616
493
13618
14019
526
6269
3720
9443
3851
6376
which sums up to:
$ cat num_records.txt \
| awk '{ sum += $1 } END { print "Sum of all records still in topic across all partitions: " sum }'
Sum of all records still in topic across all partitions: 117398
If I do the exact same string of commands using docker run edenhill/kcat:1.7.1
instead of kafkacat
above, I get a different sum of records:
$ cat num_records.txt
1412
11869
11566
1692
2377
8403
10734
4040
6568
369
13576
14019
516
6199
3685
9365
3851
6332
$ cat num_records.txt \
| awk '{ sum += $1 } END { print "Sum of all records still in topic across all partitions: " sum }'
Sum of all records still in topic across all partitions: 116573
The latter value is the same one that I got when I tried to process my topic from the given offsets in Python using confluent-kafka
, so I'm leaning towards it being the correct one.
Was this an undocumented fix? Or did a setting change between 1.5.0
and 1.7.1
that could influence this behaviour? I checked the release notes but unfortunately could not find much that seems relevant...
Digging a bit deeper, it seems the change in behaviour occurred between kafkacat 1.6.0
and kcat 1.7.0
.