Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](mtmv) Support variants rewrite by materialized view #37929

Merged
merged 9 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
queryTopPlan,
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping,
true,
queryStructInfo.getTableBitSet());
boolean isRewrittenQueryExpressionValid = true;
if (!rewrittenQueryExpressions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
queryStructInfo.getTopPlan(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping,
true,
queryStructInfo.getTableBitSet()
);
// Can not rewrite, bail out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
Expand All @@ -56,6 +58,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.types.VariantType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeUtils;
import org.apache.doris.qe.SessionVariable;
Expand Down Expand Up @@ -114,7 +117,7 @@ public List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext) {
continue;
}
// check mv plan is valid or not
if (!isMaterializationValid(cascadesContext, context)) {
if (!isMaterializationValid(queryPlan, cascadesContext, context)) {
continue;
}
// get query struct infos according to the view strut info, if valid query struct infos is empty, bail out
Expand Down Expand Up @@ -238,7 +241,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
// Try to rewrite compensate predicates by using mv scan
List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
queryPlan, materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet());
viewToQuerySlotMapping, queryStructInfo.getTableBitSet());
if (rewriteCompensatePredicates.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"Rewrite compensate predicate by view fail",
Expand Down Expand Up @@ -521,33 +524,20 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf
* @param sourcePlan the source plan witch the source expression belong to
* @param targetExpressionMapping target expression mapping, if finding the expression in key set of the mapping
* then use the corresponding value of mapping to replace it
* @param targetExpressionNeedSourceBased if targetExpressionNeedSourceBased is true,
* we should make the target expression map key to source based,
* Note: the key expression in targetExpressionMapping should be shuttled. with the method
* ExpressionUtils.shuttleExpressionWithLineage.
* example as following:
* source target
* project(slot 1, 2) project(slot 3, 2, 1)
* scan(table) scan(table)
* then
* transform source to:
* project(slot 2, 1)
* target
*/
protected List<Expression> rewriteExpression(List<? extends Expression> sourceExpressionsToWrite, Plan sourcePlan,
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping,
boolean targetExpressionNeedSourceBased, BitSet sourcePlanBitSet) {
ExpressionMapping targetExpressionMapping, SlotMapping targetToSourceMapping, BitSet sourcePlanBitSet) {
// Firstly, rewrite the target expression using source with inverse mapping
// then try to use the target expression to represent the query. if any of source expressions
// can not be represented by target expressions, return null.
// generate target to target replacement expression mapping, and change target expression to source based
List<? extends Expression> sourceShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(
sourceExpressionsToWrite, sourcePlan, sourcePlanBitSet);
ExpressionMapping expressionMappingKeySourceBased = targetExpressionNeedSourceBased
? targetExpressionMapping.keyPermute(targetToSourceMapping) : targetExpressionMapping;
ExpressionMapping expressionMappingKeySourceBased = targetExpressionMapping.keyPermute(targetToSourceMapping);
// target to target replacement expression mapping, because mv is 1:1 so get first element
List<Map<Expression, Expression>> flattenExpressionMap = expressionMappingKeySourceBased.flattenMap();
Map<? extends Expression, ? extends Expression> targetToTargetReplacementMapping = flattenExpressionMap.get(0);
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased =
flattenExpressionMap.get(0);

List<Expression> rewrittenExpressions = new ArrayList<>();
for (Expression expressionShuttledToRewrite : sourceShuttledExpressions) {
Expand All @@ -557,8 +547,13 @@ protected List<Expression> rewriteExpression(List<? extends Expression> sourceEx
}
final Set<Object> slotsToRewrite =
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof Slot);

final Set<SlotReference> variants =
expressionShuttledToRewrite.collectToSet(expression -> expression instanceof SlotReference
&& ((SlotReference) expression).getDataType() instanceof VariantType);
extendMappingByVariant(variants, targetToTargetReplacementMappingQueryBased);
Expression replacedExpression = ExpressionUtils.replace(expressionShuttledToRewrite,
targetToTargetReplacementMapping);
targetToTargetReplacementMappingQueryBased);
if (replacedExpression.anyMatch(slotsToRewrite::contains)) {
// if contains any slot to rewrite, which means can not be rewritten by target, bail out
return ImmutableList.of();
Expand All @@ -568,6 +563,94 @@ protected List<Expression> rewriteExpression(List<? extends Expression> sourceEx
return rewrittenExpressions;
}

/**
* if query contains variant slot reference, extend the expression mapping for rewrte
* such as targetToTargetReplacementMappingQueryBased is
* id#0 -> id#8
* type#1 -> type#9
* payload#4 -> payload#10
* query variants is payload['issue']['number']#20
* then we can add payload['issue']['number']#20 -> element_at(element_at(payload#10, 'issue'), 'number')
* to targetToTargetReplacementMappingQueryBased
* */
private void extendMappingByVariant(Set<SlotReference> queryVariants,
Map<Expression, Expression> targetToTargetReplacementMappingQueryBased) {
if (queryVariants.isEmpty()) {
return;
}
Map<List<String>, Expression> viewNameToExprMap = new HashMap<>();
for (Map.Entry<Expression, Expression> targetExpressionEntry :
targetToTargetReplacementMappingQueryBased.entrySet()) {
if (targetExpressionEntry.getKey() instanceof SlotReference
&& ((SlotReference) targetExpressionEntry.getKey()).getDataType() instanceof VariantType) {
SlotReference targetSlotReference = (SlotReference) targetExpressionEntry.getKey();
List<String> nameIdentifier = new ArrayList<>(targetSlotReference.getQualifier());
nameIdentifier.add(targetSlotReference.getName());
nameIdentifier.addAll(targetSlotReference.getSubPath());
viewNameToExprMap.put(nameIdentifier, targetExpressionEntry.getValue());
}
}
if (viewNameToExprMap.isEmpty()) {
return;
}
Map<List<String>, SlotReference> queryNameAndExpressionMap = new HashMap<>();
for (SlotReference slotReference : queryVariants) {
List<String> nameIdentifier = new ArrayList<>(slotReference.getQualifier());
nameIdentifier.add(slotReference.getName());
nameIdentifier.addAll(slotReference.getSubPath());
queryNameAndExpressionMap.put(nameIdentifier, slotReference);
}
for (Map.Entry<List<String>, ? extends Expression> queryNameEntry : queryNameAndExpressionMap.entrySet()) {
Expression minExpr = null;
List<String> minCompensateName = null;
for (Map.Entry<List<String>, Expression> entry : viewNameToExprMap.entrySet()) {
if (!containsAllWithOrder(queryNameEntry.getKey(), entry.getKey())) {
continue;
}
List<String> removedQueryName = new ArrayList<>(queryNameEntry.getKey());
removedQueryName.removeAll(entry.getKey());
if (minCompensateName == null) {
minCompensateName = removedQueryName;
minExpr = entry.getValue();
}
if (removedQueryName.size() < minCompensateName.size()) {
minCompensateName = removedQueryName;
minExpr = entry.getValue();
}
}
if (minExpr != null) {
targetToTargetReplacementMappingQueryBased.put(queryNameEntry.getValue(),
constructElementAt(minExpr, minCompensateName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after rewrite, these new element_at will be push into scan node too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this will push into scan node again by RBO

}
}
}

private static Expression constructElementAt(Expression target, List<String> atList) {
Expression elementAt = target;
for (String at : atList) {
elementAt = new ElementAt(elementAt, new VarcharLiteral(at));
}
return elementAt;
}

// source names is contain all target with order or not
private static boolean containsAllWithOrder(List<String> sourceNames, List<String> targetNames) {
if (sourceNames.size() < targetNames.size()) {
return false;
}
for (int index = 0; index < targetNames.size(); index++) {
String sourceName = sourceNames.get(index);
String targetName = targetNames.get(index);
if (sourceName == null || targetName == null) {
return false;
}
if (!sourceName.equals(targetName)) {
return false;
}
}
return true;
}

/**
* Normalize expression with query, keep the consistency of exprId and nullable props with
* query
Expand Down Expand Up @@ -753,7 +836,8 @@ protected boolean checkIfRewritten(Plan plan, MaterializationContext context) {
}

// check mv plan is valid or not, this can use cache for performance
private boolean isMaterializationValid(CascadesContext cascadesContext, MaterializationContext context) {
private boolean isMaterializationValid(Plan queryPlan, CascadesContext cascadesContext,
MaterializationContext context) {
long materializationId = context.generateMaterializationIdentifier().hashCode();
Boolean cachedCheckResult = cascadesContext.getMemo().materializationHasChecked(this.getClass(),
materializationId);
Expand All @@ -764,6 +848,11 @@ private boolean isMaterializationValid(CascadesContext cascadesContext, Material
context.recordFailReason(context.getStructInfo(),
"View struct info is invalid", () -> String.format("view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
// tmp to location question
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
+ "view plan is %s",
context.generateMaterializationIdentifier(), queryPlan.treeString(),
context.getStructInfo().getTopPlan().treeString()));
cascadesContext.getMemo().recordMaterializationCheckResult(this.getClass(), materializationId,
false);
return false;
Expand All @@ -775,12 +864,20 @@ private boolean isMaterializationValid(CascadesContext cascadesContext, Material
context.recordFailReason(context.getStructInfo(),
"View struct info is invalid", () -> String.format("view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
+ "view plan is %s",
context.generateMaterializationIdentifier(), queryPlan.treeString(),
context.getStructInfo().getTopPlan().treeString()));
return false;
}
if (!context.getStructInfo().isValid()) {
context.recordFailReason(context.getStructInfo(),
"View struct info is invalid", () -> String.format("view plan is %s",
"View original struct info is invalid", () -> String.format("view plan is %s",
context.getStructInfo().getOriginalPlan().treeString()));
LOG.debug(String.format("View struct info is invalid, mv identifier is %s, query plan is %s,"
+ "view plan is %s",
context.generateMaterializationIdentifier(), queryPlan.treeString(),
context.getStructInfo().getTopPlan().treeString()));
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ protected Plan rewriteQueryByView(MatchMode matchMode,
queryStructInfo.getTopPlan(),
materializationContext.getShuttledExprToScanExprMapping(),
targetToSourceMapping,
true,
queryStructInfo.getTableBitSet()
);
// Can not rewrite, bail out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;

import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
Expand All @@ -41,13 +46,24 @@ public static final class MappedRelation {
public final RelationId relationId;
public final CatalogRelation belongedRelation;
// Generate eagerly, will be used to generate slot mapping
private final Map<String, Slot> slotNameToSlotMap = new HashMap<>();
private final Map<List<String>, Slot> slotNameToSlotMap = new HashMap<>();

/**
* Construct relation and slot map
*/
public MappedRelation(RelationId relationId, CatalogRelation belongedRelation) {
this.relationId = relationId;
this.belongedRelation = belongedRelation;
for (Slot slot : belongedRelation.getOutput()) {
slotNameToSlotMap.put(slot.getName(), slot);
if (slot instanceof SlotReference) {
// variant slot
List<String> slotNames = new ArrayList<>();
slotNames.add(slot.getName());
slotNames.addAll(((SlotReference) slot).getSubPath());
slotNameToSlotMap.put(slotNames, slot);
} else {
slotNameToSlotMap.put(ImmutableList.of(slot.getName()), slot);
}
}
}

Expand All @@ -63,7 +79,7 @@ public CatalogRelation getBelongedRelation() {
return belongedRelation;
}

public Map<String, Slot> getSlotNameToSlotMap() {
public Map<List<String>, Slot> getSlotNameToSlotMap() {
return slotNameToSlotMap;
}

Expand Down
Loading
Loading