pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][broker] Optimize seeking by timestamp

Open dao-jun opened this issue 11 months ago • 15 comments

Fixes https://github.com/apache/pulsar/issues/22129

Main Issue: #xyz

PIP: #xyz

Motivation

Optimize seeking by timestamp

Modifications

Add a new method to ManageCursor.

Verifying this change

  • [ ] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: https://github.com/dao-jun/pulsar/pull/8

dao-jun avatar Feb 28 '24 14:02 dao-jun

Let's avoid cherry-picking this change so that we don't introduce regressions for current maintenance branches.

lhotari avatar Feb 29 '24 09:02 lhotari

I think we need test cases to cover the various corner cases with this solution.

I agree with you, but before I complete the tests, I need to know if this change is reasonable.

dao-jun avatar Feb 29 '24 09:02 dao-jun

I think we need test cases to cover the various corner cases with this solution.

I agree with you, but before I complete the tests, I need to know if this change is reasonable.

The general approach looks good to me.

lhotari avatar Feb 29 '24 09:02 lhotari

@dao-jun one challenge is that the ledger's timestamp is the broker's clock, but the seek uses the message publish time which is using the client's (publisher's) clock. There might be additional corner cases because of this. Any thoughts on that? Perhaps documenting this is sufficient? Having a configuration toggle to choose between the new and old method could be useful for users that face regressions?

lhotari avatar Feb 29 '24 09:02 lhotari

@dao-jun one challenge is that the ledger's timestamp is the broker's clock, but the seek uses the message publish time which is using the client's (publisher's) clock. There might be additional corner cases because of this. Any thoughts on that? Perhaps documenting this is sufficient? Having a configuration toggle to choose between the new and old method could be useful for users that face regressions?

@lhotari Yes, it's truly a problem. The client's clock may not be consistent with the broker's clock. I'd seen a similar PR before(https://github.com/apache/pulsar/pull/21940).

Even though users could use brokerTimestamp to seek the position, but it requires enabling AppendBrokerTimestampMetadataInterceptor, and it is disabled by default.

Perhaps as you said, we need to add a new configuration to determine use LedgerInfo to speed up seeking or not.

But, how about using ServiceConfiguration#brokerEntryMetadataInterceptors to determine speed up or not? I mean, if ServiceConfiguration#brokerEntryMetadataInterceptors contains AppendBrokerTimestampMetadataInterceptor, enable speed up seeking by using LedgerInfo, else, disable. Then we don't need to add a new configuration item, since pulsar broker has toooo many configuration items......

dao-jun avatar Feb 29 '24 10:02 dao-jun

But, how about using ServiceConfiguration#brokerEntryMetadataInterceptors to determine speed up or not? I mean, if ServiceConfiguration#brokerEntryMetadataInterceptors contains AppendBrokerTimestampMetadataInterceptor, enable speed up seeking by using LedgerInfo, else, disable. Then we don't need to add a new configuration item, since pulsar broker has toooo many configuration items......

I think it's better that the admin of the broker could decide about this.

A possible mitigation would be to track the publish timestamps of the first and last message and possible also min and max and store that in the ledger metadata. If this metadata is present, that could be used in the initial ledger selection.

lhotari avatar Feb 29 '24 10:02 lhotari

A possible mitigation would be to track the publish timestamps of the first and last message and possible also min and max and store that in the ledger metadata. If this metadata is present, that could be used in the initial ledger selection.

it might be a problem, 'cause we don't know the timestamp of seek is brokerTimestamp or publishTimestamp, right? If we record publish end/start timestamp to LedgerInfo, how can we decide which to use?

dao-jun avatar Feb 29 '24 11:02 dao-jun

A possible mitigation would be to track the publish timestamps of the first and last message and possible also min and max and store that in the ledger metadata. If this metadata is present, that could be used in the initial ledger selection.

it might be a problem, 'cause we don't know the timestamp of seek is brokerTimestamp or publishTimestamp, right? If we record publish end/start timestamp to LedgerInfo, how can we decide which to use?

I would assume that in the current interface, it's always the publish timestamp.

lhotari avatar Feb 29 '24 11:02 lhotari

@lhotari how about add a new method seek(long timestamp, boolean speedUp) to Consumer api? If speedUp=true, enable speed up seeking by using LedgerInfo, else, disable. It should be more flexible

dao-jun avatar Feb 29 '24 12:02 dao-jun

@lhotari how about add a new method seek(long timestamp, boolean speedUp) to Consumer api? If speedUp=true, enable speed up seeking by using LedgerInfo, else, disable. It should be more flexible

I don't think that this should be exposed to clients at all. It should be a broker level config. The alternative is to add the metadata of publish time min, max, first, last to the ledger metadata which would always provide the correct answer.

lhotari avatar Feb 29 '24 12:02 lhotari

@lhotari how about add a new method seek(long timestamp, boolean speedUp) to Consumer api? If speedUp=true, enable speed up seeking by using LedgerInfo, else, disable. It should be more flexible

I don't think that this should be exposed to clients at all. It should be a broker level config. The alternative is to add the metadata of publish time min, max, first, last to the ledger metadata which would always provide the correct answer.

msg publishTime is a broker level concept, pass it to ML doesn't make sense. Add a new config is better. seems I have to draft a PIP......

dao-jun avatar Feb 29 '24 12:02 dao-jun

msg publishTime is a broker level concept, pass it to ML doesn't make sense. Add a new config is better. seems I have to draft a PIP......

I disagree that "pass it to ML doesn't make sense". it's already passed in and used to evaluate seek. There would need to to have the min, max, first and last publishTime in the ledger metadata to reliably optimize the current solution. It does need a PIP most likely, but that's not a problem to make a PIP.

lhotari avatar Mar 01 '24 14:03 lhotari

I disagree that "pass it to ML doesn't make sense". it's already passed in and used to evaluate seek. There would need to to have the min, max, first and last publishTime in the ledger metadata to reliably optimize the current solution. It does need a PIP most likely, but that's not a problem to make a PIP.

@lhotari Current add entry apis of ManagedLedger likes addEntry(ByteBuf entry) , if we want to pass publishTime to ML, should I change the API to addEntry(ByteBuf entry, long publishTimestamp)? If not, should I decode the entry metadata and get the publishTime? If it's the case, I think it would comes more cost. And if we set field publishTime into LedgerInfo, we will also face the challenge of ML complexity. At the same time, we also need additional logic to maintain the publishTime for each Ledger, I think this implementation is not lightweight enough.

So I believe add a new configuration to brokerConf is a better way, what do you think?

dao-jun avatar Mar 02 '24 14:03 dao-jun

@lhotari I've updated the implementation, PTAL

dao-jun avatar Mar 08 '24 20:03 dao-jun

What is the status of this PR? I'm interested in it. I can help to impl it too.

KannarFr avatar Apr 11 '24 11:04 KannarFr

replaced by #22792

dao-jun avatar May 28 '24 15:05 dao-jun