pulsar
pulsar copied to clipboard
[improve][broker] Optimize seeking by timestamp
Fixes https://github.com/apache/pulsar/issues/22129
Main Issue: #xyz
PIP: #xyz
Motivation
Optimize seeking by timestamp
Modifications
Add a new method to ManageCursor.
Verifying this change
- [ ] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc
- [ ]
doc-required
- [x]
doc-not-needed
- [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/dao-jun/pulsar/pull/8
Let's avoid cherry-picking this change so that we don't introduce regressions for current maintenance branches.
I think we need test cases to cover the various corner cases with this solution.
I agree with you, but before I complete the tests, I need to know if this change is reasonable.
I think we need test cases to cover the various corner cases with this solution.
I agree with you, but before I complete the tests, I need to know if this change is reasonable.
The general approach looks good to me.
@dao-jun one challenge is that the ledger's timestamp is the broker's clock, but the seek uses the message publish time which is using the client's (publisher's) clock. There might be additional corner cases because of this. Any thoughts on that? Perhaps documenting this is sufficient? Having a configuration toggle to choose between the new and old method could be useful for users that face regressions?
@dao-jun one challenge is that the ledger's timestamp is the broker's clock, but the seek uses the message publish time which is using the client's (publisher's) clock. There might be additional corner cases because of this. Any thoughts on that? Perhaps documenting this is sufficient? Having a configuration toggle to choose between the new and old method could be useful for users that face regressions?
@lhotari Yes, it's truly a problem. The client's clock may not be consistent with the broker's clock. I'd seen a similar PR before(https://github.com/apache/pulsar/pull/21940).
Even though users could use brokerTimestamp
to seek the position, but it requires enabling AppendBrokerTimestampMetadataInterceptor
, and it is disabled by default.
Perhaps as you said, we need to add a new configuration to determine use LedgerInfo
to speed up seeking or not.
But, how about using ServiceConfiguration#brokerEntryMetadataInterceptors
to determine speed up or not?
I mean, if ServiceConfiguration#brokerEntryMetadataInterceptors
contains AppendBrokerTimestampMetadataInterceptor
, enable speed up seeking by using LedgerInfo
, else, disable.
Then we don't need to add a new configuration item, since pulsar broker has toooo many configuration items......
But, how about using
ServiceConfiguration#brokerEntryMetadataInterceptors
to determine speed up or not? I mean, ifServiceConfiguration#brokerEntryMetadataInterceptors
containsAppendBrokerTimestampMetadataInterceptor
, enable speed up seeking by usingLedgerInfo
, else, disable. Then we don't need to add a new configuration item, since pulsar broker has toooo many configuration items......
I think it's better that the admin of the broker could decide about this.
A possible mitigation would be to track the publish timestamps of the first and last message and possible also min and max and store that in the ledger metadata. If this metadata is present, that could be used in the initial ledger selection.
A possible mitigation would be to track the publish timestamps of the first and last message and possible also min and max and store that in the ledger metadata. If this metadata is present, that could be used in the initial ledger selection.
it might be a problem, 'cause we don't know the timestamp of seek
is brokerTimestamp or publishTimestamp, right? If we record publish end/start timestamp to LedgerInfo, how can we decide which to use?
A possible mitigation would be to track the publish timestamps of the first and last message and possible also min and max and store that in the ledger metadata. If this metadata is present, that could be used in the initial ledger selection.
it might be a problem, 'cause we don't know the timestamp of
seek
is brokerTimestamp or publishTimestamp, right? If we record publish end/start timestamp to LedgerInfo, how can we decide which to use?
I would assume that in the current interface, it's always the publish timestamp.
@lhotari
how about add a new method seek(long timestamp, boolean speedUp)
to Consumer
api?
If speedUp=true
, enable speed up seeking by using LedgerInfo
, else, disable.
It should be more flexible
@lhotari how about add a new method
seek(long timestamp, boolean speedUp)
toConsumer
api? IfspeedUp=true
, enable speed up seeking by usingLedgerInfo
, else, disable. It should be more flexible
I don't think that this should be exposed to clients at all. It should be a broker level config. The alternative is to add the metadata of publish time min, max, first, last to the ledger metadata which would always provide the correct answer.
@lhotari how about add a new method
seek(long timestamp, boolean speedUp)
toConsumer
api? IfspeedUp=true
, enable speed up seeking by usingLedgerInfo
, else, disable. It should be more flexibleI don't think that this should be exposed to clients at all. It should be a broker level config. The alternative is to add the metadata of publish time min, max, first, last to the ledger metadata which would always provide the correct answer.
msg publishTime is a broker level concept, pass it to ML doesn't make sense. Add a new config is better. seems I have to draft a PIP......
msg publishTime is a broker level concept, pass it to ML doesn't make sense. Add a new config is better. seems I have to draft a PIP......
I disagree that "pass it to ML doesn't make sense". it's already passed in and used to evaluate seek. There would need to to have the min, max, first and last publishTime in the ledger metadata to reliably optimize the current solution. It does need a PIP most likely, but that's not a problem to make a PIP.
I disagree that "pass it to ML doesn't make sense". it's already passed in and used to evaluate seek. There would need to to have the min, max, first and last publishTime in the ledger metadata to reliably optimize the current solution. It does need a PIP most likely, but that's not a problem to make a PIP.
@lhotari
Current add entry apis of ManagedLedger likes addEntry(ByteBuf entry)
, if we want to pass publishTime
to ML, should I change the API to addEntry(ByteBuf entry, long publishTimestamp)
?
If not, should I decode the entry metadata and get the publishTime? If it's the case, I think it would comes more cost.
And if we set field publishTime
into LedgerInfo, we will also face the challenge of ML complexity.
At the same time, we also need additional logic to maintain the publishTime
for each Ledger, I think this implementation is not lightweight enough.
So I believe add a new configuration to brokerConf is a better way, what do you think?
@lhotari I've updated the implementation, PTAL
What is the status of this PR? I'm interested in it. I can help to impl it too.
replaced by #22792