From 4aaa65744b2dc5dba22da2d0e511d3e87f342147 Mon Sep 17 00:00:00 2001 From: morrySnow Date: Fri, 24 Jan 2025 14:56:28 +0800 Subject: [PATCH] [fix](Nereids) Use the schema saved during planning as the schema of the original target table (#47337) ### What problem does this PR solve? Related PR: #47033 #45045 Problem Summary: because schema change does not involve recreating the table object, but rather rebuilding the full schema. So, we should use the schema saved during planning as the schema of the original target table. --- .../apache/doris/nereids/StatementContext.java | 7 +++++++ .../generator/PlanPatternGeneratorAnalyzer.java | 4 +--- .../nereids/rules/analysis/CollectRelation.java | 14 +++++++++++--- .../commands/insert/InsertIntoTableCommand.java | 15 ++++++--------- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 597cef2d47e8c1..75353f446a4433 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.catalog.constraint.TableIdentifier; @@ -178,6 +179,8 @@ public enum TableFrom { private final Map, TableIf> insertTargetTables = Maps.newHashMap(); // save view's def and sql mode to avoid them change before lock private final Map, Pair> viewInfos = Maps.newHashMap(); + // save insert into schema to avoid schema changed between two read locks + private final List insertTargetSchema = new ArrayList<>(); // for create view support in nereids // key is the start and end position of the sql substring that needs to be replaced, @@ -281,6 +284,10 @@ public Map, TableIf> getTables() { return tables; } + public List getInsertTargetSchema() { + return insertTargetSchema; + } + public void setTables(Map, TableIf> tables) { this.tables.clear(); this.tables.putAll(tables); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java index 99d7c308dacf0d..23e7b5eca762ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java @@ -19,7 +19,6 @@ import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration; -import java.lang.reflect.Modifier; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean Map> planClassMap = analyzer.getParentClassMap().entrySet().stream() .filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan")) .filter(kv -> !kv.getKey().name.equals("GroupPlan")) - .filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod) - && kv.getKey() instanceof ClassDeclaration) + .filter(kv -> kv.getKey() instanceof ClassDeclaration) .collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue())); List generators = planClassMap.entrySet() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 9426ab4d382b77..92a4fb76d49aaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.util.RelationUtil; import com.google.common.collect.ImmutableList; @@ -75,8 +76,8 @@ public List buildRules() { unboundRelation() .thenApply(this::collectFromUnboundRelation) .toRule(RuleType.COLLECT_TABLE_FROM_RELATION), - unboundTableSink() - .thenApply(this::collectFromUnboundTableSink) + unboundLogicalSink() + .thenApply(this::collectFromUnboundSink) .toRule(RuleType.COLLECT_TABLE_FROM_SINK), any().whenNot(UnboundRelation.class::isInstance) .whenNot(UnboundTableSink.class::isInstance) @@ -124,7 +125,7 @@ private Plan collectFromAny(MatchingContext ctx) { return null; } - private Plan collectFromUnboundTableSink(MatchingContext> ctx) { + private Plan collectFromUnboundSink(MatchingContext> ctx) { List nameParts = ctx.root.getNameParts(); switch (nameParts.size()) { case 1: @@ -182,6 +183,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext, if (tableFrom == TableFrom.QUERY) { collectMTMVCandidates(table, cascadesContext); } + if (tableFrom == TableFrom.INSERT_TARGET) { + if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) { + LOG.warn("collect insert target table '{}' more than once.", tableQualifier); + } + cascadesContext.getStatementContext().getInsertTargetSchema().clear(); + cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema()); + } if (table instanceof View) { parseAndCollectFromView(tableQualifier, (View) table, cascadesContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 76c72f82f90552..39c5909d4f553d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -28,7 +28,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; @@ -73,7 +72,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -186,9 +184,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec // lock after plan and check does table's schema changed to ensure we lock table order by id. TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv()); - List targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf); - targetTables.sort(Comparator.comparing(TableIf::getId)); - MetaLockUtils.readLockTables(targetTables); + newestTargetTableIf.readLock(); try { if (targetTableIf.getId() != newestTargetTableIf.getId()) { LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}", @@ -196,10 +192,11 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec targetTableIf.getId(), newestTargetTableIf.getId()); continue; } - if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) { + // Use the schema saved during planning as the schema of the original target table. + if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) { LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}", retryTimes, DebugUtil.printId(ctx.queryId()), - targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema()); + ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema()); continue; } if (!insertExecutor.isEmptyInsert()) { @@ -209,9 +206,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec buildResult.physicalSink ); } - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); } catch (Throwable e) { - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); // the abortTxn in onFail need to acquire table write lock if (insertExecutor != null) { insertExecutor.onFail(e);