lettuce
lettuce copied to clipboard
multi-threads share one connection, async command xadd writeandflush success,but redis stream queue has no data
Bug Report
Current Behavior
Stack trace
one thread use common connection async command and redis-future blocking read data from redis-stream queue
and other thread the common connection async command to send xadd cmd to write data to redis-stream queue
and i use wireshark to grab network data package, it shows xadd cmd send to redis successful and redis response ack, but in redis-stream queue has data
Input Code
Input Code
// your code here;
Expected behavior/code
Environment
- Lettuce version(s): [e.g. 5.0.0.RELEASE, 4.2.2.Final]
- Redis version: [e.g. 4.0.9]
Possible Solution
Additional context
Have you been able to reproduce the issue with redis-cli
?
Have you been able to reproduce the issue with
redis-cli
? first:subscribe one queue called GlobalConfig.streamRecvQueue success
second:use redis-cli execute XADD to write data to GlobalConfig.streamRecvQueue
then: see the code logic,on received data from GlobalConfig.streamRecvQueue queue, it will submit a task to thread pool to send data to GlobalConfig.streamSendQueue queue, but in redis GlobalConfig.streamSendQueue queue has no data
wireshark: shows send data to GlobalConfig.streamSendQueue queue cmd success
redis-cli: show GlobalConfig.streamSendQueue queue has no data
send logic:
finally, if i repeat exec xadd cmd to write data to GlobalConfig.streamRecvQueue queue, in redis GlobalConfig.streamSendQueue queue has data like this image
attention:GlobalConfig.streamRecvQueue and GlobalConfig.streamSendQueue are not the same queue
Can you provide your Java code as text instead of screenshot? I do not want to type everything into my IDE to reproduce the case.
many thanks , code as follow, you may be modify some code, like subscribe part, [handler.handle] can be changed by threadpool to invoke [sendMessage]
1、create connection:
public RedisStreamMqClient(String url) { this.client = RedisClient.create(url); this.connection = this.client.connect(); }
2、create queue
public void createQueue(Queue queue, boolean existedOk) { RedisAsyncCommands<String, String> commands = this.connection.async(); StringCodec codec = StringCodec.UTF8; CommandArgs<String, String> args = new CommandArgs<>(codec) .add(CommandKeyword.CREATE) .add(queue.getName()) .add(queue.getGroup()) .add(queue.getConsumeId()); if (queue.isMkStream()) { args.add("MKSTREAM"); } RedisFuture<String> future = commands.dispatch(CommandType.XGROUP, new StatusOutput<>(codec), args); try { future.get(); } catch (ExecutionException e) { if (existedOk && StringUtils.containsIgnoreCase(e.getMessage(), SEARCH_MSG)) { return; } throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }
3、subscribe queue
public void subscribeQueue(Consumer consumer,XReadGroupHandler handler,ExceptionHandler exceptionHandler) { Queue queue = Queue.builder() .name(consumer.getQueue()) .group(consumer.getGroup()) .build(); createQueue(queue, true); RedisAsyncCommands<String, String> commands = this.connection.async(); StringCodec codec = StringCodec.UTF8; CommandArgs<String, String> args = new CommandArgs<>(codec) .add(CommandKeyword.GROUP) .add(consumer.getGroup()) .add(consumer.getName()); if (consumer.isNoAck()) { args.add(CommandKeyword.NOACK); } if (consumer.isBlock()) { args.add(CommandKeyword.BLOCK).add(0); } args.add(CommandKeyword.COUNT) .add(consumer.getCount()) .add("STREAMS") .add(consumer.getQueue()) .add(consumer.getConsumeId()); while (true) { try { RedisFuture<List<Object>> future = commands.dispatch(CommandType.XREADGROUP, new NestedMultiOutput<>(codec), args); List<Object> consumedObjects = future.get(); XReadGroupResponse response = XReadGroupResponse.fromOutput((List) consumedObjects.get(0)); String[] messageIds = response.getMessageIds().toArray(new String[0]); ackMessage(consumer.getQueue(), consumer.getGroup(), messageIds); handler.handle(response); log.info("recv-----------------"); } catch (InterruptedException | ExecutionException e) { exceptionHandler.handle(e); try { Thread.sleep(consumer.getDelay()); } catch (InterruptedException ex) { throw new RuntimeException(ex); } createQueue(queue, true); } } }
4、send data
public String sendMessage(Message message) { RedisAsyncCommands<String, String> commands = this.connection.async(); XAddArgs addArgs = XAddArgs.Builder .maxlen(message.getMaxLength()) .approximateTrimming(message.isApproximateTrimming()); List<Object> keysAndValues = new ArrayList<>(Arrays.asList(message.getKeysAndValues())); keysAndValues.add("request_id"); keysAndValues.add(message.getRequestId()); Object[] data = keysAndValues.toArray(new Object[0]); RedisFuture<String> future = commands.xadd(message.getQueue(), addArgs, data); try { String ret = future.get(); return ret; } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }
Can you provide your Java code as text instead of screenshot? I do not want to type everything into my IDE to reproduce the case.
hi, i have provide the sample code, is it useful?
I was not able to reproduce the issue. Having a reproducer would be useful to proceed here.
If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 30 days this issue will be closed.