quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Connect to Kafka with Confluent Parallel Consumer

Open bojanv55 opened this issue 2 months ago • 43 comments

Description

Parallel Kafka Connector for https://github.com/confluentinc/parallel-consumer

In order to be able to consume messages in parallel from single partition (with possibility to lock on a key level), Parallel Consumer allows just that.

Repository name

quarkus-parallel-kafka

Short description

Parallel Kafka Connector

Repository Homepage URL

https://docs.quarkiverse.io/<REPOSITORY_NAME>/dev/

Repository Topics

  • quarkus-extension

...

Team Members

Additional context

No response

bojanv55 avatar Oct 16 '25 08:10 bojanv55

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

quarkus-bot[bot] avatar Oct 16 '25 08:10 quarkus-bot[bot]

The Quarkus Messaging Kafka incoming channels do allow parallel processing using @Blocking(ordered = false) or @RunOnVirtualThread annotations.

AFAIK, the only bit missing from it is the key-level ordering. I'd very much like to add this feature to the connector. It should not be very tough to add this.

ozangunalp avatar Oct 16 '25 09:10 ozangunalp

@ozangunalp I am trying now to implement this, since I need key-level ordering. If I use the setup you proposed, and add some code to do the locking based on a key-level, how would kafka currently ack the records? (eg. if I'm processing 1,2,3,4,5 and I finished processing 5th but no 1,2,3,4 would it ack 5 and then I would potentially lose 1,2,3,4 records as unprocessed?) I guess acking part is missing if I would like to implement this in the user-code side? It has to go in the framework?

P.S. are there any plans about feature timeline or nothing like that exists?

bojanv55 avatar Oct 16 '25 18:10 bojanv55

The "throttled" commit-strategy, which is the default in Quarkus Messaging, already handles messages that are acked out of order.

As for the key-level ordering, I've done some tests. I can have something functional with minimal overhead using decorators and pausable channels. But it isn't quite feature-complete. If you want to give it a try, I pushed my test here for you to check.

ozangunalp avatar Oct 17 '25 17:10 ozangunalp

@ozangunalp thanks, I will try also your tests to check how it works. In the meanwhile I also created https://github.com/bojanv55/parallel-kafka-connector that is connector for parallel-consumer. It is basically 90% of current code copy-pasted with some changes to support parallel-consumer. I tested it a bit seems working. Could I ask you also to check these 2 lines if they provide optimal integration with quarkus pipelines:

  • is this the way to ack/nack messages in smallrye pipeline https://github.com/bojanv55/parallel-kafka-connector/blob/3c1bfa023ed16f0b9ff46c202e5cd5f03c87a18f/src/main/java/me/vukas/parallel/ParallelKafkaSource.java#L110
  • is it OK to always return the same publisher for multiple "concurrency" consumers https://github.com/bojanv55/parallel-kafka-connector/blob/3c1bfa023ed16f0b9ff46c202e5cd5f03c87a18f/src/main/java/me/vukas/parallel/ParallelKafkaConnector.java#L77

bojanv55 avatar Oct 20 '25 07:10 bojanv55

@ozangunalp Just playing with the test you pushed. Seems I'm not able to block it on the key-level (probably doing something wrong in my code?)

This is the code that receives the messages

@io.smallrye.reactive.messaging.annotations.Blocking(ordered = false/*, value = "my-pool"*/)
    @Incoming("some-topic")
    //@Blocking
    public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
        Log.info("Received user: " + word);
        return myEntityRepository.upsertCurrentProducer("Asd", "sdf", word);
    }

This is the code that saves to db eventually

Uni<Void> upsertCurrentProducerInternal(
            String x, String y, String producer) {

        String sql =
                "INSERT INTO MyEntity (id, field)\n" +
                        "SELECT 1 AS id, '"+producer+"' AS field\n" +
                        "FROM DUAL\n" +
                        "WHERE SLEEP(5) = 0\n" +
                        "ON DUPLICATE KEY UPDATE field = 'fff';";

        return sessionFactory.withTransaction(
                (session, tx) ->
                        session
                                .createNativeQuery(sql)
                                .executeUpdate()
                                .replaceWithVoid());
    }

I would expect that when I send 5 messages with the same key at the same time - since the key is same and SQL is sleeping for 5 seconds - it should not process other messages with same key until 5 seconds passed. But it automatically consumes all other messages.

