valkey icon indicating copy to clipboard operation
valkey copied to clipboard

[NEW] Add config for non random gossip

Open VyacheslavVanin opened this issue 9 months ago • 16 comments

The problem/use-case that the feature addresses

Currently gossip protocol chooses some random entries to send to some random node with oldest pong. So there possible a case when some node is not being pinged for a long time. This was the cause of the test flaps, because we have the timeout on tests and we check is every node of the cluster has full topology. This cases are rare but we have many tests and a lot of pull requests that need to be checked.

We fixed this by changing the gossip part of cluster_legacy.c so that now it sends all known nodes to every other node. The clusters created for tests are small so the increased traffic is not at issue but solves our problem with the longer and not reliably start.

Description of the feature

Add option like "cluster-non-random-gossip". Option is disabled by default. Users can turn it on when they run tests to guarantee cluster start in expected time.

VyacheslavVanin avatar Mar 28 '25 09:03 VyacheslavVanin

Thanks @VyacheslavVanin for filing this issue.

I was thinking about cluster gossip protocol recently and I feel we should make the gossip relevant i.e. add nodes to the gossip for which certain defined activity has happened (new node, suspicious node (we already do this), removed node, etc). For your above stated case, if we prioritize new node added to the cluster in the gossip it would reach the full convergence sooner.

Adding the option you mentioned above is possible but would be mostly helpful for test scenarios.

hpatro avatar Mar 31 '25 20:03 hpatro

Thank you @hpatro for your reply

I made the pull request with the option for test scenarios, because I already have it. And it seems that in any case the production scenario differs from the test scenario. The production scenario should still rely on randomness and limited number of pings/pongs even with the optimizations you mentioned, because it is the optimal way to pass state, albeit with random-related drawbacks. In the test scenario these random-related drawbacks become a problem, and optimality is not critical.

VyacheslavVanin avatar Apr 02 '25 08:04 VyacheslavVanin

Trying to understand the scenario better here, does it happen during creation of cluster/addition of nodes that the nodes have not converged ?

hpatro avatar Apr 02 '25 20:04 hpatro

Yes, problem happens during the creation of the cluster. And suggested PR helps to solve this problem

VyacheslavVanin avatar Apr 03 '25 08:04 VyacheslavVanin

Yes, problem happens during the creation of the cluster. And suggested PR helps to solve this problem

For this case, you should be able to perform MEET with each other O(N^2) and guarantee node discovery and not rely on the gossip.

hpatro avatar Apr 03 '25 18:04 hpatro

We already had O(N^2) MEET before cluster creation and after. And it does not help. After cluster creation we wait until every node gets full topology. We check this using "CLUSTER SLOTS" and expect that every node reply contains all nodes of the cluster. And problem that it can take too much time. I also thought that MEET should work but it does not reduce this time. I see this in MEET processing https://github.com/valkey-io/valkey/blob/unstable/src/cluster_legacy.c#L3395 and https://github.com/valkey-io/valkey/blob/unstable/src/cluster_legacy.c#L3374 but for some reason it does not work as in normal gossip

VyacheslavVanin avatar Apr 07 '25 10:04 VyacheslavVanin

Gossip doesn't accelerate the handshake process any better than performing O(N^2) MEET. Would you be able to share the cluster convergence time for both the approaches?

hpatro avatar Apr 07 '25 19:04 hpatro

Here the files with convergence times for comparison: start-valkey-times-no-pathc.log start-valkey-times-patched.log

Here the scripts used to make this measure:

#!/usr/bin/env python3
import subprocess
import time
import redis


def start_cluster():
    """Start the Valkey cluster"""
    try:
        subprocess.run(["./start_valkey_cluster"], check=True)
        print("Cluster started successfully")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error starting cluster: {e}")
        return False


