From 6a318bb011c6613d6f1c98eb0b1b28edfe6b7c0d Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 8 Jan 2021 10:14:53 -0800 Subject: [PATCH] xds: add HTTP connection manager max_stream_duration support (#4122) --- xds/internal/client/client.go | 13 ++++- xds/internal/client/client_lds_test.go | 9 ++- xds/internal/client/client_rds_test.go | 10 +++- xds/internal/client/client_xds.go | 20 +++++-- xds/internal/resolver/serviceconfig.go | 10 +++- xds/internal/resolver/watch_service.go | 35 +++++++++-- xds/internal/resolver/watch_service_test.go | 65 +++++++++++++++++++-- xds/internal/resolver/xds_resolver_test.go | 51 +++++++++------- 8 files changed, 166 insertions(+), 47 deletions(-) diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 4007a420f484..10c45415a574 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -147,6 +147,10 @@ type ListenerUpdate struct { RouteConfigName string // SecurityCfg contains security configuration sent by the control plane. SecurityCfg *SecurityConfig + // MaxStreamDuration contains the HTTP connection manager's + // common_http_protocol_options.max_stream_duration field, or zero if + // unset. + MaxStreamDuration time.Duration } func (lu *ListenerUpdate) String() string { @@ -181,8 +185,13 @@ type Route struct { Fraction *uint32 // If the matchers above indicate a match, the below configuration is used. - Action map[string]uint32 // action is weighted clusters. - MaxStreamDuration time.Duration + Action map[string]uint32 // action is weighted clusters. + // If MaxStreamDuration is nil, it indicates neither of the route action's + // max_stream_duration fields (grpc_timeout_header_max nor + // max_stream_duration) were set. In this case, the ListenerUpdate's + // MaxStreamDuration field should be used. If MaxStreamDuration is set to + // an explicit zero duration, the application's deadline should be used. + MaxStreamDuration *time.Duration } // HeaderMatcher represents header matchers. diff --git a/xds/internal/client/client_lds_test.go b/xds/internal/client/client_lds_test.go index 09dd2eea4d84..5172def9a82f 100644 --- a/xds/internal/client/client_lds_test.go +++ b/xds/internal/client/client_lds_test.go @@ -21,6 +21,7 @@ package client import ( "strings" "testing" + "time" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -37,6 +38,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/types/known/durationpb" ) func (s) TestUnmarshalListener_ClientSide(t *testing.T) { @@ -87,6 +89,9 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { RouteConfigName: v3RouteConfigName, }, }, + CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ + MaxStreamDuration: durationpb.New(time.Second), + }, } mcm, _ := ptypes.MarshalAny(cm) lis := &v3listenerpb.Listener{ @@ -278,7 +283,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { name: "v3 listener resource", resources: []*anypb.Any{v3Lis}, wantUpdate: map[string]ListenerUpdate{ - v3LDSTarget: {RouteConfigName: v3RouteConfigName}, + v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, }, }, { @@ -286,7 +291,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { resources: []*anypb.Any{v2Lis, v3Lis}, wantUpdate: map[string]ListenerUpdate{ v2LDSTarget: {RouteConfigName: v2RouteConfigName}, - v3LDSTarget: {RouteConfigName: v3RouteConfigName}, + v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, }, }, } diff --git a/xds/internal/client/client_rds_test.go b/xds/internal/client/client_rds_test.go index 481030fee211..c91444d4bfb6 100644 --- a/xds/internal/client/client_rds_test.go +++ b/xds/internal/client/client_rds_test.go @@ -317,7 +317,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}}, }, }, }, @@ -347,7 +347,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}}, }, }, }, @@ -377,7 +377,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(0)}}, }, }, }, @@ -803,3 +803,7 @@ func newUInt32P(i uint32) *uint32 { func newBoolP(b bool) *bool { return &b } + +func newDurationP(d time.Duration) *time.Duration { + return &d +} diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 68f65c082412..e2a008200a6a 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -80,6 +80,8 @@ func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) { // processClientSideListener checks if the provided Listener proto meets // the expected criteria. If so, it returns a non-empty routeConfigName. func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) { + update := &ListenerUpdate{} + apiLisAny := lis.GetApiListener().GetApiListener() if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) { return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl()) @@ -98,7 +100,7 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err if name == "" { return nil, fmt.Errorf("xds: empty route_config_name in LDS response: %+v", lis) } - return &ListenerUpdate{RouteConfigName: name}, nil + update.RouteConfigName = name case *v3httppb.HttpConnectionManager_RouteConfig: // TODO: Add support for specifying the RouteConfiguration inline // in the LDS response. @@ -108,6 +110,10 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err default: return nil, fmt.Errorf("xds: unsupported type %T for RouteSpecifier in received LDS response", apiLis.RouteSpecifier) } + + update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration() + + return update, nil } func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) { @@ -346,12 +352,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) } route.Action = clusters + msd := action.GetMaxStreamDuration() // Prefer grpc_timeout_header_max, if set. - if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil { - route.MaxStreamDuration = dur.AsDuration() - } else { - route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration() + dur := msd.GetGrpcTimeoutHeaderMax() + if dur == nil { + dur = msd.GetMaxStreamDuration() + } + if dur != nil { + d := dur.AsDuration() + route.MaxStreamDuration = &d } routesRet = append(routesRet, &route) } diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 13d3f2a095fc..95c3c4221609 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -201,11 +201,11 @@ var newWRR = wrr.NewRandom func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { cs := &configSelector{ r: r, - routes: make([]route, len(su.Routes)), + routes: make([]route, len(su.routes)), clusters: make(map[string]*clusterInfo), } - for i, rt := range su.Routes { + for i, rt := range su.routes { clusters := newWRR() for cluster, weight := range rt.Action { clusters.Add(cluster, int64(weight)) @@ -227,7 +227,11 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro if err != nil { return nil, err } - cs.routes[i].maxStreamDuration = rt.MaxStreamDuration + if rt.MaxStreamDuration == nil { + cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration + } else { + cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration + } } return cs, nil diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 01aad899e8be..79b83e95aa3c 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -22,17 +22,28 @@ import ( "fmt" "strings" "sync" + "time" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" ) -// serviceUpdate contains information received from the RDS responses which is -// of interested to the xds resolver. The RDS request is built by first making a -// LDS to get the RouteConfig name. +// serviceUpdate contains information received from the LDS/RDS responses which +// are of interest to the xds resolver. The RDS request is built by first +// making a LDS to get the RouteConfig name. type serviceUpdate struct { - // Routes contain matchers+actions to route RPCs. - Routes []*xdsclient.Route + // routes contain matchers+actions to route RPCs. + routes []*xdsclient.Route + // ldsConfig contains configuration that applies to all routes. + ldsConfig ldsConfig +} + +// ldsConfig contains information received from the LDS responses which are of +// interest to the xds resolver. +type ldsConfig struct { + // maxStreamDuration is from the HTTP connection manager's + // common_http_protocol_options field. + maxStreamDuration time.Duration } // watchService uses LDS and RDS to discover information about the provided @@ -61,6 +72,7 @@ type serviceUpdateWatcher struct { serviceName string ldsCancel func() serviceCb func(serviceUpdate, error) + lastUpdate serviceUpdate mu sync.Mutex closed bool @@ -84,6 +96,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er w.rdsCancel() w.rdsName = "" w.rdsCancel = nil + w.lastUpdate = serviceUpdate{} } // The other error cases still return early without canceling the // existing RDS watch. @@ -91,9 +104,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er return } + oldLDSConfig := w.lastUpdate.ldsConfig + w.lastUpdate.ldsConfig = ldsConfig{maxStreamDuration: update.MaxStreamDuration} + if w.rdsName == update.RouteConfigName { // If the new RouteConfigName is same as the previous, don't cancel and // restart the RDS watch. + if w.lastUpdate.ldsConfig != oldLDSConfig { + // The route name didn't change but the LDS data did; send it now. + // If the route name did change, then we will wait until the first + // RDS update before reporting this LDS config. + w.serviceCb(w.lastUpdate, nil) + } return } w.rdsName = update.RouteConfigName @@ -127,7 +149,8 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, return } - w.serviceCb(serviceUpdate{Routes: matchVh.Routes}, nil) + w.lastUpdate.routes = matchVh.Routes + w.serviceCb(w.lastUpdate, nil) } func (w *serviceUpdateWatcher) close() { diff --git a/xds/internal/resolver/watch_service_test.go b/xds/internal/resolver/watch_service_test.go index 2291d485b8b2..55a20372c018 100644 --- a/xds/internal/resolver/watch_service_test.go +++ b/xds/internal/resolver/watch_service_test.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -138,7 +139,7 @@ func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantU return fmt.Errorf("timeout when waiting for service update: %v", err) } gotUpdate := u.(serviceUpdateErr) - if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) { + if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{})) { return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty())) } return nil @@ -165,7 +166,7 @@ func (s) TestServiceWatch(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { @@ -179,7 +180,7 @@ func (s) TestServiceWatch(t *testing.T) { } wantUpdate2 := serviceUpdate{ - Routes: []*xdsclient.Route{{ + routes: []*xdsclient.Route{{ Path: newStringP(""), Action: map[string]uint32{cluster: 1}, }}, @@ -219,7 +220,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { @@ -240,7 +241,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2") // RDS update for the new name. - wantUpdate2 := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}} + wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { @@ -254,6 +255,58 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { } } +// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS +// response, the second LDS response includes a new MaxStreamDuration. It also +// verifies this is reported in subsequent RDS updates. +func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) { + serviceUpdateCh := testutils.NewChannel() + xdsC := fakeclient.NewClient() + cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { + serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) + }, nil) + defer cancelWatch() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + + wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, ldsConfig: ldsConfig{maxStreamDuration: time.Second}} + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + }, + }, + }, nil) + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Another LDS update with the same RDS_name but different MaxStreamDuration (zero in this case). + wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { + t.Fatal(err) + } + + // RDS update. + wantUpdate3 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}} + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}, + }, + }, + }, nil) + if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate3); err != nil { + t.Fatal(err) + } +} + // TestServiceNotCancelRDSOnSameLDSUpdate covers the case that if the second LDS // update contains the same RDS name as the previous, the RDS watch isn't // canceled and restarted. @@ -271,7 +324,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index a3c0f2866353..13f17e513df0 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -511,7 +511,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) @@ -526,11 +526,11 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { Routes: []*client.Route{{ Prefix: newStringP("/foo"), Action: map[string]uint32{"A": 1}, - MaxStreamDuration: 5 * time.Second, + MaxStreamDuration: newDurationP(5 * time.Second), }, { Prefix: newStringP("/bar"), Action: map[string]uint32{"B": 1}, - MaxStreamDuration: time.Duration(0), + MaxStreamDuration: newDurationP(0), }, { Prefix: newStringP(""), Action: map[string]uint32{"C": 1}, @@ -554,43 +554,50 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { } testCases := []struct { + name string method string timeoutSupport bool want *time.Duration }{{ + name: "RDS setting", method: "/foo/method", timeoutSupport: true, - want: func() *time.Duration { x := 5 * time.Second; return &x }(), + want: newDurationP(5 * time.Second), }, { + name: "timeout support disabled", method: "/foo/method", timeoutSupport: false, want: nil, }, { + name: "explicit zero in RDS; ignore LDS", method: "/bar/method", timeoutSupport: true, want: nil, }, { + name: "no config in RDS; fallback to LDS", method: "/baz/method", timeoutSupport: true, - want: nil, + want: newDurationP(time.Second), }} for _, tc := range testCases { - env.TimeoutSupport = tc.timeoutSupport - req := iresolver.RPCInfo{ - Method: tc.method, - Context: context.Background(), - } - res, err := cs.SelectConfig(req) - if err != nil { - t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err) - continue - } - res.OnCommitted() - got := res.MethodConfig.Timeout - if !reflect.DeepEqual(got, tc.want) { - t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want) - } + t.Run(tc.name, func(t *testing.T) { + env.TimeoutSupport = tc.timeoutSupport + req := iresolver.RPCInfo{ + Method: tc.method, + Context: context.Background(), + } + res, err := cs.SelectConfig(req) + if err != nil { + t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err) + return + } + res.OnCommitted() + got := res.MethodConfig.Timeout + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want) + } + }) } } @@ -856,3 +863,7 @@ func replaceRandNumGenerator(start int64) func() { grpcrandInt63n = grpcrand.Int63n } } + +func newDurationP(d time.Duration) *time.Duration { + return &d +}