Connect to Kafka with Confluent Parallel Consumer
Description
Parallel Kafka Connector for https://github.com/confluentinc/parallel-consumer
In order to be able to consume messages in parallel from single partition (with possibility to lock on a key level), Parallel Consumer allows just that.
Repository name
quarkus-parallel-kafka
Short description
Parallel Kafka Connector
Repository Homepage URL
https://docs.quarkiverse.io/<REPOSITORY_NAME>/dev/
Repository Topics
- quarkus-extension
...
Team Members
Additional context
No response
/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)
The Quarkus Messaging Kafka incoming channels do allow parallel processing using @Blocking(ordered = false) or @RunOnVirtualThread annotations.
AFAIK, the only bit missing from it is the key-level ordering. I'd very much like to add this feature to the connector. It should not be very tough to add this.
@ozangunalp I am trying now to implement this, since I need key-level ordering. If I use the setup you proposed, and add some code to do the locking based on a key-level, how would kafka currently ack the records? (eg. if I'm processing 1,2,3,4,5 and I finished processing 5th but no 1,2,3,4 would it ack 5 and then I would potentially lose 1,2,3,4 records as unprocessed?) I guess acking part is missing if I would like to implement this in the user-code side? It has to go in the framework?
P.S. are there any plans about feature timeline or nothing like that exists?
The "throttled" commit-strategy, which is the default in Quarkus Messaging, already handles messages that are acked out of order.
As for the key-level ordering, I've done some tests. I can have something functional with minimal overhead using decorators and pausable channels. But it isn't quite feature-complete. If you want to give it a try, I pushed my test here for you to check.
@ozangunalp thanks, I will try also your tests to check how it works. In the meanwhile I also created https://github.com/bojanv55/parallel-kafka-connector that is connector for parallel-consumer. It is basically 90% of current code copy-pasted with some changes to support parallel-consumer. I tested it a bit seems working. Could I ask you also to check these 2 lines if they provide optimal integration with quarkus pipelines:
- is this the way to ack/nack messages in smallrye pipeline https://github.com/bojanv55/parallel-kafka-connector/blob/3c1bfa023ed16f0b9ff46c202e5cd5f03c87a18f/src/main/java/me/vukas/parallel/ParallelKafkaSource.java#L110
- is it OK to always return the same publisher for multiple "concurrency" consumers https://github.com/bojanv55/parallel-kafka-connector/blob/3c1bfa023ed16f0b9ff46c202e5cd5f03c87a18f/src/main/java/me/vukas/parallel/ParallelKafkaConnector.java#L77
@ozangunalp Just playing with the test you pushed. Seems I'm not able to block it on the key-level (probably doing something wrong in my code?)
This is the code that receives the messages
@io.smallrye.reactive.messaging.annotations.Blocking(ordered = false/*, value = "my-pool"*/)
@Incoming("some-topic")
//@Blocking
public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
Log.info("Received user: " + word);
return myEntityRepository.upsertCurrentProducer("Asd", "sdf", word);
}
This is the code that saves to db eventually
Uni<Void> upsertCurrentProducerInternal(
String x, String y, String producer) {
String sql =
"INSERT INTO MyEntity (id, field)\n" +
"SELECT 1 AS id, '"+producer+"' AS field\n" +
"FROM DUAL\n" +
"WHERE SLEEP(5) = 0\n" +
"ON DUPLICATE KEY UPDATE field = 'fff';";
return sessionFactory.withTransaction(
(session, tx) ->
session
.createNativeQuery(sql)
.executeUpdate()
.replaceWithVoid());
}
I would expect that when I send 5 messages with the same key at the same time - since the key is same and SQL is sleeping for 5 seconds - it should not process other messages with same key until 5 seconds passed. But it automatically consumes all other messages.
This is my config
smallrye.messaging.worker.my-pool.max-concurrency=50
mp.messaging.incoming.some-topic.connector=smallrye-kafka
mp.messaging.incoming.some-topic.commit-strategy=throttled
mp.messaging.incoming.some-topic.throttled.processing-order=ordered_by_key
mp.messaging.incoming.some-topic.auto.offset.reset=latest
mp.messaging.incoming.some-topic.group.id=grupaTrotled
mp.messaging.incoming.some-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.some-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Do you have any advice here? Thanks.
@ozangunalp After checking the code a bit more - I guess this is only implemented and working in your tests?
I guess this is only implemented and working in your tests?
That's true. Let me push something that can work OOTB with your code.
You can check it out again.
@ozangunalp you sure you pushed? I cannot find anything.
Here : https://github.com/ozangunalp/smallrye-reactive-messaging/commits/kafka_throttled_concurrency_ordered/
Note that it needs some more work. And I am thinking we could even generalize this to other connectors.
@ozangunalp Locking seems working now with my example. Will test it a bit more. For now I can see only that when I emit a bit more messages in sequence, I get this exception
2025-10-29 18:08:50,922 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18231: The record 132 from topic-partition 'some-0' has waited for 62 seconds to be acknowledged. This waiting time is greater than the configured threshold (60000 ms). At the moment 423 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 131. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.
2025-10-29 18:08:50,922 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18228: A failure has been reported for Kafka topics '[some]': io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException: The record 132 from topic/partition 'some-0' has waited for 62 seconds to be acknowledged. At the moment 423 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 131.
2025-10-29 18:08:50,925 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18224: Executing consumer revoked re-balance listener for group 'grupaTrotled'
Just watched this in wireshark - when I emit 50 elements (starting from 5) it ACKs only 3 (5, 48, 55) but it acks all that it needs to and it seems to do that pretty slow, but at least it doesn't timeout. When I emit 500 elements (starting from 55) it ACKs only 3 and it doesn't ACK all needed ones (should be up to 555) again (57, 107, 132) and then it gets stuck and times-out.
How long does it take to process a single message ?
I wonder if this may be related to some fairness issue between key groups.
@ozangunalp Seems that merge is the problem. When I increase this one to much larger value it works. When I decreased it to smaller value (e.g. 2 or 3) it immediately stuck.
return groupBy
.onItem().transformToMulti(g -> {
PausableMulti<? extends Message<?>> pausable = new PausableMulti<>(g, false);
return pausable
.invoke(pausable::pause)
.map(m -> m.withAck(() -> m.ack().thenRun(pausable::resume)));
})
.merge(5000);
Is it possible that if I have 6 records with ID-s 1,2,3,4,5,6 and merge 5 for example, it can start processing records 2,3,4,5,6 while record 1 is left to be processed later for whatever reason and then block in merge, since max allowed elements in parallel is 5, but it cannot commit 6 and release merge because we did not commit 1 yet, and 1 cannot be processed at all because it is stuck with maximum elements in merge?
@bojanv55 yes, I realize this needs to be closer to the Kafka connector to work properly. It is not so much a problem with the merge – I think there may be some retention in the merge queue, but it should not blow up. It is how the commit strategy considers that the record is received for processing.
I created a draft PR upstream with the tests we are doing here: https://github.com/smallrye/smallrye-reactive-messaging/pull/3208
Please check the latest change. I think it does fix the issue you are having.
Definitely something with that merge - but not what I thought. Just tested your new code and it has similar/different behavior. I increased merge to 512 and emit 500 elements - it completes first time and ACKS all and last one OK. But then I waited few minutes and emitted 500 more elements, it got stuck after 12 - this time it actually ACKS those but doesn't consume anymore and just gets stuck/lags (kafka lag). So there are more elements in kafka waiting for processing but it doesnt't pull new in. I will paste here also my full code - maybe my code creates this behavior?
@ApplicationScoped
public class DemoProcessor {
@Inject
MyEntityRepository myEntityRepository;
@io.smallrye.reactive.messaging.annotations.Blocking(ordered = false, value = "my-pool")
@Incoming("some-topic")
public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
Log.info("Received user: " + word);
return myEntityRepository.upsertCurrentProducer("Asd", "sdf", word).onTermination().invoke(() -> System.out.println("Done with " + word));
}
}
@ApplicationScoped
public class MyEntityRepository implements PanacheRepository<MyEntity> {
private final Mutiny.SessionFactory sessionFactory;
private final Vertx vertx;
public MyEntityRepository(Mutiny.SessionFactory sessionFactory, Vertx vertx) {
this.sessionFactory = sessionFactory;
this.vertx = vertx;
}
public Uni<Void> upsertCurrentProducer(
String x, String y, String producer) {
return runOnEventLoop(
() -> upsertCurrentProducerInternal(x, y, producer));
}
Uni<Void> upsertCurrentProducerInternal(
String x, String y, String producer) {
String sql =
"INSERT INTO MyEntity (id, field)\n" +
"SELECT 1 AS id, '"+producer+"' AS field\n" +
"FROM DUAL\n" +
"WHERE SLEEP(5) = 0\n" +
"ON DUPLICATE KEY UPDATE field = 'fff';";
return sessionFactory.withTransaction(
(session, tx) ->
session
.createNativeQuery(sql)
.executeUpdate()
.replaceWithVoid());
}
private <T> Uni<T> runOnEventLoop(Supplier<Uni<T>> supplier) {
return Uni.createFrom()
.emitter(
emitter -> {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(
v -> {
try {
supplier.get().subscribe().with(emitter::complete, emitter::fail);
} catch (Exception ex) {
emitter.fail(ex);
}
});
});
}
}
Why do you run your processing on the event loop?
@ozangunalp otherwise I get this:
2025-10-30 17:54:57,700 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18228: A failure has been reported for Kafka topics '[some]': java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'my-pool-2'
Is there some other way to mitigate this exception? Also, do you think this is the reason for the hanging of the processing?
Is there some other way to mitigate this exception? Also, do you think this is the reason for the hanging of the processing?
You can use Hibernate ORM instead of Hibernate Reactive.
Just simplified the code and removed database altogether - it still fails after 512 elements (merge(512)) and I get 512 ACK that is the last one and then kafka is in lag mode...
@io.smallrye.reactive.messaging.annotations.Blocking(ordered = false, value = "my-pool")
@Incoming("some-topic")
public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
Log.info("Received user: " + word);
return Uni.createFrom().voidItem().onTermination().invoke(() -> System.out.println("Done with " + word));
}
@ozangunalp I will investigate a bit more, but I have some feeling that it is related somehow to this https://github.com/smallrye/smallrye-mutiny/discussions/1849 discussion that I opened earlier this year. At that time the group by had a bug, and it behaved almost similar to this. I hoped that it was fixed, but now when I run the same code I had at that time it still fails. Do you think it can be because of that problem? You can try this code which I think should complete but it doesn't and just times-out...
Multi.createFrom().iterable(IntStream.range(0,300).boxed().toList())
.group()
.by(x -> x > 20)
.onItem()
.transformToMultiAndMerge(
group -> Boolean.TRUE.equals(group.key())
? Multi.createFrom().item(1)
: Multi.createFrom().item(2)
)
.onCompletion()
.invoke(() -> System.out.println("Transformation completed"))
.ifNoItem().after(Duration.ofSeconds(10)).fail().onFailure().invoke(throwable -> {
System.out.println("❌ Failure: " + throwable);
throwable.printStackTrace();
})
.runSubscriptionOn(Executors.newSingleThreadExecutor())
.subscribe().with(
response -> System.out.println("Received: " + response),
failure -> System.err.println("Failed: " + failure)
);
if you change this line to 40, it completes Multi.createFrom().iterable(IntStream.range(0,40).boxed().toList()). I guess it was never fixed properly...
.transformToMultiAndMerge(
group -> Boolean.TRUE.equals(group.key())
? Multi.createFrom().item(1)
: Multi.createFrom().item(2)
)
This is not correct, by dissociating the transform result from the group you are creating a request starvation in the source.
The correct way of doing it is
.transformToMultiAndMerge(
group -> Boolean.TRUE.equals(group.key())
? group.map(ignored -> 1)
: group.map(ignored -> 2)
)
Just simplified the code and removed database altogether - it still fails after 512 elements (
merge(512)) and I get 512 ACK that is the last one and then kafka is in lag mode...@io.smallrye.reactive.messaging.annotations.Blocking(ordered = false, value = "my-pool") @Incoming("some-topic") public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) { Log.info("Received user: " + word); return Uni.createFrom().voidItem().onTermination().invoke(() -> System.out.println("Done with " + word)); }
This is odd, and I cannot reproduce it in the tests. Can you modify one of the tests in my branch and test it ?
@ozangunalp OK, then it was me using groupBy in a wrong way... Nevertheless I think that should also be handled in the code (to log warning or smth if new Multi is returned from inner method). Otherwise I'm sure people (like me) will think it should also support those Multi.createFrom().. like I created.
Going back to your test request, I managed to reproduce in your tests. You can find it here https://github.com/bojanv55/smallrye-reactive-messaging/tree/kafka_throttled_concurrency_ordered_failing_test
Method testOrderedByKeyProcessing will hang after 256 messages.
I see that effectively, you are creating a key space per record sent. So there are 500 key groups to order.
The merge concurrency is there to limit the number of requests sent to each group. So, in a normal merge case, it limits the items requested from each merged stream. It also means that because the merge already requested (more) items from 256 groups, it doesn't request from the 257th group.
This may be an issue with the merge implementation. We need to check with @jponge.
Nevertheless, in this usage, we already have a PausableMulti, letting items downstream one by one. So I think we can use a very large number (Integer.MAX) as merge concurrency.
My use-case is that on one partition I will get many different keys (1,2,3,4,5,6,7...) and only sometimes we can have multiple of such at the same time in which case we need ordering/locking (first-8, second-8, third-8) which would guarantee sequential processing.
Now, if we use Integer.MAX and if all the keys are mostly different, I would again use all the merge slots in potentially few days? (micro-service runs constantly and there is a lot of messages involved).
We can double check with @jponge but it'll be considered an unbounded request. So it should be fine.
Just tried Integer.MAX and emitting a lot of messages... Heap goes crazy pretty fast (this is after GC and retaining only needed refs)...