guardian-for-apache-kafka
guardian-for-apache-kafka copied to clipboard
Support Transactional API for Restore
What is currently missing?
Currently the restore portion of Guardian only uses a standard Kafka producer using Alpakka's Producer
. While this is fine for standard scenarios most people when doing a restore would ideally want exactly once semantics for making a restore. This seems possible to do using the standard Producer with
def baseProducerConfig
: Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
Some(
_.withBootstrapServers(
container.bootstrapServers
).withProperties(
Map(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
ProducerConfig.BATCH_SIZE_CONFIG -> 0.toString
)
).withParallelism(1)
)
However such a configuration has terrible throughput since you are essentially forcing the Kafka producer to send one message at a time with no batching
How could this be improved?
Support needs to be done in Alpakka Kafka so that its possible to create a Transactional.source
from a non Kafka cluster source. The current Transactional.sink
only works with a PartitionOffsetCommittedMarker
which is currently only created with a Transactional.source
. Trying to manually create a PartitionOffset
from a different Source
just results in an exception being thrown at https://github.com/akka/alpakka-kafka/blob/1a1dab1e5168a829b7e76053375a364739043850/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala#L220 and its not possible to manually create a PartitionOffsetCommittedMarker
since its private/internal API
There is currently an ongoing issue at Alpakka Kafka at https://github.com/akka/alpakka-kafka/issues/1075 for this
Is this a feature you would work on yourself?
- [ ] I plan to open a pull request for this feature