Skip to content

Commit

Permalink
[improvement](mtmv) Support to union all original table when some par…
Browse files Browse the repository at this point in the history
…titions of mv which contains hive table are invalid
  • Loading branch information
seawinde committed Aug 20, 2024
1 parent 872aab7 commit 617a3d4
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,18 @@ public Map<String, PartitionItem> getAndCopyPartitionItems() {
return res;
}

/**
* Get hive partition name by id
*/
public String getPartitionNameById(long id) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
BiMap<Long, String> inverse = hivePartitionValues.getPartitionNameToIdMap().inverse();
return inverse.get(id);
}

private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,17 +428,14 @@ protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}
// Collect the mv related base table partitions which query used
Map<BaseTableInfo, Set<Partition>> queryUsedBaseTablePartitions = new LinkedHashMap<>();
Map<BaseTableInfo, Set<String>> queryUsedBaseTablePartitions = new LinkedHashMap<>();
queryUsedBaseTablePartitions.put(relatedPartitionTable, new HashSet<>());
queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(), queryUsedBaseTablePartitions);
// Bail out, not check invalid partition if not olap scan, support later
if (queryUsedBaseTablePartitions.isEmpty()) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
}
Set<String> queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable)
.stream()
.map(Partition::getName)
.collect(Collectors.toSet());
Set<String> queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable);

Collection<Partition> mvValidPartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
cascadesContext.getConnectContext(), System.currentTimeMillis(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
Expand Down Expand Up @@ -49,6 +50,8 @@
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
Expand Down Expand Up @@ -730,10 +733,10 @@ public Plan visitLogicalOlapScan(LogicalOlapScan olapScan,
* Collect partitions on base table
*/
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan,
Map<BaseTableInfo, Set<Partition>>> {
Map<BaseTableInfo, Set<String>>> {
@Override
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
Map<BaseTableInfo, Set<Partition>> targetTablePartitionMap) {
Map<BaseTableInfo, Set<String>> targetTablePartitionMap) {
TableIf table = catalogRelation.getTable();
BaseTableInfo relatedPartitionTable = new BaseTableInfo(table);
if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) {
Expand All @@ -742,13 +745,35 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
if (catalogRelation instanceof LogicalOlapScan) {
// Handle olap table
LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation;
Set<Partition> tablePartitions = targetTablePartitionMap.get(relatedPartitionTable);
Set<String> tablePartitions = targetTablePartitionMap.get(relatedPartitionTable);
for (Long partitionId : logicalOlapScan.getSelectedPartitionIds()) {
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId));
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId).getName());
}
} else if (catalogRelation instanceof LogicalFileScan
&& catalogRelation.getTable() instanceof HMSExternalTable) {
// support hive partition check
Set<String> tablePartitions = targetTablePartitionMap.get(relatedPartitionTable);
LogicalFileScan logicalFileScan = (LogicalFileScan) catalogRelation;
HMSExternalTable hiveExternalTable = (HMSExternalTable) logicalFileScan.getTable();
SelectedPartitions selectedPartitions = logicalFileScan.getSelectedPartitions();
Map<Long, PartitionItem> selectedPartitionsMap = selectedPartitions.selectedPartitions;
if (selectedPartitionsMap.isEmpty()) {
// logicalFileScan.getSelectedPartitions() can not get select partitions when without filter
// So query without filter would hit mv without check the data valid
targetTablePartitionMap.clear();
return catalogRelation;
}
for (Long partitionId : selectedPartitionsMap.keySet()) {
String partitionName = hiveExternalTable.getPartitionNameById(partitionId);
if (partitionName == null) {
// can not use the mv
tablePartitions.clear();
break;
}
tablePartitions.add(partitionName);
}
} else {
// todo Support other type partition table
// Not support to partition check now when query external catalog table, support later.
targetTablePartitionMap.clear();
}
return catalogRelation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

-- !after_modify_and_without_refresh_catalog_19 --
3 2 3 2023-10-19 2023-10-19
3 2 3 2023-10-19 2023-10-19

