kvrocks
kvrocks copied to clipboard
Blocking commands may block permanently due to lost wakeup
Search before asking
- [X] I had searched in the issues and found no similar issues.
Version
unstable
Minimal reproduce step
I discovered this issue while testing a PR(https://github.com/apache/kvrocks/pull/2332), even though the PR increased the likelihood of it occurring, the problem actually existed beforehand.
It is difficult to reproduce manually without modifying the code, and it almost exclusively occurs during Go's integration tests.
Here's an example with TestRegression
:
func TestRegression(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
c := srv.NewTCPClient()
defer func() { require.NoError(t, c.Close()) }()
proto := "*3\r\n$5\r\nBLPOP\r\n$6\r\nhandle\r\n$1\r\n0\r\n"
require.NoError(t, c.Write(fmt.Sprintf("%s%s", proto, proto)))
resList := []string{"*2", "$6", "handle", "$1", "a"}
v := rdb.RPush(ctx, "handle", "a")
require.EqualValues(t, 1, v.Val())
for _, res := range resList {
c.MustRead(t, res)
}
v = rdb.RPush(ctx, "handle", "a")
require.EqualValues(t, 1, v.Val())
for _, res := range resList {
c.MustRead(t, res)
}
}
By printing some information, I was able to trace the error:
Start Execute BPOP
TryPopFormList
CommandPush::Execute
WakeupBlockingConns: handle, n_conns: 1
CommandPush::Execute OK
BlockingCommander::StartBlocking
BlockKeys
The issue occurs here:
// CommandBPop
Status Execute(Server *srv, Connection *conn, std::string *output) override {
srv_ = srv;
InitConnection(conn);
auto s = TryPopFromList();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already output in TryPopFromList
}
// <==HERE: Push is done after TryPopFromList, but not yet Blocking
return StartBlocking(timeout_, output);
}
To manually reproduce the issue consistently, you can add a sleep and complete a PUSH within 5 seconds:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
srv_ = srv;
InitConnection(conn);
auto s = TryPopFromList();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already output in TryPopFromList
}
std::cout << "Sleep for 5 seconds" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Wake up" << std::endl;
return StartBlocking(timeout_, output);
}
What did you expect to see?
Blocking commands will not be blocked after new key values are added.
What did you see instead?
Blocking command blocks permanently.
Anything Else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!