Skip to content

Commit

Permalink
resolve the conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jun 27, 2022
1 parent 4ca09db commit 3a2bdd4
Showing 1 changed file with 1 addition and 323 deletions.
324 changes: 1 addition & 323 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,15 @@ import (
"google.golang.org/grpc/status"
)

<<<<<<< HEAD
const slowThreshold = 5 * time.Millisecond
=======
const (
heartbeatSendTimeout = 5 * time.Second
// store config
storeReadyWaitTime = 5 * time.Second

// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
slowThreshold = 5 * time.Millisecond
)
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
)

=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
Expand All @@ -79,28 +61,6 @@ type GrpcServer struct {
*Server
}

type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error)

func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHeader, fn forwardFn) (rsp interface{}, err error) {
failpoint.Inject("customTimeout", func() {
time.Sleep(5 * time.Second)
})
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return fn(ctx, client)
}
if err := s.validateRequest(header); err != nil {
return nil, err
}
return nil, nil
}

>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
// GetMembers implements gRPC PDServer.
func (s *Server) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
if s.IsClosed() {
Expand Down Expand Up @@ -507,51 +467,6 @@ func (s *Server) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHeartbea
}, nil
}

<<<<<<< HEAD
const regionHeartbeatSendTimeout = 5 * time.Second

var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout")
=======
// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartbeatServer struct {
stream pdpb.PD_ReportBucketsServer
closed int32
}

func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error {
if atomic.LoadInt32(&b.closed) == 1 {
return status.Errorf(codes.Canceled, "stream is closed")
}
done := make(chan error, 1)
go func() {
done <- b.stream.SendAndClose(bucket)
}()
select {
case err := <-done:
if err != nil {
atomic.StoreInt32(&b.closed, 1)
}
return err
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&b.closed, 1)
return ErrSendHeartbeatTimeout
}
}

func (b *bucketHeartbeatServer) Recv() (*pdpb.ReportBucketsRequest, error) {
if atomic.LoadInt32(&b.closed) == 1 {
return nil, io.EOF
}
req, err := b.stream.Recv()
if err != nil {
atomic.StoreInt32(&b.closed, 1)
return nil, errors.WithStack(err)
}
return req, nil
}
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

// heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error
// occurs on Send() or Recv(), both endpoints will be closed.
type heartbeatServer struct {
Expand All @@ -567,8 +482,6 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
go func() { done <- s.stream.Send(m) }()
select {
case err := <-done:
<<<<<<< HEAD
=======
if err != nil {
atomic.StoreInt32(&s.closed, 1)
}
Expand All @@ -591,103 +504,6 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
return req, nil
}

// ReportBuckets implements gRPC PDServer
func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
var (
server = &bucketHeartbeatServer{stream: stream}
forwardStream pdpb.PD_ReportBucketsClient
cancel context.CancelFunc
lastForwardedHost string
errCh chan error
)
defer func() {
if cancel != nil {
cancel()
}
}()
for {
request, err := server.Recv()
if err == io.EOF {
return nil
}
forwardedHost := getForwardedHost(stream.Context())
if !s.isLocalRequest(forwardedHost) {
if forwardStream == nil || lastForwardedHost != forwardedHost {
if cancel != nil {
cancel()
}
client, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return err
}
log.Info("create bucket report forward stream", zap.String("forwarded-host", forwardedHost))
forwardStream, cancel, err = s.createReportBucketsForwardStream(client)
if err != nil {
return err
}
lastForwardedHost = forwardedHost
errCh = make(chan error, 1)
go forwardReportBucketClientToServer(forwardStream, server, errCh)
}
if err := forwardStream.Send(request); err != nil {
return errors.WithStack(err)
}

select {
case err := <-errCh:
return err
default:
}
continue
}
rc := s.GetRaftCluster()
if rc == nil {
resp := &pdpb.ReportBucketsResponse{
Header: s.notBootstrappedHeader(),
}
err := server.Send(resp)
return errors.WithStack(err)
}
if err := s.validateRequest(request.GetHeader()); err != nil {
return err
}
buckets := request.GetBuckets()
if buckets == nil || len(buckets.Keys) == 0 {
continue
}
store := rc.GetLeaderStoreByRegionID(buckets.GetRegionId())
if store == nil {
return errors.Errorf("the store of the bucket in region %v is not found ", buckets.GetRegionId())
}
storeLabel := strconv.FormatUint(store.GetID(), 10)
storeAddress := store.GetAddress()
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc()

start := time.Now()
err = rc.HandleReportBuckets(buckets)
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))
if err != nil {
atomic.StoreInt32(&s.closed, 1)
}
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
return errors.WithStack(errSendRegionHeartbeatTimeout)
}
}

func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
req, err := s.stream.Recv()
if err != nil {
atomic.StoreInt32(&s.closed, 1)
return nil, errors.WithStack(err)
}
return req, nil
}

// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
var (
Expand Down Expand Up @@ -1709,141 +1525,3 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}
<-done
}
<<<<<<< HEAD
=======

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
if err != nil {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}, err
}
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: "failed to execute StoreGlobalConfig transaction"}}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
r, err := s.client.Get(ctx, globalConfigPath+name)
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
for {
select {
case <-ctx.Done():
return nil
case res := <-watchChan:
cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}

// Evict the leaders when the store is damaged. Damaged regions are emergency errors
// and requires user to manually remove the `evict-leader-scheduler` with pd-ctl
func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) error {
// TODO: regions have no special process for the time being
// and need to be removed in the future
damagedRegions := stats.GetDamagedRegionsId()
if len(damagedRegions) == 0 {
return nil
}

log.Error("store damaged and leaders will be evicted, you might fix the store and remove evict-leader-scheduler manually",
zap.Uint64("store-id", stats.GetStoreId()),
zap.Uint64s("region-ids", damagedRegions))

// TODO: reimplement add scheduler logic to avoid repeating the introduction HTTP requests inside `server/api`.
return s.GetHandler().AddEvictOrGrant(float64(stats.GetStoreId()), schedulers.EvictLeaderName)
}

// ReportMinResolvedTS implements gRPC PDServer.
func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.ReportMinResolvedTsRequest) (*pdpb.ReportMinResolvedTsResponse, error) {
forwardedHost := getForwardedHost(ctx)
if !s.isLocalRequest(forwardedHost) {
client, err := s.getDelegateClient(ctx, forwardedHost)
if err != nil {
return nil, err
}
ctx = grpcutil.ResetForwardContext(ctx)
return pdpb.NewPDClient(client).ReportMinResolvedTS(ctx, request)
}

if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil
}

storeID := request.StoreId
minResolvedTS := request.MinResolvedTs
if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil {
return nil, err
}
log.Debug("updated min resolved-ts",
zap.Uint64("store", storeID),
zap.Uint64("min resolved-ts", minResolvedTS))
return &pdpb.ReportMinResolvedTsResponse{
Header: s.header(),
}, nil
}
>>>>>>> e3fd147fd (server: partially fix grpc status (#4798))

0 comments on commit 3a2bdd4

Please sign in to comment.