lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

multi-threads share one connection, async command xadd writeandflush success,but redis stream queue has no data

Open phlahut opened this issue 1 year ago • 7 comments

Bug Report

Current Behavior

Stack trace
   one thread use common connection async command and redis-future blocking read data from redis-stream queue

image

   and other thread the common connection async command to send xadd cmd to write data to redis-stream queue

image

   and  i use wireshark to grab network data package, it shows xadd cmd send to redis successful and redis response ack, but in redis-stream queue has data 

image

Input Code

Input Code
// your code here;

Expected behavior/code

Environment

  • Lettuce version(s): [e.g. 5.0.0.RELEASE, 4.2.2.Final]
  • Redis version: [e.g. 4.0.9]

Possible Solution

Additional context

phlahut avatar Sep 15 '22 10:09 phlahut

Have you been able to reproduce the issue with redis-cli?

mp911de avatar Sep 15 '22 12:09 mp911de

Have you been able to reproduce the issue with redis-cli? first:subscribe one queue called GlobalConfig.streamRecvQueue success image

second:use redis-cli execute XADD to write data to GlobalConfig.streamRecvQueue image

then: see the code logic,on received data from GlobalConfig.streamRecvQueue queue, it will submit a task to thread pool to send data to GlobalConfig.streamSendQueue queue, but in redis GlobalConfig.streamSendQueue queue has no data image

wireshark: shows send data to GlobalConfig.streamSendQueue queue cmd success image

redis-cli: show GlobalConfig.streamSendQueue queue has no data image

send logic: image

finally, if i repeat exec xadd cmd to write data to GlobalConfig.streamRecvQueue queue, in redis GlobalConfig.streamSendQueue queue has data like this image image

attention:GlobalConfig.streamRecvQueue and GlobalConfig.streamSendQueue are not the same queue

phlahut avatar Sep 16 '22 02:09 phlahut

Can you provide your Java code as text instead of screenshot? I do not want to type everything into my IDE to reproduce the case.

mp911de avatar Sep 16 '22 06:09 mp911de

many thanks , code as follow, you may be modify some code, like subscribe part, [handler.handle] can be changed by threadpool to invoke [sendMessage]

1、create connection: public RedisStreamMqClient(String url) { this.client = RedisClient.create(url); this.connection = this.client.connect(); }

2、create queue public void createQueue(Queue queue, boolean existedOk) { RedisAsyncCommands<String, String> commands = this.connection.async(); StringCodec codec = StringCodec.UTF8; CommandArgs<String, String> args = new CommandArgs<>(codec) .add(CommandKeyword.CREATE) .add(queue.getName()) .add(queue.getGroup()) .add(queue.getConsumeId()); if (queue.isMkStream()) { args.add("MKSTREAM"); } RedisFuture<String> future = commands.dispatch(CommandType.XGROUP, new StatusOutput<>(codec), args); try { future.get(); } catch (ExecutionException e) { if (existedOk && StringUtils.containsIgnoreCase(e.getMessage(), SEARCH_MSG)) { return; } throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }

3、subscribe queue public void subscribeQueue(Consumer consumer,XReadGroupHandler handler,ExceptionHandler exceptionHandler) { Queue queue = Queue.builder() .name(consumer.getQueue()) .group(consumer.getGroup()) .build(); createQueue(queue, true); RedisAsyncCommands<String, String> commands = this.connection.async(); StringCodec codec = StringCodec.UTF8; CommandArgs<String, String> args = new CommandArgs<>(codec) .add(CommandKeyword.GROUP) .add(consumer.getGroup()) .add(consumer.getName()); if (consumer.isNoAck()) { args.add(CommandKeyword.NOACK); } if (consumer.isBlock()) { args.add(CommandKeyword.BLOCK).add(0); } args.add(CommandKeyword.COUNT) .add(consumer.getCount()) .add("STREAMS") .add(consumer.getQueue()) .add(consumer.getConsumeId()); while (true) { try { RedisFuture<List<Object>> future = commands.dispatch(CommandType.XREADGROUP, new NestedMultiOutput<>(codec), args); List<Object> consumedObjects = future.get(); XReadGroupResponse response = XReadGroupResponse.fromOutput((List) consumedObjects.get(0)); String[] messageIds = response.getMessageIds().toArray(new String[0]); ackMessage(consumer.getQueue(), consumer.getGroup(), messageIds); handler.handle(response); log.info("recv-----------------"); } catch (InterruptedException | ExecutionException e) { exceptionHandler.handle(e); try { Thread.sleep(consumer.getDelay()); } catch (InterruptedException ex) { throw new RuntimeException(ex); } createQueue(queue, true); } } }

4、send data public String sendMessage(Message message) { RedisAsyncCommands<String, String> commands = this.connection.async(); XAddArgs addArgs = XAddArgs.Builder .maxlen(message.getMaxLength()) .approximateTrimming(message.isApproximateTrimming()); List<Object> keysAndValues = new ArrayList<>(Arrays.asList(message.getKeysAndValues())); keysAndValues.add("request_id"); keysAndValues.add(message.getRequestId()); Object[] data = keysAndValues.toArray(new Object[0]); RedisFuture<String> future = commands.xadd(message.getQueue(), addArgs, data); try { String ret = future.get(); return ret; } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }

phlahut avatar Sep 16 '22 09:09 phlahut

Can you provide your Java code as text instead of screenshot? I do not want to type everything into my IDE to reproduce the case.

hi, i have provide the sample code, is it useful?

phlahut avatar Sep 20 '22 10:09 phlahut

I was not able to reproduce the issue. Having a reproducer would be useful to proceed here.

mp911de avatar Nov 22 '22 09:11 mp911de

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 30 days this issue will be closed.

github-actions[bot] avatar Apr 26 '24 00:04 github-actions[bot]