Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support data compression in Broadcast / Passthrough exchange operator; optimize process about choosing Broadcast Join; #41968

Merged
merged 38 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2521a89
1
solotzg Mar 7, 2023
33bdbc1
2
solotzg Mar 7, 2023
2d8c81f
3
solotzg Mar 7, 2023
b6889a3
Revert "2"
solotzg Mar 7, 2023
b2ea880
4
solotzg Mar 7, 2023
8330a56
5
solotzg Mar 7, 2023
e40ad67
6
solotzg Mar 7, 2023
2b9d353
ut
solotzg Mar 8, 2023
780224d
Merge branch 'compress-bj-exchange' of github.com:solotzg/tidb into c…
solotzg Mar 8, 2023
50d3a42
7
solotzg Mar 8, 2023
66ff966
8
solotzg Mar 8, 2023
5190a5e
9
solotzg Mar 8, 2023
4ff16f4
10
solotzg Mar 8, 2023
d051c1f
Merge remote-tracking branch 'pingcap/master' into compress-bj-exchange
solotzg Mar 8, 2023
7e4e2c9
11
solotzg Mar 8, 2023
6aa00ab
comment
solotzg Mar 8, 2023
eadfaf5
12
solotzg Mar 8, 2023
a23d8c0
13
solotzg Mar 8, 2023
16c6ed3
add session var BroadcastJoinCostModelVersion;
solotzg Mar 9, 2023
e4921c5
add more ut
solotzg Mar 9, 2023
cfe3654
check bcj for more join type
solotzg Mar 9, 2023
c153b65
Merge remote-tracking branch 'pingcap/master' into compress-bj-exchange
solotzg Mar 10, 2023
20132e6
update bazel file
solotzg Mar 10, 2023
a03307b
fix tests
solotzg Mar 10, 2023
220d11a
add more tests about `mppStoreCnt`
solotzg Mar 10, 2023
af9d977
add more fail point tests
solotzg Mar 11, 2023
f92579e
bcj is there is one tiflash node
solotzg Mar 13, 2023
1ed2a7a
15
solotzg Mar 14, 2023
c85c9e5
Merge remote-tracking branch 'pingcap/master' into compress-bj-exchange
solotzg Mar 14, 2023
fa6d626
more ut
solotzg Mar 15, 2023
bb22066
Merge remote-tracking branch 'pingcap/master' into compress-bj-exchange
solotzg Mar 16, 2023
588d68d
16
solotzg Mar 16, 2023
78f8bb7
Merge remote-tracking branch 'pingcap/master' into compress-bj-exchange
solotzg Mar 20, 2023
55e71b9
17
solotzg Mar 20, 2023
959dce0
18
solotzg Mar 20, 2023
ea2c9b9
fix behavior of tpch q4
solotzg Mar 20, 2023
c1495a9
Merge remote-tracking branch 'pingcap/master' into compress-bj-exchange
solotzg Mar 20, 2023
3ab47aa
rename to `tidb_prefer_broadcast_join_by_exchange_data_size`
solotzg Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,3 +1649,60 @@ func TestDisaggregatedTiFlashGeneratedColumn(t *testing.T) {
test1(false)
test2()
}

