Skip to content

Commit

Permalink
server: add split support.
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Mar 16, 2016
1 parent 531d231 commit 90c9071
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 3 deletions.
96 changes: 96 additions & 0 deletions server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func (c *raftCluster) handleJob(job *pd_jobpd.Job) error {
switch req.AdminRequest.GetCmdType() {
case raft_cmdpb.AdminCommandType_ChangePeer:
return c.handleChangePeer(job)
case raft_cmdpb.AdminCommandType_Split:
return c.handleSplit(job)
default:
log.Errorf("invalid job command %v, ignore", req)
return nil
Expand Down Expand Up @@ -363,6 +365,100 @@ func (c *raftCluster) handleChangePeer(job *pd_jobpd.Job) error {
return nil
}

func (c *raftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) error {
newRegionID, err := c.s.idAlloc.Alloc()
if err != nil {
return errors.Trace(err)
}

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.s.idAlloc.Alloc(); err != nil {
return errors.Trace(err)
}
}

split := &raft_cmdpb.AdminRequest{
CmdType: raft_cmdpb.AdminCommandType_Split.Enum(),
Split: &raft_cmdpb.SplitRequest{
NewRegionId: proto.Uint64(newRegionID),
NewPeerIds: peerIDs,
SplitKey: request.SplitKey,
},
}

req := &raft_cmdpb.RaftCommandRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: request.Region.RegionId,
Peer: request.Leader,
},
AdminRequest: split,
}

return c.postJob(req)
}

func (c *raftCluster) handleSplit(job *pd_jobpd.Job) error {
response, err := c.sendRaftCommand(job.Request)
if err != nil {
return errors.Trace(err)
}

if response.Header != nil && response.Header.Error != nil {
log.Errorf("handle %v but failed with response %v, cancel it", job.Request, response.Header.Error)
return nil
}

// Must be split response here
// TODO: check this error later.
left := response.AdminResponse.Split.Left
right := response.AdminResponse.Split.Right

// Update region
leftSearchPath := makeRegionSearchKey(c.clusterRoot, left.GetEndKey())
rightSearchPath := makeRegionSearchKey(c.clusterRoot, right.GetEndKey())

leftValue, err := proto.Marshal(left)
if err != nil {
return errors.Trace(err)
}

rightValue, err := proto.Marshal(right)
if err != nil {
return errors.Trace(err)
}

var ops []clientv3.Op

leftPath := makeRegionKey(c.clusterRoot, left.GetRegionId())
rightPath := makeRegionKey(c.clusterRoot, right.GetRegionId())

ops = append(ops, clientv3.OpPut(leftPath, encodeRegionSearchKey(left.GetEndKey())))
ops = append(ops, clientv3.OpPut(rightPath, encodeRegionSearchKey(right.GetEndKey())))
ops = append(ops, clientv3.OpPut(leftSearchPath, string(leftValue)))
ops = append(ops, clientv3.OpPut(rightSearchPath, string(rightValue)))

var cmps []clientv3.Cmp
cmps = append(cmps, c.s.leaderCmp())
// new left search path must not exist
cmps = append(cmps, clientv3.Compare(clientv3.CreatedRevision(leftSearchPath), "=", 0))
// new right search path must exist, because it is the same as origin split path.
cmps = append(cmps, clientv3.Compare(clientv3.CreatedRevision(rightSearchPath), ">", 0))
cmps = append(cmps, clientv3.Compare(clientv3.CreatedRevision(rightPath), "=", 0))

resp, err := c.s.client.Txn(context.TODO()).
If(cmps...).
Then(ops...).
Commit()
if err != nil {
return errors.Trace(err)
} else if !resp.Succeeded {
return errors.New("update split region failed")
}

return nil
}

