Skip to content

Commit

Permalink
[fix](statistics)Fix write all 0 to column stats bug when new partiti…
Browse files Browse the repository at this point in the history
…on created. (#38394)

Fix  write all 0 to column stats bug when new partition created.
  • Loading branch information
Jibing-Li authored Jul 26, 2024
1 parent fe2fdf0 commit b8770ee
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 56 deletions.
57 changes: 31 additions & 26 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1197,44 +1197,49 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
.collect(Collectors.toSet()))) {
return true;
}
long rowCount = getRowCount();
if (rowCount > 0 && tblStats.rowCount == 0) {
// 1 Check row count.
long currentRowCount = getRowCount();
long lastAnalyzeRowCount = tblStats.rowCount;
// 1.1 Empty table -> non-empty table. Need analyze.
if (currentRowCount != 0 && lastAnalyzeRowCount == 0) {
return true;
}
// 1.2 Non-empty table -> empty table. Need analyze;
if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
return true;
}
// 1.3 Table is still empty. Not need to analyze. lastAnalyzeRowCount == 0 is always true here.
if (currentRowCount == 0) {
return false;
}
// 1.4 If row count changed more than the threshold, need analyze.
// lastAnalyzeRowCount == 0 is always false here.
double changeRate =
((double) Math.abs(currentRowCount - lastAnalyzeRowCount) / lastAnalyzeRowCount) * 100.0;
if (changeRate > (100 - StatisticsUtil.getTableStatsHealthThreshold())) {
return true;
}

// 2. Check update rows.
long updateRows = tblStats.updatedRows.get();
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
int tblHealth = StatisticsUtil.getTableHealth(currentRowCount, updateRows);
return tblHealth < StatisticsUtil.getTableStatsHealthThreshold();
}

