Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/janardhanvissa/grpc-go in…
Browse files Browse the repository at this point in the history
…to stubserver-streamingcall
  • Loading branch information
janardhankrishna-sai committed Jan 8, 2025
2 parents 4298e86 + 724f450 commit 96cd53a
Show file tree
Hide file tree
Showing 48 changed files with 1,972 additions and 381 deletions.
8 changes: 1 addition & 7 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand All @@ -48,11 +46,7 @@ import (
var PickFirstConfig string

func init() {
name := pickfirst.Name
if !envconfig.NewPickFirstEnabled {
name = pickfirstleaf.Name
}
PickFirstConfig = fmt.Sprintf("[{%q: {}}]", name)
PickFirstConfig = fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name)
}

// ChildState is the balancer state of a child along with the endpoint which
Expand Down
39 changes: 21 additions & 18 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s) TestGRPCLB_Basic(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()

Expand Down Expand Up @@ -517,7 +517,7 @@ func (s) TestGRPCLB_Weighted(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()

Expand Down Expand Up @@ -597,7 +597,7 @@ func (s) TestGRPCLB_DropRequest(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -769,7 +769,7 @@ func (s) TestGRPCLB_BalancerDisconnects(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -940,7 +940,7 @@ func (s) TestGRPCLB_ExplicitFallback(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -1008,11 +1008,12 @@ func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
cc.Connect()
testC := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -1102,10 +1103,11 @@ func (s) TestGRPCLB_PickFirst(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend: %v", err)
}
cc.Connect()
defer cc.Close()

// Push a service config with grpclb as the load balancing policy and
Expand Down Expand Up @@ -1198,7 +1200,7 @@ func (s) TestGRPCLB_BackendConnectionErrorPropagation(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to create new client to the backend %v", err)
t.Fatalf("Failed to create a client for the backend: %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -1241,10 +1243,11 @@ func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
cc.Connect()
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)

Expand Down Expand Up @@ -1311,15 +1314,16 @@ func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
// Push the backend address to the remote balancer.
tss.ls.sls <- sl

cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName,
grpc.WithResolvers(r),
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
grpc.WithUserAgent(testUserAgent))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
cc.Connect()
testC := testgrpc.NewTestServiceClient(cc)

// Push a resolver update with grpclb configuration which does not contain the
Expand Down Expand Up @@ -1418,15 +1422,14 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,
tss.ls.statsDura = 100 * time.Millisecond
creds := serverNameCheckCreds{}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds),
grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
cc.Connect()
defer cc.Close()

rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
Expand Down
27 changes: 24 additions & 3 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,18 @@ func init() {
balancer.Register(pickfirstBuilder{})
}

// enableHealthListenerKeyType is a unique key type used in resolver attributes
// to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}
type (
// enableHealthListenerKeyType is a unique key type used in resolver
// attributes to indicate whether the health listener usage is enabled.
enableHealthListenerKeyType struct{}
// managedByPickfirstKeyType is an attribute key type to inform Outlier
// Detection that the generic health listener is being used.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when
// implementing the dualstack design. This is a hack. Once Dualstack is
// completed, outlier detection will stop sending ejection updates through
// the connectivity listener.
managedByPickfirstKeyType struct{}
)

var (
logger = grpclog.Component("pick-first-leaf-lb")
Expand Down Expand Up @@ -140,6 +149,17 @@ func EnableHealthListener(state resolver.State) resolver.State {
return state
}

// IsManagedByPickfirst returns whether an address belongs to a SubConn
// managed by the pickfirst LB policy.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable
// outlier_detection via the with connectivity listener when using pick_first.
// Once Dualstack changes are complete, all SubConns will be created by
// pick_first and outlier detection will only use the health listener for
// ejection. This hack can then be removed.
func IsManagedByPickfirst(addr resolver.Address) bool {
return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Expand All @@ -166,6 +186,7 @@ type scData struct {
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true)
sd := &scData{
rawConnectivityState: connectivity.Idle,
effectiveState: connectivity.Idle,
Expand Down
Loading

0 comments on commit 96cd53a

Please sign in to comment.