Skip to content

Commit

Permalink
planner: show accessed partition when explain mpp query over partitio…
Browse files Browse the repository at this point in the history
…n table (#30367)
  • Loading branch information
windtalker authored Dec 7, 2021
1 parent 7d4895f commit ffd59ec
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
31 changes: 31 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1726,6 +1727,36 @@ func (s *partitionTableSuite) TestAddDropPartitions(c *C) {
tk.MustPartition(`select * from t where a < 20`, "p1,p2,p3").Sort().Check(testkit.Rows("12", "15", "7"))
}

func (s *partitionTableSuite) TestMPPQueryExplainInfo(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create database tiflash_partition_test")
tk.MustExec("use tiflash_partition_test")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")

tk.MustExec(`create table t(a int) partition by range(a) (
partition p0 values less than (5),
partition p1 values less than (10),
partition p2 values less than (15))`)
tb := testGetTableByName(c, tk.Se, "tiflash_partition_test", "t")
for _, partition := range tb.Meta().GetPartitionInfo().Definitions {
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, partition.ID, true)
c.Assert(err, IsNil)
}
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec(`insert into t values (2), (7), (12)`)
tk.MustExec("set tidb_enforce_mpp=1")
tk.MustPartition(`select * from t where a < 3`, "p0").Sort().Check(testkit.Rows("2"))
tk.MustPartition(`select * from t where a < 8`, "p0,p1").Sort().Check(testkit.Rows("2", "7"))
tk.MustPartition(`select * from t where a < 20`, "all").Sort().Check(testkit.Rows("12", "2", "7"))
tk.MustPartition(`select * from t where a < 5 union all select * from t where a > 10`, "p0").Sort().Check(testkit.Rows("12", "2"))
tk.MustPartition(`select * from t where a < 5 union all select * from t where a > 10`, "p2").Sort().Check(testkit.Rows("12", "2"))
}

func (s *partitionTableSuite) PartitionPruningInTransaction(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
Expand Down
49 changes: 46 additions & 3 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ func (p *PhysicalTableReader) ExplainNormalizedInfo() string {
return ""
}

func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string {
ts := p.TablePlans[0].(*PhysicalTableScan)
func getAccessObjectForTableScan(sctx sessionctx.Context, ts *PhysicalTableScan, partitionInfo PartitionInfo) string {
pi := ts.Table.GetPartitionInfo()
if pi == nil || !sctx.GetSessionVars().UseDynamicPartitionPrune() {
return ""
Expand All @@ -346,7 +345,51 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string {
}
tbl := tmp.(table.PartitionedTable)

return partitionAccessObject(sctx, tbl, pi, &p.PartitionInfo)
return partitionAccessObject(sctx, tbl, pi, &partitionInfo)
}

func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string {
if !sctx.GetSessionVars().UseDynamicPartitionPrune() {
return ""
}
if len(p.PartitionInfos) == 0 {
ts := p.TablePlans[0].(*PhysicalTableScan)
return getAccessObjectForTableScan(sctx, ts, p.PartitionInfo)
}
if len(p.PartitionInfos) == 1 {
return getAccessObjectForTableScan(sctx, p.PartitionInfos[0].tableScan, p.PartitionInfos[0].partitionInfo)
}
containsPartitionTable := false
for _, info := range p.PartitionInfos {
if info.tableScan.Table.GetPartitionInfo() != nil {
containsPartitionTable = true
break
}
}
if !containsPartitionTable {
return ""
}
var buffer bytes.Buffer
for index, info := range p.PartitionInfos {
if index > 0 {
buffer.WriteString(", ")
}

tblName := info.tableScan.Table.Name.O
if info.tableScan.TableAsName != nil && info.tableScan.TableAsName.O != "" {
tblName = info.tableScan.TableAsName.O
}

if info.tableScan.Table.GetPartitionInfo() == nil {
buffer.WriteString("table of ")
buffer.WriteString(tblName)
continue
}
buffer.WriteString(getAccessObjectForTableScan(sctx, info.tableScan, info.partitionInfo))
buffer.WriteString(" of ")
buffer.WriteString(tblName)
}
return buffer.String()
}

func partitionAccessObject(sctx sessionctx.Context, tbl table.PartitionedTable, pi *model.PartitionInfo, partTable *PartitionInfo) string {
Expand Down
7 changes: 7 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ var (
_ PhysicalPlan = &PhysicalTableSample{}
)

type tableScanAndPartitionInfo struct {
tableScan *PhysicalTableScan
partitionInfo PartitionInfo
}

// PhysicalTableReader is the table reader in tidb.
type PhysicalTableReader struct {
physicalSchemaProducer
Expand All @@ -86,6 +91,8 @@ type PhysicalTableReader struct {

// Used by partition table.
PartitionInfo PartitionInfo
// Used by MPP, because MPP plan may contain join/union/union all, it is possible that a physical table reader contains more than 1 table scan
PartitionInfos []tableScanAndPartitionInfo
}

// PartitionInfo indicates partition helper info in physical plan.
Expand Down
12 changes: 12 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,17 @@ func (t *mppTask) convertToRootTask(ctx sessionctx.Context) *rootTask {
return t.copy().(*mppTask).convertToRootTaskImpl(ctx)
}

func collectPartitionInfosFromMPPPlan(p *PhysicalTableReader, mppPlan PhysicalPlan) {
switch x := mppPlan.(type) {
case *PhysicalTableScan:
p.PartitionInfos = append(p.PartitionInfos, tableScanAndPartitionInfo{x, x.PartitionInfo})
default:
for _, ch := range mppPlan.Children() {
collectPartitionInfosFromMPPPlan(p, ch)
}
}
}

func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
sender := PhysicalExchangeSender{
ExchangeType: tipb.ExchangeType_PassThrough,
Expand All @@ -2266,6 +2277,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
StoreType: kv.TiFlash,
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()
collectPartitionInfosFromMPPPlan(p, t.p)

cst := t.cst + t.count()*ctx.GetSessionVars().GetNetworkFactor(nil)
p.cost = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
Expand Down

0 comments on commit ffd59ec

Please sign in to comment.