Skip to content

Commit

Permalink
[improve]improve group commit logic (apache#413)
Browse files Browse the repository at this point in the history
(cherry picked from commit ba21aab)
  • Loading branch information
vinlee19 authored and PeatBoy committed Jan 21, 2025
1 parent bd99ed3 commit 054a31e
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -122,7 +123,11 @@ public DorisBatchStreamLoad(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT);
this.enableGroupCommit =
loadProps.containsKey(GROUP_COMMIT)
&& !loadProps
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
Expand Down Expand Up @@ -283,7 +288,12 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
Throwable resEx = new Throwable();
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
LOG.info("stream load started for {} on host {}", label, hostPort);
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host {}", hostPort);
} else {
LOG.info("stream load started for {} on host {}", label, hostPort);
}

try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

Expand Down Expand Up @@ -131,7 +132,11 @@ public DorisStreamLoad(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT);
this.enableGroupCommit =
streamLoadProp.containsKey(GROUP_COMMIT)
&& !streamLoadProp
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
loadBatchFirstRecord = true;
}

Expand Down Expand Up @@ -263,7 +268,16 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw

public RespContent stopLoad() throws IOException {
recordStream.endInput();
LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort);
if (enableGroupCommit) {
LOG.info("table {} stream load stopped with group commit on host {}", table, hostPort);
} else {
LOG.info(
"table {} stream load stopped for {} on host {}",
table,
currentLabel,
hostPort);
}

Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
Expand All @@ -285,7 +299,11 @@ public void startLoad(String label, boolean isResume) throws IOException {
loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
if (enableGroupCommit) {
LOG.info("table {} stream load started with group commit on host {}", table, hostPort);
} else {
LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
}
this.currentLabel = label;
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
Expand All @@ -300,18 +318,26 @@ public void startLoad(String label, boolean isResume) throws IOException {
if (enable2PC) {
putBuilder.enable2PC();
}
String finalLabel = label;

String executeMessage;
if (enableGroupCommit) {
executeMessage = "table " + table + " start execute load with group commit";
} else {
executeMessage = "table " + table + " start execute load for label " + label;
}
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info(
"table {} start execute load for label {}",
table,
finalLabel);
LOG.info(executeMessage);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
String err = "failed to stream load data with label: " + label;
String err;
if (enableGroupCommit) {
err = "failed to stream load data with group commit";
} else {
err = "failed to stream load data with label: " + label;
}
LOG.warn(err, e);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public class LoadConstants {
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
public static final String GROUP_COMMIT = "group_commit";
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class DorisSinkITCase extends DorisTestBase {
static final String TABLE_JSON_TBL = "tbl_json_tbl";
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";

Expand Down Expand Up @@ -264,6 +265,56 @@ public void testDataStreamBatch() throws Exception {
checkResult(expected, query, 2);
}

@Test
public void testTableGroupCommit() throws Exception {
initializeTable(TABLE_GROUP_COMMIT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sinkDDL =
String.format(
"CREATE TABLE doris_group_commit_sink ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'sink.label-prefix' = '"
+ UUID.randomUUID()
+ "',"
+ " 'sink.properties.column_separator' = '\\x01',"
+ " 'sink.properties.line_delimiter' = '\\x02',"
+ " 'sink.properties.group_commit' = 'sync_mode',"
+ " 'sink.ignore.update-before' = 'false',"
+ " 'sink.enable.batch-mode' = 'true',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.flush.queue-size' = '2',"
+ " 'sink.buffer-flush.max-rows' = '3',"
+ " 'sink.buffer-flush.max-bytes' = '5000',"
+ " 'sink.buffer-flush.interval' = '10s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_GROUP_COMMIT,
USERNAME,
PASSWORD);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(
"INSERT INTO doris_group_commit_sink SELECT 'doris',1 union all SELECT 'group_commit',2 union all SELECT 'flink',3");

Thread.sleep(25000);
List<String> expected = Arrays.asList("doris,1", "flink,3", "group_commit,2");
String query =
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_GROUP_COMMIT);
//
checkResult(expected, query, 2);
}

@Test
public void testJobManagerFailoverSink() throws Exception {
initializeFailoverTable(TABLE_CSV_JM);
Expand Down

0 comments on commit 054a31e

Please sign in to comment.