Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#4798
Browse files Browse the repository at this point in the history
ref tikv#4797

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx authored and ti-chi-bot committed Jun 22, 2022
1 parent 0204bfa commit 4ba4ee4
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 13 deletions.
94 changes: 81 additions & 13 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,61 @@ import (
"google.golang.org/grpc/status"
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}
const (
heartbeatSendTimeout = 5 * time.Second
// store config
storeReadyWaitTime = 5 * time.Second

// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
)

=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}

type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error)

func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHeader, fn forwardFn) (rsp interface{}, err error) {
failpoint.Inject("customTimeout", func() {
time.Sleep(5 * time.Second)
})
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return fn(ctx, client)
}
if err := s.validateRequest(header); err != nil {
return nil, err
}
return nil, nil
}

>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
Expand Down Expand Up @@ -99,11 +141,6 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb
}, nil
}

const (
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
)

// Tso implements gRPC PDServer.
func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
var (
Expand Down Expand Up @@ -629,7 +666,36 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
return resp, nil
}

<<<<<<< HEAD
const regionHeartbeatSendTimeout = 5 * time.Second
=======
// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartbeatServer struct {
stream pdpb.PD_ReportBucketsServer
closed int32
}

func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error {
if atomic.LoadInt32(&b.closed) == 1 {
return status.Errorf(codes.Canceled, "stream is closed")
}
done := make(chan error, 1)
go func() {
done <- b.stream.SendAndClose(bucket)
}()
select {
case err := <-done:
if err != nil {
atomic.StoreInt32(&b.closed, 1)
}
return err
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&b.closed, 1)
return ErrSendHeartbeatTimeout
}
}
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout")

Expand All @@ -654,7 +720,11 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
<<<<<<< HEAD
return errors.WithStack(errSendRegionHeartbeatTimeout)
=======
return ErrSendHeartbeatTimeout
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
}
}

Expand Down Expand Up @@ -1418,7 +1488,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
// TODO: Call it in gRPC interceptor.
func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() || !s.member.IsLeader() {
return errors.WithStack(ErrNotLeader)
return ErrNotLeader
}
if header.GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId())
Expand Down Expand Up @@ -1603,7 +1673,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL
// the gRPC communication between PD servers internally.
func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
return ErrNotStarted
}
// If onlyAllowLeader is true, check whether the sender is PD leader.
if onlyAllowLeader {
Expand Down Expand Up @@ -1701,8 +1771,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
<-done
}

const globalConfigPath = "/global/config/"

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
Expand Down
21 changes: 21 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
syncer "github.com/tikv/pd/server/region_syncer"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/tests"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -408,6 +410,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) {
c.Assert(resp.GetMembers(), Not(HasLen), 0)
}

func (s *clusterTestSuite) TestNotLeader(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
c.Assert(tc.RunInitialServers(), IsNil)

tc.WaitLeader()
followerServer := tc.GetServer(tc.GetFollower())
grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr())
clusterID := followerServer.GetClusterID()
req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)}
resp, err := grpcPDClient.AllocID(context.Background(), req)
c.Assert(resp, IsNil)
grpcStatus, ok := status.FromError(err)
c.Assert(ok, IsTrue)
c.Assert(grpcStatus.Code(), Equals, codes.Unavailable)
c.Assert(grpcStatus.Message(), Equals, "not leader")
}

func (s *clusterTestSuite) TestStoreVersionChange(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
Expand Down

0 comments on commit 4ba4ee4

Please sign in to comment.