swift-kafka-client
swift-kafka-client copied to clipboard
add approx. transactional api
There is an approximate solution for Transactional API for Kafka gsoc.
Some ideas that were expressed in https://github.com/swift-server/swift-kafka-gsoc/issues/78#issuecomment-1642042408
There are the following ideas:
- We might initTransaction and switch state in machine
- Run blocking transactional calls in GlobalQueue
- Retry retryable errors
- Automatically abort transactions when abort error received
UPD: after review API changed to the following:
To use kafka transactions, it is required to create KafkaTransactionalProducerConfiguration:
let config = KafkaTransactionalProducerConfiguration(transactionalID: "1234")
Similar to KafkaProducer, it is possible to create KafkaTransactionalProducer:
let transactionalProducer = try await KafkaTransactionalProducer.makeTransactionalProducerWithEvents(config: config)
To commit transactions, it is simply required to call withTransaction(...):
try await transactionalProducer.withTransaction { transaction in
// Produce new messages:
let newMessage = KafkaProducerMessage(
topic: "<some topic>",
value: "<some value>"
)
try transaction.send(newMessage)
...
// commit offsets:
let partitionlist = RDKafkaTopicPartitionList()
partitionlist.setOffset(topic: self.uniqueTestTopic, partition: message.partition, offset: Int64(message.offset))
try await transaction.send(offsets: partitionlist, forConsumer: consumer)
...
}