@Override
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
Set<String> allPartitions = getPartitionNames().stream().map(this::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
Map<String, Set<String>> ret = Maps.newHashMap();
for (Column col : getSchemaAllIndexes(false)) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
ret.put(col.getName(), allPartitions);
}
return ret;
}
Map<String, Set<String>> colToPart = new HashMap<>();
Set<String> partitions = Sets.newHashSet();
// No need to filter unchanged partitions, because it may bring unexpected behavior.
// Use dummy partition to skip it.
partitions.add("Dummy Partition");
Map<String, Set<String>> colToParts = new HashMap<>();
for (Column col : getSchemaAllIndexes(false)) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = getPartitionNames().stream()
.map(this::getPartition)
.filter(Partition::hasData)
.filter(partition -> partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName)
.collect(Collectors.toSet());
colToPart.put(col.getName(), partitions);
}
return colToPart;
colToParts.put(col.getName(), partitions);
}
return colToParts;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ public enum ScheduleType {

@SerializedName("emptyJob")
public final boolean emptyJob;

@SerializedName("rowCount")
public final long rowCount;
/**
*
* Used to store the newest partition version of tbl when creating this job.
Expand All @@ -206,7 +209,8 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject) {
boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject,
long rowCount) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -244,6 +248,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.tblUpdateTime = tblUpdateTime;
this.emptyJob = emptyJob;
this.userInject = userInject;
this.rowCount = rowCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class AnalysisInfoBuilder {
private long tblUpdateTime;
private boolean emptyJob;
private boolean userInject;
private long rowCount;

public AnalysisInfoBuilder() {
}
Expand Down Expand Up @@ -103,6 +104,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
tblUpdateTime = info.tblUpdateTime;
emptyJob = info.emptyJob;
userInject = info.userInject;
rowCount = info.rowCount;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -275,12 +277,17 @@ public AnalysisInfoBuilder setUserInject(boolean userInject) {
return this;
}

public AnalysisInfoBuilder setRowCount(long rowCount) {
this.rowCount = rowCount;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject);
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -309,10 +310,13 @@ private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
Set<String> partitionNames, AnalysisType analysisType) throws DdlException {

Set<String> dummyPartitions = Sets.newHashSet();
// validateAndGetPartitions is to be deprecated, for now, use dummy partition for empty partitions.
dummyPartitions.add("Dummy Partition");
Map<String, Set<String>> columnToPartitions = columnNames.stream()
.collect(Collectors.toMap(
columnName -> columnName,
columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames)
columnName -> new HashSet<>(partitionNames == null ? dummyPartitions : partitionNames)
));

if (analysisType == AnalysisType.HISTOGRAM) {
Expand Down Expand Up @@ -405,6 +409,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
infoBuilder.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0);
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount();
infoBuilder.setRowCount(rowCount);
return infoBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
Expand All @@ -37,7 +38,6 @@

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -69,15 +69,17 @@ public void doExecute() throws Exception {
return;
}
Set<String> partitionNames = info.colToPartitions.get(info.colName);
if (StatisticsUtil.isEmptyTable(tbl, info.analysisMethod)
|| partitionNames == null || partitionNames.isEmpty()) {
if (partitionNames == null) {
LOG.warn("Table {}.{}.{}, partitionNames for column {} is null. ColToPartitions:[{}]",
info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions);
}
if (partitionNames == null || partitionNames.isEmpty()) {
LOG.warn("Table {}.{}.{}, partitionNames for column {} is null or empty. ColToPartitions:[{}]",
info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions);
throw new RuntimeException();
}
if (info.rowCount == 0 && tableSample != null) {
StatsId statsId = new StatsId(concatColumnStatsId(), info.catalogId, info.dbId,
info.tblId, info.indexId, info.colName, null);
job.appendBuf(this, Arrays.asList(new ColStatsData(statsId)));
ColStatsData colStatsData = new ColStatsData(statsId);
Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData);
job.appendBuf(this, Collections.singletonList(colStatsData));
return;
}
if (tableSample != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
Expand All @@ -33,6 +32,7 @@
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.collect.Sets;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -181,6 +181,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
List<AnalysisInfo> analysisInfos, TableIf table) {
AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount();
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
.setCatalogId(db.getCatalog().getId())
Expand All @@ -203,6 +204,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(table.getUpdateTime())
.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0)
.setRowCount(rowCount)
.build();
analysisInfos.add(jobInfo);
}
Expand All @@ -228,10 +230,12 @@ protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
Set<String> partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream()
.map(Column::getName).collect(Collectors.toSet());
colNames = partitionColumnNames.stream().collect(Collectors.joining(","));
Set<String> partitionNames = olapTable.getAllPartitions().stream()
.map(Partition::getName).collect(Collectors.toSet());
Set<String> partitions = Sets.newHashSet();
// No need to filter unchanged partitions, because it may bring unexpected behavior.
// Use dummy partition to skip it.
partitions.add("Dummy Partition");
for (String column : partitionColumnNames) {
needRunPartitions.put(column, partitionNames);
needRunPartitions.put(column, partitions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
jobType = analyzedJob.jobType;
if (tableIf != null) {
if (tableIf instanceof OlapTable) {
rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount();
rowCount = analyzedJob.rowCount;
}
if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public void testReAnalyze() {
new MockUp<OlapTable>() {

int count = 0;
int[] rowCount = new int[]{100, 100, 200, 200, 1, 1};
int[] rowCount = new int[]{100, 200, 1, 0, 0, 100};

final Column c = new Column("col1", PrimitiveType.INT);
@Mock
Expand All @@ -298,20 +298,35 @@ public List<Column> getColumns() {
OlapTable olapTable = new OlapTable();
TableStatsMeta stats1 = new TableStatsMeta(
50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>())
.setColName("col1").build(), olapTable);
stats1.updatedRows.addAndGet(50);
.setColName("col1").setRowCount(100).build(), olapTable);
stats1.updatedRows.addAndGet(70);

Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
TableStatsMeta stats2 = new TableStatsMeta(
190, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setColName("col1").build(), olapTable);
.setColToPartitions(new HashMap<>()).setColName("col1").setRowCount(200).build(), olapTable);
stats2.updatedRows.addAndGet(20);
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));

TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable);
.setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1")
.setRowCount(0).build(), olapTable);
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3));

TableStatsMeta stats4 = new TableStatsMeta(0, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1")
.setRowCount(1).build(), olapTable);
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats4));

TableStatsMeta stats5 = new TableStatsMeta(0, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1")
.setRowCount(0).build(), olapTable);
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats5));

TableStatsMeta stats6 = new TableStatsMeta(0, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1")
.setRowCount(30).build(), olapTable);
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats6));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.doris.catalog.OlapTable;

import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -31,15 +29,9 @@ class TableStatsMetaTest {

@Test
void update(@Mocked OlapTable table) {
new MockUp<OlapTable>() {
@Mock
public long getRowCount() {
return 4;
}
};
TableStatsMeta tableStatsMeta = new TableStatsMeta();
AnalysisInfo jobInfo = new AnalysisInfoBuilder().setColToPartitions(new HashMap<>())
.setColName("col1").build();
.setColName("col1").setRowCount(4).build();
tableStatsMeta.update(jobInfo, table);
Assertions.assertEquals(4, tableStatsMeta.rowCount);
}
Expand Down
27 changes: 27 additions & 0 deletions regression-test/suites/statistics/analyze_stats.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,33 @@ PARTITION `p599` VALUES IN (599)
assertEquals("521779.0", alter_result[0][5])
assertEquals("7.142863009760572", alter_result[0][6])

// Test analyze after new empty partition created.
sql """CREATE TABLE `part` (
`id` INT NULL,
`colint` INT NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
PARTITION BY RANGE(`id`)
(PARTITION p1 VALUES [("-2147483648"), ("10000")),
PARTITION p2 VALUES [("10000"), ("20000")))
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """analyze table part with sync;"""
sql """Insert into part values (1, 1), (10001, 10001);"""
sql """analyze table part with sync;"""
sleep(1000)
sql """alter table part add partition p3 VALUES [("20000"), ("30000"));"""
sql """analyze table part with sync;"""
sql """analyze table part with sync;"""
def new_part_result = sql """show column stats part(id)"""
assertEquals("2.0", new_part_result[0][2])
new_part_result = sql """show column stats part(colint)"""
assertEquals("2.0", new_part_result[0][2])

sql """DROP DATABASE IF EXISTS trigger"""
}
Expand Down

0 comments on commit b8770ee

Please sign in to comment.