lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

Retry commands failed with `LOADING` against the master node

Open barshaul opened this issue 1 year ago • 13 comments

Bug Report

Current Behavior

Lettuce is filtering out nodes in LOADING state by checking their master_repl_offset filed == 0. However, in the scenarios described below nodes' replOffset can be left with -1L value. Thus, a loading replica with replOffset == -1, would be considered as a valid read candidate and Lettuce will continue to query it, resulting in LOADING error raised back to the user.

When the replOffset is not being initialized properly?

Lets examine the getNodeSpecificViews function in DefaultClusterTopologyRefresh.java:

NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requestedInfo) {

        List<RedisClusterNodeSnapshot> allNodes = new ArrayList<>();

        Map<String, NodeTopologyView> self = new HashMap<>();

        Set<RedisURI> nodes = requestedTopology.nodes();

        List<NodeTopologyView> views = new ArrayList<>();
        for (RedisURI nodeUri : nodes) {

            try {
                NodeTopologyView nodeTopologyView = NodeTopologyView.from(nodeUri, requestedTopology, requestedInfo);

                if (!nodeTopologyView.isAvailable()) {
                    continue;
                }

                RedisClusterNode node = nodeTopologyView.getOwnPartition();
                if (node.getUri() == null) {
                    node.setUri(nodeUri);
                } else {
                    node.addAlias(nodeUri);
                }

                self.put(node.getNodeId(), nodeTopologyView);

                List<RedisClusterNodeSnapshot> nodeWithStats = new ArrayList<>(nodeTopologyView.getPartitions().size());

                for (RedisClusterNode partition : nodeTopologyView.getPartitions()) {

                    if (validNode(partition)) {
                        nodeWithStats.add(new RedisClusterNodeSnapshot(partition));
                    }
                }

                allNodes.addAll(nodeWithStats);

                Partitions partitions = new Partitions();
                partitions.addAll(nodeWithStats);

                nodeTopologyView.setPartitions(partitions);

                views.add(nodeTopologyView);
            } catch (CompletionException e) {
                logger.warn(String.format("Cannot retrieve partition view from %s, error: %s", nodeUri, e));
            }
        }

        for (RedisClusterNodeSnapshot node : allNodes) {

            if (!self.containsKey(node.getNodeId())) {
                continue;
            }

            NodeTopologyView view = self.get(node.getNodeId());

            node.setConnectedClients(view.getConnectedClients());
            node.setReplOffset(view.getReplicationOffset());
            node.setLatencyNs(view.getLatency());
        }

        for (NodeTopologyView view : views) {
            view.postProcessPartitions();
        }

        return new NodeTopologyViews(views);
    }

In the first for loop, if a specific node, lets call it X, is not a part of nodes, but is found in other nodes' nodeTopologyView.getPartitions() (e.g., when a new node is added to the cluster topology), then X will be found in allNodes, but not in self. So, in the second loop we will skip this node and won't set it's replOffset:

            if (!self.containsKey(node.getNodeId())) {
                continue;
            }

Another scenario is when node Y is a part of node, but this node is currently loading. When a node is loading, CLUSTER NODES command will return a LOADING error. Therefore, we will skip adding this node to self in the first loop because nodeTopologyView.isAvailable() will be false. However, the loading node will be present in other nodes CLUSTER NODES output, and therefore will be added to allNodes. Again, we'll skip this node in the second loop and won't set it's replOffset:

...
                # First loop
                if (!nodeTopologyView.isAvailable()) {
                    continue;
                }
...
            # Second loop
            if (!self.containsKey(node.getNodeId())) {
                continue;
            }

In both cases, the created topology will contain nodes with replOffset == -1 at the end of the creation of the client's topology.

Stack trace I added logs to Lettuce to show the behavior, notice the ' is a read candidate, reploffset is = -1': ```

2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:41788, nodeId='0f22044584019b4bcfe8b1384b0bc1220c96b26c', connected=true, slaveOf='null', pingSentTimestamp=0, pongReceivedTimestamp=1692177079000, configEpoch=3, replOffset=1190, flags=[MASTER], aliases=[], slot count=5461] is NOT a read candidate, reploffset is = 1190

2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:22152, nodeId='505c3cbb1204788e851dffd515c3e4f50cb7ea0d', connected=true, slaveOf='a2beee0391b51f8812c24792adb8d4f92f788b79', pingSentTimestamp=0, pongReceivedTimestamp=1692177079000, configEpoch=1, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]] is NOT a read candidate, reploffset is = -1

2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:402 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:20686, nodeId='eed0801c3df60c91f3e9dac72ce284317199a12e', connected=true, slaveOf='667d8a4028353e75b090c471eb7d1bc8511ea4fd', pingSentTimestamp=0, pongReceivedTimestamp=1692177081059, configEpoch=2, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]] is a read candidate, reploffset is = -1

2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:43062, nodeId='f41fd8da7e8dc570aac137db7d89ef4131213d10', connected=true, slaveOf='0f22044584019b4bcfe8b1384b0bc1220c96b26c', pingSentTimestamp=0, pongReceivedTimestamp=1692177080056, configEpoch=3, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]] is NOT a read candidate, reploffset is = -1

2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:405 - RedisClusterNodeSnapshot [uri=redis://127.0.0.1:44142, nodeId='a2beee0391b51f8812c24792adb8d4f92f788b79', connected=true, slaveOf='null', pingSentTimestamp=0, pongReceivedTimestamp=1692177077000, configEpoch=1, replOffset=1190, flags=[MYSELF, MASTER], aliases=[redis://127.0.0.1:44142], slot count=5461] is NOT a read candidate, reploffset is = 1190

2023-08-16 09:11:21 WARN PooledClusterConnectionProvider:230 - read candidates selection = [RedisClusterNodeSnapshot [uri=redis://127.0.0.1:20686, nodeId='eed0801c3df60c91f3e9dac72ce284317199a12e', connected=true, slaveOf='667d8a4028353e75b090c471eb7d1bc8511ea4fd', pingSentTimestamp=0, pongReceivedTimestamp=1692177081059, configEpoch=2, replOffset=-1, flags=[SLAVE, REPLICA], aliases=[]], RedisClusterNodeSnapshot [uri=redis://127.0.0.1:18358, nodeId='667d8a4028353e75b090c471eb7d1bc8511ea4fd', connected=true, slaveOf='null', pingSentTimestamp=0, pongReceivedTimestamp=1692177080000, configEpoch=2, replOffset=1250, flags=[MASTER], aliases=[], slot count=5462]]

...

Aug 16, 2023 9:11:21 AM com.elasticache.DowntimeClients.BaseDowntimeClient exec WARNING: recieved excption: io.lettuce.core.RedisLoadingException: LOADING Redis is loading the dataset in memory```

