Skip to content

Commit

Permalink
Renamed PartitionInfo to [Phys]PlanPartInfo to distinguish from model…
Browse files Browse the repository at this point in the history
….PartitionInfo
  • Loading branch information
mjonss committed Jan 2, 2024
1 parent 999f599 commit 5757002
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 74 deletions.
45 changes: 21 additions & 24 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3453,7 +3453,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e

tmp, _ := b.is.TableByID(ts.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
partitions, err := partitionPruning(b.ctx, tbl, &v.PlanPartInfo)
if err != nil {
b.err = err
return nil
Expand Down Expand Up @@ -3539,14 +3539,14 @@ func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []i
return keyColOffsets
}

func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table, partitionInfo *plannercore.PartitionInfo,
func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table, physPlanPartInfo *plannercore.PhysPlanPartInfo,
lookUpContent []*indexJoinLookUpContent) (usedPartition []table.PhysicalTable, canPrune bool, contentPos []int64, err error) {
partitionTbl := tbl.(table.PartitionedTable)

// In index join, this is called by multiple goroutines simultaneously, but partitionPruning is not thread-safe.
// Use once.Do to avoid DATA RACE here.
// TODO: condition based pruning can be do in advance.
condPruneResult, err := builder.partitionPruning(partitionTbl, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
condPruneResult, err := builder.partitionPruning(partitionTbl, physPlanPartInfo)
if err != nil {
return nil, false, nil, err
}
Expand Down Expand Up @@ -3692,7 +3692,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) e
b.err = exeerrors.ErrBuildExecutor
return nil
}
ret.partitionIDMap, err = getPartitionIdsAfterPruning(b.ctx, tbl, &v.PartitionInfo)
ret.partitionIDMap, err = getPartitionIdsAfterPruning(b.ctx, tbl, &v.PlanPartInfo)
if err != nil {
b.err = err
return nil
Expand All @@ -3702,7 +3702,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) e

