Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Jan 2, 2025
1 parent b295f0c commit e3eff8f
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -155,6 +156,9 @@ public class SchemaChangeHandler extends AlterHandler {

public final Map<Long, IndexChangeJob> runnableIndexChangeJob = Maps.newConcurrentMap();

// schema change job should not make any progress if related table is in `blockedTableIds`
public final ConcurrentMap<Long, AtomicInteger> blockedTableIds = Maps.newConcurrentMap();

public int cycleCount = 0;

public SchemaChangeHandler() {
Expand Down Expand Up @@ -3422,4 +3426,61 @@ private void checkOrder(List<Column> targetIndexSchema, List<String> orderedColN
nameSet.add(colName);
}
}

public boolean cancelSchemaChangeJobByTableIds(List<Long> tableIdList, String msg) {
boolean hasUnfinishedAlterJobs = false;
for (long tableId : tableIdList) {
if (cancelSchemaChangeJobByTableId(tableId, msg)) {
hasUnfinishedAlterJobs = true;
}
}
return hasUnfinishedAlterJobs;
}

// NOTE: job.cancel() will take table's write lock, so this method should be outside table's lock
public boolean cancelSchemaChangeJobByTableId(long tableId, String msg) {
boolean hasUnfinishedAlterJobs = false;
List<AlterJobV2> unfinishedAlterJobs = getUnfinishedAlterJobV2ByTableId(tableId);
for (AlterJobV2 job : unfinishedAlterJobs) {
if (job.cancel(msg)) {
hasUnfinishedAlterJobs = true;
}
}
return hasUnfinishedAlterJobs;
}

public void blockTable(long tableId) {
LOG.info("block schema change job for tableId={} when doing partial update", tableId);
blockedTableIds.compute(key, (k, count) -> {
if (count == null) {
count = new AtomicInteger(0);
}
count.incrementAndGet();
return count;
});
}

public void unblockTable(long tableId) {
AtomicInteger count = blockedTableIds.get(key);
if (count == null) {
return;
}

int current = count.decrementAndGet();
if (current == 0) {
blockedTableIds.remove(key);
}
LOG.info("unblock schema change job for table={} after partial update", tableId);
}

public void unblockTables(List<Long> tableIds) {
for (long tableId : tableIds) {
unblockTable(tableId);
}
}

public boolean isTableBlocked(long tableId) {
AtomicInteger count = blockedTableIds.get(tableId);
return count == null ? false : count.get() > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ protected void createShadowIndexReplica() throws AlterCancelException {
@Override
protected void runPendingJob() throws Exception {
Preconditions.checkState(jobState == JobState.PENDING, jobState);

if (env.getSchemaChangeHandler().isTableBlocked(tableId)) {
return;
}

LOG.info("begin to send create replica tasks. job: {}", jobId);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
Expand Down Expand Up @@ -459,6 +464,11 @@ protected void addShadowIndexToCatalog(OlapTable tbl) {
@Override
protected void runWaitingTxnJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState);

if (env.getSchemaChangeHandler().isTableBlocked(tableId)) {
return;
}

try {
if (!checkFailedPreviousLoadAndAbort()) {
LOG.info("wait transactions before {} to be finished, schema change job: {}", watershedTxnId, jobId);
Expand Down Expand Up @@ -571,6 +581,10 @@ protected void runWaitingTxnJob() throws AlterCancelException {
protected void runRunningJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.RUNNING, jobState);

if (env.getSchemaChangeHandler().isTableBlocked(tableId)) {
return;
}

// must check if db or table still exist first.
// or if table is dropped, the tasks will never be finished,
// and the job will be in RUNNING state forever.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ public void complete() throws UserException {
txnState.addTableIndexes((OlapTable) targetTable);
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
txnState.cancelOrWaitForSchemaChange();
txnState.cancelSchemaChangeJob();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void generatePlan(OlapTable table) throws UserException {
if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS
|| uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
txnState.setSchemaForPartialUpdate(table);
txnState.cancelOrWaitForSchemaChange();
txnState.cancelSchemaChangeJob();
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
txnState.addTableIndexes(table);
if (isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
txnState.cancelOrWaitForSchemaChange();
txnState.cancelSchemaChangeJob();
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId loadId,
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
txnState.cancelOrWaitForSchemaChange();
txnState.cancelSchemaChangeJob();
}

return planParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void beginTransaction() {
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.cancelOrWaitForSchemaChange();
state.cancelSchemaChangeJob();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,7 @@ public void abortTransaction(long transactionId, String reason, TxnCommitAttachm
if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
clearBackendTransactions(transactionState);
}
Env.getSchemaChangeHandler().unblockTables(transactionState.getTableIdList());

LOG.info("abort transaction: {} successfully", transactionState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,25 +874,16 @@ public boolean checkSchemaCompatibility(OlapTable olapTable) {
return true;
}

public void cancelOrWaitForSchemaChange() {
public void cancelSchemaChangeJob() {
// When partial update load meets schema change on dst table, use this method to cancel
// the schema change job. This method should be called before generating plan and after begin transaction.

// NOTE: job.cancel() will take table's write lock, so this method should be outside table's lock
boolean hasUnfinishedAlterJobs = false;
String msg = "cancel schema change job because of conflicting partial update,"
+ " transactionId: " + transactionId;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (long tableId : tableIdList) {
List<AlterJobV2> unfinishedAlterJobs = Env.getCurrentEnv().getAlterInstance().getSchemaChangeHandler()
.getUnfinishedAlterJobV2ByTableId(tableId);
for (AlterJobV2 job : unfinishedAlterJobs) {
if (job.cancel(msg)) {
hasUnfinishedAlterJobs = true;
}
}
}
boolean hasUnfinishedAlterJobs = Env.getCurrentEnv().getAlterInstance().getSchemaChangeHandler()
.cancelSchemaChangeJobByTableIds(tableIdList, msg);
stopWatch.stop();
if (hasUnfinishedAlterJobs) {
LOG.info("wait for cancelling unfinished schema change task before partial update,"
Expand Down

0 comments on commit e3eff8f

Please sign in to comment.