etcd
etcd copied to clipboard
Fixed unexpected cancelled watch with WatchID=0.
etcdserver: Fixed unexpected cancelled watch with WatchID=0.
I ran into the same problem from https://github.com/etcd-io/etcd/issues/12385, I found that the error will cause the watch with WatchID=0 to be invalid,so I set WatchID to start at 1 to fix it.
First of all WatchID=0 is semantic, it's AutoWatchID.
Secondly when token expired, make a new watch request to server side, serverWatchStream.isWatchPermitted returns false, then send WatchResponse to serverWatchStream.ctrlStream with Canceled=true, however WatchResponse.WatchID=creq.WatchId, default creq.WatchId=0, so a watch with WatchID=0 will be unexpectedly cancelled on the server side and client can never receive messages with WatchID=0.
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId, // default to 0
Canceled: true, // will delete 0 from ids
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
}
select {
case sws.ctrlStream <- wr: // send to ctrlStream
continue
case <-sws.closec:
return nil
}
}
...
case c, ok := <-sws.ctrlStream: // receive from ctrlStream
if !ok {
return
}
...
// track id creation
wid := mvcc.WatchID(c.WatchId)
if c.Canceled {
delete(ids, wid) // delete from ids and client can never receive messages
continue
}
I found some comments that said watchID starts at 0, I don't know if there are any other effects.
Thanks @kafuu-chino for the PR, but I did not get what's the issue you are fixing. The PR is also causing some test failures.
@ahrtr Let me explain in detail:
- First,
WatchCreateRequestdoes not set the value ofWatchID, soWatchID = 0.
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
StartRevision: wr.rev,
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
Fragment: wr.fragment,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
- On server side,
WatchID = 0means theWatchIDis automatically generated, seeAutoWatchID.
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0
Due to ws.nextID = 0, so the first watch with WatchID = 0.
if id == AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
}
- Watch once, initialize watch with
WatchID = 0and wait for the token to expire. Then watch again, if the token expires,sws.isWatchPermittedreturns false,creq.WatchId = 0,Canceled = trueandCreated = trueat this time, then sendWatchResponsetosws.ctrlStream.
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId, // = 0
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
}
select {
case sws.ctrlStream <- wr:
continue
case <-sws.closec:
return nil
}
}
- Next handle
WatchResponseon afterSend, ifCanceled == true, thendelete(ids, wid),wid = 0at this time. This operation causes the server considers the watch withWatchID = 0to be no longer active.
case c, ok := <-sws.ctrlStream:
if !ok {
return
}
if err := sws.gRPCStream.Send(c); err != nil {
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
} else {
ws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
streamFailures.WithLabelValues("send", "watch").Inc()
}
return
}
// track id creation
wid := mvcc.WatchID(c.WatchId)
if c.Canceled {
delete(ids, wid)
continue
}
- Client side receives
WatchResponse, then enter switchcase pbresp.Created:. The new watch returns an error, the old watch withWatchID = 0does not respond, and no longer receives messages from the server. This is not what we want, we just want this watch to fail, and we don't want to make the previous watch unresponsive on the server side.
case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
// merge new events
cur.Events = append(cur.Events, pbresp.Events...)
// update "Fragment" field; last response with "Fragment" == false
cur.Fragment = pbresp.Fragment
}
switch {
case pbresp.Created:
I looked at the commit history and restored creq.WatchId to -1.
@ahrtr @mitake Help review it sometime.
@kafuu-chino Could you squash the 2 commits into one, and change the commit title like this? *: avoid closing a watch with ID 0 incorrectly
~I think the test failure isn’t related to your change, so running tests on my local machine.~ TestKVCompact is failing because of the assert.
@mitake Sure, I've squashed and rebase to the latest commit.
@kafuu-chino The change looks good to me. But please rebase this PR instead of merging main into your dev branch. Firstly sync your main branch with the upstream etcd. Afterwards, execute commands something like below,
$ git checkout main
$ git pull
$ git checkout fix
$ git rebase -i main # keep the "pick" in the first line, and change all the following lines' "pick" to "s"
@ahrtr Like this?
@mitake That's the idea for my local test.
- Watch a key.
- Watch a new key and enter this code https://github.com/etcd-io/etcd/blob/main/server/etcdserver/api/v3rpc/watch.go#L271. I set the
--auth-token-ttlof the server and wait for the token to expire. - Set new value of key in step 1, and check whether message from watch can be received normally.
I need to read the tests package code first and it's take some time to think test code.
@kafuu-chino I'm writing a test case in https://github.com/etcd-io/etcd/pull/14322/commits/ff9c6612f7dff2dd730ee4be0c4ab167ba042e6f for https://github.com/etcd-io/etcd/pull/14322 For now the test case is still in progress (for some reasons the unary interceptor refreshes the token so the mechanism I'm adding isn't used), but it might also be useful for this PR too. If you want to try it as a starting point, feel free to use it.
@kafuu-chino @ahrtr I finalized the test for my PR in https://github.com/etcd-io/etcd/pull/14322/commits/94fd1612949e5b4279e923efdec7b2de67e5d3bc I think it can be extended for covering the change in this PR. Let me think about it. Probably I'll be able to share my update sometime early next week.
@kafuu-chino On the second thought, I think this change is essentially not limited to auth token TTL. I wrote below integration test and checked that the test can fail in the main branch and run successfully with your branch. Could you check it's a suitable test case for your change? If it's ok, could you include it in this PR? (path is tests/integration/v3_auth_test.go)
func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
users := []user{
{
name: "user1",
password: "user1-123",
role: "role1",
key: "k1",
end: "k2",
},
}
authSetupUsers(t, integration.ToGRPC(clus.Client(0)).Auth, users)
authSetupRoot(t, integration.ToGRPC(clus.Client(0)).Auth)
c, cerr := integration.NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
if cerr != nil {
t.Fatal(cerr)
}
defer c.Close()
watchEndCh := make(chan interface{})
go func() {
wChan := c.Watch(ctx, "k1", clientv3.WithRev(1))
watchResponse := <-wChan
fmt.Printf("watch response from k1: %v\n", watchResponse)
testutil.AssertTrue(t, len(watchResponse.Events) != 0)
watchEndCh <- struct{}{}
}()
// Sleep for making sure that the above goroutine invokes Watch()
// So the above Watch() can get watch ID = 0
time.Sleep(1 * time.Second)
wChan := c.Watch(ctx, "non-allowed-key", clientv3.WithRev(1))
watchResponse := <-wChan
testutil.AssertNotNil(t, watchResponse.Err()) // permission denied
_, err := c.Put(ctx, "k1", "val")
if err != nil {
t.Fatalf("Unexpected error from Put: %v", err)
}
<-watchEndCh
}
@mitake Ok, I'll try the test later.
@mitake I think the test is fine and add it to the PR.
@kafuu-chino Could you rebase this PR based on the latest main? My PR introduced a conflict in the integration test file, sorry.
@mitake Rebase finished.
@mitake @ahrtr modification of test has committed, please review.
Codecov Report
Merging #14296 (f1d4935) into main (434c7c4) will decrease coverage by
0.10%. The diff coverage is100.00%.
@@ Coverage Diff @@
## main #14296 +/- ##
==========================================
- Coverage 75.35% 75.24% -0.11%
==========================================
Files 457 457
Lines 37190 37193 +3
==========================================
- Hits 28023 27986 -37
- Misses 7408 7442 +34
- Partials 1759 1765 +6
| Flag | Coverage Δ | |
|---|---|---|
| all | 75.24% <100.00%> (-0.11%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| client/v3/watch.go | 91.81% <100.00%> (-0.59%) |
:arrow_down: |
| server/etcdserver/api/v3rpc/watch.go | 86.03% <100.00%> (+0.45%) |
:arrow_up: |
| server/proxy/grpcproxy/watch.go | 92.48% <100.00%> (-1.16%) |
:arrow_down: |
| server/storage/mvcc/watcher.go | 100.00% <100.00%> (ø) |
|
| client/pkg/v3/tlsutil/tlsutil.go | 83.33% <0.00%> (-8.34%) |
:arrow_down: |
| server/storage/mvcc/watchable_store.go | 84.42% <0.00%> (-7.98%) |
:arrow_down: |
| client/v3/experimental/recipes/queue.go | 58.62% <0.00%> (-6.90%) |
:arrow_down: |
| client/v3/leasing/util.go | 95.00% <0.00%> (-3.34%) |
:arrow_down: |
| server/etcdserver/api/v3rpc/util.go | 70.96% <0.00%> (-3.23%) |
:arrow_down: |
| server/etcdserver/api/v3rpc/member.go | 93.54% <0.00%> (-3.23%) |
:arrow_down: |
| ... and 16 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Overall look good to me. Thanks!
@kafuu-chino Thanks a lot for working on this! Is it possible for you to open PRs which backport this change to 3.4 and 3.5 branches?
@kafuu-chino please let me know if you don’t have time to backport the change to release-3.4 and 3.5. Probably I’ll be able to help.
@kafuu-chino please let me know if you don’t have time to backport the change to release-3.4 and 3.5. Probably I’ll be able to help.
Sorry, I just got back from my vacation, I'll handle it today.