Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into trace_predicate_p…
Browse files Browse the repository at this point in the history
…ush_down
  • Loading branch information
Yisaer committed Dec 23, 2021
2 parents 7d631cb + b9eb9f6 commit 8fd2774
Show file tree
Hide file tree
Showing 40 changed files with 958 additions and 125 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ coverage.out
*.iml
*.swp
*.log
*.test.bin
tags
profile.coverprofile
explain_test
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ devgotest: failpoint-enable
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -check.p true > gotest.log || { $(FAILPOINT_DISABLE); grep -v '^\([[]20\|PASS:\|ok \)' 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

ut: failpoint-enable tools/bin/ut
tools/bin/ut $(X);
@$(FAILPOINT_DISABLE)

gotest: failpoint-enable
@echo "Running in native mode."
@export log_level=info; export TZ='Asia/Shanghai'; \
Expand Down Expand Up @@ -220,6 +224,10 @@ failpoint-disable: tools/bin/failpoint-ctl
# Restoring gofail failpoints...
@$(FAILPOINT_DISABLE)

tools/bin/ut: tools/check/ut.go
cd tools/check; \
$(GO) build -o ../bin/ut ut.go

tools/bin/megacheck: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/megacheck honnef.co/go/tools/cmd/megacheck
Expand Down
15 changes: 9 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ type local struct {
duplicateDetection bool
duplicateDB *pebble.DB
errorMgr *errormanager.ErrorManager
}

var bufferPool = membuf.NewPool(1024, manual.Allocator{})
bufferPool *membuf.Pool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
Expand Down Expand Up @@ -244,6 +244,8 @@ func NewLocalBackend(
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
errorMgr: errorMgr,

bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
}
local.conns = common.NewGRPCConns()
if err = local.checkMultiIngestSupport(ctx); err != nil {
Expand Down Expand Up @@ -423,6 +425,7 @@ func (local *local) Close() {
engine.unlock()
}
local.conns.Close()
local.bufferPool.Destroy()

if local.duplicateDB != nil {
// Check whether there are duplicates.
Expand Down Expand Up @@ -776,7 +779,7 @@ func (local *local) WriteToTiKV(
requests = append(requests, req)
}

bytesBuf := bufferPool.NewBuffer()
bytesBuf := local.bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
Expand Down Expand Up @@ -1664,14 +1667,14 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: bufferPool.NewBuffer(),
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore"
Expand Down Expand Up @@ -357,7 +358,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
f.wg.Add(1)
go f.ingestSSTLoop()
sorted := needSort && !partitialSort
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024)
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func newTestClient(
}
}

// ScatterRegions scatters regions in a batch.
func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
return nil
}

func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
81 changes: 63 additions & 18 deletions br/pkg/membuf/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package membuf

const bigValueSize = 1 << 16 // 64K

var allocBufLen = 1 << 20 // 1M
const (
defaultPoolSize = 1024
defaultBlockSize = 1 << 20 // 1M
defaultLargeAllocThreshold = 1 << 16 // 64K
)