def check_cluster_status():
    """Check if all nodes know about all other nodes"""
    ports = [6380, 6381, 6382, 6383, 6384, 6385]
    all_ports_set = set(ports)

    for port in ports:
        try:
            r = redis.Redis(host='localhost', port=port)
            cluster_slots = r.execute_command("CLUSTER SLOTS")

            # Extract all ports from the CLUSTER SLOTS response
            found_ports = set()
            for slot_info in cluster_slots:
                # The format is [start_slot, end_slot, [ip, port, node_id], ...replicas]
                master_info = slot_info[2]
                found_ports.add(master_info[1])
                for replica in slot_info[3:]:
                    found_ports.add(replica[1])

            # Check if we found all ports
            if found_ports != all_ports_set:
                print(f"Node {port} is missing ports: {all_ports_set - found_ports}")
                return False

        except Exception as e:
            print(f"Error checking node {port}: {e}")
            return False

    print("All nodes know about all other nodes!")
    return True


def main():
    max_attempts = 50
    attempt = 0

    while attempt < max_attempts:
        attempt += 1
        print(f"\nAttempt {attempt}/{max_attempts}")

        # Start the cluster
        if not start_cluster():
            time.sleep(2)
            continue

        t0 = time.time()

        while not check_cluster_status():
            time.sleep(1)

        t1 = time.time()
        with open('start-reids-times.log', 'a') as f:
            f.write(f'{t1-t0}\n')

        # If not ready, wait and try again
        time.sleep(2)
    else:
        print(f"Failed to establish proper cluster after {max_attempts} attempts")


if __name__ == "__main__":
    main()

And script for starting cluster:

#!/bin/sh

echo $pwd
VALKEY_SERVER=$HOME/projects/valkey/valkey-server
VALKEY_CLI=$HOME/projects/valkey/valkey-cli

wait_valkey_ready() {
    PORT=$1
    for i in $(seq 1 10);
    do
        if [ $($VALKEY_CLI -p $PORT ping) = PONG ] ; then
            return ;
        fi
        echo retry to connect to valkey instance on port $PORT
        sleep 1
    done
    echo Failed to connect to valkey instance on port $PORT
    exit -1
}

