Skip to content

Commit

Permalink
[refact](fe) remove redundant code for light schema change. (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei authored and Lchangliang committed Jun 30, 2022
1 parent 4c3ad30 commit 9169123
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1240,138 +1240,4 @@ public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request
}
return result;
}

@Override
public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request);

TStatus status = new TStatus(TStatusCode.OK);
List<TColumnDef> allColumns = new ArrayList<TColumnDef>();

Catalog catalog = Catalog.getCurrentCatalog();
try {
if (!catalog.isMaster()) {
status.setStatusCode(TStatusCode.ILLEGAL_STATE);
status.addToErrorMsgs("retry rpc request to master.");
TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns);
LOG.debug("result: {}", result);
return result;
}
TableName tableName = catalog.getTableNameByTableId(request.getTableId());
if (tableName == null) {
throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist");
}

Database db = catalog.getDbNullable(tableName.getDb());
if (db == null) {
throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist");
}

List<TColumnDef> addColumns = request.getAddColumns();
if (addColumns == null || addColumns.size() == 0) {
throw new UserException("invalid request: addColumns empty.");
}

//rpc only olap table
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP);
olapTable.writeLockOrMetaException();

try {
//3.create AddColumnsClause
List<ColumnDef> ColumnDefs = new ArrayList<ColumnDef>();
for (TColumnDef tColumnDef : addColumns) {
String comment = tColumnDef.getComment();
if (comment == null || comment.length() == 0) {
Instant ins = Instant.ofEpochSecond(1568568760);
ZonedDateTime zdt = ins.atZone(ZoneId.systemDefault());
comment = "auto change " + zdt.toString();
}

TColumnDesc tColumnDesc = tColumnDef.getColumnDesc();

String columnName = tColumnDesc.getColumnName();
TPrimitiveType tPrimitiveType = tColumnDesc.getColumnType();
int columnLength = tColumnDesc.getColumnLength();
int columnPrecision = tColumnDesc.getColumnPrecision();
int columnScale = tColumnDesc.getColumnScale();
boolean isAllowNull = tColumnDesc.isIsAllowNull();
TypeDef typeDef = new TypeDef(ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType), columnLength, columnPrecision, columnScale));
ColumnDef columnDef = new ColumnDef(columnName, typeDef, false, null, isAllowNull, DefaultValue.NOT_SET, comment, true);
ColumnDefs.add(columnDef);
}

AddColumnsClause addColumnsClause = new AddColumnsClause(ColumnDefs, null, null);

// index id -> index schema
Map<Long, List<Column>> indexSchemaMap = new HashMap<>();
for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema(true).entrySet()) {
indexSchemaMap.put(entry.getKey(), new ArrayList<>(entry.getValue()));
}
//4. call schame change function, only for dynamic table feature.
SchemaChangeHandler schemaChangeHandler = new SchemaChangeHandler();
IntSupplier colUniqueIdSupplier = new IntSupplier() {
int pendingMaxColUniqueId = olapTable.getMaxColUniqueId();
@Override
public int getAsInt() {
pendingMaxColUniqueId++;
return pendingMaxColUniqueId;
}
};
boolean ligthSchemaChange = schemaChangeHandler.processAddColumns(addColumnsClause, olapTable, indexSchemaMap, true, colUniqueIdSupplier);
if (ligthSchemaChange) {
//for schema change add column optimize, direct modify table meta.
List<Index> newIndexes = olapTable.getCopiedIndexes();
long jobId = Catalog.getCurrentCatalog().getNextId();
Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false);
} else {
throw new MetaNotFoundException("table_id " + request.getTableId() + " cannot light schema change through rpc.");
}

//5. build all columns
for (Column column : olapTable.getBaseSchema()) {
TColumnDesc desc = new TColumnDesc(column.getName(), column.getDataType().toThrift());
Integer precision = column.getOriginType().getPrecision();
if (precision != null) {
desc.setColumnPrecision(precision);
}
Integer columnLength = column.getOriginType().getColumnSize();
if (columnLength != null) {
desc.setColumnLength(columnLength);
}
Integer decimalDigits = column.getOriginType().getDecimalDigits();
if (decimalDigits != null) {
desc.setColumnScale(decimalDigits);
}
desc.setIsAllowNull(column.isAllowNull());
desc.setColUniqueId(column.getUniqueId());
TColumnDef colDef = new TColumnDef(desc);
String comment = column.getComment();
if (comment != null) {
colDef.setComment(comment);
}
allColumns.add(colDef);
}

} catch (Exception e) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getMessage());
} finally {
olapTable.writeUnlock();
}
} catch (MetaNotFoundException e) {
status.setStatusCode(TStatusCode.NOT_FOUND);
status.addToErrorMsgs(e.getMessage());
} catch (UserException e) {
status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
status.addToErrorMsgs(e.getMessage());
} catch(Exception e) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getMessage());
}

TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns);
LOG.debug("result: {}", result);
return result;
}
}
14 changes: 0 additions & 14 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ struct TColumnDesc {
4: optional i32 columnPrecision
5: optional i32 columnScale
6: optional bool isAllowNull
7: optional i32 col_unique_id = -1
}

// A column definition; used by CREATE TABLE and DESCRIBE <table> statements. A column
Expand Down Expand Up @@ -742,17 +741,6 @@ struct TWaitingTxnStatusResult {
2: optional i32 txn_status_id
}

struct TAddColumnsRequest {
1: required i64 table_id
2: required list<TColumnDef> addColumns
}

struct TAddColumnsResult {
1: required Status.TStatus status
2: required i64 table_id
3: required list<TColumnDef> allColumns
}

service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
Expand Down Expand Up @@ -794,6 +782,4 @@ service FrontendService {
Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)

TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)

TAddColumnsResult addColumns(1: TAddColumnsRequest request)
}

0 comments on commit 9169123

Please sign in to comment.