Sentinel scan iterator
Description
This PR implements the scanIterator method for the RedisSentinel client.
Previously, scanIterator was not exposed on the Sentinel client, limiting the ability to iterate over keys on the master node directly through the Sentinel interface. This implementation adds that capability with handling for failovers.
Issue: #2999
Changes:
- Automatic Failover Handling: If a master failover occurs during iteration, the iterator detects the
MASTER_CHANGEtopology event and automatically restarts the scan from the beginning on the new master. This ensures all keys are eventually covered, even if the topology changes mid-scan. - Reset-on-Failover Strategy: Since Redis cursors are stateful and specific to a node instance, the iterator deliberately restarts from cursor 0 (the beginning) upon detecting a master failover. This guarantees data completeness but may result in duplicate keys being yielded, as documented.
Checklist
- [X] Does
npm testpass with this change (including linting)? - [X] Is the new or changed code fully tested?
- [X] Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
Hi, I’m Jit, a friendly security platform designed to help developers build secure applications from day zero with an MVS (Minimal viable security) mindset.
In case there are security findings, they will be communicated to you as a comment inside the PR.
Hope you’ll enjoy using Jit.
Questions? Comments? Want to learn more? Get in touch with us.
Hi @harshrai654, thanks for the contribution! On first glance, this looks good, gut it looks like there are some failing tests. Can you please resolve those, so we can have more thorough look on the final state of the code
Hey @nkaradzhov I have fixed the tests. A stopped node in previous test of the describe block was causing issues in the next test which requires healthy nodes in the sentinel. Added cleaning up of nodes of frame between these two tests.
@harshrai654 thank you so much! I will need to take a thorough look into this before passing it, and I am extremely busy on a big change that needs to happen very soon. So, apologies, but I will have to delay my full review here.
@harshrai654 thank you so much! I will need to take a thorough look into this before passing it, and I am extremely busy on a big change that needs to happen very soon. So, apologies, but I will have to delay my full review here.
No worries, Thanks for the update and for letting me know!
This seems to cause a deadlock when the consumer tries to do another operation during the scan and there's not enough master clients available.
for await (const keys of sentinel.scanIterator()) {
console.log(keys)
// Hangs forever
const values = await sentinel.mGet(keys)
console.log(values)
}
Hey @TheIndra55, thanks for catching this! You are absolutely correct.
Acquiring the master connection for the entire lifespan of the iterator causes a resource starvation deadlock. Since acquire reserves a client from the pool and doesn't release it until the iterator finishes, any operation inside the consumer's loop (like your mGet example) that requires a connection will block indefinitely if the pool is exhausted (e.g., a single-connection pool).
Even with a larger pool, this implementation unnecessarily holds a connection idle while the user's code executes, reducing overall throughput.
The fix can be to do an "Acquire->Scan->Release" loop: we should acquire the connection only for the scan operation itself and release it immediately before yielding to the user. This ensures the pool is free for other operations during the consumer's processing phase.
Here is the proposed implementation:
async *scanIterator(
this: RedisSentinelType<M, F, S, RESP, TYPE_MAPPING>,
options?: ScanOptions & ScanIteratorOptions
) {
let cursor = options?.cursor ?? "0";
let shouldRestart = false;
// This listener persists for the entire iterator life to track failovers
const handleTopologyChange = (event: RedisSentinelEvent) => {
if (event.type === "MASTER_CHANGE") {
shouldRestart = true;
}
};
this.on("topology-change", handleTopologyChange);
try {
do {
// 1. Acquire lease JUST for the scan command
const masterClient = await this.acquire();
let reply;
try {
// If master changed since last loop, reset cursor to 0
if (shouldRestart) {
cursor = "0";
shouldRestart = false;
}
reply = await masterClient.scan(cursor, options);
} finally {
// 2. Release IMMEDIATELY so the pool is available for the user
masterClient.release();
}
// Handle edge case: Topology changed *during* the scan
if (shouldRestart) {
// Data might be from the old master, so we discard this batch
// and restart from cursor 0 on the new master.
cursor = "0";
continue;
}
// 3. Yield to user
// The connection is released, so `await sentinel.mGet()` will succeed
cursor = reply.cursor;
yield reply.keys;
} while (cursor !== "0");
} finally {
this.removeListener("topology-change", handleTopologyChange);
}
}
This ensures we respect the pool limits and handle topology changes correctly by resetting the cursor if a failover occurs between (or during) iterations.
What do you think?