Message loss with message delivery not guaranteed
Description
I have a pulsar client created with a one producer created for a topic and ran a load tests and I could see that few of the messages are lost. In the current implementation is message delivery guaranteed with a publisher confirmation ?
Pulsar broker :
Current version of pulsar is: 4.1.0
Broker Config
Library Version: 4.3.2
Client and Producer code
builder = PulsarClient.Builder() .Authentication(AuthenticationFactory.Token(TOKEN)) .CheckCertificateRevocation(true) .ExceptionHandler(new CustomExceptionHandler()) .KeepAliveInterval(new TimeSpan(0, 0, 30)) .RetryInterval(new TimeSpan(0, 0, 2)) .CloseInactiveConnectionsInterval(new TimeSpan(0, 0, 30)); builder.ConnectionSecurity(EncryptionPolicy.EnforceEncrypted) .ServiceUrl(new Uri($"pulsar+ssl://{pulsarHostName}:6651")) .TrustedCertificateAuthority(Certificate.GenerateCertificate(certificateOption.Value.Pulsar.Ca)) .VerifyCertificateAuthority(true) .VerifyCertificateName(false);
_client = builder.Build(); _producer = _client.NewProducer(Schema.ByteArray) .ProducerName($"{@event}-{Environment.MachineName}")
.StateChangedHandler(producerStateChanged =>
{
var topicName = producerStateChanged.Producer.Topic;
var state = producerStateChanged.ProducerState;
_logger.LogDebug(new ApplicationLogMessage
{
Message = $"The producer for topic '{topicName}' changed state to '{state}'"
});
})
.Topic($"persistent://global/seed/Test)
.Create();
Publisher code
var messageId = await producer.Send(new MessageMetadata(), byteArray, CancellationToken.None).ConfigureAwait(false);
Reproduction Steps
The publisher code is exposed via an http end point.
Run a load test
Number of users : 10 Number of iterations : 20
Expected behavior
Expected: 200 Messages to be published.
Actual behavior
There are few messages loss with EntryId and LedgerId returned as negative values
Regression?
No response
Known Workarounds
No response
Configuration
No response
Other information
No response
Calling 'producer.Send' gives you a ValueTask<MessageId>, and this is not completed until the broker has confirmed receiving the message and giving it a message ID. So if Send is completed and you get a message ID, but the message is not on the topic, then it sounds like an issue with the broker (or your configuration of the broker and/or topic).
Thank you @blankensteiner , I got the MessageId but the EntryId and LedgeId returned within the MessageId are -ve values. So does that mean it's a broker issue or library wasn't able to get a confirmation.
I have update the Original thread with pulsar version and broker config we use in K8s our cluster
When you send a message using DotPulsar it will send a "CommandSend" to the broker. The broker will then send either a "CommandSendReceipt" or "CommandSendError" in response to that. If DotPulsar gets a CommandSendError, you would get an exception; if DotPulsar gets a CommandSendReceipt, you get the message ID. Since you are getting a message ID, it seems that the broker is telling DotPulsar that it has received and stored the message.
Thank you @blankensteiner for your help. MessageId is returned but what does this negative values below mean ?
"partitionKey": "none", "partition": -1, "ledgerId": -9223372036854800000.0, "batchIndex": -1, "eventType": "TopicIntegrationEvent", "entryId": -9223372036854800000.0, "dataSize": 1161, "eventName": "TopicIntegrationEvent"
A MessageId consists of ledgerId, entryId, partition, batchIndex, and topic. Where did you get "partitionKey", "eventType", "dataSize", and "eventName" from? and where is the topic?
Internally we were logging other params , yes as you mentioned we are getting only the following properties in messageid.
"partition": -1, "ledgerId": -9223372036854800000.0, "batchIndex": -1, "entryId": -9223372036854800000.0,
during the load test we got few of them returned with -ve values and found those were the missing ones. Does that mean publisher couldn't confirm or something missing from our end ?
Load Test
Service which exposes an end point to publish a message to a topic. Heave 2 instances of this services running and the load to these instances are load balanced in a round robin fashion. So with this setup we have 2 producers actively publishing into this topic who's subscription is keyshared.
I would suggest talking with the Pulsar community (maybe on slack). There must be someone with knowledge of the protocol and broker that can answer this. From DotPulsar's point of view, with the current implementation, we expect the broker to have successfully stored the message if CommandSendReceipt is received as a response to CommandSend. Maybe this has to change, but I think it would be weird to not use CommandSendError to signal that something is not right.
Thank you. Do you see anything wrong with client implementation code that I have shared ?
In the pulsar conf I updated the flag brokerDeduplicationEnabled as false and after that I'm not seeing the -ve values returned. Does that mean the SequenceId uniqueness is not guaranteed per producer.
during the load tests my application didn’t have any restarts but just wanted to check if my observations were right.
Scenario
While creating the producer there is an option to set the InitialSequenceId() if that is not set then I guess it defaulted as 0. So If my application restarts then the sequenceId will be reset as 0 and will there be a chance of generating a sequenceId which was generated in the past? Should the producer track and save the sequenceId's so that the InitialSequenceId() is set with the latest value whenever the producer restarts?
Thank you. Do you see anything wrong with client implementation code that I have shared ?
No, I do not.
In the pulsar conf I updated the flag brokerDeduplicationEnabled as false and after that I'm not seeing the -ve values returned. Does that mean the SequenceId uniqueness is not guaranteed per producer.
I guess so, but you would have to check with the Pulsar community (it is not DotPulsar specific).
Should the producer track and save the sequenceId's so that the InitialSequenceId() is set with the latest value whenever the producer restarts?
Yes.
Thank you. Do you see anything wrong with client implementation code that I have shared ?
No, I do not.
In the pulsar conf I updated the flag brokerDeduplicationEnabled as false and after that I'm not seeing the -ve values returned. Does that mean the SequenceId uniqueness is not guaranteed per producer.
I guess so, but you would have to check with the Pulsar community (it is not DotPulsar specific).
Should the producer track and save the sequenceId's so that the InitialSequenceId() is set with the latest value whenever the producer restarts?
Yes.
By looking at the dotpulsar library code I thought the sequenceId is generate by the library and not by pulsar. So you suspect pulsar is not honoring producer name and sequenceId combination uniqueness ?
The end-user, creating the producer, has to keep track of the sequenceId. I don't know what I suspect, which is why someone with more knowledge of the protocol/broker may be more helpful. Could be that the messageIds you describe are signaling that the message is a duplicate.
We have initiated a new thread in the Pulsar Slack channel: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1759732652521889
According to the response generated by the AI assistant (available here: https://apache-pulsar.slack.com/archives/C07CA4STH99/p1759732676588439 ), this appears to be a known issue in Pulsar, particularly when using non-Java clients or batching. It has been identified as a client-side batching issue, for which the Java client already includes a fix.
Could you please confirm if a similar fix is planned or expected for the DotPulsar as well?
@blankensteiner - I guess Pulsar java client had this issue addressed in v3.1.0.
https://pulsar.apache.org/release-notes/versioned/pulsar-3.1.0/
Hi @satishviswanathan and @praveengopalan A PR is most welcome, otherwise I need a detailed description of what needs to change.
There's been references to https://github.com/apache/pulsar/pull/6326 and https://github.com/apache/pulsar-client-go/issues/1395 which explain the context. Similar logic as https://github.com/apache/pulsar/pull/6326/files#diff-d6fcf8aa2d0035cc386dca0942a452343d6854763c7fd397efa4e660c0069767 would be needed.
Without studying it closely, I see a lot of mentioning of batching. I just want to make sure this is not a "Producer sending batches" issue, since DotPulsar only supports reading batches and not sending batches.
Without studying it closely, I see a lot of mentioning of batching. I just want to make sure this is not a "Producer sending batches" issue, since DotPulsar only supports reading batches and not sending batches.
Yes, the referenced Pulsar client issue is related to sending batches.
@praveengopalan Please share the (minimal) app and steps to reproduce that can be followed (the current reproducing steps in the description don't fullfil this condition). You can share the app in a separate GitHub repository, a forked repository or with files in a GitHub gist. The following step could then be to have a failing test case for DotPulsar and then finally fixing the issue by bridging the gap and making the test pass.
@lhotari The sample app and standalone.conf available at https://github.com/praveengopalan/pulsar-minimal
curl --location 'http://localhost:64462/api/publish'
--header 'Content-Type: application/json'
--data '{
"Content":"Pulsar Test Publish",
"MessageCount":500
}'
Steps to reproduce
call '/api/publish' endpoint Expected - 500 message should be published to topic "persistent://viteglobal/Seed/TopicEvent" Actual - only one message published