func TestMppStoreCntWithErrors(t *testing.T) {
// mock non-root tasks return error
var mppStoreCountPDError = "github.com/pingcap/tidb/store/copr/mppStoreCountPDError"
var mppStoreCountSetMPPCnt = "github.com/pingcap/tidb/store/copr/mppStoreCountSetMPPCnt"
var mppStoreCountSetLastUpdateTime = "github.com/pingcap/tidb/store/copr/mppStoreCountSetLastUpdateTime"
var mppStoreCountSetLastUpdateTimeP2 = "github.com/pingcap/tidb/store/copr/mppStoreCountSetLastUpdateTimeP2"

store := testkit.CreateMockStore(t, withMockTiFlash(3))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
require.Equal(t, mppCnt, 3)
}
require.Nil(t, failpoint.Enable(mppStoreCountSetMPPCnt, `return(1000)`))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
// meet cache
require.Equal(t, mppCnt, 3)
}
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTime, `return("0")`))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
// update cache
require.Equal(t, mppCnt, 1000)
}
require.Nil(t, failpoint.Enable(mppStoreCountPDError, `return(true)`))
{
_, err := store.GetMPPClient().GetMPPStoreCount()
require.Error(t, err)
}
require.Nil(t, failpoint.Disable(mppStoreCountPDError))
require.Nil(t, failpoint.Enable(mppStoreCountSetMPPCnt, `return(2222)`))
// set last update time to the latest
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTime, fmt.Sprintf(`return("%d")`, time.Now().UnixMicro())))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
// still update cache
require.Equal(t, mppCnt, 2222)
}
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTime, `return("1")`))
// fail to get lock and old cache
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTimeP2, `return("2")`))
require.Nil(t, failpoint.Enable(mppStoreCountPDError, `return(true)`))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
require.Equal(t, mppCnt, 2222)
}
require.Nil(t, failpoint.Disable(mppStoreCountSetMPPCnt))
require.Nil(t, failpoint.Disable(mppStoreCountSetLastUpdateTime))
require.Nil(t, failpoint.Disable(mppStoreCountSetLastUpdateTimeP2))
require.Nil(t, failpoint.Disable(mppStoreCountPDError))
}
2 changes: 2 additions & 0 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration, tiflashcompute.DispatchPolicy) ([]MPPTaskMeta, error)
// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion, memTracker *memory.Tracker) Response
// GetMPPStoreCount returns number of TiFlash stores if there is no error, else return (0, error)
GetMPPStoreCount() (int, error)
solotzg marked this conversation as resolved.
Show resolved Hide resolved
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
119 changes: 119 additions & 0 deletions planner/core/casetest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,125 @@ func TestMPPHintsScope(t *testing.T) {
}
}

func TestMPPBCJModel(t *testing.T) {
/*
if there are 3 mpp stores, planner won't choose broadcast join enven if `tidb_broadcast_join_cost_model_version` GE 1
broadcast exchange size:
Build: 2 * sizeof(Data)
Probe: 0
exchange size: Build = 2 * sizeof(Data)
hash exchange size:
Build: sizeof(Data) * 2 / 3
Probe: sizeof(Data) * 2 / 3
exchange size: Build + Probe = 4/3 * sizeof(Data)
*/
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(3))
{
cnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Equal(t, cnt, 3)
require.Nil(t, err)
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, index idx_a(a), index idx_b(b))")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)

var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "UPDATE") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}

func TestMPPBCJModelOneTiFlash(t *testing.T) {
/*
if there are 1 mpp stores, planner should choose broadcast join if `tidb_broadcast_join_cost_model_version` GE 1
broadcast exchange size:
Build: 0 * sizeof(Data)
Probe: 0
exchange size: Build = 0 * sizeof(Data)
hash exchange size:
Build: sizeof(Data) * 0 / 1
Probe: sizeof(Data) * 0 / 1
exchange size: Build + Probe = 0 * sizeof(Data)
*/
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(1))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, index idx_a(a), index idx_b(b))")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
{
cnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Equal(t, cnt, 1)
require.Nil(t, err)
}
{
tk.MustExec("set @@session.tidb_broadcast_join_cost_model_version=-1")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1292 Truncated incorrect tidb_broadcast_join_cost_model_version value: '-1'"))
tk.MustExec("set @@session.tidb_broadcast_join_cost_model_version=2")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1292 Truncated incorrect tidb_broadcast_join_cost_model_version value: '2'"))
}
{
// no BCJ if `tidb_broadcast_join_cost_model_version` EQ 0
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size=0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count=0")
}

var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "UPDATE") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}

func TestHintScope(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
" └─Window 10000.00 mpp[tiflash] row_number()->Column#4 over(order by test.t.b rows between current row and current row)",
" └─Sort 10000.00 mpp[tiflash] test.t.b",
" └─ExchangeReceiver 10000.00 mpp[tiflash] ",
" └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough",
" └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough, Compression: FAST",
" └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
]
},
Expand Down
Loading