pulsar-dotpulsar icon indicating copy to clipboard operation
pulsar-dotpulsar copied to clipboard

Message loss with message delivery not guaranteed

Open satishviswanathan opened this issue 2 months ago • 18 comments

Description

I have a pulsar client created with a one producer created for a topic and ran a load tests and I could see that few of the messages are lost. In the current implementation is message delivery guaranteed with a publisher confirmation ?

Pulsar broker :

Current version of pulsar is: 4.1.0

Broker Config

broker.conf.txt

Library Version: 4.3.2

Client and Producer code

builder = PulsarClient.Builder() .Authentication(AuthenticationFactory.Token(TOKEN)) .CheckCertificateRevocation(true) .ExceptionHandler(new CustomExceptionHandler()) .KeepAliveInterval(new TimeSpan(0, 0, 30)) .RetryInterval(new TimeSpan(0, 0, 2)) .CloseInactiveConnectionsInterval(new TimeSpan(0, 0, 30)); builder.ConnectionSecurity(EncryptionPolicy.EnforceEncrypted) .ServiceUrl(new Uri($"pulsar+ssl://{pulsarHostName}:6651")) .TrustedCertificateAuthority(Certificate.GenerateCertificate(certificateOption.Value.Pulsar.Ca)) .VerifyCertificateAuthority(true) .VerifyCertificateName(false);

