From b8770ee1d88577ce454ed0fe05d3b770b955ddf8 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 26 Jul 2024 23:03:35 +0800 Subject: [PATCH] [fix](statistics)Fix write all 0 to column stats bug when new partition created. (#38394) Fix write all 0 to column stats bug when new partition created. --- .../org/apache/doris/catalog/OlapTable.java | 57 ++++++++++--------- .../apache/doris/statistics/AnalysisInfo.java | 7 ++- .../doris/statistics/AnalysisInfoBuilder.java | 9 ++- .../doris/statistics/AnalysisManager.java | 8 ++- .../doris/statistics/OlapAnalysisTask.java | 18 +++--- .../statistics/StatisticsAutoCollector.java | 12 ++-- .../doris/statistics/TableStatsMeta.java | 2 +- .../doris/statistics/AnalysisManagerTest.java | 25 ++++++-- .../doris/statistics/TableStatsMetaTest.java | 10 +--- .../suites/statistics/analyze_stats.groovy | 27 +++++++++ 10 files changed, 119 insertions(+), 56 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d357b9f93d60a4..099935e387a605 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1197,44 +1197,49 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) { .collect(Collectors.toSet()))) { return true; } - long rowCount = getRowCount(); - if (rowCount > 0 && tblStats.rowCount == 0) { + // 1 Check row count. + long currentRowCount = getRowCount(); + long lastAnalyzeRowCount = tblStats.rowCount; + // 1.1 Empty table -> non-empty table. Need analyze. + if (currentRowCount != 0 && lastAnalyzeRowCount == 0) { return true; } + // 1.2 Non-empty table -> empty table. Need analyze; + if (currentRowCount == 0 && lastAnalyzeRowCount != 0) { + return true; + } + // 1.3 Table is still empty. Not need to analyze. lastAnalyzeRowCount == 0 is always true here. + if (currentRowCount == 0) { + return false; + } + // 1.4 If row count changed more than the threshold, need analyze. + // lastAnalyzeRowCount == 0 is always false here. + double changeRate = + ((double) Math.abs(currentRowCount - lastAnalyzeRowCount) / lastAnalyzeRowCount) * 100.0; + if (changeRate > (100 - StatisticsUtil.getTableStatsHealthThreshold())) { + return true; + } + + // 2. Check update rows. long updateRows = tblStats.updatedRows.get(); - int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows); + int tblHealth = StatisticsUtil.getTableHealth(currentRowCount, updateRows); return tblHealth < StatisticsUtil.getTableStatsHealthThreshold(); } @Override public Map> findReAnalyzeNeededPartitions() { - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId()); - Set allPartitions = getPartitionNames().stream().map(this::getPartition) - .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); - if (tableStats == null) { - Map> ret = Maps.newHashMap(); - for (Column col : getSchemaAllIndexes(false)) { - if (StatisticsUtil.isUnsupportedType(col.getType())) { - continue; - } - ret.put(col.getName(), allPartitions); - } - return ret; - } - Map> colToPart = new HashMap<>(); + Set partitions = Sets.newHashSet(); + // No need to filter unchanged partitions, because it may bring unexpected behavior. + // Use dummy partition to skip it. + partitions.add("Dummy Partition"); + Map> colToParts = new HashMap<>(); for (Column col : getSchemaAllIndexes(false)) { if (StatisticsUtil.isUnsupportedType(col.getType())) { continue; } - long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName()); - Set partitions = getPartitionNames().stream() - .map(this::getPartition) - .filter(Partition::hasData) - .filter(partition -> partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName) - .collect(Collectors.toSet()); - colToPart.put(col.getName(), partitions); - } - return colToPart; + colToParts.put(col.getName(), partitions); + } + return colToParts; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index c707107e0e0fb6..461528b151ff6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -190,6 +190,9 @@ public enum ScheduleType { @SerializedName("emptyJob") public final boolean emptyJob; + + @SerializedName("rowCount") + public final long rowCount; /** * * Used to store the newest partition version of tbl when creating this job. @@ -206,7 +209,8 @@ public AnalysisInfo(long jobId, long taskId, List taskIds, long catalogId, long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition, boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull, - boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject) { + boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject, + long rowCount) { this.jobId = jobId; this.taskId = taskId; this.taskIds = taskIds; @@ -244,6 +248,7 @@ public AnalysisInfo(long jobId, long taskId, List taskIds, long catalogId, this.tblUpdateTime = tblUpdateTime; this.emptyJob = emptyJob; this.userInject = userInject; + this.rowCount = rowCount; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 22f3d22b3ce77c..ddef30ee4de28b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -64,6 +64,7 @@ public class AnalysisInfoBuilder { private long tblUpdateTime; private boolean emptyJob; private boolean userInject; + private long rowCount; public AnalysisInfoBuilder() { } @@ -103,6 +104,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) { tblUpdateTime = info.tblUpdateTime; emptyJob = info.emptyJob; userInject = info.userInject; + rowCount = info.rowCount; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -275,12 +277,17 @@ public AnalysisInfoBuilder setUserInject(boolean userInject) { return this; } + public AnalysisInfoBuilder setRowCount(long rowCount) { + this.rowCount = rowCount; + return this; + } + public AnalysisInfo build() { return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount, - cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject); + cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 347b1c6b047dc5..36cc57ee3815e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -66,6 +66,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; @@ -309,10 +310,13 @@ private void sendJobId(List analysisInfos, boolean proxy) { private Map> validateAndGetPartitions(TableIf table, Set columnNames, Set partitionNames, AnalysisType analysisType) throws DdlException { + Set dummyPartitions = Sets.newHashSet(); + // validateAndGetPartitions is to be deprecated, for now, use dummy partition for empty partitions. + dummyPartitions.add("Dummy Partition"); Map> columnToPartitions = columnNames.stream() .collect(Collectors.toMap( columnName -> columnName, - columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames) + columnName -> new HashSet<>(partitionNames == null ? dummyPartitions : partitionNames) )); if (analysisType == AnalysisType.HISTOGRAM) { @@ -405,6 +409,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio infoBuilder.setTaskIds(Lists.newArrayList()); infoBuilder.setTblUpdateTime(table.getUpdateTime()); infoBuilder.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0); + long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount(); + infoBuilder.setRowCount(rowCount); return infoBuilder.build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 073000d9eb2b16..1eec0ee93f94f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndexMeta; @@ -37,7 +38,6 @@ import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -69,15 +69,17 @@ public void doExecute() throws Exception { return; } Set partitionNames = info.colToPartitions.get(info.colName); - if (StatisticsUtil.isEmptyTable(tbl, info.analysisMethod) - || partitionNames == null || partitionNames.isEmpty()) { - if (partitionNames == null) { - LOG.warn("Table {}.{}.{}, partitionNames for column {} is null. ColToPartitions:[{}]", - info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions); - } + if (partitionNames == null || partitionNames.isEmpty()) { + LOG.warn("Table {}.{}.{}, partitionNames for column {} is null or empty. ColToPartitions:[{}]", + info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions); + throw new RuntimeException(); + } + if (info.rowCount == 0 && tableSample != null) { StatsId statsId = new StatsId(concatColumnStatsId(), info.catalogId, info.dbId, info.tblId, info.indexId, info.colName, null); - job.appendBuf(this, Arrays.asList(new ColStatsData(statsId))); + ColStatsData colStatsData = new ColStatsData(statsId); + Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData); + job.appendBuf(this, Collections.singletonList(colStatsData)); return; } if (tableSample != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 4dd8960b6fb389..85913f5fd48354 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; @@ -33,6 +32,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.collect.Sets; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -181,6 +181,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, List analysisInfos, TableIf table) { AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; + long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount(); AnalysisInfo jobInfo = new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) .setCatalogId(db.getCatalog().getId()) @@ -203,6 +204,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, .setJobType(JobType.SYSTEM) .setTblUpdateTime(table.getUpdateTime()) .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0) + .setRowCount(rowCount) .build(); analysisInfos.add(jobInfo); } @@ -228,10 +230,12 @@ protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { Set partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream() .map(Column::getName).collect(Collectors.toSet()); colNames = partitionColumnNames.stream().collect(Collectors.joining(",")); - Set partitionNames = olapTable.getAllPartitions().stream() - .map(Partition::getName).collect(Collectors.toSet()); + Set partitions = Sets.newHashSet(); + // No need to filter unchanged partitions, because it may bring unexpected behavior. + // Use dummy partition to skip it. + partitions.add("Dummy Partition"); for (String column : partitionColumnNames) { - needRunPartitions.put(column, partitionNames); + needRunPartitions.put(column, partitions); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 9231c6a2bc7cd1..4a37106cb6d6bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -157,7 +157,7 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) { jobType = analyzedJob.jobType; if (tableIf != null) { if (tableIf instanceof OlapTable) { - rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount(); + rowCount = analyzedJob.rowCount; } if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet() .containsAll(tableIf.getBaseSchema().stream() diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index dc8fb70bc05e47..ebe9cdf93b6bf3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -276,7 +276,7 @@ public void testReAnalyze() { new MockUp() { int count = 0; - int[] rowCount = new int[]{100, 100, 200, 200, 1, 1}; + int[] rowCount = new int[]{100, 200, 1, 0, 0, 100}; final Column c = new Column("col1", PrimitiveType.INT); @Mock @@ -298,20 +298,35 @@ public List getColumns() { OlapTable olapTable = new OlapTable(); TableStatsMeta stats1 = new TableStatsMeta( 50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) - .setColName("col1").build(), olapTable); - stats1.updatedRows.addAndGet(50); + .setColName("col1").setRowCount(100).build(), olapTable); + stats1.updatedRows.addAndGet(70); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1)); TableStatsMeta stats2 = new TableStatsMeta( 190, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setColName("col1").build(), olapTable); + .setColToPartitions(new HashMap<>()).setColName("col1").setRowCount(200).build(), olapTable); stats2.updatedRows.addAndGet(20); Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2)); TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable); + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(0).build(), olapTable); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3)); + TableStatsMeta stats4 = new TableStatsMeta(0, new AnalysisInfoBuilder() + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(1).build(), olapTable); + Assertions.assertTrue(olapTable.needReAnalyzeTable(stats4)); + + TableStatsMeta stats5 = new TableStatsMeta(0, new AnalysisInfoBuilder() + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(0).build(), olapTable); + Assertions.assertFalse(olapTable.needReAnalyzeTable(stats5)); + + TableStatsMeta stats6 = new TableStatsMeta(0, new AnalysisInfoBuilder() + .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1") + .setRowCount(30).build(), olapTable); + Assertions.assertTrue(olapTable.needReAnalyzeTable(stats6)); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java index b5e73ba09da728..7532884f4004f7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/TableStatsMetaTest.java @@ -19,8 +19,6 @@ import org.apache.doris.catalog.OlapTable; -import mockit.Mock; -import mockit.MockUp; import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -31,15 +29,9 @@ class TableStatsMetaTest { @Test void update(@Mocked OlapTable table) { - new MockUp() { - @Mock - public long getRowCount() { - return 4; - } - }; TableStatsMeta tableStatsMeta = new TableStatsMeta(); AnalysisInfo jobInfo = new AnalysisInfoBuilder().setColToPartitions(new HashMap<>()) - .setColName("col1").build(); + .setColName("col1").setRowCount(4).build(); tableStatsMeta.update(jobInfo, table); Assertions.assertEquals(4, tableStatsMeta.rowCount); } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index d2ef7b14ff7199..be032a359c98de 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -2825,6 +2825,33 @@ PARTITION `p599` VALUES IN (599) assertEquals("521779.0", alter_result[0][5]) assertEquals("7.142863009760572", alter_result[0][6]) + // Test analyze after new empty partition created. + sql """CREATE TABLE `part` ( + `id` INT NULL, + `colint` INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + PARTITION BY RANGE(`id`) + (PARTITION p1 VALUES [("-2147483648"), ("10000")), + PARTITION p2 VALUES [("10000"), ("20000"))) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """analyze table part with sync;""" + sql """Insert into part values (1, 1), (10001, 10001);""" + sql """analyze table part with sync;""" + sleep(1000) + sql """alter table part add partition p3 VALUES [("20000"), ("30000"));""" + sql """analyze table part with sync;""" + sql """analyze table part with sync;""" + def new_part_result = sql """show column stats part(id)""" + assertEquals("2.0", new_part_result[0][2]) + new_part_result = sql """show column stats part(colint)""" + assertEquals("2.0", new_part_result[0][2]) sql """DROP DATABASE IF EXISTS trigger""" }