diff --git a/internal/wrr/random.go b/internal/wrr/random.go index a43652dcb735..ccf5113e9f32 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -18,6 +18,7 @@ package wrr import ( + "fmt" "sync" "google.golang.org/grpc/internal/grpcrand" @@ -29,6 +30,10 @@ type weightedItem struct { Weight int64 } +func (w *weightedItem) String() string { + return fmt.Sprint(*w) +} + // randomWRR is a struct that contains weighted items implement weighted random algorithm. type randomWRR struct { mu sync.RWMutex @@ -68,3 +73,7 @@ func (rw *randomWRR) Add(item interface{}, weight int64) { rw.items = append(rw.items, rItem) rw.sumOfWeights += weight } + +func (rw *randomWRR) String() string { + return fmt.Sprint(rw.items) +} diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index d78122ca5b9d..4007a420f484 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -179,7 +179,10 @@ type Route struct { CaseInsensitive bool Headers []*HeaderMatcher Fraction *uint32 - Action map[string]uint32 // action is weighted clusters. + + // If the matchers above indicate a match, the below configuration is used. + Action map[string]uint32 // action is weighted clusters. + MaxStreamDuration time.Duration } // HeaderMatcher represents header matchers. diff --git a/xds/internal/client/client_rds_test.go b/xds/internal/client/client_rds_test.go index 5e9ee7758657..ab4737376e54 100644 --- a/xds/internal/client/client_rds_test.go +++ b/xds/internal/client/client_rds_test.go @@ -20,6 +20,7 @@ package client import ( "testing" + "time" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" v2routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" @@ -33,6 +34,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/types/known/durationpb" ) func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { @@ -290,6 +292,96 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { }, }, }, + { + name: "good-route-config-with-max-stream-duration", + rc: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(time.Second)}, + }, + }, + }, + }, + }, + }, + }, + wantUpdate: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}}, + }, + }, + }, + }, + { + name: "good-route-config-with-grpc-timeout-header-max", + rc: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{GrpcTimeoutHeaderMax: durationpb.New(time.Second)}, + }, + }, + }, + }, + }, + }, + }, + wantUpdate: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}}, + }, + }, + }, + }, + { + name: "good-route-config-with-both-timeouts", + rc: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(2 * time.Second), GrpcTimeoutHeaderMax: durationpb.New(0)}, + }, + }, + }, + }, + }, + }, + }, + wantUpdate: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 66376571546f..f31b6009b6e1 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -322,7 +322,8 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) } clusters := make(map[string]uint32) - switch a := r.GetRoute().GetClusterSpecifier().(type) { + action := r.GetRoute() + switch a := action.GetClusterSpecifier().(type) { case *v3routepb.RouteAction_Cluster: clusters[a.Cluster] = 1 case *v3routepb.RouteAction_WeightedClusters: @@ -341,6 +342,13 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) } route.Action = clusters + msd := action.GetMaxStreamDuration() + // Prefer grpc_timeout_header_max, if set. + if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil { + route.MaxStreamDuration = dur.AsDuration() + } else { + route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration() + } routesRet = append(routesRet, &route) } return routesRet, nil diff --git a/xds/internal/env/env.go b/xds/internal/env/env.go index c0fa0e65b7a3..c4b46bae171b 100644 --- a/xds/internal/env/env.go +++ b/xds/internal/env/env.go @@ -29,6 +29,7 @@ const ( bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP" xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" + timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" ) var ( @@ -44,4 +45,8 @@ var ( // enabled, which can be done by setting the environment variable // "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true". CircuitBreakingSupport = strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "true") + // TimeoutSupport indicates whether support for max_stream_duration in + // route actions is enabled. This can be enabled by setting the + // environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "true". + TimeoutSupport = strings.EqualFold(os.Getenv(timeoutSupportEnv), "true") ) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 50514303e894..13d3f2a095fc 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -22,12 +22,14 @@ import ( "encoding/json" "fmt" "sync/atomic" + "time" "google.golang.org/grpc/codes" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clustermanager" + "google.golang.org/grpc/xds/internal/env" ) const ( @@ -93,12 +95,13 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) { } type route struct { - action wrr.WRR - m *compositeMatcher // converted from route matchers + m *compositeMatcher // converted from route matchers + clusters wrr.WRR + maxStreamDuration time.Duration } func (r route) String() string { - return r.m.String() + "->" + fmt.Sprint(r.action) + return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration) } type configSelector struct { @@ -110,18 +113,18 @@ type configSelector struct { var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { - var action wrr.WRR + var rt *route // Loop through routes in order and select first match. - for _, rt := range cs.routes { - if rt.m.match(rpcInfo) { - action = rt.action + for _, r := range cs.routes { + if r.m.match(rpcInfo) { + rt = &r break } } - if action == nil { + if rt == nil || rt.clusters == nil { return nil, errNoMatchedRouteFound } - cluster, ok := action.Next().(string) + cluster, ok := rt.clusters.Next().(string) if !ok { return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } @@ -129,7 +132,8 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP // it is committed. ref := &cs.clusters[cluster].refCount atomic.AddInt32(ref, 1) - return &iresolver.RPCConfig{ + + config := &iresolver.RPCConfig{ // Communicate to the LB policy the chosen cluster. Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster), OnCommitted: func() { @@ -144,7 +148,13 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP } } }, - }, nil + } + + if env.TimeoutSupport && rt.maxStreamDuration != 0 { + config.MethodConfig.Timeout = &rt.maxStreamDuration + } + + return config, nil } // incRefs increments refs of all clusters referenced by this config selector. @@ -196,9 +206,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } for i, rt := range su.Routes { - action := newWRR() + clusters := newWRR() for cluster, weight := range rt.Action { - action.Add(cluster, int64(weight)) + clusters.Add(cluster, int64(weight)) // Initialize entries in cs.clusters map, creating entries in // r.activeClusters as necessary. Set to zero as they will be @@ -210,14 +220,16 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } cs.clusters[cluster] = ci } - cs.routes[i].action = action + cs.routes[i].clusters = clusters var err error cs.routes[i].m, err = routeToMatcher(rt) if err != nil { return nil, err } + cs.routes[i].maxStreamDuration = rt.MaxStreamDuration } + return cs, nil } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index f3bdc57c0ba0..a3c0f2866353 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -41,6 +41,7 @@ import ( "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" + "google.golang.org/grpc/xds/internal/env" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) @@ -496,6 +497,103 @@ func (s) TestXDSResolverWRR(t *testing.T) { } } +func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { + defer func(old bool) { env.TimeoutSupport = old }(env.TimeoutSupport) + xdsC := fakeclient.NewClient() + xdsR, tcc, cancel := testSetup(t, setupOpts{ + xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, + }) + defer func() { + cancel() + xdsR.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + + defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) + newWRR = xdstestutils.NewTestWRR + + // Invoke the watchAPI callback with a good service update and wait for the + // UpdateState method to be called on the ClientConn. + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*client.Route{{ + Prefix: newStringP("/foo"), + Action: map[string]uint32{"A": 1}, + MaxStreamDuration: 5 * time.Second, + }, { + Prefix: newStringP("/bar"), + Action: map[string]uint32{"B": 1}, + MaxStreamDuration: time.Duration(0), + }, { + Prefix: newStringP(""), + Action: map[string]uint32{"C": 1}, + }}, + }, + }, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("ClientConn.UpdateState returned error: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + testCases := []struct { + method string + timeoutSupport bool + want *time.Duration + }{{ + method: "/foo/method", + timeoutSupport: true, + want: func() *time.Duration { x := 5 * time.Second; return &x }(), + }, { + method: "/foo/method", + timeoutSupport: false, + want: nil, + }, { + method: "/bar/method", + timeoutSupport: true, + want: nil, + }, { + method: "/baz/method", + timeoutSupport: true, + want: nil, + }} + + for _, tc := range testCases { + env.TimeoutSupport = tc.timeoutSupport + req := iresolver.RPCInfo{ + Method: tc.method, + Context: context.Background(), + } + res, err := cs.SelectConfig(req) + if err != nil { + t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err) + continue + } + res.OnCommitted() + got := res.MethodConfig.Timeout + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want) + } + } +} + // TestXDSResolverDelayedOnCommitted tests that clusters remain in service // config if RPCs are in flight. func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { diff --git a/xds/internal/testutils/wrr.go b/xds/internal/testutils/wrr.go index a4df5fc6050e..6c9486329d41 100644 --- a/xds/internal/testutils/wrr.go +++ b/xds/internal/testutils/wrr.go @@ -19,6 +19,7 @@ package testutils import ( + "fmt" "sync" "google.golang.org/grpc/internal/wrr" @@ -66,3 +67,7 @@ func (twrr *testWRR) Next() interface{} { twrr.mu.Unlock() return iww.item } + +func (twrr *testWRR) String() string { + return fmt.Sprint(twrr.itemsWithWeight) +}