vertx-redis-client icon indicating copy to clipboard operation
vertx-redis-client copied to clipboard

How to use vertx-redis-client well? The version is 4.1.2

Open aloserman opened this issue 2 years ago • 3 comments

When I wrote the following code and found that a large amount of data entered, it could not work properly. The cause is as follows: Future{cause=Connection pool reached max wait queue size of 200}; All data is discarded。 How can we ensure that a piece of data is not lost and use it correctly. I did not find a good way to use it in the official

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        vertx = Vertx.vertx();
        RedisOptions options = new RedisOptions();
        options.setType(RedisClientType.STANDALONE)
                .setMaxPoolSize(30)
                .setMaxWaitingHandlers(1024)
                .setPoolRecycleTimeout(15_000)
                .setMaxPoolWaiting(200)
                .setPassword("password");

        List<String> clusters = new ArrayList<>();
        clusters.add("redis://host:port/db");
        options.setEndpoints(clusters);
        Redis client = Redis.createClient(vertx, options);
        api = RedisAPI.api(client);
    }

    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        api.hset(value, new Handler<AsyncResult<Response>>() {
            @Override
            public void handle(AsyncResult<Response> responseAsyncResult) {
                if (!responseAsyncResult.succeeded()) {
                    System.out.println("-----------failed");
                    System.out.println(responseAsyncResult.cause());
                }
            }
        });
    }

Future{cause=Connection pool reached max wait queue size of 2000}

aloserman avatar Jul 27 '21 09:07 aloserman

@aloserman you're used the pooled mode, which means that for each command, a connection is acquired from the pool, the command executed, and the connection returned to the pool.

Given your error, it seems that you're invoking the invoke method with a high frequency, the frequency is so high that the bottleneck becomes getting and returning the connection.

To solve this, you should reuse the connection:

Redis client = Redis.createClient(vertx, options);
client.connect()
  .onSuccess(conn -> {
        api = RedisAPI.api(conn);
  });

This way only a single connection will be used and all the queuing is controlled by: setMaxWaitingHandlers making it a large number usually is good enough.

Remember that now is also up to your application to decide when to release the connection:

conn.close();

pmlopes avatar Jul 27 '21 10:07 pmlopes

Through your guidance, the code is like this, but there is an exception that cannot be serialized. How can I solve it? ` @Override public void open(Configuration parameters) throws Exception { super.open(parameters);

            vertx = Vertx.vertx();
            options.setType(RedisClientType.STANDALONE)
                    .setMaxPoolSize(30)
                    .setMaxWaitingHandlers(1024)
                    .setPoolRecycleTimeout(15_000)
                    .setMaxPoolWaiting(-1)
                    .setPassword("password");

            List<String> clusters = new ArrayList<>();
            clusters.add("redis://host:port/db");
            options.setEndpoints(clusters);
        }
        @Override
        public void invoke(List<String> value, Context context) throws Exception {
            Redis client = Redis.createClient(vertx, options);
            client.connect().onSuccess(new Handler<RedisConnection>() {
                @Override
                public void handle(RedisConnection conn) {
                    RedisAPI api = RedisAPI.api(conn);
                    api.hset(value, new Handler<AsyncResult<Response>>() {
                        @Override
                        public void handle(AsyncResult<Response> event) {
                            if (!event.succeeded()) {
                                System.out.println("-----------failed");
                                System.out.println(event.cause());
                            }
                            api.close();
                            if (conn != null) {
                                conn.close();
                            }
                        }
                    });
                }
            });
        }`

exception as follow: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: io.vertx.core.net.NetClientOptions@13805618 is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243) at com.test.findbug.TestRedisAsyncInsert.main(TestRedisAsyncInsert.java:44) Caused by: java.io.NotSerializableException: io.vertx.core.net.NetClientOptions at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 7 more

aloserman avatar Jul 28 '21 03:07 aloserman

When the data emit speed is very fast, the client will be blocked and no data will be sent out. How can we ensure that when the processing reaches the bottleneck, the data will not be discarded and can be sent normally.

aloserman avatar Jul 28 '21 05:07 aloserman