Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Immediately throw Exception in mailbox executor rather than wait until next record in flink sink writer. #736

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

loserwang1024
Copy link
Collaborator

Purpose

Linked issue: close #735

Brief change log

Tests

Add some failover test in FlinkSinkWriterTest.

API and Format

Documentation

@loserwang1024
Copy link
Collaborator Author

@wuchong @swuferhong , CC

@@ -123,6 +130,18 @@ public void write(RowData value, Context context) throws IOException, Interrupte

InternalRow internalRow = sinkRow.replace(value);
CompletableFuture<?> writeFuture = writeRow(value.getRowKind(), internalRow);
writeFuture.whenComplete(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering what's the problem if checkAsyncException in writing new rows?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons:

  1. case 1: The KafkaWriter flushing needs to wait for all in-flight records to send successfully. What if one record is stuck for some reason, the job will be stuck.
  2. case 2: If data is little, the failover will be thrown at a long time later. (if the former is a window, a heavy calculate will be replay.)

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an IT case that only insert one row into a shutdown cluster and expects job failover.

@@ -16,7 +16,7 @@

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.level = DEBUG
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert it

}

// Checking for exceptions from previous writes
// Checking for exceptions from previous writes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the duplicated comment

// Checking for exceptions from previous writes
mailboxExecutor.execute(this::checkAsyncException, "Update error metric");
}
});
writeFuture.exceptionally(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed now?

}

private void testExceptionWhenFlussUnavailable(
BiConsumer<FlinkSinkWriter, MailboxExecutor> actionAfterFlussUnavailable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actionAfterFlussUnavailable is not used.

BiConsumer<FlinkSinkWriter, MailboxExecutor> actionAfterFlussUnavailable)
throws Exception {
FlussClusterExtension flussClusterExtension = FlussClusterExtension.builder().build();
flussClusterExtension.start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use try ... catch ... finally to make sure the flussClusterExtension is closed if any exception is thrown in the middle.

testExceptionWhenFlussUnavailable(
(writer, mailboxExecutor) -> {
writer.getTableWriter().flush();
writer.getTableWriter().flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the duplicated flush

testExceptionWhenFlussUnavailable(
(writer, mailboxExecutor) -> {
writer.getTableWriter().flush();
assertThatThrownBy(() -> writer.flush(true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comment for the tests that why the lines will throw exception, but tableWriter#flush() doesn't.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Immediately throw Exception in mailbox executor  rather than wait until next record in flink sink writer.
3 participants