Skip to content

Commit

Permalink
Handle Hanyang's feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 27, 2023
1 parent fd2433b commit 19c05ba
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
5 changes: 3 additions & 2 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string
if err != nil {
return nil, err
}
existing, loaded := clientConns.LoadOrStore(addr, cc)
conn, loaded := clientConns.LoadOrStore(addr, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
}
cc.Close()
cc = conn.(*grpc.ClientConn)
log.Debug("use existing connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return existing.(*grpc.ClientConn), nil
return cc, nil
}
27 changes: 16 additions & 11 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,9 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}

streamCtx := stream.Context()
forwardedHost := grpcutil.GetForwardedHost(streamCtx)
if s.IsAPIServiceMode() || !s.isLocalRequest(forwardedHost) {
if s.IsAPIServiceMode() {
var ok bool
forwardedHost, ok = s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || len(forwardedHost) == 0 {
return ErrNotFoundTSOAddr
}
}

if forwardedHost, err := s.getForwardedHost(ctx, stream.Context()); err != nil {
return err
} else if len(forwardedHost) > 0 {
clientConn, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -261,6 +253,19 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}
}

func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context) (forwardedHost string, err error) {
if s.IsAPIServiceMode() {
var ok bool
forwardedHost, ok = s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || len(forwardedHost) == 0 {
return "", ErrNotFoundTSOAddr
}
} else if fh := grpcutil.GetForwardedHost(streamCtx); !s.isLocalRequest(fh) {
forwardedHost = fh
}
return forwardedHost, nil
}

// Bootstrap implements gRPC PDServer.
func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error) {
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
Expand Down

0 comments on commit 19c05ba

Please sign in to comment.