-- !after_modify_and_without_refresh_catalog_18 --
2 2 2 2023-10-18 2023-10-18
Expand All @@ -27,6 +28,7 @@

-- !after_modify_and_refresh_catalog_19 --
3 2 3 2023-10-19 2023-10-19
3 2 3 2023-10-19 2023-10-19

-- !after_modify_and_refresh_catalog_18 --
2 2 2 2023-10-18 2023-10-18
Expand All @@ -48,6 +50,7 @@
3 2 3 2023-10-19 2023-10-19

-- !after_add_and_without_refresh_catalog_20 --
\N \N 7 \N 2023-10-20

-- !after_add_data_with_refresh_catalog --
1 2 1 2023-10-17 2023-10-17
Expand All @@ -60,6 +63,7 @@
3 2 3 2023-10-19 2023-10-19

-- !after_add_and_refresh_catalog_20 --
\N \N 7 \N 2023-10-20

-- !after_add_data_and_refresh_catalog_and_mv --
\N \N 7 \N 2023-10-20
Expand Down
20 changes: 20 additions & 0 deletions regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,23 @@
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

-- !query_21_0_before --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50
2023-10-21 2023-10-21 \N 2 3 \N
2023-11-22 2023-11-22 \N 2 3 \N

-- !query_21_0_after --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50
2023-10-21 2023-10-21 \N 2 3 \N
2023-11-22 2023-11-22 \N 2 3 \N

-- !query_22_0_before --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

-- !query_22_0_after --
2023-10-18 2023-10-18 2023-10-18 2 3 109.20
2023-10-19 2023-10-19 2023-10-19 2 3 99.50

Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ suite("part_partition_invalid", "p0,external") {
AS ${query_sql}
"""

sql """REFRESH MATERIALIZED VIEW ${mv_name} complete"""
sql """REFRESH MATERIALIZED VIEW ${mv_name} auto"""
waitingMTMVTaskFinished(getJobName(olap_db, mv_name))
order_qt_query_mv_directly """select * from ${mv_name};"""

Expand All @@ -151,11 +151,12 @@ suite("part_partition_invalid", "p0,external") {
contains("${mv_name}(${mv_name})")
}

// data change in external table doesn't influence query rewrite,
// if want to use new data in external table should be refresh manually
// partition data change in external hive table influence query rewrite,
sql """insert into ${hive_catalog_name}.${hive_database}.${hive_table} values(3, 3, 'ok', 99.5, 'a', 'b', 1, 'yy', '2023-10-19');"""
explain {
sql(""" ${query_sql}""")
// logicalFileScan.getSelectedPartitions() can not get select partitions without filter
// So query without filter would hit mv without check the data valid
contains("${mv_name}(${mv_name})")
}
order_qt_after_modify_data_without_refresh_catalog """ ${query_sql}"""
Expand All @@ -165,7 +166,7 @@ suite("part_partition_invalid", "p0,external") {
${query_sql} where o_orderdate = '2023-10-19';
""")
// query invalid partition data, should hit mv, because not check now.
contains("${mv_name}(${mv_name})")
notContains("${mv_name}(${mv_name})")
}
order_qt_after_modify_and_without_refresh_catalog_19 """ ${query_sql} where o_orderdate = '2023-10-19';"""