Input Code

Input Code
// your code here;

Expected behavior/code

A loading node should not be considered as a read candidate.

Environment

  • Lettuce version(s): 6.3.0
  • Redis version: 7

Possible Solution

There are couple of options:

  1. Initialize replOffset with 0
  2. In the function mentioned above, add:
            if (!self.containsKey(node.getNodeId())) {
                node.setReplOffset(0);
                continue;
            }
  1. Separate between the topology view and the replication offset. CLUSTER NODES cannot be called while a node is loading, but INFO REPLICATION is. Even if we haven't got a view from a specific node, we can still set its master_repl_offset.

Mitigation for users who currently facing this issue:

1. Catch LOADING errors and route them back to the primary [preferred]

  public String execGet(RedisAdvancedClusterCommands<String, String> client, StatefulRedisClusterConnection<String, String> conn, String key) {
    try {
      return client.get(key);
    } catch (RedisLoadingException loadingError) {
        int slot = SlotHash.getSlot(key);
        Optional<RedisClusterNode> master = conn.getPartitions().getPartitions().stream()
            .filter(redisClusterNode -> redisClusterNode.hasSlot(slot)).findFirst();
        NodeSelection<String, String> node = client
            .nodes(redisClusterNode -> redisClusterNode.getNodeId().equals(master.get().getNodeId()));
        Executions<String> result = node.commands().get(key);
        String strResult = result.get(master.get());
        return strResult;
    }
  }

Downsides:

In this mitigation we keep the loading replica in the client’s topology view, so the client will continue to refer to this replica as a read candidate and route read requests to it. This means that all commands that will fail due to LOADING error will have a doubled latency - once the time it took to send it to the replica, and second the time to send it to the primary. However, it will enforce not getting a full downtime for read queries. Another downside of this method is that all traffic that was routed to the replica will now be routed to the primary, which can increase the load on the primary node. If there’re other replicas, we would have wanted to be able to route the failed command to another replica, or at least to randomly choose a node from the primary and other replicas. However, AFAIK, Lettuce currently doesn’t provider information about which of the nodes returned the LOADING error, so we don’t have a way to select all nodes in this shard and exclude only the loading ones. I’m still waiting for response about if it’s possible on the git’s issue.

2. Filter out replicas with replOffset == -1, while enabling the periodic topology updates:

    final ClusterTopologyRefreshOptions topologyOptions = ClusterTopologyRefreshOptions.builder()
        .enableAllAdaptiveRefreshTriggers()
        .enablePeriodicRefresh()
        .dynamicRefreshSources(true)
        .enablePeriodicRefresh(Duration.ofSeconds(60))
        .build();
...
    ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
        .topologyRefreshOptions(topologyOptions)
        .socketOptions(socketOptions)
        .autoReconnect(true)
        .timeoutOptions(timeoutOptions)
        .nodeFilter(
            it -> !(it.is(RedisClusterNode.NodeFlag.FAIL)
                || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)
                || it.is(RedisClusterNode.NodeFlag.NOADDR)
                || (it.is(RedisClusterNode.NodeFlag.REPLICA) && it.getReplOffset() == -1L)
                )) // Filter out Predicate
        .validateClusterNodeMembership(false)
        .build();

Downsides:

This mitigation means that we’ll filter out new replicas, regardless to their loading state, and replaced replicas during their loading state. The new replicas will be added back to the cluster topology only in the next periodic topology update. Consequently, these replicas will remain inaccessible for read queries for an extended period equal to or less than the duration between periodic topology checks. Moreover, if an existing replica gets into a LOADING state, the client will continue to route requests to it until it will remove it in the next topology check. If the cluster is large, setting the duration for the topology updates too low may impact performance, as the length of the CLUSTER NODES output is affected by the cluster size.

barshaul avatar Aug 16 '23 11:08 barshaul