lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

Race condition when using pipelining in cluster mode

Open volodymyrpavlenko opened this issue 6 years ago • 21 comments

Bug Report

Current Behavior

When using Lettuce in cluster mode with pipelining (connection.setAutoFlushCommands(false)), there seems to be a race condition between adding commands to write buffer, and flushing commands to output.

The race condition can be triggered when lettuce client is not yet connected to the node, where the command should be routed. In this case, the response future from a GET method can be returned before the command is actually added to the command buffer.

If connection.flushCommands() is invoked before the command is added to the buffer, then it will not be send, thus the client future will never complete (unless we invoke connection.flushCommands() again).

The race condition seems to be due to the following lines: https://github.com/lettuce-io/lettuce-core/blob/master/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java#L104 https://github.com/lettuce-io/lettuce-core/blob/master/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java#L129

Input Code

Input Code
  public static void main(String[] args) {
    for (int i = 0; i < 1000; i++) {
      test();
      System.out.println("Finished iteration " + i);
    }
  }

  private static void test() {
    RedisClusterClient redisClusterClient = RedisClusterClient.create("redis://localhost:7008/");

    final StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();

//    connection.setReadFrom(ReadFrom.SLAVE); // Uncommenting this line will make test fail much more often
    connection.setAutoFlushCommands(false);

    final RedisAdvancedClusterAsyncCommands<String, String> async = connection.async();

    /*
     127.0.0.1:7000> cluster keyslot bar
     (integer) 5061 (node1)
     127.0.0.1:7000> cluster keyslot baz2
     (integer) 10594 (node2)
     127.0.0.1:7000> cluster keyslot foo
     (integer) 12182 (node3)
    */

    final RedisFuture<String> future1 = async.get("bar");
    final RedisFuture<String> future2 = async.get("baz2");
    final RedisFuture<String> future3 = async.get("foo");

    connection.flushCommands();

    if (!LettuceFutures.awaitAll(10, TimeUnit.SECONDS, future1, future2, future3)) {
      connection.flushCommands();
      if (LettuceFutures.awaitAll(1, TimeUnit.SECONDS, future1, future2, future3)) {
        throw new IllegalStateException("Commands didn't finish in 10 seconds. Finished after second invocation of connection.flushCommands().");
      } else {
        throw new IllegalStateException("Commands didn't finish in 10 seconds.");
      }
    }

    connection.close();
  }
Output
Finished iteration 0
Finished iteration 1
Finished iteration 2
Finished iteration 3
Finished iteration 4
Finished iteration 5
Finished iteration 6
Finished iteration 7
Finished iteration 8
Finished iteration 9
Finished iteration 10
Finished iteration 11
Finished iteration 12
Finished iteration 13
Exception in thread "main" java.lang.IllegalStateException: Commands didn't finish in 10 seconds. Finished after second invocation of connection.flushCommands().
	at com.spotify.redisperformancebenchmark.tests.TestRaceCondition.test(TestRaceCondition.java:47)
	at com.spotify.redisperformancebenchmark.tests.TestRaceCondition.main(TestRaceCondition.java:13)

Expected behavior/code

All futures eventually complete with only one invocation of flushCommands()

Environment

  • Lettuce version(s): [5.1.8.RELEASE]
  • Redis version: [5.0.5]

Possible Solution

Is it possible to separate creation of a Connection object (and command buffer), and actual network connectivity. This will allow to store command into the buffer before returning future to the client.

Additional context

From the code, looks like MasterSlaveChannelWriter will have the same issue, though I have not validated this.

volodymyrpavlenko avatar Aug 29 '19 12:08 volodymyrpavlenko

The code behaves as designed. Connections to cluster nodes are established asynchronously. Flushing commands without having all cluster connections established can lead to the state where a connection is being created while flushCommands() is called. This signal does not reach the connection in the creation and that is why you see some commands not executed.

