alpakka-kafka
alpakka-kafka copied to clipboard
Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
### Versions used Akka version: "2.6.15" Akka Kafka version: "2.1.0" ### Expected Behavior Using source per partition (Consumer.committablePartitionedSource - https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition) and source restart (RestartSource.onFailuresWithBackoff - https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage) : source per partition...
In your example for external offset storage, using plainSource, a single partition is assigned. I'm guessing it's common to subscribe to multiple partitions in one subscription? For those cases, I'm...
### Versions used 2.0.1 ### Expected Behavior Assuming external (or kafka-internal) commits are processed properly and loaded properly, it is expected that we will get exactly-once processing of records based...
Related to [KAFKA-3552](https://issues.apache.org/jira/browse/KAFKA-3552), network latency on consumer side can cause an `OutOfMemoryError` in the heartbeat thread, which is fatal. ``` Uncaught error from thread [Tests-akka.kafka.default-dispatcher-147] shutting down JVM since 'akka.jvm-exit-on-fatal-error'...
### Short description For the `Committer` add a committing FlowWithContext which emits the elements dowstream but drops the committable context (the current one emits no useful value). ### Details -...
First seen in #1069 ``` AtLeastOnce: At-Least-Once One To Conditional - should support batching of offsets `withOffsetContext` *** FAILED *** (10 seconds, 431 milliseconds) Vector(ConsumerRecord(topic = topic-2-96, partition = 0,...
Hi, having a `committableSource` with `KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG` ` val streamSource = Consumer.committableSource(getConsumerSettings, Subscriptions.topics(topicSubscription)) ` ``` def getConsumerSettings { val kafkaMBAddress = consumerProperties("bootstrap.servers") val groupID = consumerProperties("groupId") ConsumerSettings(system, new ByteArrayDeserializer, newKafkaAvroDeserializer(consumerProperties)) .withBootstrapServers(kafkaMBAddress)...
### Versions used 2.0.4, but it's happening with 2.0.5 too Akka version: 2.6.8 ### Expected Behavior Using an shared Producer, we expect that there is only one Buffer (akka.stream.impl.FixedSizeBuffer$ModuloFixedSizeBuffer) for...
A typical use case in Kafka to achieve Exactly-Once semantics when consuming messages is to store offset external to kafka atomically with appropriate state.. for that the Alpakka library provides...
Hi Team, I am trying to use the alpakka-kafka and my consumer throughput in nowhere as claimed in the article - https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations My Consumer code is as following- ``` ConsumerSettings...