Skip to content

Commit

Permalink
copr: increase extra concurrency for small coprocessor tasks (#37725)
Browse files Browse the repository at this point in the history
close #37724
  • Loading branch information
you06 authored Oct 14, 2022
1 parent 50425f8 commit 61eed5c
Show file tree
Hide file tree
Showing 13 changed files with 484 additions and 110 deletions.
9 changes: 6 additions & 3 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
builder.Request.KeyRanges = TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles)
return builder
}

Expand Down Expand Up @@ -515,8 +515,9 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc

// TableHandlesToKVRanges converts sorted handle to kv ranges.
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) []kv.KeyRange {
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hint := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
Expand All @@ -525,6 +526,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) []kv.KeyRange {
EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()),
}
krs = append(krs, ran)
hint = append(hint, 1)
i++
continue
}
Expand All @@ -540,9 +542,10 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) []kv.KeyRange {
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hint = append(hint, j-i)
i = j
}
return krs
return krs, hint
}

// PartitionHandlesToKVRanges convert ParitionHandles to kv ranges.
Expand Down
21 changes: 11 additions & 10 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual := TableHandlesToKVRanges(1, handles)
actual, _ := TableHandlesToKVRanges(1, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
Expand Down Expand Up @@ -396,15 +396,16 @@ func TestRequestBuilder3(t *testing.T) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
FixedRowCountHint: []int{1, 4, 2, 1},
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
Expand Down
5 changes: 2 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4152,12 +4152,12 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
}
} else {
for _, p := range usedPartitionList {
tmp := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
}
}
Expand Down Expand Up @@ -4291,7 +4291,6 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
b.SetTableHandles(getPhysicalTableID(e.table), handles)
}
}

return builder.buildTableReaderBase(ctx, e, b)
}

Expand Down
6 changes: 4 additions & 2 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,8 @@ func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum,
continue
}
numHandles += len(handles)
tblKVRanges = append(tblKVRanges, distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)...)
ranges, _ := distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)
tblKVRanges = append(tblKVRanges, ranges...)
}
if numHandles == 0 {
return nil, nil
Expand Down Expand Up @@ -687,7 +688,8 @@ func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum,
continue
}
numHandles += len(handles)
tblKVRanges = append(tblKVRanges, distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)...)
ranges, _ := distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)
tblKVRanges = append(tblKVRanges, ranges...)
}

if numHandles == 0 {
Expand Down
1 change: 1 addition & 0 deletions executor/oomtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
deps = [
"//testkit",
"//testkit/testsetup",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
Expand Down
5 changes: 5 additions & 0 deletions executor/oomtest/oom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testsetup"
Expand Down Expand Up @@ -165,6 +166,10 @@ func TestMemTracker4DeleteExec(t *testing.T) {

oom.SetTracker("")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/disableFixedRowCountHint", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/disableFixedRowCountHint"))
}()
tk.Session().GetSessionVars().EnabledRateLimitAction = true
tk.Session().GetSessionVars().MemQuotaQuery = 10000
tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a")
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ type Request struct {
}
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
1 change: 0 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2262,7 +2262,6 @@ type Concurrency struct {
indexLookupJoinConcurrency int

// distSQLScanConcurrency is the number of concurrent dist SQL scan worker.
// distSQLScanConcurrency is deprecated, use ExecutorConcurrency instead.
distSQLScanConcurrency int

// hashJoinConcurrency is the number of concurrent hash join outer worker.
Expand Down
21 changes: 21 additions & 0 deletions store/copr/copr_test/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "copr_test_test",
srcs = [
"coprocessor_test.go",
"main_test.go",
],
deps = [
"//config",
"//kv",
"//store/copr",
"//store/mockstore",
"//testkit/testmain",
"//testkit/testsetup",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
],
)
100 changes: 100 additions & 0 deletions store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package copr_test

import (
"context"
"testing"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/mockstore"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
)

func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c testutils.Cluster) {
mockstore.BootstrapWithMultiRegions(c, []byte("g"), []byte("n"), []byte("t"))
}),
)
require.NoError(t, err)
defer require.NoError(t, store.Close())
copClient := store.GetClient().(*copr.CopClient)
ctx := context.Background()
killed := uint32(0)
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}

req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z"),
FixedRowCountHint: []int{1, 1, 3, copr.CopSmallTaskRow},
Concurrency: 15,
}
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
conc, smallConc := it.GetConcurrency()
rateLimit := it.GetSendRate()
require.Equal(t, conc, 1)
require.Equal(t, smallConc, 1)
require.Equal(t, rateLimit.GetCapacity(), 2)

req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z"),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
conc, smallConc = it.GetConcurrency()
rateLimit = it.GetSendRate()
require.Equal(t, conc, 1)
require.Equal(t, smallConc, 2)
require.Equal(t, rateLimit.GetCapacity(), 3)

// cross-region long range
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: copr.BuildKeyRanges("a", "z"),
FixedRowCountHint: []int{10},
Concurrency: 15,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
conc, smallConc = it.GetConcurrency()
rateLimit = it.GetSendRate()
require.Equal(t, conc, 1)
require.Equal(t, smallConc, 2)
require.Equal(t, rateLimit.GetCapacity(), 3)

req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: copr.BuildKeyRanges("a", "z"),
FixedRowCountHint: []int{copr.CopSmallTaskRow + 1},
Concurrency: 15,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
conc, smallConc = it.GetConcurrency()
rateLimit = it.GetSendRate()
require.Equal(t, conc, 4)
require.Equal(t, smallConc, 0)
require.Equal(t, rateLimit.GetCapacity(), 4)
}
60 changes: 60 additions & 0 deletions store/copr/copr_test/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package copr_test

import (
"flag"
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/testkit/testmain"
"github.com/pingcap/tidb/testkit/testsetup"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testmain.ShortCircuitForBench(m)

testsetup.SetupForCommonTest()

flag.Parse()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})
tikv.EnableFailpoints()
opts := []goleak.Option{
// TODO: figure the reason and shorten this list
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.newBackoffFn.func1"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/v3.waitRetryBackoff"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ccBalancerWrapper).watcher"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Client).keepalive"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"),
}
callback := func(i int) int {
// wait for MVCCLevelDB to close, MVCCLevelDB will be closed in one second
time.Sleep(time.Second)
return i
}
goleak.VerifyTestMain(testmain.WrapTestingM(m, callback), opts...)
}
Loading

0 comments on commit 61eed5c

Please sign in to comment.