tmp, _ := b.is.TableByID(is.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
partitions, err := partitionPruning(b.ctx, tbl, &v.PlanPartInfo)
if err != nil {
b.err = err
return nil
Expand Down Expand Up @@ -3889,7 +3889,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
b.err = exeerrors.ErrBuildExecutor
return nil
}
ret.partitionIDMap, err = getPartitionIdsAfterPruning(b.ctx, tbl, &v.PartitionInfo)
ret.partitionIDMap, err = getPartitionIdsAfterPruning(b.ctx, tbl, &v.PlanPartInfo)
if err != nil {
b.err = err
return nil
Expand All @@ -3906,7 +3906,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo

tmp, _ := b.is.TableByID(is.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
partitions, err := partitionPruning(b.ctx, tbl, &v.PlanPartInfo)
if err != nil {
b.err = err
return nil
Expand Down Expand Up @@ -4036,7 +4036,7 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
}

tmp, _ := b.is.TableByID(ts.Table.ID)
partitions, err := partitionPruning(b.ctx, tmp.(table.PartitionedTable), v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
partitions, err := partitionPruning(b.ctx, tmp.(table.PartitionedTable), &v.PlanPartInfo)
if err != nil {
b.err = err
return nil
Expand Down Expand Up @@ -4163,8 +4163,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
partitionInfo := &v.PartitionInfo
usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
usedPartitionList, err := builder.partitionPruning(pt, &v.PlanPartInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4428,7 +4427,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
if !ok {
return nil, exeerrors.ErrBuildExecutor
}
e.partitionIDMap, err = getPartitionIdsAfterPruning(builder.ctx, tbl, &v.PartitionInfo)
e.partitionIDMap, err = getPartitionIdsAfterPruning(builder.ctx, tbl, &v.PlanPartInfo)
if err != nil {
return nil, err
}
Expand All @@ -4443,7 +4442,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}

tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID)
usedPartition, canPrune, contentPos, err := builder.prunePartitionForInnerExecutor(tbl, &v.PartitionInfo, lookUpContents)
usedPartition, canPrune, contentPos, err := builder.prunePartitionForInnerExecutor(tbl, &v.PlanPartInfo, lookUpContents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4503,7 +4502,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
if !ok {
return nil, exeerrors.ErrBuildExecutor
}
e.partitionIDMap, err = getPartitionIdsAfterPruning(builder.ctx, tbl, &v.PartitionInfo)
e.partitionIDMap, err = getPartitionIdsAfterPruning(builder.ctx, tbl, &v.PlanPartInfo)
if err != nil {
return nil, err
}
Expand All @@ -4518,7 +4517,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}

tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID)
usedPartition, canPrune, contentPos, err := builder.prunePartitionForInnerExecutor(tbl, &v.PartitionInfo, lookUpContents)
usedPartition, canPrune, contentPos, err := builder.prunePartitionForInnerExecutor(tbl, &v.PlanPartInfo, lookUpContents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -5154,19 +5153,17 @@ func (b *executorBuilder) buildAdminResetTelemetryID(v *plannercore.AdminResetTe
return &AdminResetTelemetryIDExec{BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID())}
}

func (builder *dataReaderBuilder) partitionPruning(tbl table.PartitionedTable, conds []expression.Expression, partitionNames []model.CIStr,
columns []*expression.Column, columnNames types.NameSlice) ([]table.PhysicalTable, error) {
func (builder *dataReaderBuilder) partitionPruning(tbl table.PartitionedTable, planPartInfo *plannercore.PhysPlanPartInfo) ([]table.PhysicalTable, error) {
builder.once.Do(func() {
condPruneResult, err := partitionPruning(builder.executorBuilder.ctx, tbl, conds, partitionNames, columns, columnNames)
condPruneResult, err := partitionPruning(builder.executorBuilder.ctx, tbl, planPartInfo)
builder.once.condPruneResult = condPruneResult
builder.once.err = err
})
return builder.once.condPruneResult, builder.once.err
}

func partitionPruning(ctx sessionctx.Context, tbl table.PartitionedTable, conds []expression.Expression, partitionNames []model.CIStr,
columns []*expression.Column, columnNames types.NameSlice) ([]table.PhysicalTable, error) {
idxArr, err := plannercore.PartitionPruning(ctx, tbl, conds, partitionNames, columns, columnNames)
func partitionPruning(ctx sessionctx.Context, tbl table.PartitionedTable, planPartInfo *plannercore.PhysPlanPartInfo) ([]table.PhysicalTable, error) {
idxArr, err := plannercore.PartitionPruning(ctx, tbl, planPartInfo.PruningConds, planPartInfo.PartitionNames, planPartInfo.Columns, planPartInfo.ColumnNames)
if err != nil {
return nil, err
}
Expand All @@ -5190,11 +5187,11 @@ func partitionPruning(ctx sessionctx.Context, tbl table.PartitionedTable, conds
return ret, nil
}

func getPartitionIdsAfterPruning(ctx sessionctx.Context, tbl table.PartitionedTable, partInfo *plannercore.PartitionInfo) (map[int64]struct{}, error) {
if partInfo == nil {
return nil, errors.New("partInfo in getPartitionIdsAfterPruning must not be nil")
func getPartitionIdsAfterPruning(ctx sessionctx.Context, tbl table.PartitionedTable, physPlanPartInfo *plannercore.PhysPlanPartInfo) (map[int64]struct{}, error) {
if physPlanPartInfo == nil {
return nil, errors.New("physPlanPartInfo in getPartitionIdsAfterPruning must not be nil")
}
idxArr, err := plannercore.PartitionPruning(ctx, tbl, partInfo.PruningConds, partInfo.PartitionNames, partInfo.Columns, partInfo.ColumnNames)
idxArr, err := plannercore.PartitionPruning(ctx, tbl, physPlanPartInfo.PruningConds, physPlanPartInfo.PartitionNames, physPlanPartInfo.Columns, physPlanPartInfo.ColumnNames)
if err != nil {
return nil, err
}
Expand Down
25 changes: 12 additions & 13 deletions pkg/planner/core/access_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (p *BatchPointGetPlan) AccessObject() AccessObject {
return res
}

func getDynamicAccessPartition(sctx sessionctx.Context, tblInfo *model.TableInfo, partitionInfo *PartitionInfo, asName string) (res *DynamicPartitionAccessObject) {
func getDynamicAccessPartition(sctx sessionctx.Context, tblInfo *model.TableInfo, physPlanPartInfo *PhysPlanPartInfo, asName string) (res *DynamicPartitionAccessObject) {
pi := tblInfo.GetPartitionInfo()
if pi == nil || !sctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
return nil
Expand All @@ -377,7 +377,7 @@ func getDynamicAccessPartition(sctx sessionctx.Context, tblInfo *model.TableInfo
}
tbl := tmp.(table.PartitionedTable)

idxArr, err := PartitionPruning(sctx, tbl, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
idxArr, err := PartitionPruning(sctx, tbl, physPlanPartInfo.PruningConds, physPlanPartInfo.PartitionNames, physPlanPartInfo.Columns, physPlanPartInfo.ColumnNames)
if err != nil {
res.err = "partition pruning error:" + err.Error()
return res
Expand All @@ -398,7 +398,7 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject
if !sctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
return DynamicPartitionAccessObjects(nil)
}
if len(p.PartitionInfos) == 0 {
if len(p.TableScanAndPartitionInfos) == 0 {
ts, ok := p.TablePlans[0].(*PhysicalTableScan)
if !ok {
return OtherAccessObject("")
Expand All @@ -407,15 +407,15 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
}
res := getDynamicAccessPartition(sctx, ts.Table, &p.PartitionInfo, asName)
res := getDynamicAccessPartition(sctx, ts.Table, &p.PlanPartInfo, asName)
if res == nil {
return DynamicPartitionAccessObjects(nil)
}
return DynamicPartitionAccessObjects{res}
}
if len(p.PartitionInfos) == 1 {
ts := p.PartitionInfos[0].tableScan
partInfo := p.PartitionInfos[0].partitionInfo
if len(p.TableScanAndPartitionInfos) == 1 {
ts := p.TableScanAndPartitionInfos[0].tableScan
partInfo := p.TableScanAndPartitionInfos[0].physPlanPartInfo
asName := ""
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
Expand All @@ -428,17 +428,16 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject
}

res := make(DynamicPartitionAccessObjects, 0)
for _, info := range p.PartitionInfos {
for _, info := range p.TableScanAndPartitionInfos {
if info.tableScan.Table.GetPartitionInfo() == nil {
continue
}
ts := info.tableScan
partInfo := info.partitionInfo
asName := ""
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
}
accessObj := getDynamicAccessPartition(sctx, ts.Table, &partInfo, asName)
accessObj := getDynamicAccessPartition(sctx, ts.Table, &info.physPlanPartInfo, asName)
if accessObj != nil {
res = append(res, accessObj)
}
Expand All @@ -458,7 +457,7 @@ func (p *PhysicalIndexReader) accessObject(sctx sessionctx.Context) AccessObject
if is.TableAsName != nil && len(is.TableAsName.O) > 0 {
asName = is.TableAsName.O
}
res := getDynamicAccessPartition(sctx, is.Table, &p.PartitionInfo, asName)
res := getDynamicAccessPartition(sctx, is.Table, &p.PlanPartInfo, asName)
if res == nil {
return DynamicPartitionAccessObjects(nil)
}
Expand All @@ -474,7 +473,7 @@ func (p *PhysicalIndexLookUpReader) accessObject(sctx sessionctx.Context) Access
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
}
res := getDynamicAccessPartition(sctx, ts.Table, &p.PartitionInfo, asName)
res := getDynamicAccessPartition(sctx, ts.Table, &p.PlanPartInfo, asName)
if res == nil {
return DynamicPartitionAccessObjects(nil)
}
Expand All @@ -490,7 +489,7 @@ func (p *PhysicalIndexMergeReader) accessObject(sctx sessionctx.Context) AccessO
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
}
res := getDynamicAccessPartition(sctx, ts.Table, &p.PartitionInfo, asName)
res := getDynamicAccessPartition(sctx, ts.Table, &p.PlanPartInfo, asName)
if res == nil {
return DynamicPartitionAccessObjects(nil)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,13 +1108,13 @@ func (p *LogicalJoin) constructInnerTableScanTask(
tblColHists: ds.TblColHists,
keepOrder: ts.KeepOrder,
}
copTask.partitionInfo = PartitionInfo{
copTask.physPlanPartInfo = PhysPlanPartInfo{
PruningConds: ds.allConds,
PartitionNames: ds.partitionNames,
Columns: ds.TblCols,
ColumnNames: ds.names,
}
ts.PartitionInfo = copTask.partitionInfo
ts.PlanPartInfo = copTask.physPlanPartInfo
selStats := ts.StatsInfo().Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
t := copTask.convertToRootTask(ds.SCtx())
Expand Down Expand Up @@ -1272,7 +1272,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
tblCols: ds.TblCols,
keepOrder: is.KeepOrder,
}
cop.partitionInfo = PartitionInfo{
cop.physPlanPartInfo = PhysPlanPartInfo{
PruningConds: ds.allConds,
PartitionNames: ds.partitionNames,
Columns: ds.TblCols,
Expand Down
10 changes: 5 additions & 5 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,7 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
indexPlanFinished: false,
tblColHists: ds.TblColHists,
}
cop.partitionInfo = PartitionInfo{
cop.physPlanPartInfo = PhysPlanPartInfo{
PruningConds: pushDownNot(ds.SCtx(), ds.allConds),
PartitionNames: ds.partitionNames,
Columns: ds.TblCols,
Expand Down Expand Up @@ -1757,7 +1757,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty,
tblCols: ds.TblCols,
expectCnt: uint64(prop.ExpectedCnt),
}
cop.partitionInfo = PartitionInfo{
cop.physPlanPartInfo = PhysPlanPartInfo{
PruningConds: pushDownNot(ds.SCtx(), ds.allConds),
PartitionNames: ds.partitionNames,
Columns: ds.TblCols,
Expand Down Expand Up @@ -2179,7 +2179,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
partTp: property.AnyType,
tblColHists: ds.TblColHists,
}
ts.PartitionInfo = PartitionInfo{
ts.PlanPartInfo = PhysPlanPartInfo{
PruningConds: pushDownNot(ds.SCtx(), ds.allConds),
PartitionNames: ds.partitionNames,
Columns: ds.TblCols,
Expand Down Expand Up @@ -2213,13 +2213,13 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
indexPlanFinished: true,
tblColHists: ds.TblColHists,
}
copTask.partitionInfo = PartitionInfo{
copTask.physPlanPartInfo = PhysPlanPartInfo{
PruningConds: pushDownNot(ds.SCtx(), ds.allConds),
PartitionNames: ds.partitionNames,
Columns: ds.TblCols,
ColumnNames: ds.names,
}
ts.PartitionInfo = copTask.partitionInfo
ts.PlanPartInfo = copTask.physPlanPartInfo
task = copTask
if candidate.isMatchProp {
copTask.keepOrder = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
tbl := tmp.(table.PartitionedTable)
if !tiFlashStaticPrune {
var partitions []table.PhysicalTable
partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames)
partitions, err = partitionPruning(e.ctx, tbl, ts.PlanPartInfo.PruningConds, ts.PlanPartInfo.PartitionNames, ts.PlanPartInfo.Columns, ts.PlanPartInfo.ColumnNames)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 5757002

Please sign in to comment.