pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

Fix: SendAsync callback was not invoked when producer is in reconnecting

Open gunli opened this issue 10 months ago • 4 comments

Fixes #1332

Master Issue: #1332

Motivation

SendAsync() callback should be called to give a response to the user/application when the producer is busy in reconnecting.

Modifications

Run reconnecting in a seperate goroutine.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • run TestProducerKeepReconnectingAndThenCallSendAsync()

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

gunli avatar Feb 18 '25 04:02 gunli

@shibd @nodece @RobertIndie @BewareMyPower @crossoverJie

gunli avatar Feb 18 '25 06:02 gunli

PRs to fix potential write conflicts #1336 and data race #1338 have been pushed.

gunli avatar Feb 25 '25 01:02 gunli

SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue, failTimeoutMessages can manage the timeout.

I believe that performing timeout checks and callback handling before entering the pendingQueue will make the producer increasingly complex, and it will require additional cleanup work before closing the producer.

In my opinion, this approach is essentially the same in nature as performing timeout checks after entering the pendingQueue.

Regarding your concerns about concurrency risks, I don't think there is any such risk. When a producer receives a connectionClosed event, it is removed from the connection's handler list, ensuring that no further connectionClosed events will be triggered to make the producer reconnect again.

Furthermore, even if we don't reconnect in a separate goroutine, the remaining changes in this PR are still necessary. This is because failTimeoutMessages may occur before connection.internalWriteData() when the timeout configuration is set to an extremely short value. In such a case, when a timeout happens, the buffer is released and reallocated to another pendingItem. Then, when the code reaches connection.internalWriteData(), the buffer refers to a new value, resulting in a data race.

gunli avatar Mar 12 '25 03:03 gunli

I submitted https://github.com/apache/pulsar-client-go/pull/1345 to fix #1332.

nodece avatar Mar 12 '25 17:03 nodece