Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] improve batch mode when streamload failed #560

Merged
merged 1 commit into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public DorisBatchStreamLoad(
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);

getLock(bufferKey).readLock().lock();
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
Expand All @@ -198,6 +199,7 @@ public void writeRecord(String database, String table, byte[] record) {
int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
getLock(bufferKey).readLock().unlock();

if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
Expand Down Expand Up @@ -258,8 +260,9 @@ private synchronized boolean doFlush(
}

private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
if (bufferMap.isEmpty()) {
if (!waitUtilDone && bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
LOG.info("bufferMap is empty, no need to flush {}", bufferKey);
return false;
}
if (null == bufferKey) {
Expand Down Expand Up @@ -295,6 +298,7 @@ private synchronized void flushBuffer(String bufferKey) {
getLock(bufferKey).writeLock().unlock();
}
if (buffer == null) {
LOG.info("buffer key is not exist {}, skipped", bufferKey);
return;
}
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
Expand All @@ -312,6 +316,9 @@ private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
} catch (InterruptedException e) {
throw new RuntimeException("Failed to put record buffer to flush queue");
}
// When the load thread reports an error, the flushQueue will be cleared,
// and need to force a check for the exception.
checkFlushException();
}

private void checkFlushException() {
Expand All @@ -321,7 +328,9 @@ private void checkFlushException() {
}

private void waitAsyncLoadFinish() {
for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) {
// Because the queue will have a drainTo operation, it needs to be multiplied by 2
for (int i = 0; i < executionOptions.getFlushQueueSize() * 2 + 1; i++) {
// eof buffer
BatchRecordBuffer empty = new BatchRecordBuffer();
putRecordToFlushQueue(empty);
}
Expand All @@ -335,8 +344,6 @@ public void close() {
// close async executor
this.loadExecutorService.shutdown();
this.started.set(false);
// clear buffer
this.flushQueue.clear();
}

@VisibleForTesting
Expand Down Expand Up @@ -407,10 +414,14 @@ public void run() {
recordList.clear();
try {
BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
if (buffer == null || buffer.getLabelName() == null) {
// label is empty and does not need to load. It is the flag of waitUtilDone
if (buffer == null) {
continue;
}
if (buffer.getLabelName() == null) {
// When the label is empty, it is the eof buffer for checkpoint flush.
continue;
}

recordList.add(buffer);
boolean merge = false;
if (!flushQueue.isEmpty()) {
Expand All @@ -424,6 +435,7 @@ public void run() {
if (!merge) {
for (BatchRecordBuffer bf : recordList) {
if (bf == null || bf.getLabelName() == null) {
// When the label is empty, it's eof buffer for checkpointFlush.
continue;
}
load(bf.getLabelName(), bf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.batch;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand Down Expand Up @@ -67,6 +68,12 @@ public DorisBatchWriter(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {

long restoreCheckpointId =
initContext
.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
LOG.info("restore from checkpointId {}", restoreCheckpointId);
if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(
Expand All @@ -75,6 +82,7 @@ public DorisBatchWriter(
this.database = tableInfo[0];
this.table = tableInfo[1];
}

LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.subtaskId = initContext.getSubtaskId();
this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
Expand Down Expand Up @@ -130,12 +138,13 @@ public void flush(boolean flush) throws IOException, InterruptedException {

@Override
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
// nothing to commit
checkFlushException();
return Collections.emptyList();
}

@Override
public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
checkFlushException();
return new ArrayList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -124,17 +123,10 @@ public void testLoadFail() throws Exception {
when(httpClientBuilder.build()).thenReturn(httpClient);
when(httpClient.execute(any())).thenReturn(response);
loader.writeRecord("db", "tbl", "1,data".getBytes());
loader.checkpointFlush();

TestUtil.waitUntilCondition(
() -> !loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(20)),
100L,
"testLoadFail wait loader exit failed." + loader.isLoadThreadAlive());
AtomicReference<Throwable> exception = loader.getException();
Assert.assertTrue(exception.get() instanceof Exception);
Assert.assertTrue(exception.get().getMessage().contains("stream load error"));
LOG.info("testLoadFail end");
thrown.expect(Exception.class);
thrown.expectMessage("stream load error");
loader.checkpointFlush();
}

@Test
Expand Down Expand Up @@ -175,17 +167,10 @@ public void testLoadError() throws Exception {
when(httpClientBuilder.build()).thenReturn(httpClient);
when(httpClient.execute(any())).thenReturn(response);
loader.writeRecord("db", "tbl", "1,data".getBytes());
loader.checkpointFlush();

TestUtil.waitUntilCondition(
() -> !loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(20)),
100L,
"testLoadError wait loader exit failed." + loader.isLoadThreadAlive());
AtomicReference<Throwable> exception = loader.getException();
Assert.assertTrue(exception.get() instanceof Exception);
Assert.assertTrue(exception.get().getMessage().contains("stream load error"));
LOG.info("testLoadError end");
thrown.expect(Exception.class);
thrown.expectMessage("stream load error");
loader.checkpointFlush();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void testInit() {
.build();
thrown.expect(IllegalStateException.class);
thrown.expectMessage("tableIdentifier input error");
DorisBatchWriter batchWriter = new DorisBatchWriter(null, null, options, null, null);
Sink.InitContext initContext = mock(Sink.InitContext.class);
DorisBatchWriter batchWriter = new DorisBatchWriter(initContext, null, options, null, null);
}

@Test
Expand Down