Skip to content

Commit

Permalink
region-syncer: use history buffer to record changed regions (tikv#1293)
Browse files Browse the repository at this point in the history
* region-syncer: use history buffer to record changed regions
  • Loading branch information
nolouch authored and rleungx committed Nov 12, 2018
1 parent db2b11b commit 1e7b911
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 4 deletions.
5 changes: 5 additions & 0 deletions server/core/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (kv *KV) SetRegionKV(regionKV *RegionKV) *KV {
return kv
}

// GetRegionKV gets the region storage.
func (kv *KV) GetRegionKV() *RegionKV {
return kv.regionKV
}

// SwitchToRegionStorage switches to the region storage.
func (kv *KV) SwitchToRegionStorage() {
atomic.StoreInt32(&kv.useRegionKV, 1)
Expand Down
6 changes: 5 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -114,7 +115,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
break
}
for _, r := range resp.GetRegions() {
s.server.GetStorage().SaveRegion(r)
err = s.server.GetStorage().SaveRegion(r)
if err != nil {
s.history.record(core.NewRegionInfo(r, nil))
}
}
}
}
Expand Down
110 changes: 110 additions & 0 deletions server/region_syncer/history_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package syncer

import (
"strconv"

"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
)

const (
historyKey = "historyIndex"
defaultFlushCount = 100
)

type historyBuffer struct {
index uint64
records []*core.RegionInfo
head int
tail int
size int
kv core.KVBase
flushCount int
}

func newHistoryBuffer(size int, kv core.KVBase) *historyBuffer {
// use an empty space to simplify operation
size++
if size < 2 {
size = 2
}
records := make([]*core.RegionInfo, size)
h := &historyBuffer{
records: records,
size: size,
kv: kv,
flushCount: defaultFlushCount,
}
h.reload()
return h
}

func (h *historyBuffer) len() int {
if h.tail < h.head {
return h.tail + h.size - h.head
}
return h.tail - h.head
}

func (h *historyBuffer) nextIndex() uint64 {
return h.index
}

func (h *historyBuffer) firstIndex() uint64 {
return h.index - uint64(h.len())
}

func (h *historyBuffer) record(r *core.RegionInfo) {
h.records[h.tail] = r
h.tail = (h.tail + 1) % h.size
if h.tail == h.head {
h.head = (h.head + 1) % h.size
}
h.index++
h.flushCount--
if h.flushCount <= 0 {
h.persist()
h.flushCount = defaultFlushCount
}
}

func (h *historyBuffer) get(index uint64) *core.RegionInfo {
if index < h.nextIndex() && index >= h.firstIndex() {
pos := (h.head + int(index-h.firstIndex())) % h.size
return h.records[pos]
}
return nil
}

func (h *historyBuffer) reload() {
v, err := h.kv.Load(historyKey)
if err != nil {
log.Warnf("load history index failed: %s", err)
}
if v != "" {
h.index, err = strconv.ParseUint(v, 10, 64)
if err != nil {
log.Fatalf("load history index failed: %s", err)
}
}
}

func (h *historyBuffer) persist() {
err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10))
if err != nil {
log.Warnf("persist history index (%d) failed: %v", h.nextIndex(), err)
}
}
85 changes: 85 additions & 0 deletions server/region_syncer/history_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package syncer

import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testHistoryBuffer{})

type testHistoryBuffer struct{}

func Test(t *testing.T) {
TestingT(t)
}

func (t *testHistoryBuffer) TestBufferSize(c *C) {
var regions []*core.RegionInfo
for i := 0; i <= 100; i++ {
regions = append(regions, core.NewRegionInfo(&metapb.Region{Id: uint64(i)}, nil))
}

// size equals 1
h := newHistoryBuffer(1, core.NewMemoryKV())
c.Assert(h.len(), Equals, 0)
for _, r := range regions {
h.record(r)
}
c.Assert(h.len(), Equals, 1)
c.Assert(h.get(100), Equals, regions[h.nextIndex()-1])
c.Assert(h.get(99), IsNil)

// size equals 2
h = newHistoryBuffer(2, core.NewMemoryKV())
for _, r := range regions {
h.record(r)
}
c.Assert(h.len(), Equals, 2)
c.Assert(h.get(100), Equals, regions[h.nextIndex()-1])
c.Assert(h.get(99), Equals, regions[h.nextIndex()-2])
c.Assert(h.get(98), IsNil)

// size eqauls 100
kv := core.NewMemoryKV()
h1 := newHistoryBuffer(100, kv)
for i := 0; i < 6; i++ {
h1.record(regions[i])
}
c.Assert(h1.len(), Equals, 6)
c.Assert(h1.nextIndex(), Equals, uint64(6))
h1.persist()

// restart the buffer
h2 := newHistoryBuffer(100, kv)
c.Assert(h2.nextIndex(), Equals, uint64(6))
c.Assert(h2.get(h.nextIndex()-1), IsNil)
c.Assert(h2.len(), Equals, 0)
for _, r := range regions {
index := h2.nextIndex()
h2.record(r)
c.Assert(h2.get(uint64(index)), Equals, r)
}

c.Assert(h2.nextIndex(), Equals, uint64(107))
c.Assert(h2.get(h2.nextIndex()), IsNil)
s, err := h2.kv.Load(historyKey)
c.Assert(err, IsNil)
// flush in index 106
c.Assert(s, Equals, "106")
}
11 changes: 8 additions & 3 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
)

const (
msgSize = 8 * 1024 * 1024
maxSyncRegionBatchSize = 100
syncerKeepAliveInterval = 10 * time.Second
msgSize = 8 * 1024 * 1024
maxSyncRegionBatchSize = 100
syncerKeepAliveInterval = 10 * time.Second
defaultHistoryBufferSize = 10000
)

// ClientStream is the client side of the region syncer.
Expand Down Expand Up @@ -59,6 +60,7 @@ type RegionSyncer struct {
server Server
closed chan struct{}
wg sync.WaitGroup
history *historyBuffer
}

// NewRegionSyncer returns a region syncer.
Expand All @@ -71,6 +73,7 @@ func NewRegionSyncer(s Server) *RegionSyncer {
streams: make(map[string]ServerStream),
server: s,
closed: make(chan struct{}),
history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionKV()),
}
}

Expand All @@ -86,10 +89,12 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
return
case first := <-regionNotifier:
requests = append(requests, first.GetMeta())
s.history.record(first)
pending := len(regionNotifier)
for i := 0; i < pending && i < maxSyncRegionBatchSize; i++ {
region := <-regionNotifier
requests = append(requests, region.GetMeta())
s.history.record(region)
}
regions := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
Expand Down

0 comments on commit 1e7b911

Please sign in to comment.