// Allocator is the abstract interface for allocating and freeing memory.
type Allocator interface {
Expand All @@ -38,30 +40,71 @@ func (stdAllocator) Free(_ []byte) {}
// garbage collector which always release the memory so late. Use a fixed size chan to reuse
// can decrease the memory usage to 1/3 compare with sync.Pool.
type Pool struct {
allocator Allocator
recycleCh chan []byte
allocator Allocator
blockSize int
blockCache chan []byte
largeAllocThreshold int
}

// Option configures a pool.
type Option func(p *Pool)

// WithPoolSize configures how many blocks cached by this pool.
func WithPoolSize(size int) Option {
return func(p *Pool) {
p.blockCache = make(chan []byte, size)
}
}

// WithBlockSize configures the size of each block.
func WithBlockSize(size int) Option {
return func(p *Pool) {
p.blockSize = size
}
}

// WithAllocator specifies the allocator used by pool to allocate and free memory.
func WithAllocator(allocator Allocator) Option {
return func(p *Pool) {
p.allocator = allocator
}
}

// WithLargeAllocThreshold configures the threshold for large allocation of a Buffer.
// If allocate size is larger than this threshold, bytes will be allocated directly
// by the make built-in function and won't be tracked by the pool.
func WithLargeAllocThreshold(threshold int) Option {
return func(p *Pool) {
p.largeAllocThreshold = threshold
}
}

// NewPool creates a new pool.
func NewPool(size int, allocator Allocator) *Pool {
return &Pool{
allocator: allocator,
recycleCh: make(chan []byte, size),
func NewPool(opts ...Option) *Pool {
p := &Pool{
allocator: stdAllocator{},
blockSize: defaultBlockSize,
blockCache: make(chan []byte, defaultPoolSize),
largeAllocThreshold: defaultLargeAllocThreshold,
}
for _, opt := range opts {
opt(p)
}
return p
}

func (p *Pool) acquire() []byte {
select {
case b := <-p.recycleCh:
case b := <-p.blockCache:
return b
default:
return p.allocator.Alloc(allocBufLen)
return p.allocator.Alloc(p.blockSize)
}
}

func (p *Pool) release(b []byte) {
select {
case p.recycleCh <- b:
case p.blockCache <- b:
default:
p.allocator.Free(b)
}
Expand All @@ -72,10 +115,12 @@ func (p *Pool) NewBuffer() *Buffer {
return &Buffer{pool: p, bufs: make([][]byte, 0, 128), curBufIdx: -1}
}

var globalPool = NewPool(1024, stdAllocator{})

// NewBuffer creates a new buffer in global pool.
func NewBuffer() *Buffer { return globalPool.NewBuffer() }
func (p *Pool) Destroy() {
close(p.blockCache)
for b := range p.blockCache {
p.allocator.Free(b)
}
}

// Buffer represents the reuse buffer.
type Buffer struct {
Expand Down Expand Up @@ -123,12 +168,12 @@ func (b *Buffer) Destroy() {

// TotalSize represents the total memory size of this Buffer.
func (b *Buffer) TotalSize() int64 {
return int64(len(b.bufs) * allocBufLen)
return int64(len(b.bufs) * b.pool.blockSize)
}

// AllocBytes allocates bytes with the given length.
func (b *Buffer) AllocBytes(n int) []byte {
if n > bigValueSize {
if n > b.pool.largeAllocThreshold {
return make([]byte, n)
}
if b.curIdx+n > b.curBufLen {
Expand Down
20 changes: 14 additions & 6 deletions br/pkg/membuf/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
allocBufLen = 1024
}

type testAllocator struct {
allocs int
frees int
Expand All @@ -41,7 +37,13 @@ func (t *testAllocator) Free(_ []byte) {

func TestBufferPool(t *testing.T) {
allocator := &testAllocator{}
pool := NewPool(2, allocator)
pool := NewPool(
WithPoolSize(2),
WithAllocator(allocator),
WithBlockSize(1024),
WithLargeAllocThreshold(512),
)
defer pool.Destroy()

bytesBuf := pool.NewBuffer()
bytesBuf.AllocBytes(256)
Expand All @@ -53,6 +55,10 @@ func TestBufferPool(t *testing.T) {
bytesBuf.AllocBytes(767)
require.Equal(t, 2, allocator.allocs)

largeBytes := bytesBuf.AllocBytes(513)
require.Equal(t, 513, len(largeBytes))
require.Equal(t, 2, allocator.allocs)

require.Equal(t, 0, allocator.frees)
bytesBuf.Destroy()
require.Equal(t, 0, allocator.frees)
Expand All @@ -67,7 +73,9 @@ func TestBufferPool(t *testing.T) {
}

func TestBufferIsolation(t *testing.T) {
bytesBuf := NewBuffer()
pool := NewPool(WithBlockSize(1024))
defer pool.Destroy()
bytesBuf := pool.NewBuffer()
defer bytesBuf.Destroy()

b1 := bytesBuf.AllocBytes(16)
Expand Down
59 changes: 50 additions & 9 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// Constants for split retry machinery.
Expand Down Expand Up @@ -112,6 +114,7 @@ SplitRegions:
regionMap[region.Region.GetId()] = region
}
for regionID, keys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
var newRegions []*RegionInfo
region := regionMap[regionID]
log.Info("split regions",
Expand Down Expand Up @@ -142,6 +145,7 @@ SplitRegions:
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
Expand Down Expand Up @@ -294,8 +298,6 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
var errs error
for _, region := range newRegionSet {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
err := rs.client.ScatterRegion(ctx, region)
if err == nil {
// it is safe accroding to the Go language spec.
Expand Down Expand Up @@ -328,15 +330,54 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe

}

// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error.
func isUnsupportedError(err error) bool {
s, ok := status.FromError(errors.Cause(err))
if !ok {
// Not a gRPC error. Something other went wrong.
return false
}
// In two conditions, we fallback to ScatterRegion:
// (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.)
// (2) If the Message is "region 0 not found":
// In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering.
// When the request contains the field `regionIDs`, it would use the batch version,
// Otherwise, it uses the old version and scatter the region with `regionID` in the request.
// When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs`
// which would be ignored by protocol buffers, and leave the `regionID` be zero.
// Then the older version of PD would try to search the region with ID 0.
// (Then it consistently fails, and returns "region 0 not found".)
return s.Code() == codes.Unimplemented ||
strings.Contains(s.Message(), "region 0 not found")
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
}

err := utils.WithRetry(ctx, func() error {
err := rs.client.ScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
return nil
}
return err
// the retry is for the temporary network errors during sending request.
}, &exponentialBackoffer{attempt: 3, baseBackoff: 500 * time.Millisecond})

if err != nil {
log.Warn("failed to batch scatter region", logutil.ShortError(err))
}
}

func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
Expand Down
Loading

0 comments on commit 8fd2774

Please sign in to comment.