Changing AutoFlush state requires special attention and more attention when using Redis Cluster or Master/Slave. Connections to nodes need to be fully initialized before AutoFlush state can be altered.

mp911de avatar Sep 06 '19 10:09 mp911de

Hi Mark, thanks for your reply!

  1. Is there a way to know if the connection has been already established? It seemed that it's quite encapsulated inside the Lettuce code, and is not exposed to the application.

  2. What would happen if connection is broken for any reason, and the Lettuce client needs to reconnect when the program is running, for example in the following case:

1. application successfully checks if the client is connected (how?)
2. network connection interrupted, client disconnects
3. client starts reconnection
4. send command
5. flush
6. connection established

This would lead to command not being flushed.
  1. If I wait for the connection to be established, there is still a race condition (however a less likely one) if I flush after the connection was established, but before the command was written.

volodymyrpavlenko avatar Sep 06 '19 11:09 volodymyrpavlenko

You can iterate over partitions of the cluster StatefulRedisClusterConnection.getPartitions() and fetch the connection via StatefulRedisClusterConnection.getConnection(…) or StatefulRedisClusterConnection.getConnectionAsync(…).

There's no way to introspect connected nodes. Regarding connected/disconnected state, you can inspect the state by calling StatefulRedisConnection.isOpen() and assume on that basis whether you can send the bulk.

Does that help?

mp911de avatar Sep 06 '19 11:09 mp911de

