Skip to content

Commit

Permalink
[refactor](schema change) refact fe light schema change. (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei authored and Lchangliang committed Jul 6, 2022
1 parent 4ad2baa commit 4919302
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 185 deletions.
7 changes: 6 additions & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ Status DataDir::load() {
<< " schema hash: " << rowset_meta->tablet_schema_hash()
<< " for txn: " << rowset_meta->txn_id();
}
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
Status publish_status = tablet->add_rowset(rowset);
Expand Down Expand Up @@ -522,7 +527,7 @@ Status DataDir::load() {
tablet->tablet_meta());
}
}

// At startup, we only count these invalid rowset, but do not actually delete it.
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
// which is cleaned up uniformly by the background cleanup thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
Expand All @@ -70,6 +71,7 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand Down Expand Up @@ -1727,8 +1729,7 @@ public int getAsInt() {
if (ligthSchemaChange) {
long jobId = Catalog.getCurrentCatalog().getNextId();
//for schema change add/drop value column optimize, direct modify table meta.
Catalog.getCurrentCatalog()
.modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false);
modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false);
return;
} else {
createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
Expand Down Expand Up @@ -2084,4 +2085,179 @@ public void replayAlterJobV2(AlterJobV2 alterJob) {
}
super.replayAlterJobV2(alterJob);
}

// the invoker should keep table's write lock
public void modifyTableAddOrDropColumns(Database db, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap,
List<Index> indexes, long jobId, boolean isReplay) throws DdlException {

LOG.debug("indexSchemaMap:{}, indexes:{}", indexSchemaMap, indexes);
if (olapTable.getState() == OlapTableState.ROLLUP) {
throw new DdlException("Table[" + olapTable.getName() + "]'s is doing ROLLUP job");
}

// for now table's state can only be NORMAL
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name());

// for bitmapIndex
boolean hasIndexChange = false;
Set<Index> newSet = new HashSet<>(indexes);
Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
if (!newSet.equals(oriSet)) {
hasIndexChange = true;
}

// begin checking each table
// ATTN: DO NOT change any meta in this loop
Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
for (Long alterIndexId : indexSchemaMap.keySet()) {
// Must get all columns including invisible columns.
// Because in alter process, all columns must be considered.
List<Column> alterSchema = indexSchemaMap.get(alterIndexId);

LOG.debug("index[{}] is changed. start checking...", alterIndexId);
// 1. check order: a) has key; b) value after key
boolean meetValue = false;
boolean hasKey = false;
for (Column column : alterSchema) {
if (column.isKey() && meetValue) {
throw new DdlException(
"Invalid column order. value should be after key. index[" + olapTable.getIndexNameById(
alterIndexId) + "]");
}
if (!column.isKey()) {
meetValue = true;
} else {
hasKey = true;
}
}
if (!hasKey) {
throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
}

// 2. check partition key
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
List<Column> partitionColumns = partitionInfo.getPartitionColumns();
for (Column partitionCol : partitionColumns) {
boolean found = false;
for (Column alterColumn : alterSchema) {
if (alterColumn.nameEquals(partitionCol.getName(), true)) {
found = true;
break;
}
} // end for alterColumns

if (!found && alterIndexId == olapTable.getBaseIndexId()) {
// 2.1 partition column cannot be deleted.
throw new DdlException(
"Partition column[" + partitionCol.getName() + "] cannot be dropped. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
} // end for partitionColumns
}

// 3. check distribution key:
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
if (distributionInfo.getType() == DistributionInfoType.HASH) {
List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
for (Column distributionCol : distributionColumns) {
boolean found = false;
for (Column alterColumn : alterSchema) {
if (alterColumn.nameEquals(distributionCol.getName(), true)) {
found = true;
break;
}
} // end for alterColumns
if (!found && alterIndexId == olapTable.getBaseIndexId()) {
// 2.2 distribution column cannot be deleted.
throw new DdlException(
"Distribution column[" + distributionCol.getName() + "] cannot be dropped. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
} // end for distributionCols
}

// 5. store the changed columns for edit log
changedIndexIdToSchema.put(alterIndexId, alterSchema);

LOG.debug("schema change[{}-{}-{}] check pass.", db.getId(), olapTable.getId(), alterIndexId);
} // end for indices

if (changedIndexIdToSchema.isEmpty() && !hasIndexChange) {
throw new DdlException("Nothing is changed. please check your alter stmt.");
}

//update base index schema
long baseIndexId = olapTable.getBaseIndexId();
List<Long> indexIds = new ArrayList<Long>();
indexIds.add(baseIndexId);
indexIds.addAll(olapTable.getIndexIdListExceptBaseIndex());
for (int i = 0; i < indexIds.size(); i++) {
List<Column> indexSchema = indexSchemaMap.get(indexIds.get(i));
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(indexIds.get(i));
currentIndexMeta.setSchema(indexSchema);

int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
int newSchemaVersion = currentSchemaVersion + 1;
currentIndexMeta.setSchemaVersion(newSchemaVersion);
// generate schema hash for new index has to generate a new schema hash not equal to current schema hash
int currentSchemaHash = currentIndexMeta.getSchemaHash();
int newSchemaHash = Util.generateSchemaHash();
while (currentSchemaHash == newSchemaHash) {
newSchemaHash = Util.generateSchemaHash();
}
currentIndexMeta.setSchemaHash(newSchemaHash);
}
olapTable.setIndexes(indexes);
olapTable.rebuildFullSchema();

//update max column unique id
int maxColUniqueId = olapTable.getMaxColUniqueId();
for (Column column : indexSchemaMap.get(olapTable.getBaseIndexId())) {
if (column.getUniqueId() > maxColUniqueId) {
maxColUniqueId = column.getUniqueId();
}
}
olapTable.setMaxColUniqueId(maxColUniqueId);

if (!isReplay) {
TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(db.getId(), olapTable.getId(),
indexSchemaMap, indexes, jobId);
LOG.debug("logModifyTableAddOrDropColumns info:{}", info);
Catalog.getCurrentCatalog().getEditLog().logModifyTableAddOrDropColumns(info);
}

//for compatibility, we need create a finished state schema change job v2

SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, db.getId(), olapTable.getId(),
olapTable.getName(), 1000);
schemaChangeJob.setJobState(AlterJobV2.JobState.FINISHED);
schemaChangeJob.setFinishedTimeMs(System.currentTimeMillis());
this.addAlterJobV2(schemaChangeJob);

LOG.info("finished modify table's add or drop columns. table: {}, is replay: {}", olapTable.getName(),
isReplay);
}

public void replayModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) throws MetaNotFoundException {
LOG.debug("info:{}", info);
long dbId = info.getDbId();
long tableId = info.getTableId();
Map<Long, LinkedList<Column>> indexSchemaMap = info.getIndexSchemaMap();
List<Index> indexes = info.getIndexes();
long jobId = info.getJobId();

Database db = Catalog.getCurrentCatalog().getInternalDataSource().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, indexes, jobId, true);
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop columns", e);
} finally {
olapTable.writeUnlock();
}
}
}
Loading

0 comments on commit 4919302

Please sign in to comment.