Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve]improve group commit logic #413

Merged
merged 7 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -122,7 +123,11 @@ public DorisBatchStreamLoad(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT);
this.enableGroupCommit =
loadProps.containsKey(GROUP_COMMIT)
&& !loadProps
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
Expand Down Expand Up @@ -283,7 +288,12 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
Throwable resEx = new Throwable();
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
LOG.info("stream load started for {} on host {}", label, hostPort);
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host {}", hostPort);
} else {
LOG.info("stream load started for {} on host {}", label, hostPort);
}

try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -131,7 +132,11 @@ public DorisStreamLoad(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT);
this.enableGroupCommit =
streamLoadProp.containsKey(GROUP_COMMIT)
&& !streamLoadProp
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
loadBatchFirstRecord = true;
}

Expand Down Expand Up @@ -263,7 +268,16 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw

public RespContent stopLoad() throws IOException {
recordStream.endInput();
LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort);
if (enableGroupCommit) {
LOG.info("table {} stream load stopped with group commit on host {}", table, hostPort);
} else {
LOG.info(
"table {} stream load stopped for {} on host {}",
table,
currentLabel,
hostPort);
}

Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
Expand All @@ -285,7 +299,11 @@ public void startLoad(String label, boolean isResume) throws IOException {
loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
if (enableGroupCommit) {
LOG.info("table {} stream load started with group commit on host {}", table, hostPort);
} else {
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
}
this.currentLabel = label;
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
Expand All @@ -300,18 +318,26 @@ public void startLoad(String label, boolean isResume) throws IOException {
if (enable2PC) {
putBuilder.enable2PC();
}
String finalLabel = label;

String executeMessage;
if (enableGroupCommit) {
executeMessage = "table " + table + " start execute load with group commit";
} else {
executeMessage = "table " + table + " start execute load for label " + label;
}
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info(
"table {} start execute load for label {}",
table,
finalLabel);
LOG.info(executeMessage);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
String err = "failed to stream load data with label: " + label;
String err;
if (enableGroupCommit) {
err = "failed to stream load data with group commit";
} else {
err = "failed to stream load data with label: " + label;
}
LOG.warn(err, e);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public class LoadConstants {
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
public static final String GROUP_COMMIT = "group_commit";
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class DorisSinkITCase extends DorisTestBase {
static final String TABLE_JSON_TBL = "tbl_json_tbl";
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";

Expand Down Expand Up @@ -264,6 +265,56 @@ public void testDataStreamBatch() throws Exception {
checkResult(expected, query, 2);
}

@Test
public void testTableGroupCommit() throws Exception {
initializeTable(TABLE_GROUP_COMMIT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sinkDDL =
String.format(
"CREATE TABLE doris_group_commit_sink ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'sink.label-prefix' = '"
+ UUID.randomUUID()
+ "',"
+ " 'sink.properties.column_separator' = '\\x01',"
+ " 'sink.properties.line_delimiter' = '\\x02',"
+ " 'sink.properties.group_commit' = 'sync_mode',"
+ " 'sink.ignore.update-before' = 'false',"
+ " 'sink.enable.batch-mode' = 'true',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.flush.queue-size' = '2',"
+ " 'sink.buffer-flush.max-rows' = '3',"
+ " 'sink.buffer-flush.max-bytes' = '5000',"
+ " 'sink.buffer-flush.interval' = '10s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_GROUP_COMMIT,
USERNAME,
PASSWORD);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(
"INSERT INTO doris_group_commit_sink SELECT 'doris',1 union all SELECT 'group_commit',2 union all SELECT 'flink',3");

Thread.sleep(25000);
List<String> expected = Arrays.asList("doris,1", "flink,3", "group_commit,2");
String query =
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_GROUP_COMMIT);
//
checkResult(expected, query, 2);
}

@Test
public void testJobManagerFailoverSink() throws Exception {
initializeFailoverTable(TABLE_CSV_JM);
Expand Down
Loading