# Kill previously started instances
killall valkey-server
killall redis-server
echo Waiting previous services killed...
sleep 2
rm -rf valkey{0,1,2,3,4,5,6,7}/*
mkdir -p valkey{0,1,2,3,4,5}
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey0/ --port 6380 --cluster-port 7380
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey1/ --port 6381 --cluster-port 7381
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey2/ --port 6382 --cluster-port 7382
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey3/ --port 6383 --cluster-port 7383
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey4/ --port 6384 --cluster-port 7384
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey5/ --port 6385 --cluster-port 7385
echo Waiting instances ready...
wait_valkey_ready 6380
wait_valkey_ready 6381
wait_valkey_ready 6382
wait_valkey_ready 6383
wait_valkey_ready 6384
wait_valkey_ready 6385

## Start using valkey-cli --cluster
$VALKEY_CLI \
     --cluster create \
     127.0.0.1:6380 \
     127.0.0.1:6381 \
     127.0.0.1:6382 \
     127.0.0.1:6383 \
     127.0.0.1:6384 \
     127.0.0.1:6385 \
     --cluster-replicas 1 \
     --cluster-yes

VyacheslavVanin avatar Apr 08 '25 10:04 VyacheslavVanin

Thanks for sharing.

You mentioned you used O(N^2) CLUSTER MEET but the above script relies on valkey-cli to create the cluster which only does O(N) MEET. So, it will take much longer naturally.

https://github.com/valkey-io/valkey/blob/44dafba2ce5635c5945c52bc6d2aa798e3537448/src/valkey-cli.c#L6909-L6916

Could you run the experiment with O(N^2) MEET and share the numbers?

hpatro avatar Apr 08 '25 20:04 hpatro

Here is N^2 variant start-valkey-times-no-patch-N2.log Almost the same numbers

#!/bin/sh

echo $pwd
VALKEY_SERVER=$HOME/projects/valkey/valkey-server
VALKEY_CLI=$HOME/projects/valkey/valkey-cli

wait_valkey_ready() {
    PORT=$1
    for i in $(seq 1 10);
    do
        if [ $($VALKEY_CLI -p $PORT ping) = PONG ] ; then
            return ;
        fi
        echo retry to connect to valkey instance on port $PORT
        sleep 1
    done
    echo Failed to connect to valkey instance on port $PORT
    exit -1
}

killall valkey-server
killall valkey-server
echo Waiting previous services killed...
sleep 2
rm -rf valkey{0,1,2,3,4,5,6,7}/*
mkdir -p valkey{0,1,2,3,4,5}
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey0/ --port 6380 --cluster-port 7380
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey1/ --port 6381 --cluster-port 7381
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey2/ --port 6382 --cluster-port 7382
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey3/ --port 6383 --cluster-port 7383
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey4/ --port 6384 --cluster-port 7384
$VALKEY_SERVER valkey.conf --logfile ./valkey.log --dir ./valkey5/ --port 6385 --cluster-port 7385
echo Waiting instances ready...
wait_valkey_ready 6380
wait_valkey_ready 6381
wait_valkey_ready 6382
wait_valkey_ready 6383
wait_valkey_ready 6384
wait_valkey_ready 6385

ports="6380 6381 6382 6383 6384 6385"

# meet each node with another
for p in $ports ; do
    for pp in $ports; do
        $VALKEY_CLI -p $p cluster meet 127.0.0.1 $pp $(($pp + 1000))
    done
done

# Master[0] -> Slots 0 - 5460
# Master[1] -> Slots 5461 - 10922
# Master[2] -> Slots 10923 - 16383
$VALKEY_CLI -p 6380 cluster addslotsrange 0 5460
$VALKEY_CLI -p 6381 cluster addslotsrange 5461 10922
$VALKEY_CLI -p 6382 cluster addslotsrange 10923 16383

echo wait cluster meet ...
sleep 2

id0=$($VALKEY_CLI -p 6380 cluster myid)
id1=$($VALKEY_CLI -p 6381 cluster myid)
id2=$($VALKEY_CLI -p 6382 cluster myid)
$VALKEY_CLI -p 6383 cluster replicate $id0
$VALKEY_CLI -p 6384 cluster replicate $id1
$VALKEY_CLI -p 6385 cluster replicate $id2

# meet each node with another
for p in $ports ; do
    for pp in $ports; do
        $VALKEY_CLI -p $p cluster meet 127.0.0.1 $pp $(($pp + 1000))
    done
done

VyacheslavVanin avatar Apr 08 '25 21:04 VyacheslavVanin

Same results with additional N2 MEETs for each time got incomplete result.

#!/usr/bin/env python3
import subprocess
import time
import redis

def start_cluster():
    """Start the Valkey cluster"""
    try:
        subprocess.run(["./start_valkey_cluster"], check=True)
        print("Cluster started successfully")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error starting cluster: {e}")
        return False

def check_cluster_status():
    """Check if all nodes know about all other nodes"""
    ports = [6380, 6381, 6382, 6383, 6384, 6385]
    all_ports_set = set(ports)
    
    for port in ports:
        try:
            r = redis.Redis(host='localhost', port=port)
            cluster_slots = r.execute_command("CLUSTER SLOTS")
            
            # Extract all ports from the CLUSTER SLOTS response
            found_ports = set()
            for slot_info in cluster_slots:
                # The format is [start_slot, end_slot, [ip, port, node_id], ...replicas]
                master_info = slot_info[2]
                found_ports.add(master_info[1])
                for replica in slot_info[3:]:
                    found_ports.add(replica[1])
            
            # Check if we found all ports
            if found_ports != all_ports_set:
                print(f"Node {port} is missing ports: {all_ports_set - found_ports}")
                missing_ports = all_ports_set - found_ports
                for p in all_ports_set:
                    r = redis.Redis(host='localhost', port=p)
                    for pp in all_ports_set:
                        if pp == p:
                            continue
                        r.execute_command(f"CLUSTER MEET 127.0.0.1 {pp}")
                return False
                
        except Exception as e:
            print(f"Error checking node {port}: {e}")
            return False
    
    print("All nodes know about all other nodes!")
    return True

def main():
    max_attempts = 50
    attempt = 0
    
    while attempt < max_attempts:
        attempt += 1
        print(f"\nAttempt {attempt}/{max_attempts}")
        
        # Start the cluster
        if not start_cluster():
            time.sleep(2)
            continue
            
        t0 = time.time()
        
        while not check_cluster_status():
            time.sleep(1)

        t1 = time.time()
        with open('start-reids-times.log', 'a') as f:
            f.write(f'{t1-t0}\n')
            
        # If not ready, wait and try again
        time.sleep(2)
    else:
        print(f"Failed to establish proper cluster after {max_attempts} attempts")

if __name__ == "__main__":
    main()

VyacheslavVanin avatar Apr 08 '25 21:04 VyacheslavVanin

Ok, that's interesting. I'm a bit puzzled with this. Tagging few folks @enjoy-binbin @madolson to check if they have any thoughts about this.

hpatro avatar Apr 09 '25 01:04 hpatro

start-valkey-times-no-patch-MEETS.log Full results for last script

VyacheslavVanin avatar Apr 09 '25 05:04 VyacheslavVanin

I'm a little confused. We do have a fallback where we explicitly ping every node within half the node timeout. If we want to make it less "random", we could also just do an incremental scan of the dictionary instead of randomly pinging nodes. I'm not entirely sure why we need to ping nodes randomly (gossip should likely be random).

madolson avatar Apr 16 '25 19:04 madolson

valkey1.log valkey4.log Here logs of two nodes that was not able to ping each other for too long. in valkey1.log there is a long period between pings from/to a12f61ce06037a0176b9bc2b3a337ce80cd309cf

3096778:M 17 Apr 2025 12:06:55.633 . ping packet received: a12f61ce06037a0176b9bc2b3a337ce80cd309cf
...
3096778:M 17 Apr 2025 12:07:07.682 . Pinging node a12f61ce06037a0176b9bc2b3a337ce80cd309cf ()

After Pinging node a12f61ce06037a0176b9bc2b3a337ce80cd309cf 37dc46d8d866f5dfb3955db7ba717bae54be32ff immediately got this node to CLUSTER SLOTS response

@madolson the fallback was never called (added logs there to see if it was called).

Another run, with more added logs in fallback:

        if (node->link && node->ping_sent == 0 && (now - node->pong_received) > ping_interval) {                                                                                                                                                                                                                     
             serverLog(LL_DEBUG, "!!!!!!! NODE %.40s (%s) not pinged for too long.", node->name, node->human_nodename);                                                                                                                                                                                               
              clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);                                                                                                                                                                                                                                                       
              continue;                                                                                                                                                                                                                                                                                                
         } else {                                                                                                                                                                                                                                                                                                     
             serverLog(LL_DEBUG, "!!!!!!! Skip pinging NODE %.40s (%s) has_link=%d, ping_sent=%lld, pong_received=%lld, now=%lld, interval=%lld, ping_interval=%lld",                                                                                                                                                 
                       node->name, node->human_nodename,                                                                                                                                                                                                                                                              
                       node->link != NULL, node->ping_sent, node->pong_received, now, now - node->pong_received, ping_interval                                                                                                                                                                                        
             );                                                                                                                                                                                                                                                                                                       
         }     

valkey3.log valkey5.log Here is 15 seconds between Pinging node 7da381c6430a69141dd64faadaed17d3c6b83ec1 in valkey3.log. After the second ping the nodes finally converged

VyacheslavVanin avatar Apr 18 '25 07:04 VyacheslavVanin

Hi! @madolson Any other ideas how to fix this without adding flag for testing purposes https://github.com/valkey-io/valkey/pull/1909?

It looks like this update of pong_received that can occur on gossip from some other nodes, prevents pinging fallback https://github.com/valkey-io/valkey/blob/20a49ff01387be7a5091d8d51251c9875d1183ea/src/cluster_legacy.c#L2437

May be need to track the last ping time here instead of pong time? Now ping_sent is 0 almost always.

VyacheslavVanin avatar May 14 '25 10:05 VyacheslavVanin