Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into disable-label-check
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Jun 10, 2019
2 parents 820094d + ebf6cf2 commit 36ec3b1
Show file tree
Hide file tree
Showing 31 changed files with 1,178 additions and 1,099 deletions.
5 changes: 3 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/pd/pkg/testutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/mock"
"google.golang.org/grpc"
)

Expand All @@ -39,7 +40,7 @@ func TestClient(t *testing.T) {
var _ = Suite(&testClientSuite{})

type idAllocator struct {
allocator *core.MockIDAllocator
allocator *mock.IDAllocator
}

func (i *idAllocator) alloc() uint64 {
Expand All @@ -48,7 +49,7 @@ func (i *idAllocator) alloc() uint64 {
}

var (
regionIDAllocator = &idAllocator{allocator: &core.MockIDAllocator{}}
regionIDAllocator = &idAllocator{allocator: &mock.IDAllocator{}}
// Note: IDs below are entirely arbitrary. They are only for checking
// whether GetRegion/GetStore works.
// If we alloc ID in client in the future, these IDs must be updated.
Expand Down
8 changes: 3 additions & 5 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/statistics"
"go.uber.org/zap"
)

type clusterInfo struct {
sync.RWMutex
core *schedule.BasicCluster

core *core.BasicCluster
id core.IDAllocator
kv *core.KV
meta *metapb.Cluster
Expand All @@ -48,7 +46,7 @@ var defaultChangedRegionsLimit = 10000

func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo {
return &clusterInfo{
core: schedule.NewBasicCluster(),
core: core.NewBasicCluster(),
id: id,
opt: opt,
kv: kv,
Expand Down Expand Up @@ -680,7 +678,7 @@ func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.Regi
return c.regionStats.getRegionStatsByType(typ)
}

func (c *clusterInfo) GetOpt() schedule.NamespaceOptions {
func (c *clusterInfo) GetOpt() namespace.ScheduleOptions {
return c.opt
}

Expand Down
9 changes: 5 additions & 4 deletions server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/mock"
)

var _ = Suite(&testStoresInfoSuite{})
Expand Down Expand Up @@ -301,7 +302,7 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) {
func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))
cluster := newClusterInfo(mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))

n, np := uint64(3), uint64(3)
stores := newTestStores(n)
Expand Down Expand Up @@ -347,7 +348,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))
cluster := newClusterInfo(mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))

n, np := uint64(3), uint64(3)

Expand Down Expand Up @@ -562,7 +563,7 @@ func heartbeatRegions(c *C, cluster *clusterInfo, regions []*metapb.Region) {
func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newClusterInfo(core.NewMockIDAllocator(), opt, nil)
cluster := newClusterInfo(mock.NewIDAllocator(), opt, nil)

// 1: [nil, nil)
region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
Expand Down Expand Up @@ -601,7 +602,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newClusterInfo(core.NewMockIDAllocator(), opt, nil)
cluster := newClusterInfo(mock.NewIDAllocator(), opt, nil)

regions := []*metapb.Region{
{
Expand Down
3 changes: 2 additions & 1 deletion server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/mock"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -591,7 +592,7 @@ type testGetStoresSuite struct {
func (s *testGetStoresSuite) SetUpSuite(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
s.cluster = newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))
s.cluster = newClusterInfo(mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))

stores := newTestStores(200)

Expand Down
56 changes: 14 additions & 42 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/testutil"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/mock"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedulers"
"github.com/pkg/errors"
)