func (c *raftCluster) sendRaftCommand(request *raft_cmdpb.RaftCommandRequest) (*raft_cmdpb.RaftCommandResponse, error) {
nodeID := request.Header.Peer.GetNodeId()

Expand Down
110 changes: 107 additions & 3 deletions server/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,62 @@ func (s *mockRaftStore) handleChangePeer(c *C, req *raft_cmdpb.RaftCommandReques
}

func (s *mockRaftStore) handleSplit(c *C, req *raft_cmdpb.RaftCommandRequest) *raft_cmdpb.RaftCommandResponse {
// TODO later
return newErrorCmdResponse(errors.Errorf("unsupported request %v", req))
split := req.AdminRequest.Split
raftPeer := s.peers[req.Header.GetRegionId()]
splitKey := split.SplitKey
newRegionID := split.GetNewRegionId()
newPeerIDs := split.GetNewPeerIds()

region := raftPeer.region

c.Assert(newPeerIDs, HasLen, len(region.Peers))

c.Assert(string(splitKey), Greater, string(region.GetStartKey()))
if len(region.GetEndKey()) > 0 {
c.Assert(string(splitKey), Less, string(region.GetEndKey()))
}

newRegion := &metapb.Region{
RegionId: proto.Uint64(newRegionID),
Peers: make([]*metapb.Peer, len(newPeerIDs)),
StartKey: splitKey,
EndKey: append([]byte(nil), region.GetEndKey()...),
}

var newPeer metapb.Peer

maxPeerID := uint64(0)
for i, id := range newPeerIDs {
peer := *region.Peers[i]
if peer.GetStoreId() == s.storeIdent.GetStoreId() {
newPeer = peer
}

peer.PeerId = proto.Uint64(id)
if id > maxPeerID {
maxPeerID = id
}
newRegion.Peers[i] = &peer
}

newRegion.MaxPeerId = proto.Uint64(maxPeerID)
region.EndKey = append([]byte(nil), splitKey...)

raftPeer.region = region
s.peers[newRegionID] = &mockRaftPeer{
peer: newPeer,
region: *newRegion,
}

resp := &raft_cmdpb.RaftCommandResponse{
AdminResponse: &raft_cmdpb.AdminResponse{
Split: &raft_cmdpb.SplitResponse{
Left: &region,
Right: newRegion,
},
},
}
return resp
}

func (s *testClusterWorkerSuite) SetUpSuite(c *C) {
Expand Down Expand Up @@ -377,7 +431,7 @@ func (s *testClusterWorkerSuite) checkRegionPeerNumber(c *C, regionKey []byte, e
return region
}

func (s *testClusterWorkerSuite) TestBaseChangePeer(c *C) {
func (s *testClusterWorkerSuite) TestChangePeer(c *C) {
cluster, err := s.svr.getCluster(s.clusterID)
c.Assert(err, IsNil)

Expand All @@ -391,6 +445,7 @@ func (s *testClusterWorkerSuite) TestBaseChangePeer(c *C) {

c.Assert(region.Peers, HasLen, 1)

// Now we treat the first peer in region as leader.
leaderPeer := *region.Peers[0]
leaderPd := mustGetLeader(c, s.client, s.getRootPath())

Expand Down Expand Up @@ -442,3 +497,52 @@ func (s *testClusterWorkerSuite) TestBaseChangePeer(c *C) {

s.checkRegionPeerNumber(c, regionKey, 3)
}

func (s *testClusterWorkerSuite) TestSplit(c *C) {
cluster, err := s.svr.getCluster(s.clusterID)
c.Assert(err, IsNil)

regionKey := []byte("a")
region, err := cluster.GetRegion(regionKey)
c.Assert(err, IsNil)
c.Assert(region.GetStartKey(), BytesEquals, []byte(""))
c.Assert(region.GetEndKey(), BytesEquals, []byte(""))

// Now we treat the first peer in region as leader.
leaderPeer := *region.Peers[0]
leaderPd := mustGetLeader(c, s.client, s.getRootPath())

conn, err := net.Dial("tcp", leaderPd.GetAddr())
c.Assert(err, IsNil)
defer conn.Close()

askSplit := &pdpb.Request{
Header: newRequestHeader(s.clusterID),
CmdType: pdpb.CommandType_AskSplit.Enum(),
AskSplit: &pdpb.AskSplitRequest{
Region: region,
Leader: &leaderPeer,
SplitKey: []byte("b"),
},
}

sendRequest(c, conn, 0, askSplit)
_, resp := recvResponse(c, conn)
c.Assert(resp.GetCmdType(), Equals, pdpb.CommandType_AskSplit)

time.Sleep(500 * time.Millisecond)
left, err := cluster.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(left.GetStartKey(), BytesEquals, []byte(""))
c.Assert(left.GetEndKey(), BytesEquals, []byte("b"))
c.Assert(left.GetRegionId(), Equals, region.GetRegionId())

right, err := cluster.GetRegion([]byte("b"))
c.Assert(err, IsNil)
c.Assert(right.GetStartKey(), BytesEquals, []byte("b"))
c.Assert(right.GetEndKey(), BytesEquals, []byte(""))

region, err = cluster.GetRegion([]byte("c"))
c.Assert(err, IsNil)
c.Assert(region.GetRegionId(), Equals, right.GetRegionId())
}
26 changes: 26 additions & 0 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,29 @@ func (c *conn) handleAskChangePeer(req *pdpb.Request) (*pdpb.Response, error) {
AskChangePeer: &pdpb.AskChangePeerResponse{},
}, nil
}

func (c *conn) handleAskSplit(req *pdpb.Request) (*pdpb.Response, error) {
request := req.GetAskSplit()
if request == nil {
return nil, errors.Errorf("invalid ask split command, but %v", req)
} else if request.Region == nil {
return nil, errors.Errorf("missing region for split")
} else if request.Leader == nil {
return nil, errors.Errorf("missing leader for split")
} else if request.SplitKey == nil {
return nil, errors.Errorf("missing split key for split")
}

cluster, err := c.getCluster(req)
if err != nil {
return nil, errors.Trace(err)
}

if err = cluster.HandleAskSplit(request); err != nil {
return nil, errors.Trace(err)
}

return &pdpb.Response{
AskSplit: &pdpb.AskSplitResponse{},
}, nil
}
2 changes: 2 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (c *conn) handleRequest(req *pdpb.Request) (*pdpb.Response, error) {
return c.handlePutMeta(req)
case pdpb.CommandType_AskChangePeer:
return c.handleAskChangePeer(req)
case pdpb.CommandType_AskSplit:
return c.handleAskSplit(req)
default:
return nil, errors.Errorf("unsupported command %s", req)
}
Expand Down

0 comments on commit 90c9071

Please sign in to comment.