-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Immediately throw Exception in mailbox executor rather than wait until next record in flink sink writer. #736
base: main
Are you sure you want to change the base?
Conversation
…ait until next record.
@wuchong @swuferhong , CC |
@@ -123,6 +130,18 @@ public void write(RowData value, Context context) throws IOException, Interrupte | |||
|
|||
InternalRow internalRow = sinkRow.replace(value); | |||
CompletableFuture<?> writeFuture = writeRow(value.getRowKind(), internalRow); | |||
writeFuture.whenComplete( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering what's the problem if checkAsyncException in writing new rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons:
- case 1: The KafkaWriter flushing needs to wait for all in-flight records to send successfully. What if one record is stuck for some reason, the job will be stuck.
- case 2: If data is little, the failover will be thrown at a long time later. (if the former is a window, a heavy calculate will be replay.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an IT case that only insert one row into a shutdown cluster and expects job failover.
@@ -16,7 +16,7 @@ | |||
|
|||
# Set root logger level to OFF to not flood build logs | |||
# set manually to INFO for debugging purposes | |||
rootLogger.level = OFF | |||
rootLogger.level = DEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert it
} | ||
|
||
// Checking for exceptions from previous writes | ||
// Checking for exceptions from previous writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the duplicated comment
// Checking for exceptions from previous writes | ||
mailboxExecutor.execute(this::checkAsyncException, "Update error metric"); | ||
} | ||
}); | ||
writeFuture.exceptionally( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed now?
} | ||
|
||
private void testExceptionWhenFlussUnavailable( | ||
BiConsumer<FlinkSinkWriter, MailboxExecutor> actionAfterFlussUnavailable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actionAfterFlussUnavailable
is not used.
BiConsumer<FlinkSinkWriter, MailboxExecutor> actionAfterFlussUnavailable) | ||
throws Exception { | ||
FlussClusterExtension flussClusterExtension = FlussClusterExtension.builder().build(); | ||
flussClusterExtension.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use try ... catch ... finally
to make sure the flussClusterExtension
is closed if any exception is thrown in the middle.
testExceptionWhenFlussUnavailable( | ||
(writer, mailboxExecutor) -> { | ||
writer.getTableWriter().flush(); | ||
writer.getTableWriter().flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the duplicated flush
testExceptionWhenFlussUnavailable( | ||
(writer, mailboxExecutor) -> { | ||
writer.getTableWriter().flush(); | ||
assertThatThrownBy(() -> writer.flush(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comment for the tests that why the lines will throw exception, but tableWriter#flush()
doesn't.
Purpose
Linked issue: close #735
Brief change log
Tests
Add some failover test in FlinkSinkWriterTest.
API and Format
Documentation