pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][pulsar-function] prefer to use index in BrokerEntryMetadata and considering batchIndex when build sequenceId

Open yapxue opened this issue 3 years ago • 3 comments

prefer to use index in BrokerEntryMetadata and considering batchIndex when build sequenceId.

Fixes #17061

Motivation

In pulsar function EFFECTIVELY_ONCE semantic, producer attach a sequenceId to output topic for deduplication, the sequenceId consists of ledgerId and entryId. But for batched messages, the share the same entryId so they have same sequenceId, others will become duplicated and dropped, this may cause data loss.

Modifications

this change is inspired by KOP convert of MessageId to kafka offset and. pulsar has AppendIndexMetadataInterceptor, it append index in BrokerEntryMetadata, use this index as sequenceId is best option. If it is not present, then considering ledgerId, entryId, batchIndex to compose sequenceId.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

This change added unit tests.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: ( no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • [ ] doc-required (Your PR needs to update docs and you will update later)

  • [x] doc-not-needed (Please explain why)

  • [ ] doc (Your PR contains doc changes)

  • [ ] doc-complete (Docs have been already added)

yapxue avatar Aug 11 '22 16:08 yapxue

The sequence ID will reset to 0 after the input topic have been re-created. Does this will impact the function if the output topic haven't been re-created? Because the new data writing to output topic always with the lower sequenceID.

The getSequenceId is different, because the message ID is guaranteed to be monotonically increasing within a cluster.

Any suggestions to make index persistent somewhere else to survive topic recreation? Or just use ledgerId+entryId+batchIndex as sequenceId ? I am afraid 24 bits for ledgerId may overflow for large clusters.

yapxue avatar Aug 15 '22 01:08 yapxue

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Sep 23 '22 02:09 github-actions[bot]

Is there any progress on this PR? We also encountered the problem of batch messages being deduplicated.

graysonzeng avatar Dec 21 '23 04:12 graysonzeng