etcd icon indicating copy to clipboard operation
etcd copied to clipboard

Fixed unexpected cancelled watch with WatchID=0.

Open kafuu-chino opened this issue 3 years ago • 17 comments

etcdserver: Fixed unexpected cancelled watch with WatchID=0.

I ran into the same problem from https://github.com/etcd-io/etcd/issues/12385, I found that the error will cause the watch with WatchID=0 to be invalid,so I set WatchID to start at 1 to fix it.

First of all WatchID=0 is semantic, it's AutoWatchID.

Secondly when token expired, make a new watch request to server side, serverWatchStream.isWatchPermitted returns false, then send WatchResponse to serverWatchStream.ctrlStream with Canceled=true, however WatchResponse.WatchID=creq.WatchId, default creq.WatchId=0, so a watch with WatchID=0 will be unexpectedly cancelled on the server side and client can never receive messages with WatchID=0.

if !sws.isWatchPermitted(creq) {
    wr := &pb.WatchResponse{
        Header:       sws.newResponseHeader(sws.watchStream.Rev()),
        WatchId:      creq.WatchId, // default to 0
        Canceled:     true, // will delete 0 from ids
        Created:      true,
        CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
    }

    select {
        case sws.ctrlStream <- wr: // send to ctrlStream
            continue
        case <-sws.closec:
            return nil
    }
}

...

case c, ok := <-sws.ctrlStream: // receive from ctrlStream
    if !ok {
        return
    }

    ...

    // track id creation
    wid := mvcc.WatchID(c.WatchId)
    if c.Canceled {
        delete(ids, wid) // delete from ids and client can never receive messages
        continue
    }

kafuu-chino avatar Aug 02 '22 11:08 kafuu-chino

I found some comments that said watchID starts at 0, I don't know if there are any other effects.

kafuu-chino avatar Aug 08 '22 03:08 kafuu-chino

Thanks @kafuu-chino for the PR, but I did not get what's the issue you are fixing. The PR is also causing some test failures.

ahrtr avatar Aug 08 '22 20:08 ahrtr

@ahrtr Let me explain in detail:

  1. First, WatchCreateRequest does not set the value of WatchID, so WatchID = 0.
func (wr *watchRequest) toPB() *pb.WatchRequest {
    req := &pb.WatchCreateRequest{
        StartRevision:  wr.rev,
        Key:            []byte(wr.key),
        RangeEnd:       []byte(wr.end),
        ProgressNotify: wr.progressNotify,
        Filters:        wr.filters,
        PrevKv:         wr.prevKV,
        Fragment:       wr.fragment,
    }
    cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
    return &pb.WatchRequest{RequestUnion: cr}
}
  1. On server side, WatchID = 0 means the WatchID is automatically generated, see AutoWatchID.
// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
// user-provided ID is available. If pass, an ID will automatically be assigned.
const AutoWatchID WatchID = 0

Due to ws.nextID = 0, so the first watch with WatchID = 0.

if id == AutoWatchID {
    for ws.watchers[ws.nextID] != nil {
        ws.nextID++
    }
    id = ws.nextID
    ws.nextID++
}
  1. Watch once, initialize watch with WatchID = 0 and wait for the token to expire. Then watch again, if the token expires, sws.isWatchPermitted returns false, creq.WatchId = 0, Canceled = true and Created = true at this time, then send WatchResponse to sws.ctrlStream.
if !sws.isWatchPermitted(creq) {
    wr := &pb.WatchResponse{
        Header:       sws.newResponseHeader(sws.watchStream.Rev()),
        WatchId:      creq.WatchId, // = 0
        Canceled:     true,
        Created:      true,
        CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
    }

    select {
        case sws.ctrlStream <- wr:
            continue
        case <-sws.closec:
            return nil
    }
}
  1. Next handle WatchResponse on after Send, if Canceled == true, then delete(ids, wid), wid = 0 at this time. This operation causes the server considers the watch with WatchID = 0 to be no longer active.
case c, ok := <-sws.ctrlStream:
    if !ok {
        return
    }

if err := sws.gRPCStream.Send(c); err != nil {
    if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
        sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
    } else {
        ws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
        streamFailures.WithLabelValues("send", "watch").Inc()
    }
    return
}