_client = builder.Build(); _producer = _client.NewProducer(Schema.ByteArray) .ProducerName($"{@event}-{Environment.MachineName}")

 .StateChangedHandler(producerStateChanged =>
 {
     var topicName = producerStateChanged.Producer.Topic;
     var state = producerStateChanged.ProducerState;

     _logger.LogDebug(new ApplicationLogMessage
     {
         Message = $"The producer for topic '{topicName}' changed state to '{state}'"
     });
 })
 .Topic($"persistent://global/seed/Test)
 .Create();

Publisher code

var messageId = await producer.Send(new MessageMetadata(), byteArray, CancellationToken.None).ConfigureAwait(false);

Reproduction Steps

The publisher code is exposed via an http end point.

Run a load test

Number of users : 10 Number of iterations : 20

Expected behavior

Expected: 200 Messages to be published.

Actual behavior

There are few messages loss with EntryId and LedgerId returned as negative values

Regression?

No response

Known Workarounds

No response

Configuration

No response

Other information

No response

satishviswanathan avatar Oct 03 '25 02:10 satishviswanathan

Calling 'producer.Send' gives you a ValueTask<MessageId>, and this is not completed until the broker has confirmed receiving the message and giving it a message ID. So if Send is completed and you get a message ID, but the message is not on the topic, then it sounds like an issue with the broker (or your configuration of the broker and/or topic).

blankensteiner avatar Oct 03 '25 08:10 blankensteiner

Thank you @blankensteiner , I got the MessageId but the EntryId and LedgeId returned within the MessageId are -ve values. So does that mean it's a broker issue or library wasn't able to get a confirmation.

I have update the Original thread with pulsar version and broker config we use in K8s our cluster

satishviswanathan avatar Oct 03 '25 11:10 satishviswanathan

When you send a message using DotPulsar it will send a "CommandSend" to the broker. The broker will then send either a "CommandSendReceipt" or "CommandSendError" in response to that. If DotPulsar gets a CommandSendError, you would get an exception; if DotPulsar gets a CommandSendReceipt, you get the message ID. Since you are getting a message ID, it seems that the broker is telling DotPulsar that it has received and stored the message.

blankensteiner avatar Oct 03 '25 13:10 blankensteiner

Thank you @blankensteiner for your help. MessageId is returned but what does this negative values below mean ?

"partitionKey": "none", "partition": -1, "ledgerId": -9223372036854800000.0, "batchIndex": -1, "eventType": "TopicIntegrationEvent", "entryId": -9223372036854800000.0, "dataSize": 1161, "eventName": "TopicIntegrationEvent"

satishviswanathan avatar Oct 03 '25 17:10 satishviswanathan

A MessageId consists of ledgerId, entryId, partition, batchIndex, and topic. Where did you get "partitionKey", "eventType", "dataSize", and "eventName" from? and where is the topic?

blankensteiner avatar Oct 03 '25 18:10 blankensteiner

Internally we were logging other params , yes as you mentioned we are getting only the following properties in messageid.

"partition": -1, "ledgerId": -9223372036854800000.0, "batchIndex": -1, "entryId": -9223372036854800000.0,

during the load test we got few of them returned with -ve values and found those were the missing ones. Does that mean publisher couldn't confirm or something missing from our end ?

Load Test

Service which exposes an end point to publish a message to a topic. Heave 2 instances of this services running and the load to these instances are load balanced in a round robin fashion. So with this setup we have 2 producers actively publishing into this topic who's subscription is keyshared.

satishviswanathan avatar Oct 03 '25 22:10 satishviswanathan

I would suggest talking with the Pulsar community (maybe on slack). There must be someone with knowledge of the protocol and broker that can answer this. From DotPulsar's point of view, with the current implementation, we expect the broker to have successfully stored the message if CommandSendReceipt is received as a response to CommandSend. Maybe this has to change, but I think it would be weird to not use CommandSendError to signal that something is not right.

blankensteiner avatar Oct 04 '25 10:10 blankensteiner

Thank you. Do you see anything wrong with client implementation code that I have shared ?

In the pulsar conf I updated the flag brokerDeduplicationEnabled as false and after that I'm not seeing the -ve values returned. Does that mean the SequenceId uniqueness is not guaranteed per producer.

during the load tests my application didn’t have any restarts but just wanted to check if my observations were right.

Scenario

While creating the producer there is an option to set the InitialSequenceId() if that is not set then I guess it defaulted as 0. So If my application restarts then the sequenceId will be reset as 0 and will there be a chance of generating a sequenceId which was generated in the past? Should the producer track and save the sequenceId's so that the InitialSequenceId() is set with the latest value whenever the producer restarts?

satishviswanathan avatar Oct 04 '25 15:10 satishviswanathan

Thank you. Do you see anything wrong with client implementation code that I have shared ?

No, I do not.

In the pulsar conf I updated the flag brokerDeduplicationEnabled as false and after that I'm not seeing the -ve values returned. Does that mean the SequenceId uniqueness is not guaranteed per producer.

I guess so, but you would have to check with the Pulsar community (it is not DotPulsar specific).

Should the producer track and save the sequenceId's so that the InitialSequenceId() is set with the latest value whenever the producer restarts?

Yes.

blankensteiner avatar Oct 05 '25 10:10 blankensteiner

Thank you. Do you see anything wrong with client implementation code that I have shared ?

No, I do not.

In the pulsar conf I updated the flag brokerDeduplicationEnabled as false and after that I'm not seeing the -ve values returned. Does that mean the SequenceId uniqueness is not guaranteed per producer.

I guess so, but you would have to check with the Pulsar community (it is not DotPulsar specific).

Should the producer track and save the sequenceId's so that the InitialSequenceId() is set with the latest value whenever the producer restarts?

Yes.

By looking at the dotpulsar library code I thought the sequenceId is generate by the library and not by pulsar. So you suspect pulsar is not honoring producer name and sequenceId combination uniqueness ?

satishviswanathan avatar Oct 05 '25 16:10 satishviswanathan

The end-user, creating the producer, has to keep track of the sequenceId. I don't know what I suspect, which is why someone with more knowledge of the protocol/broker may be more helpful. Could be that the messageIds you describe are signaling that the message is a duplicate.

blankensteiner avatar Oct 06 '25 06:10 blankensteiner

We have initiated a new thread in the Pulsar Slack channel: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1759732652521889

According to the response generated by the AI assistant (available here: https://apache-pulsar.slack.com/archives/C07CA4STH99/p1759732676588439 ), this appears to be a known issue in Pulsar, particularly when using non-Java clients or batching. It has been identified as a client-side batching issue, for which the Java client already includes a fix.

Could you please confirm if a similar fix is planned or expected for the DotPulsar as well?

praveengopalan avatar Oct 06 '25 14:10 praveengopalan

@blankensteiner - I guess Pulsar java client had this issue addressed in v3.1.0.

https://pulsar.apache.org/release-notes/versioned/pulsar-3.1.0/

satishviswanathan avatar Oct 06 '25 20:10 satishviswanathan

Hi @satishviswanathan and @praveengopalan A PR is most welcome, otherwise I need a detailed description of what needs to change.

blankensteiner avatar Oct 07 '25 10:10 blankensteiner

There's been references to https://github.com/apache/pulsar/pull/6326 and https://github.com/apache/pulsar-client-go/issues/1395 which explain the context. Similar logic as https://github.com/apache/pulsar/pull/6326/files#diff-d6fcf8aa2d0035cc386dca0942a452343d6854763c7fd397efa4e660c0069767 would be needed.

lhotari avatar Oct 07 '25 12:10 lhotari

Without studying it closely, I see a lot of mentioning of batching. I just want to make sure this is not a "Producer sending batches" issue, since DotPulsar only supports reading batches and not sending batches.

blankensteiner avatar Oct 07 '25 12:10 blankensteiner

Without studying it closely, I see a lot of mentioning of batching. I just want to make sure this is not a "Producer sending batches" issue, since DotPulsar only supports reading batches and not sending batches.

Yes, the referenced Pulsar client issue is related to sending batches.

@praveengopalan Please share the (minimal) app and steps to reproduce that can be followed (the current reproducing steps in the description don't fullfil this condition). You can share the app in a separate GitHub repository, a forked repository or with files in a GitHub gist. The following step could then be to have a failing test case for DotPulsar and then finally fixing the issue by bridging the gap and making the test pass.

lhotari avatar Oct 07 '25 13:10 lhotari

@lhotari The sample app and standalone.conf available at https://github.com/praveengopalan/pulsar-minimal

curl --location 'http://localhost:64462/api/publish'
--header 'Content-Type: application/json'
--data '{ "Content":"Pulsar Test Publish", "MessageCount":500 }'

Steps to reproduce

call '/api/publish' endpoint Expected - 500 message should be published to topic "persistent://viteglobal/Seed/TopicEvent" Actual - only one message published

praveengopalan avatar Oct 13 '25 08:10 praveengopalan