diff --git a/server/config.go b/server/config.go index c7429ae3882..39b15213638 100644 --- a/server/config.go +++ b/server/config.go @@ -97,6 +97,8 @@ type Config struct { // Only test can change them. nextRetryDelay time.Duration disableStrictReconfigCheck bool + + heartbeatStreamBindInterval typeutil.Duration } // NewConfig creates a new config. @@ -144,6 +146,8 @@ const ( defaultTickInterval = 500 * time.Millisecond // embed etcd has a check that `5 * tick > election` defaultElectionInterval = 3000 * time.Millisecond + + defaultHeartbeatStreamRebindInterval = time.Minute ) func adjustString(v *string, defValue string) { @@ -276,6 +280,8 @@ func (c *Config) adjust() error { c.Schedule.adjust() c.Replication.adjust() + + adjustDuration(&c.heartbeatStreamBindInterval, defaultHeartbeatStreamRebindInterval) return nil } diff --git a/server/grpc_service.go b/server/grpc_service.go index ac574d4f55b..42f3a6495a1 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -290,7 +290,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { return errors.Trace(err) } - isNew := true + var lastBind time.Time for { request, err := server.Recv() if err == io.EOF { @@ -311,9 +311,10 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { hbStreams := cluster.coordinator.hbStreams - if isNew { + if time.Since(lastBind) > s.cfg.heartbeatStreamBindInterval.Duration { + regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "bind").Inc() hbStreams.bindStream(storeID, server) - isNew = false + lastBind = time.Now() } region := core.NewRegionInfo(request.GetRegion(), request.GetLeader()) diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go new file mode 100644 index 00000000000..9071f55f3e6 --- /dev/null +++ b/server/heartbeat_stream_test.go @@ -0,0 +1,86 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/pkg/testutil" + "github.com/pingcap/pd/pkg/typeutil" +) + +var _ = Suite(&testHeartbeatStreamSuite{}) + +type testHeartbeatStreamSuite struct { + testClusterBaseSuite + region *metapb.Region +} + +func (s *testHeartbeatStreamSuite) SetUpSuite(c *C) { + s.svr, s.cleanup = newTestServer(c) + s.svr.cfg.heartbeatStreamBindInterval = typeutil.NewDuration(time.Second) + err := s.svr.Run() + c.Assert(err, IsNil) + mustWaitLeader(c, []*Server{s.svr}) + s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) + + bootstrapReq := s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0") + _, err = s.svr.bootstrapCluster(bootstrapReq) + c.Assert(err, IsNil) + s.region = bootstrapReq.Region +} + +func (s *testHeartbeatStreamSuite) TearDownSuite(c *C) { + s.cleanup() +} + +func (s *testHeartbeatStreamSuite) TestActivity(c *C) { + // Add a new store and an addPeer operator. + storeID, err := s.svr.idAlloc.Alloc() + c.Assert(err, IsNil) + putStore(c, s.grpcPDClient, s.svr.clusterID, &metapb.Store{Id: storeID, Address: "127.0.0.1:1"}) + newHandler(s.svr).AddAddPeerOperator(s.region.GetId(), storeID) + + stream1, stream2 := newRegionheartbeatClient(c, s.grpcPDClient), newRegionheartbeatClient(c, s.grpcPDClient) + checkActiveStream := func() int { + select { + case <-stream1.respCh: + return 1 + case <-stream2.respCh: + return 2 + case <-time.After(time.Second): + return 0 + } + } + + req := &pdpb.RegionHeartbeatRequest{ + Header: newRequestHeader(s.svr.clusterID), + Leader: s.region.Peers[0], + Region: s.region, + } + // Active stream is stream1. + stream1.stream.Send(req) + c.Assert(checkActiveStream(), Equals, 1) + // Rebind to stream2. + stream2.stream.Send(req) + c.Assert(checkActiveStream(), Equals, 2) + // Rebind to stream1 if no more heartbeats sent through stream2. + testutil.WaitUntil(c, func(c *C) bool { + stream1.stream.Send(req) + return checkActiveStream() == 1 + }) +}