pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

Flaky-test: SimpleProducerConsumerTest.testMultiTopicsConsumerImplPauseForManualSubscription

Open lhotari opened this issue 1 year ago • 1 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Example failure

https://github.com/apache/pulsar/actions/runs/11407423480/job/31743771210?pr=23484#step:11:1686

Exception stacktrace

  Error:  Tests run: 294, Failures: 1, Errors: 0, Skipped: 231, Time elapsed: 352.306 s <<< FAILURE! - in org.apache.pulsar.client.api.SimpleProducerConsumerTest
  Error:  org.apache.pulsar.client.api.SimpleProducerConsumerTest.testMultiTopicsConsumerImplPauseForManualSubscription  Time elapsed: 10.489 s  <<< FAILURE!
  java.lang.AssertionError: expected [30] but found [29]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertEqualsImpl(Assert.java:149)
  	at org.testng.Assert.assertEquals(Assert.java:131)
  	at org.testng.Assert.assertEquals(Assert.java:1418)
  	at org.testng.Assert.assertEquals(Assert.java:1382)
  	at org.testng.Assert.assertEquals(Assert.java:1428)
  	at org.apache.pulsar.client.api.SimpleProducerConsumerTest.testMultiTopicsConsumerImplPauseForManualSubscription(SimpleProducerConsumerTest.java:3580)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

lhotari avatar Oct 18 '24 16:10 lhotari

Hi @lhotari I would like to share my findings about the test case. It tests the pausing of a multi-topic consumer by doing the following:

  • There are 3 topics.
  • Step 1: Produce 5 messages per topic. Create a multi-topic consumer for topic 1 and 2 (but not topic 3) with receiverQueueSize of 1. Receive 8 messages. Here it looks to me like 4 messages of topic 1 and 4 messages of topic 2 should be consumed? In successful test runs this is the case. Then one message remains in the queue of each of the two (internal) consumers. However, in unsuccessful test runs, for me, it was always the case that 5 messages of one topic, and 3 messages of the other topic were received.
  • Step 2: Pause the multi-topic consumer.
  • Step 3: Add topic 3 to the subscription of the multi-topic consumer. Now there are 3 internal consumers, one per topic.
  • Step 4: Produce 5 more messages per topic.
  • Step 5: Here the receiver queues of consumer 1 and 2 should be cleared by receiving 2 messages. In successful test runs this is the case. In failing test runs 2 messages of the same topic are received.
  • Step 6: It is expected, that both consumer queues are empty, and since the multi-consumer is paused, no message should be consumed. A problem might be that the following code is used:
// 6. should not consume any messages
Awaitility.await().untilAsserted(() -> assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)));

instead of:

// 6. should not consume any messages
assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS));

With the current test code, it is possible that a message is received in the first call of the await, but no message is received in the next call, and then assertNull succeeds. Then the test does not fail even though a message is received. This is why, in the end, the counter of received messages is 29 instead of 30, because one message is received in step 6.

  • Step 7: Resume multi-topic consumer.
  • Step 8: Consume the remaining messages.

It seems, that the multi-topic consumer is automatically resumed between steps 2 and 7 in MultiTopicsConsumerImpl.resumeReceivingFromPausedConsumersIfNeeded().

pdolif avatar Oct 19 '24 19:10 pdolif

I would like to share my findings about the test case. It tests the pausing of a multi-topic consumer by doing the following

Very good analysis @pdolif !

One detail that caught my eye in the test is step 3: https://github.com/apache/pulsar/blob/7367f1c6553c83d7d335977b86ed38494c9485b7/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L3541-L3542

That violates principles of testing where tests should test the public API and externally observable behaviors. There are a lot of such tests in the Pulsar code base and we should also move towards a direction where this wouldn't carried forward.

It seems, that the multi-topic consumer is automatically resumed between steps 2 and 7 in MultiTopicsConsumerImpl.resumeReceivingFromPausedConsumersIfNeeded().

It might be different. the pausing logic seems to be different than what is tested. Calling .pause() will stop sending flow permits to broker and the pausing referenced in MultiTopicsConsumerImpl.resumeReceivingFromPausedConsumersIfNeeded() is a different matter.

lhotari avatar Oct 21 '24 06:10 lhotari