Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into improve_group_commit
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 committed Jul 2, 2024
2 parents 72feac5 + a24898c commit 44e3b3c
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,9 @@ public Thread newThread(Runnable r) {
public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
this.httpClientBuilder = httpClientBuilder;
}

@VisibleForTesting
public boolean isLoadThreadAlive() {
return loadThreadAlive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void checkFlushException() {
}

@VisibleForTesting
public void setBatchStageLoad(BatchStageLoad batchStageLoad) {
this.batchStageLoad = batchStageLoad;
public BatchStageLoad getBatchStageLoad() {
return batchStageLoad;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public List<SourceSchema> getSchemaList() throws Exception {
private ArrayList<Document> sampleData(MongoCollection<Document> collection, Long sampleNum) {
ArrayList<Document> query = new ArrayList<>();
query.add(new Document("$sample", new Document("size", sampleNum)));
return collection.aggregate(query).into(new ArrayList<>());
// allowDiskUse to avoid mongo 'Sort exceeded memory limit' error
return collection.aggregate(query).allowDiskUse(true).into(new ArrayList<>());
}

private static String buildConnectionString(
Expand All @@ -159,6 +160,8 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
String username = config.get(MongoDBSourceOptions.USERNAME);
String password = config.get(MongoDBSourceOptions.PASSWORD);
String database = config.get(MongoDBSourceOptions.DATABASE);
// note: just to unify job name, no other use.
config.setString("database-name", database);
String collection = config.get(MongoDBSourceOptions.COLLECTION);
if (StringUtils.isBlank(collection)) {
collection = config.get(TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,13 @@ public Map<String, Object> extractBeforeRow(JsonNode record) {
public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
JsonNode dataNode = recordRoot.get(FIELD_DATA);
Map<String, Object> rowMap = extractRow(dataNode);
String objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
String objectId;
// if user specifies the `_id` field manually, the $oid field may not exist
if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
} else {
objectId = rowMap.get(ID_FIELD).toString();
}
rowMap.put(ID_FIELD, objectId);
return rowMap;
}
Expand All @@ -135,7 +141,13 @@ private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
throws JsonProcessingException {
String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY);
JsonNode jsonNode = objectMapper.readTree(documentKey);
String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD);
String objectId;
// if user specifies the `_id` field manually, the $oid field may not exist
if (jsonNode.get(ID_FIELD).has(OID_FIELD)) {
objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD);
} else {
objectId = jsonNode.get(ID_FIELD).asText();
}
Map<String, Object> row = new HashMap<>();
row.put(ID_FIELD, objectId);
return row;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.function.SupplierWithException;

import java.util.concurrent.TimeoutException;

public class TestUtil {

public static void waitUntilCondition(
SupplierWithException<Boolean, Exception> condition,
Deadline timeout,
long retryIntervalMillis,
String errorMsg)
throws Exception {
while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
}

if (!timeout.hasTimeLeft()) {
throw new TimeoutException(errorMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.doris.flink.sink.batch;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.TestUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
Expand All @@ -42,7 +42,6 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -97,7 +96,7 @@ public void testLoadFail() throws Exception {
DorisBatchStreamLoad loader =
new DorisBatchStreamLoad(
options, readOptions, executionOptions, new LabelGenerator("label", false));
waitUntilCondition(
TestUtil.waitUntilCondition(
() -> loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
100L,
Expand Down Expand Up @@ -137,7 +136,7 @@ public void testLoadError() throws Exception {
new DorisBatchStreamLoad(
options, readOptions, executionOptions, new LabelGenerator("label", false));

waitUntilCondition(
TestUtil.waitUntilCondition(
() -> loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
100L,
Expand Down Expand Up @@ -168,20 +167,4 @@ public void after() {
backendUtilMockedStatic.close();
}
}

public static void waitUntilCondition(
SupplierWithException<Boolean, Exception> condition,
Deadline timeout,
long retryIntervalMillis,
String errorMsg)
throws Exception {
while (timeout.hasTimeLeft() && !(Boolean) condition.get()) {
long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
}

if (!timeout.hasTimeLeft()) {
throw new TimeoutException(errorMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.doris.flink.sink.copy;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.sink2.Sink;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.doris.flink.sink.TestUtil;
import org.apache.doris.flink.sink.writer.DorisWriterState;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
Expand All @@ -36,8 +37,10 @@
import org.junit.Test;
import org.junit.runners.MethodSorters;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -62,18 +65,9 @@ public void testPrepareCommit() throws Exception {
HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
when(httpClientBuilder.build()).thenReturn(httpClient);
BatchStageLoad stageLoad =
new BatchStageLoad(
dorisOptions,
readOptions,
executionOptions,
new LabelGenerator("label", true));
stageLoad.setHttpClientBuilder(httpClientBuilder);

CloseableHttpResponse uploadResponse = HttpTestUtil.getResponse("", false, true);
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse("", true, false);
when(httpClient.execute(any())).thenReturn(uploadResponse).thenReturn(preCommitResponse);

Sink.InitContext initContext = mock(Sink.InitContext.class);
// when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisCopyWriter<String> copyWriter =
Expand All @@ -83,8 +77,15 @@ public void testPrepareCommit() throws Exception {
dorisOptions,
readOptions,
executionOptions);
copyWriter.setBatchStageLoad(stageLoad);
stageLoad.setCurrentCheckpointID(1);
copyWriter.getBatchStageLoad().setHttpClientBuilder(httpClientBuilder);
copyWriter.getBatchStageLoad().setCurrentCheckpointID(1);

TestUtil.waitUntilCondition(
() -> copyWriter.getBatchStageLoad().isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
100L,
"Condition was not met in given timeout.");
Assert.assertTrue(copyWriter.getBatchStageLoad().isLoadThreadAlive());
// no data
Collection<DorisCopyCommittable> committableList = copyWriter.prepareCommit();
Assert.assertEquals(0, committableList.size());
Expand All @@ -98,18 +99,16 @@ public void testPrepareCommit() throws Exception {
DorisCopyCommittable committable = committableList.toArray(new DorisCopyCommittable[0])[0];
Assert.assertEquals("127.0.0.1:8030", committable.getHostPort());

Pattern copySql =
Pattern.compile(
"COPY INTO `db`.`table` FROM @~\\('.doris_[0-9a-f]{32}_table_0_1_0}'\\)");
// todo: compare properties
Assert.assertTrue(
committable
.getCopySQL()
.startsWith("COPY INTO `db`.`table` FROM @~('{label_table_0_1_0}')"));
Assert.assertTrue(copySql.matcher(committable.getCopySQL()).find());
copyWriter.close();
}

@Test
public void testSnapshot() throws Exception {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);

Sink.InitContext initContext = mock(Sink.InitContext.class);
// when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisCopyWriter<String> copyWriter =
Expand All @@ -119,13 +118,12 @@ public void testSnapshot() throws Exception {
dorisOptions,
readOptions,
executionOptions);
BatchStageLoad stageLoad =
new BatchStageLoad(
dorisOptions,
readOptions,
executionOptions,
new LabelGenerator("label", true));
copyWriter.setBatchStageLoad(stageLoad);
TestUtil.waitUntilCondition(
() -> copyWriter.getBatchStageLoad().isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
100L,
"Condition was not met in given timeout.");
Assert.assertTrue(copyWriter.getBatchStageLoad().isLoadThreadAlive());
List<DorisWriterState> writerStates = copyWriter.snapshotState(1);
Assert.assertTrue(writerStates.isEmpty());
copyWriter.close();
Expand Down

0 comments on commit 44e3b3c

Please sign in to comment.