mongo-kafka
mongo-kafka copied to clipboard
Add support for exactly-once semantics in the source connector
This PR adds support for exactly-once semantics in the source connector, which was described in KIP-618 and shipped in Kafka Connect 3.3.0.
To test this PR beyond the existing test suite, I created a little chaos tester: https://github.com/tysonmote/mongo-kafka-chaos This repo stands up a MongoDB to Kafka pipeline using MongoDB Kafka Connector and then uses SIGKILL to randomly kill containers to confirm (more or less) exactly-once semantics.
Before this PR, killing Kafka brokers randomly would produce duplicate messages fairly easily:
./chaos.sh -i 60 kafka1,kafka2,kafka3
[...]
consumer | Events consumed: 338,221 (2,539/s)
consumer | Events consumed: 345,351 (374/s)
consumer | ERROR: Got event 345350 after 345353 (out of order)
consumer | ERROR: Duplicate event 345350
consumer | ERROR: Duplicate event 345351
consumer | ERROR: Duplicate event 345352
consumer | ERROR: Duplicate event 345353
consumer | Events consumed: 402,445 (5,709/s)
consumer | Events consumed: 427,805 (2,536/s)
With this PR, Kafka Connect shows that it is using ExactlyOnceWorkerSourceTask
:
connect | [2023-08-06 22:14:11,876] INFO Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask)
connect | [2023-08-06 22:14:11,876] INFO ExactlyOnceWorkerSourceTask{id=chaos-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
And I'm unable to find duplicate (or even out-of-order) messages when killing Kafka brokers or the Connect worker randomly:
./chaos.sh -i 60 kafka1,kafka2,kafka3
[...]
consumer | Events consumed: 2,230,041 (470/s)
consumer | Events consumed: 2,298,021 (6,791/s)
consumer | Events consumed: 2,310,926 (604/s)
consumer | Events consumed: 2,363,276 (5,235/s)
consumer | Events consumed: 2,385,986 (2,268/s)
consumer | Events consumed: 2,398,847 (552/s)
consumer | Events consumed: 2,467,306 (6,837/s)
./chaos.sh -i 60 connect
[...]
consumer | Events consumed: 2,557,907 (13,361/s)
consumer | Events consumed: 2,571,181 (2,632/s)
consumer | Events consumed: 2,572,361 (33/s)
consumer | Events consumed: 2,574,201 (368/s)
consumer | Events consumed: 2,576,131 (384/s)
consumer | Events consumed: 2,576,881 (149/s)
consumer | Events consumed: 2,627,996 (10,223/s)
consumer | Events consumed: 2,688,458 (11,915/s)
consumer | Events consumed: 2,726,518 (7,577/s)
consumer | Events consumed: 2,753,578 (5,323/s)
The above chaos tester is, of course, not very comprehensive. I haven't tested multi-node failures or networking issues; just arbitrary crashes using SIGKILL. The MongoDB writer is also not particularly high-throughput. I simply wrote it to model a use case that we have at Rippling, writing small batches of records in transactions.