Skip to content

Commit

Permalink
Merge pull request pingcap#13 from tangenta/new_writer-3
Browse files Browse the repository at this point in the history
split subtasks according to stats
  • Loading branch information
wjhuang2016 authored Jul 4, 2023
2 parents 88cc14c + b7c55bf commit 361c752
Show file tree
Hide file tree
Showing 16 changed files with 2,395 additions and 146 deletions.
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type importClientFactoryImpl struct {
compressionType config.CompressionType
}

func newImportClientFactoryImpl(
func NewImportClientFactoryImpl(
splitCli split.SplitClient,
tls *common.TLS,
tcpConcurrency int,
Expand Down Expand Up @@ -559,16 +559,16 @@ func NewBackend(
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
importClientFactory := NewImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
keyAdapter := KeyAdapter(NoopKeyAdapter{})
if config.DupeDetectEnabled {
keyAdapter = dupDetectKeyAdapter{}
}
var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
writeLimiter = NewStoreWriteLimiter(config.StoreWriteBWLimit)
} else {
writeLimiter = noopStoreWriteLimiter{}
writeLimiter = NoopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ func TestCheckPeersBusy(t *testing.T) {
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
writeLimiter: NoopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
BackendConfig: BackendConfig{
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
writeLimiter: NoopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
BackendConfig: BackendConfig{
Expand Down Expand Up @@ -1385,7 +1385,7 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
writeLimiter: NoopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
Expand Down Expand Up @@ -1492,7 +1492,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
writeLimiter: NoopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ type storeWriteLimiter struct {
burst int
}

func newStoreWriteLimiter(limit int) *storeWriteLimiter {
func NewStoreWriteLimiter(limit int) *storeWriteLimiter {
var burst int
// Allow burst of at most 20% of the limit.
if limit <= math.MaxInt-limit/5 {
Expand Down Expand Up @@ -699,13 +699,13 @@ func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter {
return limiter
}

type noopStoreWriteLimiter struct{}
type NoopStoreWriteLimiter struct{}

func (noopStoreWriteLimiter) WaitN(_ context.Context, _ uint64, _ int) error {
func (NoopStoreWriteLimiter) WaitN(_ context.Context, _ uint64, _ int) error {
return nil
}

func (noopStoreWriteLimiter) Limit() int {
func (NoopStoreWriteLimiter) Limit() int {
return math.MaxInt
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,20 +904,20 @@ func TestNeedSplit(t *testing.T) {

func TestStoreWriteLimiter(t *testing.T) {
// Test create store write limiter with limit math.MaxInt.
limiter := newStoreWriteLimiter(math.MaxInt)
limiter := NewStoreWriteLimiter(math.MaxInt)
err := limiter.WaitN(context.Background(), 1, 1024)
require.NoError(t, err)

// Test WaitN exceeds the burst.
limiter = newStoreWriteLimiter(100)
limiter = NewStoreWriteLimiter(100)
start := time.Now()
// 120 is the initial burst, 150 is the number of new tokens.
err = limiter.WaitN(context.Background(), 1, 120+120)
require.NoError(t, err)
require.Greater(t, time.Since(start), time.Second)

// Test WaitN with different store id.
limiter = newStoreWriteLimiter(100)
limiter = NewStoreWriteLimiter(100)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
Expand Down
Loading

0 comments on commit 361c752

Please sign in to comment.