This is my config

smallrye.messaging.worker.my-pool.max-concurrency=50
mp.messaging.incoming.some-topic.connector=smallrye-kafka
mp.messaging.incoming.some-topic.commit-strategy=throttled
mp.messaging.incoming.some-topic.throttled.processing-order=ordered_by_key
mp.messaging.incoming.some-topic.auto.offset.reset=latest
mp.messaging.incoming.some-topic.group.id=grupaTrotled
mp.messaging.incoming.some-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.some-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Do you have any advice here? Thanks.

bojanv55 avatar Oct 28 '25 09:10 bojanv55

@ozangunalp After checking the code a bit more - I guess this is only implemented and working in your tests?

bojanv55 avatar Oct 28 '25 09:10 bojanv55

I guess this is only implemented and working in your tests?

That's true. Let me push something that can work OOTB with your code.

ozangunalp avatar Oct 29 '25 12:10 ozangunalp

You can check it out again.

ozangunalp avatar Oct 29 '25 13:10 ozangunalp

@ozangunalp you sure you pushed? I cannot find anything.

bojanv55 avatar Oct 29 '25 14:10 bojanv55

Here : https://github.com/ozangunalp/smallrye-reactive-messaging/commits/kafka_throttled_concurrency_ordered/

Note that it needs some more work. And I am thinking we could even generalize this to other connectors.

ozangunalp avatar Oct 29 '25 14:10 ozangunalp

@ozangunalp Locking seems working now with my example. Will test it a bit more. For now I can see only that when I emit a bit more messages in sequence, I get this exception

2025-10-29 18:08:50,922 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18231: The record 132 from topic-partition 'some-0' has waited for 62 seconds to be acknowledged. This waiting time is greater than the configured threshold (60000 ms). At the moment 423 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 131. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.
2025-10-29 18:08:50,922 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18228: A failure has been reported for Kafka topics '[some]': io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException: The record 132 from topic/partition 'some-0' has waited for 62 seconds to be acknowledged. At the moment 423 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 131.

2025-10-29 18:08:50,925 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18224: Executing consumer revoked re-balance listener for group 'grupaTrotled'

Just watched this in wireshark - when I emit 50 elements (starting from 5) it ACKs only 3 (5, 48, 55) but it acks all that it needs to and it seems to do that pretty slow, but at least it doesn't timeout. When I emit 500 elements (starting from 55) it ACKs only 3 and it doesn't ACK all needed ones (should be up to 555) again (57, 107, 132) and then it gets stuck and times-out.

bojanv55 avatar Oct 29 '25 16:10 bojanv55

How long does it take to process a single message ?

ozangunalp avatar Oct 29 '25 17:10 ozangunalp

I wonder if this may be related to some fairness issue between key groups.

ozangunalp avatar Oct 29 '25 17:10 ozangunalp

@ozangunalp Seems that merge is the problem. When I increase this one to much larger value it works. When I decreased it to smaller value (e.g. 2 or 3) it immediately stuck.

return groupBy
                    .onItem().transformToMulti(g -> {
                        PausableMulti<? extends Message<?>> pausable = new PausableMulti<>(g, false);
                        return pausable
                                .invoke(pausable::pause)
                                .map(m -> m.withAck(() -> m.ack().thenRun(pausable::resume)));
                    })
                    .merge(5000);

bojanv55 avatar Oct 30 '25 09:10 bojanv55

Is it possible that if I have 6 records with ID-s 1,2,3,4,5,6 and merge 5 for example, it can start processing records 2,3,4,5,6 while record 1 is left to be processed later for whatever reason and then block in merge, since max allowed elements in parallel is 5, but it cannot commit 6 and release merge because we did not commit 1 yet, and 1 cannot be processed at all because it is stuck with maximum elements in merge?

bojanv55 avatar Oct 30 '25 09:10 bojanv55

@bojanv55 yes, I realize this needs to be closer to the Kafka connector to work properly. It is not so much a problem with the merge – I think there may be some retention in the merge queue, but it should not blow up. It is how the commit strategy considers that the record is received for processing.

I created a draft PR upstream with the tests we are doing here: https://github.com/smallrye/smallrye-reactive-messaging/pull/3208

Please check the latest change. I think it does fix the issue you are having.