Expand All @@ -182,6 +183,8 @@ suite("part_partition_invalid", "p0,external") {
sql """ REFRESH CATALOG ${hive_catalog_name} PROPERTIES("invalid_cache" = "true"); """
explain {
sql(""" ${query_sql}""")
// logicalFileScan.getSelectedPartitions() can not get select partitions without filter
// So query without filter would hit mv without check the data valid
contains("${mv_name}(${mv_name})")
}
order_qt_after_modify_data_and_refresh_catalog """ ${query_sql}"""
Expand All @@ -190,8 +193,8 @@ suite("part_partition_invalid", "p0,external") {
sql("""
${query_sql} where o_orderdate = '2023-10-19';
""")
// query invalid partition data, should hit mv, because not check now.
contains("${mv_name}(${mv_name})")
// query invalid partition data, should not hit mv.
notContains("${mv_name}(${mv_name})")
}
order_qt_after_modify_and_refresh_catalog_19 """ ${query_sql} where o_orderdate = '2023-10-19';"""

Expand All @@ -218,13 +221,15 @@ suite("part_partition_invalid", "p0,external") {
sql """insert into ${hive_catalog_name}.${hive_database}.${hive_table} values(6, 7, 'ok', 29.5, 'x', 'y', 6, 'ss', '2023-10-20');"""
explain {
sql(""" ${query_sql}""")
// logicalFileScan.getSelectedPartitions() can not get select partitions without filter
// So query without filter would hit mv without check the data valid
contains("${mv_name}(${mv_name})")
}
order_qt_after_add_data_without_refresh_catalog """ ${query_sql}"""

explain {
sql("""
${query_sql}
${query_sql} where o_orderdate = '2023-10-19';
""")
// query invalid partition data, should hit mv, because not check now.
contains("${mv_name}(${mv_name})")
Expand All @@ -235,7 +240,7 @@ suite("part_partition_invalid", "p0,external") {
sql("""
${query_sql} where o_orderdate = '2023-10-20';
""")
// query valid partition data, should hit mv
// query invalid partition data, should hit mv
notContains("${mv_name}(${mv_name})")
}
order_qt_after_add_and_without_refresh_catalog_20 """ ${query_sql} where o_orderdate = '2023-10-20';"""
Expand All @@ -244,6 +249,8 @@ suite("part_partition_invalid", "p0,external") {
sql """ REFRESH CATALOG ${hive_catalog_name} PROPERTIES("invalid_cache" = "true"); """
explain {
sql(""" ${query_sql}""")
// logicalFileScan.getSelectedPartitions() can not get select partitions without filter
// So query without filter would hit mv without check the data valid
contains("${mv_name}(${mv_name})")
}
order_qt_after_add_data_with_refresh_catalog """ ${query_sql}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,23 +543,23 @@ suite("partition_mv_rewrite") {

// enable union rewrite
// this depends on getting corret partitions when base table delete partition, tmp comment
// sql "SET enable_materialized_view_rewrite=false"
// order_qt_query_21_0_before "${roll_up_all_partition_sql}"
// sql "SET enable_materialized_view_rewrite=true"
// explain {
// sql("${roll_up_all_partition_sql}")
// // should rewrite successful when union rewrite enalbe if base table add new partition
// contains("mv_10086(mv_10086)")
// }
// order_qt_query_21_0_after "${roll_up_all_partition_sql}"
//
// sql "SET enable_materialized_view_rewrite=false"
// order_qt_query_22_0_before "${roll_up_partition_sql}"
// sql "SET enable_materialized_view_rewrite=true"
// explain {
// sql("${roll_up_partition_sql}")
// // should rewrite successfully when union rewrite enable if doesn't query new partition
// contains("mv_10086(mv_10086)")
// }
// order_qt_query_22_0_after "${roll_up_partition_sql}"
sql "SET enable_materialized_view_rewrite=false"
order_qt_query_21_0_before "${roll_up_all_partition_sql}"
sql "SET enable_materialized_view_rewrite=true"
explain {
sql("${roll_up_all_partition_sql}")
// should rewrite successful when union rewrite enalbe if base table add new partition
contains("mv_10086(mv_10086)")
}
order_qt_query_21_0_after "${roll_up_all_partition_sql}"

sql "SET enable_materialized_view_rewrite=false"
order_qt_query_22_0_before "${roll_up_partition_sql}"
sql "SET enable_materialized_view_rewrite=true"
explain {
sql("${roll_up_partition_sql}")
// should rewrite successfully when union rewrite enable if doesn't query new partition
contains("mv_10086(mv_10086)")
}
order_qt_query_22_0_after "${roll_up_partition_sql}"
}

0 comments on commit 617a3d4

Please sign in to comment.