go-control-plane icon indicating copy to clipboard operation
go-control-plane copied to clipboard

Resource not re-sent if clients unsubscribe and then resubscribe

Open menghanl opened this issue 4 years ago • 15 comments
trafficstars

The scenario:

Client is watch for resource A

req(A, "")
resp(A, v1)
req(A, v1) // this is ACK

user unsubscribe for A

req([], v1) // this removes A from the requested resources

user resubscribe for A

req(A, v1)

It's expected that the control plane will resend resource for A to the client, even though there's no update in the resource, because of the unsubscribe and resubscribe.

But go-control-plane doesn't re-send the resource.

menghanl avatar May 11 '21 20:05 menghanl

The spec doesn't talk about this scenario explicitly, but does mention the client can update the resource_names

In the Unsubscribing From Resources section

unsubscribing to a set of resources is done by sending a new request containing all resource names that are still being subscribed to but not containing the resource names being unsubscribed to

In the Resource updates section

Envoy may update the list of resource_names it presents to the management server in each DiscoveryRequest that ACK/NACKs a specific DiscoveryResponse. In addition, Envoy may later issue additional DiscoveryRequests at a given version_info to update the management server with new resource hints

menghanl avatar May 11 '21 20:05 menghanl

Expand to see code to reproduce
import (
	"context"
	"fmt"
	"net"
	"testing"

	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/types/known/wrapperspb"

	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
	v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

func testEndpoint(clusterName string, host string, port uint32) *v3endpointpb.ClusterLoadAssignment {
	return &v3endpointpb.ClusterLoadAssignment{
		ClusterName: clusterName,
		Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
			Locality: &v3corepb.Locality{SubZone: "subzone"},
			LbEndpoints: []*v3endpointpb.LbEndpoint{{
				HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
					Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
						SocketAddress: &v3corepb.SocketAddress{
							Protocol:      v3corepb.SocketAddress_TCP,
							Address:       host,
							PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: uint32(port)}},
					}},
				}},
			}},
			LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
			Priority:            0,
		}},
	}
}

func TestGCPFailure(t *testing.T) {
	// Start go-control-plane server.
	cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, nil)
	fmt.Println("Created new snapshot cache...")
	lis, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		panic(err.Error())
	}
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	xs := v3server.NewServer(ctx, cache, v3server.CallbackFuncs{})
	gs := grpc.NewServer()
	v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
	fmt.Println("Registered Aggregated Discovery Service (ADS)...")
	go gs.Serve(lis)
	defer gs.Stop()

	// Start a client and an ADS stream.
	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		panic(err.Error())
	}
	adsStream, err := v3discoverypb.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(context.Background(), grpc.WaitForReady(true))
	if err != nil {
		panic(err.Error())
	}

	const (
		nodeID         = "node-ID"
		clusterName    = "cluster-name"
		V3EndpointsURL = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
	)

	// Send the first EDS req.
	fmt.Println("sending first EDS req")
	if err := adsStream.Send(&v3discoverypb.DiscoveryRequest{
		Node:          &v3corepb.Node{Id: nodeID},
		TypeUrl:       V3EndpointsURL,
		ResourceNames: []string{clusterName},
		VersionInfo:   "",
		ResponseNonce: "",
	}); err != nil {
		panic(err.Error())
	}

	version1 := "version-1"
	endpoints1 := []types.Resource{
		testEndpoint(clusterName, "127.0.0.1", 9527),
	}
	snapshot1 := v3cache.NewSnapshot(version1, endpoints1, nil, nil, nil, nil, nil)
	if err := cache.SetSnapshot(nodeID, snapshot1); err != nil {
		panic(err.Error())
	}

	// Receive the first resp.
	resp, err := adsStream.Recv()
	if err != nil {
		panic(err.Error())
	}
	fmt.Println(resp)
	nonce1 := resp.Nonce

	// Send a request to unsubscribe the EDS resource.
	fmt.Println("unsubscribe")
	if err := adsStream.Send(&v3discoverypb.DiscoveryRequest{
		Node:          &v3corepb.Node{Id: nodeID},
		TypeUrl:       V3EndpointsURL,
		ResourceNames: []string{},
		VersionInfo:   version1,
		ResponseNonce: nonce1,
	}); err != nil {
		panic(err.Error())
	}

	// Send a request to resubscribe the EDS resource.
	fmt.Println("resubscribe")
	if err := adsStream.Send(&v3discoverypb.DiscoveryRequest{
		Node:          &v3corepb.Node{Id: nodeID},
		TypeUrl:       V3EndpointsURL,
		ResourceNames: []string{clusterName},
		VersionInfo:   version1,
		ResponseNonce: nonce1,
	}); err != nil {
		panic(err.Error())
	}

	// This recv blocks forever.
	//
	// But it's expected that the server will send the EDS resource to the
	// client again, even though there's no update to the resource. Because
	// there's unsubscribe and resubscribe.
	resp2, err := adsStream.Recv()
	if err != nil {
		panic(err.Error())
	}
	fmt.Println(resp2)
}

The last Recv() on the stream after resubscribing blocks forever.

menghanl avatar May 11 '21 20:05 menghanl

cc @markdroth @dfawley

menghanl avatar May 11 '21 20:05 menghanl

This seems like a bug to me. I assume this happens because the code right now assumes that if the requested version matches the server version then nothing needs to be done, but it sounds like we need to track subscriptions even in SotW to re-push resources for these kind of subscription changes.

