Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add store write limiter #35193

Merged
merged 10 commits into from
Jun 21, 2022
68 changes: 46 additions & 22 deletions br/pkg/lightning/backend/local/local.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ type local struct {
errorMgr *errormanager.ErrorManager
importClientFactory ImportClientFactory

bufferPool *membuf.Pool
metrics *metric.Metrics
bufferPool *membuf.Pool
metrics *metric.Metrics
writeLimiter StoreWriteLimiter
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -308,6 +309,12 @@ func NewLocalBackend(
if duplicateDetection {
keyAdapter = dupDetectKeyAdapter{}
}
var writeLimiter StoreWriteLimiter
if cfg.TikvImporter.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(int(cfg.TikvImporter.StoreWriteBWLimit))
} else {
writeLimiter = noopStoreWriteLimiter{}
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -334,6 +341,7 @@ func NewLocalBackend(
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
writeLimiter: writeLimiter,
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
Expand Down Expand Up @@ -784,6 +792,7 @@ func (local *local) WriteToTiKV(

leaderID := region.Leader.GetId()
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
storeIDs := make([]uint64, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
Expand Down Expand Up @@ -812,50 +821,65 @@ func (local *local) WriteToTiKV(
}
clients = append(clients, wstream)
requests = append(requests, req)
storeIDs = append(storeIDs, peer.StoreId)
}

bytesBuf := local.bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
size := int64(0)
totalSize := int64(0)
totalCount := int64(0)
firstLoop := true
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= int64(config.SplitRegionSize) {
regionMaxSize = regionSplitSize * 4 / 3
}
// Set a lower flush limit to make the speed of write more smooth.
flushLimit := int64(local.writeLimiter.Limit() / 10)

flushKVs := func() error {
for i := range clients {
if err := local.writeLimiter.WaitN(ctx, storeIDs[i], int(size)); err != nil {
return errors.Trace(err)
}
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return errors.Trace(err)
}
}
return nil
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
kvSize := int64(len(iter.Key()) + len(iter.Value()))
// here we reuse the `*sst.Pair`s to optimize object allocation
if firstLoop {
if count < len(pairs) {
pairs[count].Key = bytesBuf.AddBytes(iter.Key())
pairs[count].Value = bytesBuf.AddBytes(iter.Value())
} else {
pair := &sst.Pair{
Key: bytesBuf.AddBytes(iter.Key()),
Value: bytesBuf.AddBytes(iter.Value()),
}
pairs = append(pairs, pair)
} else {
pairs[count].Key = bytesBuf.AddBytes(iter.Key())
pairs[count].Value = bytesBuf.AddBytes(iter.Value())
}
count++
totalCount++
size += kvSize
totalSize += kvSize

if count >= local.batchWriteKVPairs {
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, Range{}, stats, errors.Trace(err)
}
if count >= local.batchWriteKVPairs || size >= flushLimit {
if err := flushKVs(); err != nil {
return nil, Range{}, stats, err
}
count = 0
size = 0
bytesBuf.Reset()
firstLoop = false
}
if size >= regionMaxSize || totalCount >= regionSplitKeys {
if totalSize >= regionMaxSize || totalCount >= regionSplitKeys {
break
}
}
Expand All @@ -865,12 +889,12 @@ func (local *local) WriteToTiKV(
}

if count > 0 {
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, Range{}, stats, errors.Trace(err)
}
if err := flushKVs(); err != nil {
return nil, Range{}, stats, err
}
count = 0
size = 0
bytesBuf.Reset()
}

var leaderPeerMetas []*sst.SSTMeta
Expand Down Expand Up @@ -913,7 +937,7 @@ func (local *local) WriteToTiKV(
logutil.Region(region.Region), logutil.Leader(region.Leader))
}
stats.count = totalCount
stats.totalBytes = size
stats.totalBytes = totalSize

return leaderPeerMetas, finishedRange, stats, nil
}
Expand Down
74 changes: 74 additions & 0 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"database/sql"
"math"
"regexp"
"runtime"
"sort"
Expand All @@ -40,6 +41,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -592,3 +594,75 @@ func intersectRange(region *metapb.Region, rg Range) Range {

return Range{start: startKey, end: endKey}
}

type StoreWriteLimiter interface {
WaitN(ctx context.Context, storeID uint64, n int) error
Limit() int
}

type storeWriteLimiter struct {
rwm sync.RWMutex
limiters map[uint64]*rate.Limiter
limit int
burst int
}

func newStoreWriteLimiter(limit int) *storeWriteLimiter {
var burst int
// Allow burst of at most 20% of the limit.
if limit <= math.MaxInt-limit/5 {
burst = limit + limit/5
} else {
// If overflowed, set burst to math.MaxInt.
burst = math.MaxInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe adjust them during config.adjust, and add a log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal implementation. burst is not configurable.

}
return &storeWriteLimiter{
limiters: make(map[uint64]*rate.Limiter),
limit: limit,
burst: burst,
}
}

func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error {
limiter := s.getLimiter(storeID)
// The original WaitN doesn't allow n > burst,
// so we call WaitN with burst multiple times.
for n > limiter.Burst() {
if err := limiter.WaitN(ctx, limiter.Burst()); err != nil {
return err
}
n -= limiter.Burst()
}
return limiter.WaitN(ctx, n)
}

func (s *storeWriteLimiter) Limit() int {
return s.limit
}

func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter {
s.rwm.RLock()
limiter, ok := s.limiters[storeID]
s.rwm.RUnlock()
if ok {
return limiter
}
s.rwm.Lock()
defer s.rwm.Unlock()
limiter, ok = s.limiters[storeID]
if !ok {
limiter = rate.NewLimiter(rate.Limit(s.limit), s.burst)
s.limiters[storeID] = limiter
}
return limiter
}

type noopStoreWriteLimiter struct{}

func (noopStoreWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error {
return nil
}

func (noopStoreWriteLimiter) Limit() int {
return math.MaxInt
}
43 changes: 43 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,3 +770,46 @@ func TestNeedSplit(t *testing.T) {
}
}
}

func TestStoreWriteLimiter(t *testing.T) {
// Test create store write limiter with limit math.MaxInt.
limiter := newStoreWriteLimiter(math.MaxInt)
err := limiter.WaitN(context.Background(), 1, 1024)
require.NoError(t, err)

// Test WaitN exceeds the burst.
limiter = newStoreWriteLimiter(100)
start := time.Now()
// 120 is the initial burst, 150 is the number of new tokens.
err = limiter.WaitN(context.Background(), 1, 120+120)
require.NoError(t, err)
require.Greater(t, time.Since(start), time.Second)

// Test WaitN with different store id.
limiter = newStoreWriteLimiter(100)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
for i := 0; i < 10; i++ {
wg.Add(1)
go func(storeID uint64) {
defer wg.Done()
start = time.Now()
var gotTokens int
for {
n := rand.Intn(50)
if limiter.WaitN(ctx, storeID, n) != nil {
break
}
gotTokens += n
}
elapsed := time.Since(start)
maxTokens := 120 + int(float64(elapsed)/float64(time.Second)*100)
// In theory, gotTokens should be less than or equal to maxTokens.
// But we allow a little of error to avoid the test being flaky.
require.LessOrEqual(t, gotTokens, maxTokens+1)

}(uint64(i))
}
wg.Wait()
}
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ type TikvImporter struct {

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's bw? batch write?

maybe max-store-write-rate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunzhaoyang ptal too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bw means bandwitch. It is similar to rysnc --bwlimit.

}

type Checkpoint struct {
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_write_limit/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
store-write-bwlimit = "1Mi"

[mydumper.csv]
header = false
49 changes: 49 additions & 0 deletions br/tests/lightning_write_limit/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

mkdir -p "$TEST_DIR/data"

cat <<EOF >"$TEST_DIR/data/test-schema-create.sql"
CREATE DATABASE test;
EOF
cat <<EOF >"$TEST_DIR/data/test.t-schema.sql"
CREATE TABLE test.t (
id int,
a int,
b int,
c int
);
EOF

# Generate 200k rows. Total size is about 5MiB.
set +x
for i in {1..200000}; do
echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv"
done
set -x

start=$(date +%s)
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml"
end=$(date +%s)
take=$((end - start))

# The encoded kv size is 10MiB. Usually it should take more than 10s.
if [ $take -lt 10 ]; then
echo "Lightning runs too fast. The write limiter doesn't work."
exit 1
fi
3 changes: 3 additions & 0 deletions br/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ addr = "127.0.0.1:8287"
# The memory cache used in for local sorting during the encode-KV phase before flushing into the engines. The memory
# usage is bound by region-concurrency * local-writer-mem-cache-size.
#local-writer-mem-cache-size = '128MiB'
# Limit the write rate to each tikv store. 0 means no limit.
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
# Limit the write bandwidth to each tikv store. The unit is 'Bytes per second'. 0 means no limit.
#store-write-bwlimit = 0

[mydumper]
# block size of file reading
Expand Down