ozangunalp avatar Oct 30 '25 10:10 ozangunalp

Definitely something with that merge - but not what I thought. Just tested your new code and it has similar/different behavior. I increased merge to 512 and emit 500 elements - it completes first time and ACKS all and last one OK. But then I waited few minutes and emitted 500 more elements, it got stuck after 12 - this time it actually ACKS those but doesn't consume anymore and just gets stuck/lags (kafka lag). So there are more elements in kafka waiting for processing but it doesnt't pull new in. I will paste here also my full code - maybe my code creates this behavior?

@ApplicationScoped
public class DemoProcessor {

    @Inject
    MyEntityRepository myEntityRepository;

    @io.smallrye.reactive.messaging.annotations.Blocking(ordered = false, value = "my-pool")
    @Incoming("some-topic")
    public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
        Log.info("Received user: " + word);
        return myEntityRepository.upsertCurrentProducer("Asd", "sdf", word).onTermination().invoke(() -> System.out.println("Done with " + word));
    }
}



@ApplicationScoped
public class MyEntityRepository implements PanacheRepository<MyEntity> {
    private final Mutiny.SessionFactory sessionFactory;
    private final Vertx vertx;

    public MyEntityRepository(Mutiny.SessionFactory sessionFactory, Vertx vertx) {
        this.sessionFactory = sessionFactory;
        this.vertx = vertx;
    }

    public Uni<Void> upsertCurrentProducer(
            String x, String y, String producer) {

        return runOnEventLoop(
                () -> upsertCurrentProducerInternal(x, y, producer));
    }

    Uni<Void> upsertCurrentProducerInternal(
            String x, String y, String producer) {

        String sql =
                "INSERT INTO MyEntity (id, field)\n" +
                        "SELECT 1 AS id, '"+producer+"' AS field\n" +
                        "FROM DUAL\n" +
                        "WHERE SLEEP(5) = 0\n" +
                        "ON DUPLICATE KEY UPDATE field = 'fff';";

        return sessionFactory.withTransaction(
                (session, tx) ->
                        session
                                .createNativeQuery(sql)
                                .executeUpdate()
                                .replaceWithVoid());
    }

    private <T> Uni<T> runOnEventLoop(Supplier<Uni<T>> supplier) {
        return Uni.createFrom()
                .emitter(
                        emitter -> {
                            Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
                            VertxContextSafetyToggle.setContextSafe(context, true);

                            context.runOnContext(
                                    v -> {
                                        try {
                                            supplier.get().subscribe().with(emitter::complete, emitter::fail);
                                        } catch (Exception ex) {
                                            emitter.fail(ex);
                                        }
                                    });
                        });
    }
}

bojanv55 avatar Oct 30 '25 11:10 bojanv55

Why do you run your processing on the event loop?

ozangunalp avatar Oct 30 '25 11:10 ozangunalp

@ozangunalp otherwise I get this:

2025-10-30 17:54:57,700 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-3) SRMSG18228: A failure has been reported for Kafka topics '[some]': java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'my-pool-2'

Is there some other way to mitigate this exception? Also, do you think this is the reason for the hanging of the processing?

bojanv55 avatar Oct 30 '25 16:10 bojanv55

Is there some other way to mitigate this exception? Also, do you think this is the reason for the hanging of the processing?

You can use Hibernate ORM instead of Hibernate Reactive.

ozangunalp avatar Oct 30 '25 17:10 ozangunalp

Just simplified the code and removed database altogether - it still fails after 512 elements (merge(512)) and I get 512 ACK that is the last one and then kafka is in lag mode...

    @io.smallrye.reactive.messaging.annotations.Blocking(ordered = false, value = "my-pool")
    @Incoming("some-topic")
    public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
        Log.info("Received user: " + word);
        return Uni.createFrom().voidItem().onTermination().invoke(() -> System.out.println("Done with " + word));
    }

bojanv55 avatar Oct 30 '25 19:10 bojanv55

@ozangunalp I will investigate a bit more, but I have some feeling that it is related somehow to this https://github.com/smallrye/smallrye-mutiny/discussions/1849 discussion that I opened earlier this year. At that time the group by had a bug, and it behaved almost similar to this. I hoped that it was fixed, but now when I run the same code I had at that time it still fails. Do you think it can be because of that problem? You can try this code which I think should complete but it doesn't and just times-out...

