pulsar-client-go
pulsar-client-go copied to clipboard
Fix incorrect ledgerID and entryID logged when producer receives unexpected ack
Fixes #1160
Motivation
The producer should log the ledgerId and entryId with -1 when receives unexpected ack
Modifications
- Convert the ledgerId and entryId from uint64 to int64 when logging
- Add tests for verifying the message ID when producers sends duplicated messages. Note that this test is only to enrich the test case for message deduplication. It's not related to the logging.
Verifying this change
The log now looks like:
time="2024-01-29T11:45:05+08:00" level=warning msg="Received ack for ledgerId:-1 entryId:-1 on sequenceId 0 - xxx
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API: (yes / no)
- The schema: (yes / no / don't know)
- The default values of configurations: (yes / no)
- The wire protocol: (yes / no)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
I think this ci failed is because the PR added brokerDeduplicationEnabled=true global configuration. Which will affect all declared test cases.
And I think if we need to test deduplication feature, we can use admin Namespace().SetDeduplicationStatus() or Topic().SetDeduplicationStatus() function(when set topicLevelPoliciesEnabled to true in broker conf) to enable deduplication in current namespace or topic level. reference
Golang example:
func TestProducerSendDuplicatedMessages(t *testing.T) {
admin, err := pulsaradmin.NewClient(&config.Config{})
assert.NoError(t, err)
err = admin.Namespaces().SetDeduplicationStatus("public/default", true)
require.NoError(t, err)
policies, err := admin.Namespaces().GetPolicies("public/default")
require.NoError(t, err)
assert.Equal(t, true, *((*policies).DeduplicationEnabled))
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: "test",
})
assert.NoError(t, err)
assert.NotNil(t, testProducer)
_, err = testProducer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
})
assert.NoError(t, err)
for i := 0; i < 3; i++ {
var seqID int64
msgID, err := testProducer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
SequenceID: &seqID,
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
assert.Equal(t, int64(-1), msgID.LedgerID())
assert.Equal(t, int64(-1), msgID.EntryID())
}
testProducer.Close()
err = admin.Namespaces().SetDeduplicationStatus("public/default", false)
require.NoError(t, err)
policies, err = admin.Namespaces().GetPolicies("public/default")
require.NoError(t, err)
assert.Equal(t, false, *((*policies).DeduplicationEnabled))
}