Skip to content

Commit

Permalink
resolve the conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jun 22, 2022
1 parent 4ba4ee4 commit c60d162
Showing 1 changed file with 0 additions and 67 deletions.
67 changes: 0 additions & 67 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ import (
)

const (
heartbeatSendTimeout = 5 * time.Second
// store config
storeReadyWaitTime = 5 * time.Second

// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
Expand All @@ -62,12 +58,6 @@ const (
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
)

=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
Expand All @@ -78,28 +68,6 @@ type GrpcServer struct {
*Server
}

type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error)

func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHeader, fn forwardFn) (rsp interface{}, err error) {
failpoint.Inject("customTimeout", func() {
time.Sleep(5 * time.Second)
})
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return fn(ctx, client)
}
if err := s.validateRequest(header); err != nil {
return nil, err
}
return nil, nil
}

>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
Expand Down Expand Up @@ -666,38 +634,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
return resp, nil
}

<<<<<<< HEAD
const regionHeartbeatSendTimeout = 5 * time.Second
=======
// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartbeatServer struct {
stream pdpb.PD_ReportBucketsServer
closed int32
}

func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error {
if atomic.LoadInt32(&b.closed) == 1 {
return status.Errorf(codes.Canceled, "stream is closed")
}
done := make(chan error, 1)
go func() {
done <- b.stream.SendAndClose(bucket)
}()
select {
case err := <-done:
if err != nil {
atomic.StoreInt32(&b.closed, 1)
}
return err
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&b.closed, 1)
return ErrSendHeartbeatTimeout
}
}
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout")

// heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error
// occurs on Send() or Recv(), both endpoints will be closed.
Expand All @@ -720,11 +657,7 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
<<<<<<< HEAD
return errors.WithStack(errSendRegionHeartbeatTimeout)
=======
return ErrSendHeartbeatTimeout
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
}
}

Expand Down

0 comments on commit c60d162

Please sign in to comment.