func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator {
Expand All @@ -52,7 +52,7 @@ type testClusterInfo struct {

func newTestClusterInfo(opt *scheduleOption) *testClusterInfo {
return &testClusterInfo{clusterInfo: newClusterInfo(
core.NewMockIDAllocator(),
mock.NewIDAllocator(),
opt,
core.NewKV(core.NewMemoryKV()),
)}
Expand Down Expand Up @@ -171,34 +171,6 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
c.Assert(oc.GetOperator(1).RegionID(), Equals, op2.RegionID())
}

type mockHeartbeatStream struct {
ch chan *pdpb.RegionHeartbeatResponse
}

func (s *mockHeartbeatStream) Send(m *pdpb.RegionHeartbeatResponse) error {
select {
case <-time.After(time.Second):
return errors.New("timeout")
case s.ch <- m:
}
return nil
}

func (s *mockHeartbeatStream) Recv() *pdpb.RegionHeartbeatResponse {
select {
case <-time.After(time.Millisecond * 10):
return nil
case res := <-s.ch:
return res
}
}

func newMockHeartbeatStream() *mockHeartbeatStream {
return &mockHeartbeatStream{
ch: make(chan *pdpb.RegionHeartbeatResponse),
}
}

func (s *testCoordinatorSuite) TestDispatch(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -233,7 +205,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
testutil.CheckTransferLeader(c, co.opController.GetOperator(2), schedule.OpBalance, 4, 2)
c.Assert(co.removeScheduler("balance-leader-scheduler"), IsNil)

stream := newMockHeartbeatStream()
stream := mock.NewHeartbeatStream()

// Transfer peer.
region := tc.GetRegion(1).Clone()
Expand All @@ -255,7 +227,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
waitNoResponse(c, stream)
}

func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) error {
func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream mock.HeartbeatStream) error {
co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream)
if err := co.cluster.putRegion(region.Clone()); err != nil {
return err
Expand Down Expand Up @@ -365,7 +337,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) {
c.Assert(tc.addRegionStore(3, 3), IsNil)
c.Assert(tc.addRegionStore(4, 4), IsNil)

stream := newMockHeartbeatStream()
stream := mock.NewHeartbeatStream()

// Add peer to store 1.
c.Assert(tc.addLeaderRegion(1, 2, 3), IsNil)
Expand Down Expand Up @@ -430,7 +402,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil)

stream := newMockHeartbeatStream()
stream := mock.NewHeartbeatStream()

// Wait for schedule.
waitOperator(c, co, 1)
Expand Down Expand Up @@ -578,7 +550,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
c.Assert(co.removeScheduler("label-scheduler"), IsNil)
c.Assert(co.schedulers, HasLen, 0)

stream := newMockHeartbeatStream()
stream := mock.NewHeartbeatStream()

// Add stores 1,2,3
c.Assert(tc.addLeaderStore(1, 1), IsNil)
Expand Down Expand Up @@ -731,7 +703,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) {
// Add 1 replica on store 2.
co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
co.run()
stream := newMockHeartbeatStream()
stream := mock.NewHeartbeatStream()
c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil)
region = waitAddLearner(c, stream, region, 2)
c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil)
Expand Down Expand Up @@ -764,7 +736,7 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := schedule.NewMockHeartbeatStreams(tc.clusterInfo.getClusterID())
hbStreams := mock.NewHeartbeatStreams(tc.clusterInfo.getClusterID())

oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams)
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(0))
Expand Down Expand Up @@ -909,7 +881,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
}
}

func waitAddLearner(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
func waitAddLearner(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
if res = stream.Recv(); res != nil {
Expand All @@ -925,7 +897,7 @@ func waitAddLearner(c *C, stream *mockHeartbeatStream, region *core.RegionInfo,
)
}

func waitPromoteLearner(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
func waitPromoteLearner(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
if res = stream.Recv(); res != nil {
Expand All @@ -942,7 +914,7 @@ func waitPromoteLearner(c *C, stream *mockHeartbeatStream, region *core.RegionIn
)
}

func waitRemovePeer(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
func waitRemovePeer(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
if res = stream.Recv(); res != nil {
Expand All @@ -958,7 +930,7 @@ func waitRemovePeer(c *C, stream *mockHeartbeatStream, region *core.RegionInfo,
)
}

func waitTransferLeader(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
func waitTransferLeader(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
if res = stream.Recv(); res != nil {
Expand All @@ -971,7 +943,7 @@ func waitTransferLeader(c *C, stream *mockHeartbeatStream, region *core.RegionIn
)
}

func waitNoResponse(c *C, stream *mockHeartbeatStream) {
func waitNoResponse(c *C, stream mock.HeartbeatStream) {
testutil.WaitUntil(c, func(c *C) bool {
res := stream.Recv()
return res == nil
Expand Down
Loading

0 comments on commit 36ec3b1

Please sign in to comment.