Skip to content

Commit

Permalink
[FLINK-37217][mysql] Fix MySqlErrorHandler TableNotFoundException U…
Browse files Browse the repository at this point in the history
…nable to obtain table correctly
  • Loading branch information
huyuanfeng committed Jan 24, 2025
1 parent 3e16a66 commit a10ecd4
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -60,12 +61,9 @@ public MySqlErrorHandler(

@Override
public void setProducerThrowable(Throwable producerThrowable) {
if (isTableNotFoundException(producerThrowable)) {
Matcher matcher =
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
Optional<TableId> notFoundTable = findNotFoundTable(producerThrowable);
if (notFoundTable.isPresent()) {
TableId tableId = notFoundTable.get();
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
Expand All @@ -86,14 +84,20 @@ public void setProducerThrowable(Throwable producerThrowable) {
super.setProducerThrowable(producerThrowable);
}

private boolean isTableNotFoundException(Throwable t) {
private Optional<TableId> findNotFoundTable(Throwable t) {
if (!(t.getCause() instanceof DebeziumException)) {
return false;
return Optional.empty();
}
DebeziumException e = (DebeziumException) t.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
return matcher.find();
if (matcher.find()) {
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
return Optional.of(new TableId(databaseName, null, tableName));
} else {
return Optional.empty();
}
}

private boolean isSchemaOutOfSyncException(Throwable t) {
Expand Down

0 comments on commit a10ecd4

Please sign in to comment.