guardian-for-apache-kafka icon indicating copy to clipboard operation
guardian-for-apache-kafka copied to clipboard

Support Transactional API for Restore

Open mdedetrich opened this issue 2 years ago • 0 comments

What is currently missing?

Currently the restore portion of Guardian only uses a standard Kafka producer using Alpakka's Producer. While this is fine for standard scenarios most people when doing a restore would ideally want exactly once semantics for making a restore. This seems possible to do using the standard Producer with

def baseProducerConfig
    : Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
  Some(
    _.withBootstrapServers(
      container.bootstrapServers
    ).withProperties(
      Map(
        ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG             -> true.toString,
        ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
        ProducerConfig.BATCH_SIZE_CONFIG                     -> 0.toString
      )
    ).withParallelism(1)
  )

However such a configuration has terrible throughput since you are essentially forcing the Kafka producer to send one message at a time with no batching

How could this be improved?

Support needs to be done in Alpakka Kafka so that its possible to create a Transactional.source from a non Kafka cluster source. The current Transactional.sink only works with a PartitionOffsetCommittedMarker which is currently only created with a Transactional.source. Trying to manually create a PartitionOffset from a different Source just results in an exception being thrown at https://github.com/akka/alpakka-kafka/blob/1a1dab1e5168a829b7e76053375a364739043850/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala#L220 and its not possible to manually create a PartitionOffsetCommittedMarker since its private/internal API

There is currently an ongoing issue at Alpakka Kafka at https://github.com/akka/alpakka-kafka/issues/1075 for this

Is this a feature you would work on yourself?

  • [ ] I plan to open a pull request for this feature

mdedetrich avatar Feb 10 '22 09:02 mdedetrich