Skip to content

Commit

Permalink
planner: support data compression in Broadcast / Passthrough exch…
Browse files Browse the repository at this point in the history
…ange operator; optimize process about choosing Broadcast Join; (#41968)

ref #40494
  • Loading branch information
solotzg authored Mar 21, 2023
1 parent b96c55d commit 88174d2
Show file tree
Hide file tree
Showing 18 changed files with 815 additions and 332 deletions.
57 changes: 57 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,3 +1649,60 @@ func TestDisaggregatedTiFlashGeneratedColumn(t *testing.T) {
test1(false)
test2()
}

func TestMppStoreCntWithErrors(t *testing.T) {
// mock non-root tasks return error
var mppStoreCountPDError = "github.com/pingcap/tidb/store/copr/mppStoreCountPDError"
var mppStoreCountSetMPPCnt = "github.com/pingcap/tidb/store/copr/mppStoreCountSetMPPCnt"
var mppStoreCountSetLastUpdateTime = "github.com/pingcap/tidb/store/copr/mppStoreCountSetLastUpdateTime"
var mppStoreCountSetLastUpdateTimeP2 = "github.com/pingcap/tidb/store/copr/mppStoreCountSetLastUpdateTimeP2"

store := testkit.CreateMockStore(t, withMockTiFlash(3))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
require.Equal(t, mppCnt, 3)
}
require.Nil(t, failpoint.Enable(mppStoreCountSetMPPCnt, `return(1000)`))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
// meet cache
require.Equal(t, mppCnt, 3)
}
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTime, `return("0")`))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
// update cache
require.Equal(t, mppCnt, 1000)
}
require.Nil(t, failpoint.Enable(mppStoreCountPDError, `return(true)`))
{
_, err := store.GetMPPClient().GetMPPStoreCount()
require.Error(t, err)
}
require.Nil(t, failpoint.Disable(mppStoreCountPDError))
require.Nil(t, failpoint.Enable(mppStoreCountSetMPPCnt, `return(2222)`))
// set last update time to the latest
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTime, fmt.Sprintf(`return("%d")`, time.Now().UnixMicro())))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
// still update cache
require.Equal(t, mppCnt, 2222)
}
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTime, `return("1")`))
// fail to get lock and old cache
require.Nil(t, failpoint.Enable(mppStoreCountSetLastUpdateTimeP2, `return("2")`))
require.Nil(t, failpoint.Enable(mppStoreCountPDError, `return(true)`))
{
mppCnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Nil(t, err)
require.Equal(t, mppCnt, 2222)
}
require.Nil(t, failpoint.Disable(mppStoreCountSetMPPCnt))
require.Nil(t, failpoint.Disable(mppStoreCountSetLastUpdateTime))
require.Nil(t, failpoint.Disable(mppStoreCountSetLastUpdateTimeP2))
require.Nil(t, failpoint.Disable(mppStoreCountPDError))
}
2 changes: 2 additions & 0 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration, tiflashcompute.DispatchPolicy) ([]MPPTaskMeta, error)
// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion, memTracker *memory.Tracker) Response
// GetMPPStoreCount returns number of TiFlash stores if there is no error, else return (0, error)
GetMPPStoreCount() (int, error)
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
115 changes: 115 additions & 0 deletions planner/core/casetest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,121 @@ func TestMPPHintsScope(t *testing.T) {
}
}

func TestMPPBCJModel(t *testing.T) {
/*
if there are 3 mpp stores, planner won't choose broadcast join enven if `tidb_prefer_broadcast_join_by_exchange_data_size` is ON
broadcast exchange size:
Build: 2 * sizeof(Data)
Probe: 0
exchange size: Build = 2 * sizeof(Data)
hash exchange size:
Build: sizeof(Data) * 2 / 3
Probe: sizeof(Data) * 2 / 3
exchange size: Build + Probe = 4/3 * sizeof(Data)
*/
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(3))
{
cnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Equal(t, cnt, 3)
require.Nil(t, err)
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, index idx_a(a), index idx_b(b))")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)

var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "UPDATE") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}

func TestMPPBCJModelOneTiFlash(t *testing.T) {
/*
if there are 1 mpp stores, planner should choose broadcast join if `tidb_prefer_broadcast_join_by_exchange_data_size` is ON
broadcast exchange size:
Build: 0 * sizeof(Data)
Probe: 0
exchange size: Build = 0 * sizeof(Data)
hash exchange size:
Build: sizeof(Data) * 0 / 1
Probe: sizeof(Data) * 0 / 1
exchange size: Build + Probe = 0 * sizeof(Data)
*/
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(1))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, index idx_a(a), index idx_b(b))")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
{
cnt, err := store.GetMPPClient().GetMPPStoreCount()
require.Equal(t, cnt, 1)
require.Nil(t, err)
}
{
tk.MustExecToErr("set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=-1")
tk.MustExecToErr("set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=2")
}
{
// no BCJ if `tidb_prefer_broadcast_join_by_exchange_data_size` is OFF
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size=0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count=0")
}

var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "UPDATE") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}

func TestHintScope(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
" └─Window 10000.00 mpp[tiflash] row_number()->Column#4 over(order by test.t.b rows between current row and current row)",
" └─Sort 10000.00 mpp[tiflash] test.t.b",
" └─ExchangeReceiver 10000.00 mpp[tiflash] ",
" └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough",
" └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough, Compression: FAST",
" └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo"
]
},
Expand Down
Loading

0 comments on commit 88174d2

Please sign in to comment.