pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Client with shared subscription is blocked

Open michalcukierman opened this issue 1 year ago • 29 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

Client - 3.1.0 Pulsar - 3.1.0 (and later builds)

Also reported on 3.0.1

Minimal reproduce step

My reproducible steps:

  1. Create persistent topic with 3 partitions
  2. Publish 1 mln messages (30KB)
  3. Run the client and consumer:
    PulsarClient client = PulsarClient.builder()
        .serviceUrl(this.pulsarBrokerUrl)
        .build();

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(5, TimeUnit.SECONDS)
        .subscribe();

What did you expect to see?

All messages are received

What did you see instead?

Client stops to receive messages, restart client helps, but it get stuck after some time.

Anything else?

The issue was originally created described here: #21082 @MichalKoziorowski-TomTom also faces the issue.

I've created new issue, because it in #21082 the author says that broker restart helps. In case of this issue, it looks like it's client related and some race condition observed in 3.x.x. after introducing ackTimeout

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

michalcukierman avatar Aug 31 '23 11:08 michalcukierman

@michalcukierman Could you please share the topic stats and internal-stats when the issue is reproduced?

codelipenghui avatar Aug 31 '23 15:08 codelipenghui

Two files attached: partitioned.stats.internal.txt partitioned.stats.txt

I see that in stats we have:

        "availablePermits" : 0,
        "unackedMessages" : 25,

        "availablePermits" : 0,
        "unackedMessages" : 32,

, but I do ack all of the messages:

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(ackTimeout, TimeUnit.SECONDS)
        .subscribe();

    Producer<byte[]> producer = client.newProducer(Schema.BYTES)
        .topic(destinationTopic)
        .compressionType(CompressionType.LZ4)
        .maxPendingMessages(8)  // Support 5 MB files
        .blockIfQueueFull(true) // Support 5 MB files
        .batchingMaxBytes(5242880)
        .create();

    Multi<Messagemessages = Multi.createBy().repeating()
        .completionStage(consumer::receiveAsync)
        .until(m -> closed.get());

    messages.subscribe().with(msg -> {
      receivedDistribution.record(getKiloBytesSize(msg.getData()));
      Uni.createFrom().completionStage(producer.newMessage(Schema.BYTES)
              .key(msg.getKey())
              .value(msg.getValue().getContent().getPayload().toByteArray())
              .eventTime(msg.getEventTime()).sendAsync())
          .subscribe().with(msgId -> {
            sentCounter.increment();
            try {
              consumer.acknowledge(msg.getMessageId());
            } catch (PulsarClientException e) {
              throw new RuntimeException("Unable to send or ACK the message", e);
            }
          });
    });
  }

askTimeout is set to 120 seconds.

It can be confirmed in the log files, where sentCounter is just on message behind Screenshot 2023-09-01 at 13 11 49

michalcukierman avatar Sep 01 '23 11:09 michalcukierman

Here are the alternative stats: https://github.com/apache/pulsar/issues/21082#issuecomment-1698836870

michalcukierman avatar Sep 01 '23 12:09 michalcukierman

I think it happens:

  • there is ackTimeout set on a consumer
  • high throughput
  • the receviverQueue size is low

