diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go old mode 100644 new mode 100755 index 11b44a6ba19c8..13773dc6d2ee4 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -232,8 +232,9 @@ type local struct { errorMgr *errormanager.ErrorManager importClientFactory ImportClientFactory - bufferPool *membuf.Pool - metrics *metric.Metrics + bufferPool *membuf.Pool + metrics *metric.Metrics + writeLimiter StoreWriteLimiter } func openDuplicateDB(storeDir string) (*pebble.DB, error) { @@ -308,6 +309,12 @@ func NewLocalBackend( if duplicateDetection { keyAdapter = dupDetectKeyAdapter{} } + var writeLimiter StoreWriteLimiter + if cfg.TikvImporter.StoreWriteBWLimit > 0 { + writeLimiter = newStoreWriteLimiter(int(cfg.TikvImporter.StoreWriteBWLimit)) + } else { + writeLimiter = noopStoreWriteLimiter{} + } local := &local{ engines: sync.Map{}, pdCtl: pdCtl, @@ -334,6 +341,7 @@ func NewLocalBackend( errorMgr: errorMgr, importClientFactory: importClientFactory, bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})), + writeLimiter: writeLimiter, } if m, ok := metric.FromContext(ctx); ok { local.metrics = m @@ -784,6 +792,7 @@ func (local *local) WriteToTiKV( leaderID := region.Leader.GetId() clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers())) + storeIDs := make([]uint64, 0, len(region.Region.GetPeers())) requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers())) for _, peer := range region.Region.GetPeers() { cli, err := local.getImportClient(ctx, peer.StoreId) @@ -812,6 +821,7 @@ func (local *local) WriteToTiKV( } clients = append(clients, wstream) requests = append(requests, req) + storeIDs = append(storeIDs, peer.StoreId) } bytesBuf := local.bufferPool.NewBuffer() @@ -819,43 +829,57 @@ func (local *local) WriteToTiKV( pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs) count := 0 size := int64(0) + totalSize := int64(0) totalCount := int64(0) - firstLoop := true // if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split // because the range-properties is not 100% accurate regionMaxSize := regionSplitSize if regionSplitSize <= int64(config.SplitRegionSize) { regionMaxSize = regionSplitSize * 4 / 3 } + // Set a lower flush limit to make the speed of write more smooth. + flushLimit := int64(local.writeLimiter.Limit() / 10) + + flushKVs := func() error { + for i := range clients { + if err := local.writeLimiter.WaitN(ctx, storeIDs[i], int(size)); err != nil { + return errors.Trace(err) + } + requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] + if err := clients[i].Send(requests[i]); err != nil { + return errors.Trace(err) + } + } + return nil + } for iter.First(); iter.Valid(); iter.Next() { - size += int64(len(iter.Key()) + len(iter.Value())) + kvSize := int64(len(iter.Key()) + len(iter.Value())) // here we reuse the `*sst.Pair`s to optimize object allocation - if firstLoop { + if count < len(pairs) { + pairs[count].Key = bytesBuf.AddBytes(iter.Key()) + pairs[count].Value = bytesBuf.AddBytes(iter.Value()) + } else { pair := &sst.Pair{ Key: bytesBuf.AddBytes(iter.Key()), Value: bytesBuf.AddBytes(iter.Value()), } pairs = append(pairs, pair) - } else { - pairs[count].Key = bytesBuf.AddBytes(iter.Key()) - pairs[count].Value = bytesBuf.AddBytes(iter.Value()) } count++ totalCount++ + size += kvSize + totalSize += kvSize - if count >= local.batchWriteKVPairs { - for i := range clients { - requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] - if err := clients[i].Send(requests[i]); err != nil { - return nil, Range{}, stats, errors.Trace(err) - } + if count >= local.batchWriteKVPairs || size >= flushLimit { + if err := flushKVs(); err != nil { + return nil, Range{}, stats, err } count = 0 + size = 0 bytesBuf.Reset() - firstLoop = false } - if size >= regionMaxSize || totalCount >= regionSplitKeys { + if totalSize >= regionMaxSize || totalCount >= regionSplitKeys { break } } @@ -865,12 +889,12 @@ func (local *local) WriteToTiKV( } if count > 0 { - for i := range clients { - requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] - if err := clients[i].Send(requests[i]); err != nil { - return nil, Range{}, stats, errors.Trace(err) - } + if err := flushKVs(); err != nil { + return nil, Range{}, stats, err } + count = 0 + size = 0 + bytesBuf.Reset() } var leaderPeerMetas []*sst.SSTMeta @@ -913,7 +937,7 @@ func (local *local) WriteToTiKV( logutil.Region(region.Region), logutil.Leader(region.Leader)) } stats.count = totalCount - stats.totalBytes = size + stats.totalBytes = totalSize return leaderPeerMetas, finishedRange, stats, nil } diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 98413b20e71e0..c4aaae30db37b 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "database/sql" + "math" "regexp" "runtime" "sort" @@ -40,6 +41,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) const ( @@ -592,3 +594,75 @@ func intersectRange(region *metapb.Region, rg Range) Range { return Range{start: startKey, end: endKey} } + +type StoreWriteLimiter interface { + WaitN(ctx context.Context, storeID uint64, n int) error + Limit() int +} + +type storeWriteLimiter struct { + rwm sync.RWMutex + limiters map[uint64]*rate.Limiter + limit int + burst int +} + +func newStoreWriteLimiter(limit int) *storeWriteLimiter { + var burst int + // Allow burst of at most 20% of the limit. + if limit <= math.MaxInt-limit/5 { + burst = limit + limit/5 + } else { + // If overflowed, set burst to math.MaxInt. + burst = math.MaxInt + } + return &storeWriteLimiter{ + limiters: make(map[uint64]*rate.Limiter), + limit: limit, + burst: burst, + } +} + +func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error { + limiter := s.getLimiter(storeID) + // The original WaitN doesn't allow n > burst, + // so we call WaitN with burst multiple times. + for n > limiter.Burst() { + if err := limiter.WaitN(ctx, limiter.Burst()); err != nil { + return err + } + n -= limiter.Burst() + } + return limiter.WaitN(ctx, n) +} + +func (s *storeWriteLimiter) Limit() int { + return s.limit +} + +func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { + s.rwm.RLock() + limiter, ok := s.limiters[storeID] + s.rwm.RUnlock() + if ok { + return limiter + } + s.rwm.Lock() + defer s.rwm.Unlock() + limiter, ok = s.limiters[storeID] + if !ok { + limiter = rate.NewLimiter(rate.Limit(s.limit), s.burst) + s.limiters[storeID] = limiter + } + return limiter +} + +type noopStoreWriteLimiter struct{} + +func (noopStoreWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error { + return nil +} + +func (noopStoreWriteLimiter) Limit() int { + return math.MaxInt +} diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 48ce64da5e3b6..767829e9c857f 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -770,3 +770,46 @@ func TestNeedSplit(t *testing.T) { } } } + +func TestStoreWriteLimiter(t *testing.T) { + // Test create store write limiter with limit math.MaxInt. + limiter := newStoreWriteLimiter(math.MaxInt) + err := limiter.WaitN(context.Background(), 1, 1024) + require.NoError(t, err) + + // Test WaitN exceeds the burst. + limiter = newStoreWriteLimiter(100) + start := time.Now() + // 120 is the initial burst, 150 is the number of new tokens. + err = limiter.WaitN(context.Background(), 1, 120+120) + require.NoError(t, err) + require.Greater(t, time.Since(start), time.Second) + + // Test WaitN with different store id. + limiter = newStoreWriteLimiter(100) + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + for i := 0; i < 10; i++ { + wg.Add(1) + go func(storeID uint64) { + defer wg.Done() + start = time.Now() + var gotTokens int + for { + n := rand.Intn(50) + if limiter.WaitN(ctx, storeID, n) != nil { + break + } + gotTokens += n + } + elapsed := time.Since(start) + maxTokens := 120 + int(float64(elapsed)/float64(time.Second)*100) + // In theory, gotTokens should be less than or equal to maxTokens. + // But we allow a little of error to avoid the test being flaky. + require.LessOrEqual(t, gotTokens, maxTokens+1) + + }(uint64(i)) + } + wg.Wait() +} diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index fee2aaf29deb2..b0ffe32fa3cd5 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -532,6 +532,7 @@ type TikvImporter struct { EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` + StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` } type Checkpoint struct { diff --git a/br/tests/lightning_write_limit/config.toml b/br/tests/lightning_write_limit/config.toml new file mode 100644 index 0000000000000..e45e694126964 --- /dev/null +++ b/br/tests/lightning_write_limit/config.toml @@ -0,0 +1,5 @@ +[tikv-importer] +store-write-bwlimit = "1Mi" + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_write_limit/run.sh b/br/tests/lightning_write_limit/run.sh new file mode 100644 index 0000000000000..b48d34e79a58d --- /dev/null +++ b/br/tests/lightning_write_limit/run.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +mkdir -p "$TEST_DIR/data" + +cat <"$TEST_DIR/data/test-schema-create.sql" +CREATE DATABASE test; +EOF +cat <"$TEST_DIR/data/test.t-schema.sql" +CREATE TABLE test.t ( + id int, + a int, + b int, + c int +); +EOF + +# Generate 200k rows. Total size is about 5MiB. +set +x +for i in {1..200000}; do + echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv" +done +set -x + +start=$(date +%s) +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml" +end=$(date +%s) +take=$((end - start)) + +# The encoded kv size is 10MiB. Usually it should take more than 10s. +if [ $take -lt 10 ]; then + echo "Lightning runs too fast. The write limiter doesn't work." + exit 1 +fi diff --git a/br/tidb-lightning.toml b/br/tidb-lightning.toml index 8840eba06bb1d..a33eb46500104 100644 --- a/br/tidb-lightning.toml +++ b/br/tidb-lightning.toml @@ -136,6 +136,8 @@ addr = "127.0.0.1:8287" # The memory cache used in for local sorting during the encode-KV phase before flushing into the engines. The memory # usage is bound by region-concurrency * local-writer-mem-cache-size. #local-writer-mem-cache-size = '128MiB' +# Limit the write bandwidth to each tikv store. The unit is 'Bytes per second'. 0 means no limit. +#store-write-bwlimit = 0 [mydumper] # block size of file reading