kafka/client: Send leader_epoch with requests.
Cover letter
- Store the leader_epoch with the leader for each partition.
- Pass the leader epoch when fetching and listing offsets
- Handle
fenced_leader_epochandunknown_leader_epochas retryable errors by updating metadata.
See KIP-320
Fixes #2046
Signed-off-by: Ben Pope [email protected]
Note to the reviewer:
If the leader_epoch returned in the response doesn't match the expected one, should I handle it by updating metadata and retrying?:
.then([tp, part](list_offsets_response res) {
using futurator = ss::futurize<list_offsets_response>;
const auto request_epoch = part.leader_epoch;
const auto leader_epoch
= res.data.topics[0].partitions[0].leader_epoch;
if (request_epoch == leader_epoch) {
return futurator::convert(std::move(res));
}
return futurator::make_exception_future(partition_error(
tp,
request_epoch < leader_epoch
? kafka::error_code::fenced_leader_epoch
: kafka::error_code::unknown_leader_epoch));
});
It doesn't seem to work for list_offsets, I think the leader_epoch isn't set in the server.
I've added this as part of the investigation into #2501
Release notes
Improvements
- kafka/client: Improve handling of
fetchandlist_offsetsunder leadership transfer
The core part of 320 is allowing fetchers to detect data loss, which requires the support of OffsetForLeaderEpoch as well. There's not much of a point of sending leader epochs with requests if the solution is to update metadata on problems, because that's the same as the previous behavior (I think).
Also, redpanda today does not support leader epochs in responses. that still needs to be implemented before it's relevant to a client.
Now that this is merged, would that unblock this PR?