@htuch can you confirm that what this issue talks about is the expected behavior?

@alecholmez We might need some kind of subscription state for SotW as well based on this issue, maybe there are options for consolidating the new delta state with sotw.

snowp avatar May 19 '21 14:05 snowp

Yes, I think this is a bug. The xDS server definitely needs to explicitly track the set of resources that the client is subscribed to.

Another relevant part of the spec is Knowing When a Requested Resource Does Not Exist:

Note that even if a requested resource does not exist at the moment when the client requests it, that resource could be created at any time. Management servers must remember the set of resources being requested by the client, and if one of those resources springs into existence later, the server must send an update to the client informing it of the new resource. Clients that initially see a resource that does not exist must be prepared for the resource to be created at any time.

The way I think of an xDS server, its job is to basically match up the states of a database containing a set of resources with the states of the individual client streams. Each client stream has an associated list of resource names that it is subscribed to, along with the corresponding version of each resource that has already been sent to the client. Whenever the client changes the set of resources it is subscribing to, new resources may need to be sent to the client. Whenever a resource in the database gets updated (where creating and removing a resource are just special cases), that change needs to be propagated to any streams that are subscribing to that resource. (In this sense, the version tracking in SotW is IMHO actually harder to implement than the state tracking for the incremental protocol, because the semantics around the resource type instance version are actually much harder to understand.)

Note that in general, the server cannot rely on the version sent by the client to decide what set of resources it needs to send; it should always pay attention to changes in the set of resources that the client is subscribing to in its requests. The only case where the server may not send a resource requested by the client is upon stream reconnection, and even then this optimization works only in a very narrow set of circumstances, as per the ACK/NACK and resource type instance version section of the spec:

Note that the version for a resource type is not a property of an individual xDS stream but rather a property of the resources themselves. If the stream becomes broken and the client creates a new stream, the client’s initial request on the new stream should indicate the most recent version seen by the client on the previous stream. Servers may decide to optimize by not resending resources that the client had already seen on the previous stream, but only if they know that the client is not subscribing to a new resource that it was not previously subscribed to. For example, it is generally safe for servers to do this optimization for wildcard LDS and CDS requests, and it is safe to do in environments where the clients will always subscribe to exactly the same set of resources.

markdroth avatar May 19 '21 18:05 markdroth

@snowp I think we can certainly abstract a pattern out of the delta logic that we could apply within SOTW. It might even be safe to say that if this is a bug in SOTW gRPC code, it's probably a bug in the REST logic too since they both follow the same protocol.

Luckily I defined the StreamState as a global server package so we could internally reference that pattern throughout the code.

alecholmez avatar May 20 '21 20:05 alecholmez

This is a big bug and I think I will go ahead and take this

alecholmez avatar Jan 06 '22 20:01 alecholmez

any update on this?

steeling avatar Aug 02 '22 20:08 steeling

@alecholmez Any plan to fix this?

tony612 avatar Oct 21 '22 08:10 tony612

@alecholmez any update on this?

bayoumymac avatar Feb 06 '23 20:02 bayoumymac

@alecholmez any update on this?

duxin40 avatar Mar 05 '24 11:03 duxin40

I know there is a PR that will address the issue. Meanwhile I made a patch for v0.12.0 that seems to solve the issue on current codebase

diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go
index ebf63f5b6..6dbcf6398 100644
--- a/pkg/cache/v3/simple.go
+++ b/pkg/cache/v3/simple.go
@@ -411,11 +411,19 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
 	if exists {
 		knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl())
 		diff := []string{}
+		subscribed := make(map[string]bool)
 		for _, r := range request.GetResourceNames() {
+			subscribed[r] = true
 			if _, ok := knownResourceNames[r]; !ok {
 				diff = append(diff, r)
 			}
 		}
+		for k := range knownResourceNames {
+			if _, ok := subscribed[k]; !ok {
+				delete(knownResourceNames, k)
+			}
+		}
+		streamState.SetKnownResourceNames(request.GetTypeUrl(), knownResourceNames)
 
 		cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID,
 			request.GetTypeUrl(), request.GetResourceNames(), knownResourceNames, diff)

kkalin68 avatar Mar 26 '24 17:03 kkalin68

@kkalin68 thanks for this!

I can confirm that your patch works. I am able to reliably recreate this issue using go grpc xds clients and applying this patch fixes it! I'm wondering if there a downside to using this fix over the fixes and changes proposed PRs attached to this issue. This seems like a much smaller and more focused fix.


For those who are curious, it's actually fairly easy to recreate this locally:

  • Create two grpc clients in one process
  • One uses ResourceA and has a very short idle timeout (I used 250ms)
  • Two uses ResourceB and has a normal timeout

Then have each client use do a grpc call every second, the first client will basically become blocked instantly and never work. It will simply always report Received error from the name resolver: produced zero addresses. The broken client will then stay in this state until an xds update happens and the watches are recreated.

carsonoid avatar Mar 27 '24 18:03 carsonoid

This PR in our fork has been implemented to address this issue. We are currently validating the impact on grpc and envoy clients. Other issues are also being addressed to properly handle grpc clients in those PRs. There is also a code fix in upstream grpc (for go) to address an improper wildcard subscription which will be released in grpc-go 1.63.0

valerian-roche avatar Mar 27 '24 22:03 valerian-roche