There may be a race condition in 3.1.0 client, as the situation was not observed with 2.10.4 (we've downgraded, also @MichalKoziorowski-TomTom reported this as a fix).

michalcukierman avatar Sep 01 '23 13:09 michalcukierman

We don't have ackTimeout and it still happens. In my case, my topic has only one partition.

you said 1 million messages right? Could you post your sample payload? I want to reproduce and debug.

redoc123 avatar Sep 05 '23 16:09 redoc123

It's 1 mln x 30 kb of sample HTML file. Any text file would be good. What we are doing:

topic:source ( 1mln x 30k) -> app:router -> topic:destination

source has 3 partitions destination has 6 partitions Router has 2 replicas, uses shared subscription to read from source, writing to destination and ack's incoming message once the message is stored and acked on destination

michalcukierman avatar Sep 05 '23 17:09 michalcukierman

Here is the exact payload: 30k.html.zip

michalcukierman avatar Sep 05 '23 17:09 michalcukierman

+1,it seems also occurred in other consumer mode(shared,keyshared)

lvdelu avatar Sep 06 '23 03:09 lvdelu

Happened today again on different module.

topic - not partitioned, without retention messages - 50k, around 5k each subscription - shared, with two clients

The solution is to use 2.10.4 client for now.

michalcukierman avatar Sep 07 '23 16:09 michalcukierman

I'll try to reproduce it and get back to you. :)

mattisonchao avatar Sep 08 '23 02:09 mattisonchao

HI, @michalcukierman

I created a repo to reproduce this issue. But no luck. Could you please help me to refine this test?

mattisonchao avatar Sep 21 '23 06:09 mattisonchao

@mattisonchao I'll have a time next week to get back to it.

michalcukierman avatar Sep 21 '23 11:09 michalcukierman

how about fix deadline ?

lvdelu avatar Sep 26 '23 10:09 lvdelu

@lvdelu Could you help to refine this test to help reproduce it? repo

mattisonchao avatar Sep 27 '23 02:09 mattisonchao

I've tried to create isolated environment to reproduce the issue on local machine using test containers, but I was not able today. The environment I observe the issue is a GCP cluster with:

  • 5 x Bookkeeper
  • 3 x Broker
  • 3 x Proxy

It also happens on randomly, with 1 million messages It sometimes get stuck around 300k-500k messages, but it is not deterministic. I'll try to reproduce the issue on GCP, but I'll need more time. It may happen that more than one partition on more than one brokers are required. Unfortunately with the test containers I am not able to recreate the same load.

michalcukierman avatar Sep 29 '23 22:09 michalcukierman

It looks like I was able to reproduce the issue in the two runs today (failed 2/2).

  • I've created SmallRye project (one producer to produce 1 mln messages, one consumer to read it using shared subscription)
  • The cluster is created on GCP using Pulsar operator from SN (great tool!)

The code is here: https://github.com/michalcukierman/pulsar-21104

In general it's very much like in the bug description. Produce 1 mln messages of 30kb:

  @Outgoing("requests-out")
  public Multi<String> produce() {
    return Multi.createBy().repeating()
        .uni(() -> Uni
            .createFrom()
            .item(() -> RandomStringUtils.randomAlphabetic(30_000))
            .onItem()
            .invoke(() -> System.out.println("+ Produced: " + outCount.incrementAndGet()))
        )
        .atMost(1_000_000);
  }

Read it using client with shared subscription and write to another topic:

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}

The settings of the client are:

pulsar.client.serviceUrl=pulsar://brokers-broker:6650

mp.messaging.incoming.requests-in.subscriptionType=Shared
mp.messaging.incoming.requests-in.numIoThreads=4
mp.messaging.incoming.requests-in.subscriptionName=request-shared-subscription
mp.messaging.incoming.requests-in.ackTimeoutMillis=5000
mp.messaging.incoming.requests-in.subscriptionInitialPosition=Earliest
mp.messaging.incoming.requests-in.receiverQueueSize=8
mp.messaging.incoming.requests-in.topic=persistent://public/default/requests_4
mp.messaging.incoming.requests-in.connector=smallrye-pulsar

mp.messaging.outgoing.dump-out.topic=persistent://public/default/dump
mp.messaging.outgoing.dump-out.connector=smallrye-pulsar
mp.messaging.outgoing.dump-out.blockIfQueueFull=true
mp.messaging.outgoing.dump-out.maxPendingMessages=8
mp.messaging.outgoing.dump-out.maxPendingMessagesAcrossPartitions=12

The retention of the topic requests is set using Pulsar Admin in Java to -1 -1.

During two runs the consumer get stucked: Screenshot 2023-10-02 at 21 12 42

michalcukierman avatar Oct 02 '23 19:10 michalcukierman

@michalcukierman

I left a comment here, and you can answer the comment under current Issue, Thanks

poorbarcode avatar Oct 25 '23 11:10 poorbarcode

Both issues may not be related, In both cases the subscriptions are blocked, but in this case the restart of the broker didn't help - it looks like a deadlock in the client.

@poorbarcode have you tried the repository I've linked? https://github.com/michalcukierman/pulsar-21104

It's possible to reproduce it on GCP cluster. Should work on other clusters as well.

michalcukierman avatar Nov 01 '23 00:11 michalcukierman

It might be fixed with https://github.com/apache/pulsar/issues/22352. Need to check...

