kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-13959: Controller should unfence Broker with busy metadata log

Open dengziming opened this issue 2 years ago • 26 comments

More detailed description of your change

The reason for KAFKA-13959 is a little complex, the two keys to this problem are:

  1. KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse.
  2. The follower needs to send one more FetchRequest to get the HW.

Here are the event sequences:

  1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1)
  2. follow(standby controller) and observer(broker) send FetchRequest(fetchOffset=m) 3.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse 3.2. leader send FetchResponse(HW=m) 3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
  3. leader append NoOpRecord, LEO=m+2. leader HW=m
  4. looping 1-4

If we change MAX_FETCH_WAIT_MS=200(less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.

We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.

Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

dengziming avatar Jun 09 '22 10:06 dengziming

@jsancio This is what I find about this problem, this solution is just temporary, let's first ensure the reason.

dengziming avatar Jun 09 '22 10:06 dengziming

@dengziming , I like this idea to fix the issue. And you dig deeper than I did. Nice finding. But in this case, I think we should increase the MetadataMaxIdleIntervalMs to something like 800ms or 1000ms, instead of reduce the fetch wait time. After all, reducing the fetch wait time will make client and broker busier.

But let's wait for the jenkins results to see if it really fixes the issue. (I think your investigation is correct)

showuon avatar Jun 09 '22 12:06 showuon

@dengziming Thanks for the investigation. One idea I was considering is only letting observers fetch up to the high watermark. Then the records would be returned to brokers as they become committed. Would that address this issue (at least when controllers are not colocated with brokers)?

hachikuji avatar Jun 10 '22 15:06 hachikuji

only letting observers fetch up to the high watermark

@hachikuji Not sure did I understand correctly, the problem is that observer HW is always less than leader HW because the observer needs to send one more request to update HW and the request will wait an extra 500ms before response, and after 500ms, the leader will append a NoOpRecord so leader HW will plus one. If we only let observers fetch up to the high watermark, observer HW is still less than leader HW, so this problem can't be solved.

dengziming avatar Jun 13 '22 14:06 dengziming

I think we should increase the MetadataMaxIdleIntervalMs to something like 800ms or 1000ms

@showuon Neither changing MetadataMaxIdleIntervalMs nor changing KafkaRaftClient.MAX_FETCH_WAIT_MS is the right way to go forward here because it's still possible that we append metadata log more frequently in a dynamic system, for example, if the metadata is updated every 50ms. We may draw lessons from how we update ISR.

dengziming avatar Jun 13 '22 14:06 dengziming

@dengziming The difference is that advancing the high watermark would then be the trigger for fulfilling the Fetch. If the observer does not have the latest high watermark, then it means data is available.

hachikuji avatar Jun 13 '22 16:06 hachikuji

@hachikuji I saw what you mean, just change fetchPurgatory.await(fetchPartition.fetchOffset()) to fetchPurgatory.await(Math.min(fetchPartition.fetchOffset(), highWatermark)) ? sadly it still not work, I'm still checking the reason.

but I also tried a similar solution in my PR that can solve the issue, which is to return directly if fetchPartition.fetchOffset() >= highWatermark.

dengziming avatar Jun 14 '22 12:06 dengziming

@dengziming Yeah, I think that's the other option. If we keep track of the last sent high watermark for each replica, then we can fulfill the fetch immediately whenever there is a high watermark change to propagate. There are some tradeoffs in terms of latency and the frequency of fetches, but perhaps metadata changes are infrequent enough and it does not make a big difference. I do think we should aim for a solution which is not super sensitive to timing-related configurations.

hachikuji avatar Jun 14 '22 19:06 hachikuji

Yes, I think this is the best solution under current design, although it will break the purpose of fetchPurgatory when there's temporarily no new records in metadata. I agree with Jason that we can adopt this solution right now, and create another JIRA to aim to come out a solution which is not super sensitive to timing-related configurations.

showuon avatar Jun 15 '22 01:06 showuon

@hachikuji In this PR I tried your suggestion and it does solve this problem, however, this will make the logic in RaftClient very complex and we need to save more states in LeaderState and it's also difficult to test. I made another solution according to how we add a follow replica into ISR to check followerEndOffset >= leaderLog.highWatermark.

here is the code change: https://github.com/dengziming/kafka/tree/KAFKA-13959-2, @showuon @hachikuji @jsancio WDYT?

dengziming avatar Jun 21 '22 03:06 dengziming

here is the code change: https://github.com/dengziming/kafka/tree/KAFKA-13959-2

Hey @dengziming, I took at look at the commits in this tree. Is this the only commit https://github.com/dengziming/kafka/commit/79dc8ec423cd74fba462e934f89bdec3dcd8528d? Can you maybe share a diff/compare. For example, something like https://github.com/dengziming/kafka/compare/30216ea1c58761e62f51af40033f24e3ae1c5c2a...KAFKA-13959-2

jsancio avatar Jul 06 '22 17:07 jsancio

here is the code change: https://github.com/dengziming/kafka/tree/KAFKA-13959-2

Hey @dengziming, I took at look at the commits in this tree. Is this the only commit dengziming@79dc8ec? Can you maybe share a diff/compare. For example, something like dengziming/[email protected]

Never mind. I understand now. The broker sends the active controller the local LEO instead of the last applied offset by the broker listener. I think this will unfence the broker at startup even if the broker hasn't applied the snapshot or any of the log records, right?

jsancio avatar Jul 06 '22 17:07 jsancio

In this PR I tried your suggestion and it does solve this problem, however, this will make the logic in RaftClient very complex and we need to save more states in LeaderState and it's also difficult to test

@dengziming Do you have a diff for this solution? I am interested in this solution as it would work in both REMOTE and COLOCATED configuration for KRaft.

jsancio avatar Jul 06 '22 18:07 jsancio

I think this will unfence the broker at startup even if the broker hasn't applied the snapshot or any of the log records, right?

Currently, we will replay the metadata records when metadata listener got new records. So yes, if we just return the current LEO, the records/snapshots might have not applied, yet.

Sorry, it's easy to reject other's proposal, but difficult to come up another solution. If we don't have any other better solution, maybe we can try the original proposed one?

One solution to this problem is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. For example:

    Broker sends a heartbeat with current offset of Y. The last commit offset is X. The controller remember this last commit offset, call it X'
    Broker sends another heartbeat with current offset of Z. Unfence the broker if Z >= X or Z >= X'.

And again, thanks for keeping trying to fix this difficult issue, @dengziming !

showuon avatar Jul 07 '22 02:07 showuon

Do you have a diff for this solution?

@jsancio I havn't flesh it, the basic idea is shown in this PR, only read up to Isolation.COMMITTED when reading from observer and invoke fetchPurgatory.maybeComplete(highWatermark.offset, currentTimeMs) on onUpdateLeaderHighWatermark.

maybe we can try the original proposed one?

@showuon This solution can only take effect if the problem is that the heartbeat period is much bigger than NoOpRecord, it may not work now since the main problem is from RaftClient.fetchPurgatory, so we should try a new solution.

dengziming avatar Jul 07 '22 08:07 dengziming

I havn't flesh it, the basic idea is shown in this PR, only read up to Isolation.COMMITTED when reading from observer and invoke fetchPurgatory.maybeComplete(highWatermark.offset, currentTimeMs) on onUpdateLeaderHighWatermark.

I see. This would not work for co-located Kafka servers, right? Co-located Kafka server are servers that are running both a controller and a broker. In that case the replica will read uncommitted data and the leader will not send a FETCH response when the HW changes.

jsancio avatar Jul 07 '22 17:07 jsancio

I see. This would not work for co-located Kafka servers, right?

@jsancio you are right, this is why I'm trying to find a simpler solution, for example, letting the broker send the local LEO instead of the last applied offset, this is the logic how we expand topic-partition ISR, the problem is that the broker may haven't applied the LEO as you mentioned.

maybe we should allow the lastAppliedOffset to be behind by A records (or time) to unfence the broker.

dengziming avatar Jul 13 '22 08:07 dengziming

maybe we should allow the lastAppliedOffset to be behind by A records (or time) to unfence the broker.

@dengziming Here are my thoughts

Improve logic for unfencing brokers

How about the controller unfence the broker when the broker's high-watermark has reached the broker registration record for that broker? When the broker first registers for a given incarnation, the controller writes a broker registration record. The controller can remember this offset as returned by the raft client. The controller can unfence the broker when the broker's high-watermark is greater than this registration offset.

Propagate the HWM to the replicas as quickly as possible.

I think that the solution above would allow us to de-prioritize this. Here are my observations anyways.

Looking at the KafkaRaftClient implementation we would have to have an index for both the fetch offset and the last sent high-watermark for that replica.

Another issue here is that we changed the KafkaRaftManager so that it doesn't set the replica id when it is an observer/broker. Since the HWM is not part of the Fetch request the leader would have to keep track of this in the LeaderState

      val nodeId = if (config.processRoles.contains(ControllerRole)) {
        OptionalInt.of(config.nodeId)
      } else {
        OptionalInt.empty()
      }

We would need to find a better solution for https://issues.apache.org/jira/browse/KAFKA-13168 or improve the FETCH request so that it includes the HWM.

jsancio avatar Jul 13 '22 12:07 jsancio

@jsancio Thank you, here is my feedback.

How about the controller unfence the broker when the broker's high-watermark

This is a good idea, however, the offset of each registration record is a soft state(not persist to metadata log) and will be lost during leader change, so we should move this judgment to the broker side, to be clear, let the broker unfence itself when seeing registration record of itself.

We would need to find a better solution for https://issues.apache.org/jira/browse/KAFKA-13168 or improve the FETCH request so that it includes the HWM.

We already have a Jira for it: https://issues.apache.org/jira/browse/KAFKA-13986 when working on KIP-836.

dengziming avatar Jul 18 '22 09:07 dengziming

This is a good idea, however, the offset of each registration record is a soft state(not persist to metadata log)

@dengziming The offset is persistent along with the record. This offset is provided to the inactive controllers in handleCommit. When processing handleCommit the inactive controllers can use the Batch::lastOffset of the batch that contains the registration record. When processing handleSnapshot the inactive controllers can use the SnapshotReader::lastContainedLogOffset of the snapshot if the snapshot contains registration records. What do you think?

jsancio avatar Jul 18 '22 12:07 jsancio

@jsancio I think the last question is that there may be more than one registration record for each broker after restarting it, so can we rely on the broker epoch? I think I need some time to check the logic before make a final decision.

dengziming avatar Jul 19 '22 16:07 dengziming

@jsancio I think the last question is that there may be more than one registration record for each broker after restarting it, so can we rely on the broker epoch? I think I need some time to check the logic before make a final decision.

Yes. I think you can rely on broker epoch and broker id. Also the active controller is guaranteed to have read all of the records on the log before handling RPCs like heartbeat.

jsancio avatar Jul 19 '22 20:07 jsancio

@dengziming I marked this issue as a blocker to 3.3.0. Let me know if you want to work on this issue. I don't mind.

jsancio avatar Aug 05 '22 21:08 jsancio

@dengziming I marked this issue as a blocker to 3.3.0. Let me know if you want to work on this issue. I don't mind.

@jsancio Thank you for reminding me, I'm working on this now, the solution is not difficult but it's trying to figure out how to test it.

dengziming avatar Aug 07 '22 14:08 dengziming

Hello @jsancio , I want to call for the first round review of this PR.

dengziming avatar Aug 08 '22 13:08 dengziming

@dengziming take a look at the tests failures. All of the failures in ClusterControlManagerTest seems related to this change.

jsancio avatar Aug 11 '22 15:08 jsancio

Thank you for your good suggestions, @jsancio , I have resolved them.

dengziming avatar Aug 12 '22 06:08 dengziming