Multi.createFrom().iterable(IntStream.range(0,300).boxed().toList())
                .group()
                .by(x -> x > 20)
                .onItem()
                .transformToMultiAndMerge(
                        group -> Boolean.TRUE.equals(group.key())
                                ? Multi.createFrom().item(1)
                                : Multi.createFrom().item(2)
                )
                .onCompletion()
                .invoke(() -> System.out.println("Transformation completed"))
                .ifNoItem().after(Duration.ofSeconds(10)).fail().onFailure().invoke(throwable -> {
                    System.out.println("❌ Failure: " + throwable);
                    throwable.printStackTrace();
                })
                .runSubscriptionOn(Executors.newSingleThreadExecutor())
                .subscribe().with(
                        response -> System.out.println("Received: " + response),
                        failure -> System.err.println("Failed: " + failure)
                );

if you change this line to 40, it completes Multi.createFrom().iterable(IntStream.range(0,40).boxed().toList()). I guess it was never fixed properly...

bojanv55 avatar Oct 30 '25 21:10 bojanv55

 .transformToMultiAndMerge(
                        group -> Boolean.TRUE.equals(group.key())
                                ? Multi.createFrom().item(1)
                                : Multi.createFrom().item(2)
                )

This is not correct, by dissociating the transform result from the group you are creating a request starvation in the source.

The correct way of doing it is

 .transformToMultiAndMerge(
                        group -> Boolean.TRUE.equals(group.key())
                                ? group.map(ignored -> 1)
                                : group.map(ignored -> 2)
                )

ozangunalp avatar Oct 30 '25 21:10 ozangunalp

Just simplified the code and removed database altogether - it still fails after 512 elements (merge(512)) and I get 512 ACK that is the last one and then kafka is in lag mode...

    @io.smallrye.reactive.messaging.annotations.Blocking(ordered = false, value = "my-pool")
    @Incoming("some-topic")
    public Uni<Void> sink(String word, IncomingKafkaRecordMetadata<String, String> metadata) {
        Log.info("Received user: " + word);
        return Uni.createFrom().voidItem().onTermination().invoke(() -> System.out.println("Done with " + word));
    }

This is odd, and I cannot reproduce it in the tests. Can you modify one of the tests in my branch and test it ?

ozangunalp avatar Oct 30 '25 21:10 ozangunalp

@ozangunalp OK, then it was me using groupBy in a wrong way... Nevertheless I think that should also be handled in the code (to log warning or smth if new Multi is returned from inner method). Otherwise I'm sure people (like me) will think it should also support those Multi.createFrom().. like I created.

Going back to your test request, I managed to reproduce in your tests. You can find it here https://github.com/bojanv55/smallrye-reactive-messaging/tree/kafka_throttled_concurrency_ordered_failing_test

Method testOrderedByKeyProcessing will hang after 256 messages.

bojanv55 avatar Oct 31 '25 17:10 bojanv55

I see that effectively, you are creating a key space per record sent. So there are 500 key groups to order.

The merge concurrency is there to limit the number of requests sent to each group. So, in a normal merge case, it limits the items requested from each merged stream. It also means that because the merge already requested (more) items from 256 groups, it doesn't request from the 257th group.

This may be an issue with the merge implementation. We need to check with @jponge.

Nevertheless, in this usage, we already have a PausableMulti, letting items downstream one by one. So I think we can use a very large number (Integer.MAX) as merge concurrency.

ozangunalp avatar Oct 31 '25 17:10 ozangunalp

My use-case is that on one partition I will get many different keys (1,2,3,4,5,6,7...) and only sometimes we can have multiple of such at the same time in which case we need ordering/locking (first-8, second-8, third-8) which would guarantee sequential processing.

Now, if we use Integer.MAX and if all the keys are mostly different, I would again use all the merge slots in potentially few days? (micro-service runs constantly and there is a lot of messages involved).

bojanv55 avatar Oct 31 '25 18:10 bojanv55

We can double check with @jponge but it'll be considered an unbounded request. So it should be fine.

ozangunalp avatar Oct 31 '25 18:10 ozangunalp

Just tried Integer.MAX and emitting a lot of messages... Heap goes crazy pretty fast (this is after GC and retaining only needed refs)...

Image

bojanv55 avatar Nov 01 '25 13:11 bojanv55