// track id creation
wid := mvcc.WatchID(c.WatchId)
if c.Canceled {
    delete(ids, wid)
    continue
}
  1. Client side receives WatchResponse, then enter switch case pbresp.Created:. The new watch returns an error, the old watch with WatchID = 0 does not respond, and no longer receives messages from the server. This is not what we want, we just want this watch to fail, and we don't want to make the previous watch unresponsive on the server side.
case pbresp := <-w.respc:
    if cur == nil || pbresp.Created || pbresp.Canceled {
        cur = pbresp
    } else if cur != nil && cur.WatchId == pbresp.WatchId {
    // merge new events
    cur.Events = append(cur.Events, pbresp.Events...)
    // update "Fragment" field; last response with "Fragment" == false
    cur.Fragment = pbresp.Fragment
    }

    switch {
    case pbresp.Created:

kafuu-chino avatar Aug 09 '22 09:08 kafuu-chino

I looked at the commit history and restored creq.WatchId to -1.

kafuu-chino avatar Aug 09 '22 12:08 kafuu-chino

@ahrtr @mitake Help review it sometime.

kafuu-chino avatar Aug 22 '22 03:08 kafuu-chino

@kafuu-chino Could you squash the 2 commits into one, and change the commit title like this? *: avoid closing a watch with ID 0 incorrectly ~I think the test failure isn’t related to your change, so running tests on my local machine.~ TestKVCompact is failing because of the assert.

mitake avatar Aug 22 '22 14:08 mitake

@mitake Sure, I've squashed and rebase to the latest commit.

kafuu-chino avatar Aug 23 '22 10:08 kafuu-chino

@kafuu-chino The change looks good to me. But please rebase this PR instead of merging main into your dev branch. Firstly sync your main branch with the upstream etcd. Afterwards, execute commands something like below,

$ git checkout main
$ git pull
$ git checkout fix
$ git rebase -i main  # keep the "pick" in the first line, and change all the following lines' "pick" to "s"

ahrtr avatar Aug 25 '22 11:08 ahrtr

@ahrtr Like this?

kafuu-chino avatar Aug 25 '22 11:08 kafuu-chino

@mitake That's the idea for my local test.

  1. Watch a key.
  2. Watch a new key and enter this code https://github.com/etcd-io/etcd/blob/main/server/etcdserver/api/v3rpc/watch.go#L271. I set the --auth-token-ttl of the server and wait for the token to expire.
  3. Set new value of key in step 1, and check whether message from watch can be received normally.

I need to read the tests package code first and it's take some time to think test code.

kafuu-chino avatar Sep 09 '22 10:09 kafuu-chino

@kafuu-chino I'm writing a test case in https://github.com/etcd-io/etcd/pull/14322/commits/ff9c6612f7dff2dd730ee4be0c4ab167ba042e6f for https://github.com/etcd-io/etcd/pull/14322 For now the test case is still in progress (for some reasons the unary interceptor refreshes the token so the mechanism I'm adding isn't used), but it might also be useful for this PR too. If you want to try it as a starting point, feel free to use it.

mitake avatar Sep 09 '22 16:09 mitake

@kafuu-chino @ahrtr I finalized the test for my PR in https://github.com/etcd-io/etcd/pull/14322/commits/94fd1612949e5b4279e923efdec7b2de67e5d3bc I think it can be extended for covering the change in this PR. Let me think about it. Probably I'll be able to share my update sometime early next week.

mitake avatar Sep 11 '22 14:09 mitake

@kafuu-chino On the second thought, I think this change is essentially not limited to auth token TTL. I wrote below integration test and checked that the test can fail in the main branch and run successfully with your branch. Could you check it's a suitable test case for your change? If it's ok, could you include it in this PR? (path is tests/integration/v3_auth_test.go)

func TestV3AuthWatchErrorAndWatchId0(t *testing.T) {
	integration.BeforeTest(t)
	clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
	defer clus.Terminate(t)

	ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
	defer cancel()

	users := []user{
		{
			name:     "user1",
			password: "user1-123",
			role:     "role1",
			key:      "k1",
			end:      "k2",
		},
	}
	authSetupUsers(t, integration.ToGRPC(clus.Client(0)).Auth, users)

	authSetupRoot(t, integration.ToGRPC(clus.Client(0)).Auth)

	c, cerr := integration.NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints(), Username: "user1", Password: "user1-123"})
	if cerr != nil {
		t.Fatal(cerr)
	}
	defer c.Close()

	watchEndCh := make(chan interface{})

	go func() {
		wChan := c.Watch(ctx, "k1", clientv3.WithRev(1))
		watchResponse := <-wChan
		fmt.Printf("watch response from k1: %v\n", watchResponse)
		testutil.AssertTrue(t, len(watchResponse.Events) != 0)
		watchEndCh <- struct{}{}
	}()

	// Sleep for making sure that the above goroutine invokes Watch()
	// So the above Watch() can get watch ID = 0
	time.Sleep(1 * time.Second)

	wChan := c.Watch(ctx, "non-allowed-key", clientv3.WithRev(1))
	watchResponse := <-wChan
	testutil.AssertNotNil(t, watchResponse.Err()) // permission denied

	_, err := c.Put(ctx, "k1", "val")
	if err != nil {
		t.Fatalf("Unexpected error from Put: %v", err)
	}

	<-watchEndCh
}

