Skip to content

Commit

Permalink
[Enhancement] Exit getLabelState() if retry for more than a certain t…
Browse files Browse the repository at this point in the history
…ime (#212)

Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
  • Loading branch information
banmoy committed May 5, 2023
1 parent ec217b4 commit c053349
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

package com.starrocks.connector.flink.manager;

import com.alibaba.fastjson.JSON;
import com.starrocks.connector.flink.row.sink.StarRocksDelimiterParser;
import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;

import com.alibaba.fastjson.JSON;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
Expand Down Expand Up @@ -60,6 +60,7 @@ public class StarRocksStreamLoadVisitor implements Serializable {
private final String[] fieldNames;
private long pos;
private boolean __opAutoProjectionInJson;
private long checkLabelTimeoutSecond;
private static final String RESULT_FAILED = "Fail";
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
Expand All @@ -72,6 +73,12 @@ public StarRocksStreamLoadVisitor(StarRocksSinkOptions sinkOptions, String[] fie
this.fieldNames = fieldNames;
this.sinkOptions = sinkOptions;
this.__opAutoProjectionInJson = __opAutoProjectionInJson;
String configuredTimeout = sinkOptions.getSinkStreamLoadProperties().get("timeout");
if (configuredTimeout != null) {
this.checkLabelTimeoutSecond = Integer.parseInt(configuredTimeout);
} else {
this.checkLabelTimeoutSecond = 600;
}
}

public Map<String, Object> doStreamLoad(StarRocksSinkBufferEntity bufferEntity) throws IOException {
Expand Down Expand Up @@ -112,10 +119,17 @@ public Map<String, Object> doStreamLoad(StarRocksSinkBufferEntity bufferEntity)

@SuppressWarnings("unchecked")
private void checkLabelState(String host, String label) throws IOException {
int idx = 0;
while(true) {
int totalSleepSecond = 0;
String lastState = null;
for (int sleepSecond = 0;;sleepSecond++) {
if (totalSleepSecond >= checkLabelTimeoutSecond) {
LOG.error("Fail to get expected load state because of timeout, label: {}, current state {}", label, lastState);
throw new StreamLoadFailException(String.format("Could not get expected load state because of timeout, " +
"label: %s, current state: %s", label, lastState));
}
totalSleepSecond += sleepSecond;
try {
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
TimeUnit.SECONDS.sleep(Math.min(sleepSecond, 5));
} catch (InterruptedException ex) {
break;
}
Expand All @@ -137,6 +151,7 @@ private void checkLabelState(String host, String label) throws IOException {
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
}
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
lastState = labelState;
switch(labelState) {
case LAEBL_STATE_VISIBLE:
case LAEBL_STATE_COMMITTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab
streamLoadResponse.setCostNanoTime(System.nanoTime() - startNanoTime);
region.complete(streamLoadResponse);
} else if (StreamLoadConstants.RESULT_STATUS_LABEL_EXISTED.equals(status)) {
String labelState = getLabelState(host, region.getDatabase(), label, Collections.singleton(StreamLoadConstants.LABEL_STATE_PREPARE));
String labelState = getLabelState(host, region.getDatabase(), region.getTable(), label, Collections.singleton(StreamLoadConstants.LABEL_STATE_PREPARE));
if (StreamLoadConstants.LABEL_STATE_COMMITTED.equals(labelState) || StreamLoadConstants.LABEL_STATE_VISIBLE.equals(labelState)) {
streamLoadResponse.setCostNanoTime(System.nanoTime() - startNanoTime);
region.complete(streamLoadResponse);
Expand Down Expand Up @@ -369,10 +369,18 @@ protected String parseHttpResponse(String requestType, String db, String table,
}
}

protected String getLabelState(String host, String database, String label, Set<String> retryStates) throws Exception {
int idx = 0;
for (;;) {
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
protected String getLabelState(String host, String database, String table, String label, Set<String> retryStates) throws Exception {
int totalSleepSecond = 0;
String lastState = null;
for (int sleepSecond = 0;;sleepSecond++) {
if (totalSleepSecond >= 60) {
log.error("Fail to get expected load state because of timeout, db: {}, table: {}, label: {}, current state {}",
database, table, label, lastState);
throw new StreamLoadFailException(String.format("Could not get expected load state because of timeout, " +
"db: %s, table: %s, label: %s", database, table, label));
}
TimeUnit.SECONDS.sleep(Math.min(sleepSecond, 5));
totalSleepSecond += sleepSecond;
try (CloseableHttpClient client = HttpClients.createDefault()) {
String url = host + "/api/" + database + "/get_load_state?label=" + label;
HttpGet httpGet = new HttpGet(url);
Expand All @@ -397,6 +405,7 @@ protected String getLabelState(String host, String database, String label, Set<S
"label: %s, load information: %s", label, entityContent));
}

lastState = state;
if (retryStates.contains(state)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public boolean prepare(StreamLoadSnapshot.Transaction transaction) {
case StreamLoadConstants.RESULT_STATUS_TRANSACTION_NOT_EXISTED: {
// currently this could happen after timeout which is specified in http header,
// but as a protection we check the state again
String labelState = getLabelState(host, transaction.getDatabase(), transaction.getLabel(),
String labelState = getLabelState(host, transaction.getDatabase(), transaction.getTable(), transaction.getLabel(),
Collections.singleton(StreamLoadConstants.LABEL_STATE_PREPARE));
if (!StreamLoadConstants.LABEL_STATE_PREPARED.equals(labelState)) {
String errMsg = String.format("Transaction prepare failed because of unexpected state, " +
Expand Down Expand Up @@ -249,7 +249,7 @@ public boolean commit(StreamLoadSnapshot.Transaction transaction) {
// that commit the transaction repeatedly because flink job continues failover for some reason , but
// the transaction actually success, and this commit should be successful
// To reduce the dependency for the returned status type, always check the label state
String labelState = getLabelState(host, transaction.getDatabase(), transaction.getLabel(), Collections.emptySet());
String labelState = getLabelState(host, transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), Collections.emptySet());
if (StreamLoadConstants.LABEL_STATE_COMMITTED.equals(labelState) || StreamLoadConstants.LABEL_STATE_VISIBLE.equals(labelState)) {
return true;
}
Expand Down

0 comments on commit c053349

Please sign in to comment.