@michalcukierman Could you recheck your case? I've checked my case and I can't reproduce with 3.0.4 client while it was easily reproducible with 3.0.1. Test with version that includes https://github.com/apache/pulsar/issues/22352 fix.

I could not reproduce the issue with Client 3.0.4, I can still reproduce the issue with Client 3.2.2 @codelipenghui the hprof is here: https://github.com/michalcukierman/pulsar-21104/blob/main/heap.hprof It was taken on the execution of the code that is currently in the main branch (Pulsar Cluster 3.1.0 / Pulsar Client 3.2.2)

michalcukierman avatar May 07 '24 11:05 michalcukierman

I've noticed that with Client 3.2.2 the behavior may be a bit different. The consumers get blocked, but occasionally resume. After a couple messages received are stuck again. Stats are uploaded: https://github.com/michalcukierman/pulsar-21104/tree/main/stats

I've confirmed once again, the issue does not occur with Pulsar Client 3.0.4 (or at least I was not able to reproduce it after processing 1 mln messages. With Client 3.2.2 usually clients are blocked after 50k messages).

michalcukierman avatar May 07 '24 12:05 michalcukierman

https://github.com/apache/pulsar/assets/4356553/a8a9ef21-f12d-4d4d-97b5-9c1b028f4d8f https://github.com/apache/pulsar/assets/4356553/a104c3fe-ad46-4b48-82c2-e045e15b7ac4 Comparison of Client 3.2.2 and Client 3.0.4

michalcukierman avatar May 07 '24 12:05 michalcukierman

@poorbarcode have you had chance to see the last comment?

michalcukierman avatar May 30 '24 19:05 michalcukierman

I've noticed that with Client 3.2.2 the behavior may be a bit different. The consumers get blocked, but occasionally resume. After a couple messages received are stuck again. Stats are uploaded: https://github.com/michalcukierman/pulsar-21104/tree/main/stats

I've confirmed once again, the issue does not occur with Pulsar Client 3.0.4 (or at least I was not able to reproduce it after processing 1 mln messages. With Client 3.2.2 usually clients are blocked after 50k messages).

I check the stats and the internal-stats, As I can see, there is about 30k messages in the topic, and there is a subscription named request-shared-subscription to consume the topic, but it seems none of the messages acknowledged, the markDeletedPosition is 0:-1 and individuallyDeletedMessages is empty: image

Which means all the messages are unacked, or, in the backlog. And I checked the stats, for the consumer of request-shared-subscription. I can see msgOutCounter=10002 which means how many messages dispatched to the consumer, unackedMessages=10001 means 10001 messages are unacked. So, obviously, you only receive messages but didn't ack messages.

I guess the configuration maxUnackedMessagesPerConsumer in your broker.conf is 10000. So, since the unackedMessages=10001 exceeds the maxUnackedMessagesPerConsumer=10000, so the blockedConsumerOnUnackedMsgs field in the stats is true. image

Which means there are toooo many unacked messages on the consumer so that the consumer stopped dispatch messages to clients.

Please ack message after process successfully.

dao-jun avatar May 30 '24 21:05 dao-jun

@dao-jun,

The whole source code with the instructions on how to run it is available in the linked repository. Note that the same example works when downgrading the client to 3.0.4 (see the videos). https://github.com/apache/pulsar/issues/21104#issuecomment-2098254335

The message acknowledgment should be done after getting write confirmation from the producer. https://github.com/apache/pulsar/issues/21104#issuecomment-1743609860

This is the original code that was failing. We are no longer using it because we've decommissioned the module.: https://github.com/apache/pulsar/issues/21104#issuecomment-1702584503

https://github.com/apache/pulsar/issues/21104 was created as a reproducible example.

michalcukierman avatar May 31 '24 18:05 michalcukierman

This code creates a pipeline with the source-processor-sink. Sink ACK triggers source ACK.

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}

michalcukierman avatar May 31 '24 18:05 michalcukierman

@michalcukierman I'm not familiar with this kind of development framework, https://github.com/apache/pulsar/issues/21104#issuecomment-2140860378 is based on the stats and internal-stats you provided. The point is: all the messages dispatched to clients are not acked.

You can debug your code to confirm the messages are acked or not

dao-jun avatar May 31 '24 21:05 dao-jun