pulsar
pulsar copied to clipboard
In Producer batching is not working with synchronous mode using send api
Search before asking
- [X] I searched in the issues and found nothing similar.
Read release policy
- [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
3.2.x
Minimal reproduce step
ProducerBuilder producerBuilderBatching = client.newProducer(Schema.STRING)
.accessMode("Shared")
.compressionType("NONE")
.enableBatching(true)
.blockIfQueueFull(true)
.batchingMaxMessages(5)
.batchingMaxBytes(13270)
.batchingMaxPublishDelay(70000, TimeUnit.MILLISECONDS);
ProducerBase<String> producerStringBatching = (ProducerBase<String>) producerBuilderBatching.topic("batch-test-topic")
.sendTimeout(20, TimeUnit.SECONDS).create();
MessageId msgId = producerStringBatching.newMessage().value("Check batching").send();
What did you expect to see?
number of messages should increase from 0 to 4 in a single batch(as max number of messages configured in a single batch was 5) and below are the pulsar SDK logs which should print in console
2024-04-02T14:54:08,963 DEBUG [pool-22-thread-2] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 0
2024-04-02T14:54:08,964 DEBUG [pool-22-thread-15] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 1
2024-04-02T14:54:08,964 DEBUG [pool-22-thread-9] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 2
2024-04-02T14:54:08,965 DEBUG [pool-22-thread-6] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 3
2024-04-02T14:54:08,965 DEBUG [pool-22-thread-20] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-800] add message to batch, num messages in batch so far 4
just like working in async mode
What did you see instead?
number of messages in a single batch is always 0 while using the synchronous mode
2024-04-02T14:43:26,850 DEBUG [pool-22-thread-15] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-792] add message to batch, num messages in batch so far 0
2024-04-02T14:43:26,852 DEBUG [pool-22-thread-8] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-792] add message to batch, num messages in batch so far 0
2024-04-02T14:43:26,854 DEBUG [pool-22-thread-7] org.apache.pulsar.client.impl.BatchMessageContainerImpl - [batch-test-topic] [standalone_onessl-4-792] add message to batch, num messages in batch so far 0
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
For the purpose of reduce the sync-api send message latency, client will flush messages to broker when you call the sync-api. And sync-api will block your app's threads, if we don't flush messages to broker but waiting until the batch is full, it may block all the threads of your app.
/cc @gaoran10 @codelipenghui
Thanks for the good issue report @ragaur-tibco !
For the purpose of reduce the sync-api send message latency, client will flush messages to broker when you call the sync-api.
@dao-jun Yes, this is reasonable, but the problem is that this doesn't seem to be documented any where.
I found the flushing logic here: https://github.com/apache/pulsar/blob/ffff639a1b73a34bbb5115503d4c7783bb2a2770/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java#L82-L86
I think @ragaur-tibco's issue report is great and the minimum resolution is to document the behavior. It seems that there could be a use case where one would want to achieve batching while using the synchronous API. That's not currently supported. We have 2 options for resolving this:
- Documenting the behavior and making it explicit that synchronous send will trigger flushing of the current batch message.
- Adding support for controlling the triggering behavior so that batching would be possible also with the synchronous API. Since the caller thread will be blocked until the batch is complete, it would be useful only in cases where other caller threads could contribute to the same batch.
I'm personally in favor of option 1., but I'm open for option 2. if someone supports that and the change goes through the typical PIP process we have in Pulsar.
@lhotari I think support batch message for sync-api has potential risks, as I mentioned before, if we don't flush messages but waiting until the batch is full, it may block users' app threads for a long while. Especially there are many producers send message in batch in a single app, it may block all the threads.
I prefer option 1, and I'll fix the doc later.