Fix: SendAsync callback was not invoked when producer is in reconnecting
Fixes #1332
Master Issue: #1332
Motivation
SendAsync() callback should be called to give a response to the user/application when the producer is busy in reconnecting.
Modifications
Run reconnecting in a seperate goroutine.
Verifying this change
- [x] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- run TestProducerKeepReconnectingAndThenCallSendAsync()
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): ( no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
@shibd @nodece @RobertIndie @BewareMyPower @crossoverJie
PRs to fix potential write conflicts #1336 and data race #1338 have been pushed.
SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue,
failTimeoutMessagescan manage the timeout.
I believe that performing timeout checks and callback handling before entering the pendingQueue will make the producer increasingly complex, and it will require additional cleanup work before closing the producer.
In my opinion, this approach is essentially the same in nature as performing timeout checks after entering the pendingQueue.
Regarding your concerns about concurrency risks, I don't think there is any such risk. When a producer receives a connectionClosed event, it is removed from the connection's handler list, ensuring that no further connectionClosed events will be triggered to make the producer reconnect again.
Furthermore, even if we don't reconnect in a separate goroutine, the remaining changes in this PR are still necessary. This is because failTimeoutMessages may occur before connection.internalWriteData() when the timeout configuration is set to an extremely short value. In such a case, when a timeout happens, the buffer is released and reallocated to another pendingItem. Then, when the code reaches connection.internalWriteData(), the buffer refers to a new value, resulting in a data race.
I submitted https://github.com/apache/pulsar-client-go/pull/1345 to fix #1332.