Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refact](fe) remove redundant code for light schema change. #4

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1240,138 +1240,4 @@ public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request
}
return result;
}

@Override
public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request);

TStatus status = new TStatus(TStatusCode.OK);
List<TColumnDef> allColumns = new ArrayList<TColumnDef>();

Catalog catalog = Catalog.getCurrentCatalog();
try {
if (!catalog.isMaster()) {
status.setStatusCode(TStatusCode.ILLEGAL_STATE);
status.addToErrorMsgs("retry rpc request to master.");
TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns);
LOG.debug("result: {}", result);
return result;
}
TableName tableName = catalog.getTableNameByTableId(request.getTableId());
if (tableName == null) {
throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist");
}

Database db = catalog.getDbNullable(tableName.getDb());
if (db == null) {
throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist");
}

List<TColumnDef> addColumns = request.getAddColumns();
if (addColumns == null || addColumns.size() == 0) {
throw new UserException("invalid request: addColumns empty.");
}

//rpc only olap table
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP);
olapTable.writeLockOrMetaException();

try {
//3.create AddColumnsClause
List<ColumnDef> ColumnDefs = new ArrayList<ColumnDef>();
for (TColumnDef tColumnDef : addColumns) {
String comment = tColumnDef.getComment();
if (comment == null || comment.length() == 0) {
Instant ins = Instant.ofEpochSecond(1568568760);
ZonedDateTime zdt = ins.atZone(ZoneId.systemDefault());
comment = "auto change " + zdt.toString();
}

TColumnDesc tColumnDesc = tColumnDef.getColumnDesc();

String columnName = tColumnDesc.getColumnName();
TPrimitiveType tPrimitiveType = tColumnDesc.getColumnType();
int columnLength = tColumnDesc.getColumnLength();
int columnPrecision = tColumnDesc.getColumnPrecision();
int columnScale = tColumnDesc.getColumnScale();
boolean isAllowNull = tColumnDesc.isIsAllowNull();
TypeDef typeDef = new TypeDef(ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType), columnLength, columnPrecision, columnScale));
ColumnDef columnDef = new ColumnDef(columnName, typeDef, false, null, isAllowNull, DefaultValue.NOT_SET, comment, true);
ColumnDefs.add(columnDef);
}

AddColumnsClause addColumnsClause = new AddColumnsClause(ColumnDefs, null, null);

// index id -> index schema
Map<Long, List<Column>> indexSchemaMap = new HashMap<>();
for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema(true).entrySet()) {
indexSchemaMap.put(entry.getKey(), new ArrayList<>(entry.getValue()));
}
//4. call schame change function, only for dynamic table feature.
SchemaChangeHandler schemaChangeHandler = new SchemaChangeHandler();
IntSupplier colUniqueIdSupplier = new IntSupplier() {
int pendingMaxColUniqueId = olapTable.getMaxColUniqueId();
@Override
public int getAsInt() {
pendingMaxColUniqueId++;
return pendingMaxColUniqueId;
}
};
boolean ligthSchemaChange = schemaChangeHandler.processAddColumns(addColumnsClause, olapTable, indexSchemaMap, true, colUniqueIdSupplier);
if (ligthSchemaChange) {
//for schema change add column optimize, direct modify table meta.
List<Index> newIndexes = olapTable.getCopiedIndexes();
long jobId = Catalog.getCurrentCatalog().getNextId();
Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false);
} else {
throw new MetaNotFoundException("table_id " + request.getTableId() + " cannot light schema change through rpc.");
}

//5. build all columns
for (Column column : olapTable.getBaseSchema()) {
TColumnDesc desc = new TColumnDesc(column.getName(), column.getDataType().toThrift());
Integer precision = column.getOriginType().getPrecision();
if (precision != null) {
desc.setColumnPrecision(precision);
}
Integer columnLength = column.getOriginType().getColumnSize();
if (columnLength != null) {
desc.setColumnLength(columnLength);
}
Integer decimalDigits = column.getOriginType().getDecimalDigits();
if (decimalDigits != null) {
desc.setColumnScale(decimalDigits);
}
desc.setIsAllowNull(column.isAllowNull());
desc.setColUniqueId(column.getUniqueId());
TColumnDef colDef = new TColumnDef(desc);
String comment = column.getComment();
if (comment != null) {
colDef.setComment(comment);
}
allColumns.add(colDef);
}

} catch (Exception e) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getMessage());
} finally {
olapTable.writeUnlock();
}
} catch (MetaNotFoundException e) {
status.setStatusCode(TStatusCode.NOT_FOUND);
status.addToErrorMsgs(e.getMessage());
} catch (UserException e) {
status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
status.addToErrorMsgs(e.getMessage());
} catch(Exception e) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getMessage());
}

TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns);
LOG.debug("result: {}", result);
return result;
}
}
14 changes: 0 additions & 14 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ struct TColumnDesc {
4: optional i32 columnPrecision
5: optional i32 columnScale
6: optional bool isAllowNull
7: optional i32 col_unique_id = -1
}

// A column definition; used by CREATE TABLE and DESCRIBE <table> statements. A column
Expand Down Expand Up @@ -742,17 +741,6 @@ struct TWaitingTxnStatusResult {
2: optional i32 txn_status_id
}

struct TAddColumnsRequest {
1: required i64 table_id
2: required list<TColumnDef> addColumns
}

struct TAddColumnsResult {
1: required Status.TStatus status
2: required i64 table_id
3: required list<TColumnDef> allColumns
}

service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
Expand Down Expand Up @@ -794,6 +782,4 @@ service FrontendService {
Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)

TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)

TAddColumnsResult addColumns(1: TAddColumnsRequest request)
}