I am not sure if this will actually work in our approach due to:

  1. Even if the connection is open before sending the batch, the race condition still exists:
  • connection is not established, I send the command without autoflushing disabled (internally, there's a connection promise), and when it's completed, the command is added to a connection buffer
  • I wait for the connection to be established, I get the same promise (AsyncConnectionProvider.getConnection(...)).
  • When promise completes (meaning the connection is established), 2 actions start at the same moment:
    1. add command to the command buffer (internally in lettuce)
    2. flush commands (externally by my code).
  • If for any reason flush commands action is quicker, the command would not be flushed.
  1. In cluster mode connections are stored by slot, iterating over 16384 connection before flushing would be bad from performance point of view.

volodymyrpavlenko avatar Sep 11 '19 07:09 volodymyrpavlenko

Therefore, setAutoFlushCommands(…) should be really used in standalone scenarios if you have full control over the connection and not in use-cases where connections might be shared.

Closing this ticket as we effectively cannot do anything here.

mp911de avatar Sep 11 '19 19:09 mp911de

In this case, it'd be great to update documentation, since it took a lot of time for us to find the reason for the race condition.

If pipelining should not be used in master/slave or cluster mode, why not to remove the method altogether from the code to avoid developers confusion in future?

volodymyrpavlenko avatar Sep 12 '19 07:09 volodymyrpavlenko

Controlling command flushing (e.g. when running a standalone main or something like that) can improve performance by the factor of 10x. For this to properly work, you either need a standalone connection of a properly warmed connection. It is intended for bulk-loading.

Updating the docs is a good suggestion, we should do that.

mp911de avatar Sep 12 '19 07:09 mp911de

Just to give you more context, we have a system with a very high throughput, which is also latency critical. We need to get batches of randomly spread keys (total volume peaks at 10m items/sec).

Since redis cluster does not support cross-slot MGET, and our keys are randomly distributed, we thought about using pipelining for batching GETs.

We'd like to use cluster for easiness of nodes discovery, failover, and sharding.

Do you have a better solution of this problem?

volodymyrpavlenko avatar Sep 12 '19 07:09 volodymyrpavlenko

Have you tried using plain MGET calls through Lettuce's API? The Redis Cluster implementation splits keys into command chunks according the slot-hash. There's an ongoing discussion at #1116 about a similar topic. To improve time to first response, you could use the streaming MGET API. But really, I suggest benchmarking first before opening up for more complexity.

If Lettuce cannot satisfy your requirements, netty has a built-in Redis codec that gives you way more control over the Redis protocol. Did you try whether the Jedis Redis client would be an option?

mp911de avatar Sep 12 '19 08:09 mp911de

MGET would be split by slot, and splitting 10-100 sized multigets over 16384 slots will simply transform them into single gets.

We tried this before, and performance was very poor. We found that running pipelines instead of MGETs in cluster mode is a great optimization.

We didn't try Jedis since it's fully synchronous, and I don't think that synchronous approach would work with our load of ~110k reqs/sec on a single client.

We looked into Redisson client, and, in cluster mode it supports pipelining, we are testing it now.

volodymyrpavlenko avatar Sep 12 '19 08:09 volodymyrpavlenko

MGET would be split by slot, and splitting 10-100 sized multigets over 16384 slots will simply transform them into single gets.

We tried this before, and performance was very poor. We found that running pipelines instead of MGETs in cluster mode is a great optimization.

We didn't try Jedis since it's fully synchronous, and I don't think that synchronous approach would work with our load of ~110k reqs/sec on a single client.

We looked into Redisson client, and, in cluster mode it supports pipelining, we are testing it now.

@volodymyrpavlenko Hello,friend,have you solved this scenario based on redisson or other ways?We are facing the same problem!In Redis cluster mode, there is a race condition in the pipeline operation for batch random key query (random slot).manual flush commands often causes pipeline query timeout(some commands maybe never send,so the future will never complete……)

nwpuqyj avatar Oct 06 '24 23:10 nwpuqyj

I'd like to revisit this scenario, as it seems to be something interesting to support, if possible.

tishun avatar Oct 07 '24 07:10 tishun

Is there any plan to work with this? My use case is consuming from Kafka topic reading large chunk of messages at the same time and then using a bulk pipeline operation. There's only one thread per instance and I have complete control over the connection being opened and closed. This works correctly in cluster mode except that I'm only able to use the main instance. When I set the setReadFrom(ReadFrom.REPLICA) then I get random strange errors.

lujop avatar Feb 25 '25 15:02 lujop

Is there any plan to work with this? My use case is consuming from Kafka topic reading large chunk of messages at the same time and then using a bulk pipeline operation. There's only one thread per instance and I have complete control over the connection being opened and closed. This works correctly in cluster mode except that I'm only able to use the main instance. When I set the setReadFrom(ReadFrom.REPLICA) then I get random strange errors.

"and then using a bulk pipeline operation" - does that imply you are flushing manually?

"then I get random strange errors" - can you be more specific here?

Are you sure this is the same issue?

tishun avatar Feb 25 '25 18:02 tishun

Yes, the connection is manually maanged. That's the code:

    override fun exist(set: Set<String>): Map<String, Boolean> {
        val connection = redisPool.borrowObject()
            .apply { setAutoFlushCommands(false) }
        return try {
            connection.readFrom = ReadFrom.REPLICA_PREFERRED
            val result = mutableMapOf<String, Boolean>()
            val commands = connection.async()
            set.chunked(bulkChunkSize).forEach {
                val futuresByKey = it.map { element ->
                    element to commands.exists(
                        element,
                    )
                }
                commands.flushCommands()
                LettuceFutures.awaitAll(
                    bulkTimeout,
                    *(futuresByKey.map { (_, future) -> future }).toTypedArray(),
                )
                result.putAll(futuresByKey.associate { (guid, future) -> guid to (future.get() == 1L) })
            }
            result
        } finally {
            connection.setAutoFlushCommands(true)
            connection.close()
        }
    }

Without connection.readFrom = ReadFrom.REPLICA_PREFERRED it works but with ReadFrom.REPLICA_PREFERRED it didn't.

"then I get random strange errors" - can you be more specific here?

Let me try to setup, get the errors again and report it correctly. I had the problem some time ago and had in my TODO list to report it, and unfortunately I lost the output 🙈

lujop avatar Feb 25 '25 19:02 lujop

Hey @lujop , thanks for all the feedback!

Without connection.readFrom = ReadFrom.REPLICA_PREFERRED it works but with ReadFrom.REPLICA_PREFERRED it didn't.

Perhaps you have such a topology that only replica nodes don't have their connections established. You have one master and multiple replicas?

Let me try to setup, get the errors again and report it correctly. I had the problem some time ago and had in my TODO list to report it, and unfortunately I lost the output 🙈

Awesome. This would help us better understand your problem.

I can't promise the team we will decide to support this as a feature of the driver, but there might be a reasonable workaround we can suggest to folks that need it. I will discuss this in the team.

tishun avatar Feb 26 '25 08:02 tishun

Hey @lujop , thanks for all the feedback! Perhaps you have such a topology that only replica nodes don't have their connections established. You have one master and multiple replicas?

It is an AWS ElastiCache Redis OSS cluster with one shard and 3 nodes with cluster mode enabled. There should be one master and two replicas for the unique shard. Do you know what is the configuration parameter that indicates that replica nodes do not have their connection established?

Let me try to setup, get the errors again and report it correctly. I had the problem some time ago and had in my TODO list to report it, and unfortunately I lost the output 🙈

Awesome. This would help us better understand your problem.

Sorry I didn't remember, there are no errors it just gets stuck randomly. A simple test just calling the exists() method from my previous code makes it randomly wait forever in the LettuceFutures.awaitAll when using connection.readFrom = ReadFrom.REPLICA_PREFERRED or ReadFrom.ANY and works when removing it.

If I remove the setAutoFlush(false) then it also works. But then performance decreases significantly.

lujop avatar Feb 26 '25 10:02 lujop

Do you know what is the configuration parameter that indicates that replica nodes do not have their connection established?

All connections are created on demand. Sorry, not sure what you are asking here, could you please rephrase?

A simple test just calling the exists() method from my previous code makes it randomly wait forever in the LettuceFutures.awaitAll when using connection.readFrom = ReadFrom.REPLICA_PREFERRED or ReadFrom.ANY and works when removing it.

Seems like a similar problem with different consequences.

If I remove the setAutoFlush(false) then it also works. But then performance decreases significantly.

Yes, issue is that the setAutoFlush(false) was primarily intended for straight bulk import scenarios. Yours is a more complicated one and it seems the driver was never intended to handle it.

tishun avatar Feb 27 '25 14:02 tishun

All connections are created on demand. Sorry, not sure what you are asking here, could you please rephrase?

My understanding is exactly this. But I asked to try to understand what you said in your first reply: Perhaps you have such a topology that only replica nodes don't have their connections established. You have one master and multiple replicas?

Yes, issue is that the setAutoFlush(false) was primarily intended for straight bulk import scenarios. Yours is a more complicated one and it seems the driver was never intended to handle it.

But is it a very unusual scenario? From my point of view, any high throughput streaming use case that does reads will benefit from this. Won't it? Because in streaming scenarios were you can bulk is very efficient. But it's a pity that you can't not use read instances because it affects throughput and someway COGS

lujop avatar Feb 28 '25 20:02 lujop

I created this test to isolate/reproduce the issue from lettuce code base https://github.com/redis/lettuce/commit/af0f3b4054e14bbf34e67ecca52f38f9929ad6de

Note: I had to use a different docker image that contains replicas and I don't know why it doesn't start correctly from TEstContainers but starts correctly using docker compose from terminal.

lujop avatar Mar 07 '25 09:03 lujop

But is it a very unusual scenario? From my point of view, any high throughput streaming use case that does reads will benefit from this. Won't it?

I don't disagree :) I am just saying that it was never intended to be used that way and it is not surprising it does not work. But we are - as it is - entertaining the option to expand the functionality.

I created this test to isolate/reproduce the issue from lettuce code base https://github.com/redis/lettuce/commit/af0f3b4054e14bbf34e67ecca52f38f9929ad6de

Thanks for the reproducer! I will try to get back to you as soon as I have some time to look over it.

tishun avatar Mar 07 '25 11:03 tishun