librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

ListOffsetsRequest should only be sent to the leader replica

Open kphelps opened this issue 1 year ago • 12 comments

When using fetch-from-follower, it is currently possible for a consumer to get stuck in a loop sending ListOffsetRequest when we go through the rd_kafka_offset_reset path since the request is sent to the preferred replica. Instead, always send it to the leader.

kphelps avatar Feb 12 '24 15:02 kphelps

CLA assistant check
All committers have signed the CLA.

cla-assistant[bot] avatar Feb 12 '24 15:02 cla-assistant[bot]

It's correct to send the ListOffsets request to the preferred replica. The loop probably comes from this discovered bug: https://github.com/confluentinc/librdkafka/issues/4620

When enabling debug logs, could you check if it's receiving FENCED_LEADER_EPOCH errors?

emasab avatar Feb 20 '24 13:02 emasab

Nope, I'm seeing NOT_LEADER_OR_FOLLOWER errors.

kphelps avatar Feb 21 '24 17:02 kphelps

Aha, from KIP-392:

The FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower. Similarly, when we need to send a ListOffsets request to a follower in order to find the log start offset, we will use the same sentinel for the replica id field.

Looks like we unconditionally set the replica id to -1 here

Looks like the Java client opts to just always send to the leader. WDYT?

kphelps avatar Feb 21 '24 17:02 kphelps

@kphelps The replica id should be set to -1 in clients, and to the broker id in followers, see the RPC definition https://github.com/apache/kafka/blob/2f401ff4c85f6797391b8a3dd57d651f4de3d6ad/clients/src/main/resources/common/message/ListOffsetsRequest.json#L42

The error NOT_LEADER_OR_FOLLOWER happens when the broker isn't a replica for that partition. In that case librdkafka refreshes metadata to get the leader again, here. https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/src/rdkafka_request.c#L935

Is it possible to reproduce the issue and send a log with "debug": "all" or "debug": "consumer,cgrp,topic,fetch,metadata,broker,topic" ?

emasab avatar Mar 05 '24 19:03 emasab

I'm working to reproduce this now, but have been having trouble in a controlled environment. Will share that when I get it.

The broker only allows fetching from the leader unless the replica id is set to -2 here which propagates down to retrieving the local log and erroring here.

kphelps avatar Mar 05 '24 22:03 kphelps

Found a test that was silently failing due to this issue

kphelps avatar Mar 21 '24 18:03 kphelps

Thanks @kphelps I was checking this issue more in depth and understood the problem, it's different from what I linked and as you said could be solved in two ways, by sending the request to the follower with -2 or to the leader as Java is doing.

The con of sending it to leader is that is case the follower is lagging behind it could have other offset resets when fetching, until it has caught up, I've checked broker code and tried using -2 by changing mock cluster implementation and it works too.

Will ask for an opinion internally too before deciding for one of the two solutions.

emasab avatar Mar 26 '24 19:03 emasab

Cannot fix it by sending the request to the follower because there are some problems: if replica id was different from CONSUMER_REPLICA_ID (-1), the isolation level parameter would be ignored, so I'm following @kphelps proposal and using the same behaviour as Java, to send the request to the leader only.

broker code:

            val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID
            val isClientRequest = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID
            val isolationLevelOpt = if (isClientRequest)
              Some(offsetRequest.isolationLevel)
            else
              None

emasab avatar Apr 10 '24 11:04 emasab

/sem-approve

emasab avatar Apr 10 '24 12:04 emasab

/sem-approve

emasab avatar Jun 10 '24 11:06 emasab

@kphelps sorry, giving we're have having an issue with the public CI, I've created this internal branch with your changes. #4754

emasab avatar Jun 12 '24 12:06 emasab