[Question] Stream loop xread and xadd on same process ?
hello i encouter a problem with ioredis. I need to listen constantly a stream channel and possibly add data in another stream. To illustrate my problem i make this code
js
const Redis = require("ioredis");
const redis = new Redis({
enableAutoPipelining: true,
});
const { setTimeout } = require("timers/promises");
const processMessage = (message) => {
const data = message[1];
console.log("DEBUG", data);
};
// const redisListener = redis.duplicate();
async function listenForMessage(lastId = "$") {
// `results` is an array, each element of which corresponds to a key.
// Because we only listen to one key (mystream) here, `results` only contains
// a single element. See more: https://redis.io/commands/xread#return-value
const results = await redis.xread(
"block",
0,
"STREAMS",
"test-stream",
lastId
);
const [key, messages] = results[0]; // `key` equals to "mystream"
messages.forEach(processMessage);
// Pass the last id of the results to the next round.
await listenForMessage(messages[messages.length - 1][0]);
}
async function run() {
await setTimeout(1000);
// in real case it should be another channel of the listened channel
// us same channel to facilitate test
redis.xadd("test-stream", "*", "message", "hello");
console.log("publish");
await setTimeout(1000);
redis.xadd("test-stream", "*", "message", "hello2");
await setTimeout(1000);
redis.xadd("test-stream", "*", "message", "hello3");
}
listenForMessage();
run();
in this case when listenMessage process running xadd not working in run process. The only solution that i found to have a listener and publisher is to duplicate redis connection. But i'm not sure is the write way. Anyone can help to improve my code base pls?
That's how blocking queries work in redis.
If you use BLOCK 0 in combination with xread, it won't be possible to call xadd as the connection is in use by xread. BLOCK 0 waits without timeout until a message is there, so the xadd can never happen on the same connection.
Either block not that long (using BLOCK 500) to execute xadd once it's finished or duplicate the connection.
pipelining isn't multiplexing, so redis can't handle two commands simultaneously on the same connection.
As best practice, use one global redis instance, which is only used by non-blocking queries. And then one connection for one or multiple blocking queries.