Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Engine] Improve Engine performance. #3216

Merged
merged 13 commits into from
Nov 3, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void assignPendingSplits() {
// Mark pending splits as already assigned
assignedSplits.addAll(pendingAssignmentForReader);
// Assign pending splits to reader
LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
LOG.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader);
enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
enumeratorContext.signalNoMoreSplits(pendingReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;

import org.slf4j.Logger;
Expand All @@ -47,7 +48,7 @@

public class JdbcInputFormat implements Serializable {

protected static final long serialVersionUID = 2L;
private static final long serialVersionUID = 2L;
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);

protected JdbcConnectionProvider connectionProvider;
Expand All @@ -63,19 +64,22 @@ public class JdbcInputFormat implements Serializable {

protected boolean hasNext;

protected JdbcDialect jdbcDialect;

public JdbcInputFormat(JdbcConnectionProvider connectionProvider,
JdbcRowConverter jdbcRowConverter,
JdbcDialect jdbcDialect,
SeaTunnelRowType typeInfo,
String queryTemplate,
int fetchSize,
Boolean autoCommit
) {
this.connectionProvider = connectionProvider;
this.jdbcRowConverter = jdbcRowConverter;
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.typeInfo = typeInfo;
this.queryTemplate = queryTemplate;
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
this.jdbcDialect = jdbcDialect;
}

public void openInputFormat() {
Expand All @@ -89,10 +93,7 @@ public void openInputFormat() {
dbConn.setAutoCommit(autoCommit);
}

statement = dbConn.prepareStatement(queryTemplate);
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
statement.setFetchSize(fetchSize);
}
statement = jdbcDialect.creatPreparedStatement(dbConn, queryTemplate, fetchSize);
} catch (SQLException se) {
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
} catch (ClassNotFoundException cnfe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunn
for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
Object seatunnelField;
SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
if (null == rs.getObject(i)) {
seatunnelField = null;
}
else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getBoolean(i);
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getByte(i);
Expand Down Expand Up @@ -88,7 +85,9 @@ else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
} else {
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
}

if (rs.wasNull()) {
seatunnelField = null;
}
fields.add(seatunnelField);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable
Expand All @@ -45,8 +49,22 @@ public interface JdbcDialect extends Serializable {

/**
* get jdbc meta-information type to seatunnel data type mapper.
*
* @return a type mapper for the database
*/
JdbcDialectTypeMapper getJdbcDialectTypeMapper();

/**
* Different dialects optimize their PreparedStatement
*
* @return The logic about optimize PreparedStatement
*/
default PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {
PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
statement.setFetchSize(fetchSize);
}
return statement;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class MysqlDialect implements JdbcDialect {
@Override
public String dialectName() {
Expand All @@ -36,4 +41,11 @@ public JdbcRowConverter getRowConverter() {
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new MySqlTypeMapper();
}

@Override
public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {
PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE);
return statement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {

inputFormat = new JdbcInputFormat(
jdbcConnectionProvider,
jdbcDialect.getRowConverter(),
jdbcDialect,
typeInfo,
query,
0,
Expand Down Expand Up @@ -147,7 +147,7 @@ private SeaTunnelRowType initTableField(Connection conn) {
} catch (Exception e) {
LOG.warn("get row type info exception", e);
}
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
return new SeaTunnelRowType(fieldNames.toArray(new String[0]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}

private PartitionParameter initPartitionParameter(String columnName, Connection connection) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedExc
* @param jobMode jobMode
* @param rowNumber row.num per FakeSource parallelism
* @param parallelism FakeSource parallelism
* @return
*/
private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName, @NonNull JobMode jobMode,
long rowNumber, int parallelism) {
Expand Down Expand Up @@ -216,7 +215,7 @@ public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedEx
return clientJobProxy.waitForJobComplete();
});

Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
Awaitility.await().atMost(3, TimeUnit.MINUTES)
.untilAsserted(() -> {
Thread.sleep(2000);
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class SeatunnelChildFirstClassLoader extends SeatunnelBaseClassLoader {
private final String[] alwaysParentFirstPatterns;
private static final String[] DEFAULT_PARENT_FIRST_PATTERNS = new String[]{
"java.",
"javax.xml",
"org.xml",
"org.w3c",
"org.apache.hadoop",
"scala.",
"org.apache.seatunnel.",
"javax.annotation.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public ImmutablePair<List<Action>, Set<URL>> parse() {
complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
}
actions.forEach(this::addCommonPluginJarsToAction);
jarUrlsSet.addAll(commonPluginJars);
return new ImmutablePair<>(actions, jarUrlsSet);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public PassiveCompletableFuture<TaskExecutionState> deployTask(
return new PassiveCompletableFuture<>(resultFuture);
}

@Deprecated
public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
@NonNull TaskGroup taskGroup,
@NonNull CompletableFuture<TaskExecutionState> resultFuture) {
Expand Down Expand Up @@ -272,7 +273,8 @@ private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {

@Override
public void run() {
ClassLoader classLoader = executionContexts.get(tracker.taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker = tracker.taskGroupExecutionTracker;
ClassLoader classLoader = executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
final Task t = tracker.task;
Expand All @@ -283,12 +285,12 @@ public void run() {
do {
result = t.call();
} while (!result.isDone() && isRunning &&
!tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
!taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
tracker.taskGroupExecutionTracker.exception(e);
taskGroupExecutionTracker.exception(e);
} finally {
tracker.taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone();
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.hazelcast.cluster.Address;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
Expand Down Expand Up @@ -218,8 +217,6 @@ public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
}

protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
Address address =
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId());
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ protected void readInternal(ObjectDataInput in) throws IOException {
public void run() throws Exception {
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
Task task = server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
.getTaskGroup().getTask(taskLocation.getTaskID());
try {
Task task = server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
.getTaskGroup().getTask(taskLocation.getTaskID());
if (successful) {
task.notifyCheckpointComplete(checkpointId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,22 @@ public int getClassId() {
public void writeData(ObjectDataOutput out) throws IOException {
out.writeObject(taskGroupLocation);
out.writeLong(taskID);
out.writeInt(index);
}

@Override
public void readData(ObjectDataInput in) throws IOException {
taskGroupLocation = in.readObject();
taskID = in.readLong();
index = in.readInt();
}

@Override
public String toString() {
return "TaskLocation{" +
"taskGroupLocation=" + taskGroupLocation +
", taskID=" + taskID +
", index=" + index +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public ProgressState call() throws Exception {
return progress.toState();
}

@SuppressWarnings("checkstyle:MagicNumber")
protected void stateProcess() throws Exception {
switch (currState) {
case INIT:
Expand All @@ -142,6 +143,8 @@ protected void stateProcess() throws Exception {
case RUNNING:
if (prepareCloseStatus) {
currState = PREPARE_CLOSE;
} else {
Thread.sleep(100);
}
break;
case PREPARE_CLOSE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public void readerFinished(long taskID) {
}
}

@SuppressWarnings("checkstyle:MagicNumber")
private void stateProcess() throws Exception {
switch (currState) {
case INIT:
Expand All @@ -233,8 +234,11 @@ private void stateProcess() throws Exception {
case RUNNING:
// The reader closes automatically after reading
if (prepareCloseStatus) {
// TODO we should trigger this after CheckpointCoordinator done
triggerBarrier(Barrier.completedBarrier());
currState = PREPARE_CLOSE;
} else {
Thread.sleep(100);
}
break;
case PREPARE_CLOSE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle implements OneInputFlowLifeCycle<Record<?>>,
OneOutputFlowLifeCycle<Record<?>> {
Expand All @@ -48,10 +49,11 @@ public void received(Record<?> record) {
}
}

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void collect(Collector<Record<?>> collector) throws Exception {
while (true) {
Record<?> record = queue.poll();
Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
if (record != null) {
handleRecord(record, collector::collect);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ public void received(Record<?> record) {
writer.abortPrepare();
throw e;
}
List<StateT> states = writer.snapshotState(barrier.getId());
if (!writerStateSerializer.isPresent()) {
runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
} else {
List<StateT> states = writer.snapshotState(barrier.getId());
runningTask.addState(barrier, sinkAction.getId(), serializeStates(writerStateSerializer.get(), states));
}
if (containAggCommitter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hazelcast.spi.impl.operationservice.Operation;

import java.io.IOException;
import java.util.Objects;

public class GetTaskGroupAddressOperation extends Operation implements IdentifiedDataSerializable {

Expand All @@ -50,7 +51,7 @@ public void run() throws Exception {
response = RetryUtils.retryWithException(() -> server.getCoordinatorService().getJobMaster(taskLocation.getJobId())
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
exception -> exception instanceof Exception, Constant.OPERATION_RETRY_SLEEP));
Objects::nonNull, Constant.OPERATION_RETRY_SLEEP));
}

@Override
Expand Down
Loading