mitake avatar Sep 14 '22 14:09 mitake

@mitake Ok, I'll try the test later.

kafuu-chino avatar Sep 19 '22 13:09 kafuu-chino

@mitake I think the test is fine and add it to the PR.

kafuu-chino avatar Sep 19 '22 14:09 kafuu-chino

@kafuu-chino Could you rebase this PR based on the latest main? My PR introduced a conflict in the integration test file, sorry.

mitake avatar Sep 20 '22 13:09 mitake

@mitake Rebase finished.

kafuu-chino avatar Sep 21 '22 03:09 kafuu-chino

@mitake @ahrtr modification of test has committed, please review.

kafuu-chino avatar Sep 26 '22 12:09 kafuu-chino

Codecov Report

Merging #14296 (f1d4935) into main (434c7c4) will decrease coverage by 0.10%. The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main   #14296      +/-   ##
==========================================
- Coverage   75.35%   75.24%   -0.11%     
==========================================
  Files         457      457              
  Lines       37190    37193       +3     
==========================================
- Hits        28023    27986      -37     
- Misses       7408     7442      +34     
- Partials     1759     1765       +6     
Flag Coverage Δ
all 75.24% <100.00%> (-0.11%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
client/v3/watch.go 91.81% <100.00%> (-0.59%) :arrow_down:
server/etcdserver/api/v3rpc/watch.go 86.03% <100.00%> (+0.45%) :arrow_up:
server/proxy/grpcproxy/watch.go 92.48% <100.00%> (-1.16%) :arrow_down:
server/storage/mvcc/watcher.go 100.00% <100.00%> (ø)
client/pkg/v3/tlsutil/tlsutil.go 83.33% <0.00%> (-8.34%) :arrow_down:
server/storage/mvcc/watchable_store.go 84.42% <0.00%> (-7.98%) :arrow_down:
client/v3/experimental/recipes/queue.go 58.62% <0.00%> (-6.90%) :arrow_down:
client/v3/leasing/util.go 95.00% <0.00%> (-3.34%) :arrow_down:
server/etcdserver/api/v3rpc/util.go 70.96% <0.00%> (-3.23%) :arrow_down:
server/etcdserver/api/v3rpc/member.go 93.54% <0.00%> (-3.23%) :arrow_down:
... and 16 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Sep 26 '22 13:09 codecov-commenter

Overall look good to me. Thanks!

ahrtr avatar Sep 26 '22 17:09 ahrtr

@kafuu-chino Thanks a lot for working on this! Is it possible for you to open PRs which backport this change to 3.4 and 3.5 branches?

mitake avatar Sep 27 '22 14:09 mitake

@kafuu-chino please let me know if you don’t have time to backport the change to release-3.4 and 3.5. Probably I’ll be able to help.

mitake avatar Oct 05 '22 12:10 mitake

@kafuu-chino please let me know if you don’t have time to backport the change to release-3.4 and 3.5. Probably I’ll be able to help.

Sorry, I just got back from my vacation, I'll handle it today.

kafuu-chino avatar Oct 08 '22 03:10 kafuu-chino