From 71105a15fbc184084b739b3ad055518b20d2ed92 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Fri, 21 Oct 2022 19:58:09 +0800 Subject: [PATCH 01/10] [Engine] [Test] improve engine performance --- .../jdbc/internal/JdbcInputFormat.java | 10 ++++++++-- .../jdbc/internal/dialect/JdbcDialect.java | 13 +++++++++++++ .../internal/dialect/mysql/MysqlDialect.java | 17 +++++++++++++++++ .../seatunnel/jdbc/source/JdbcSource.java | 3 ++- .../loader/SeatunnelChildFirstClassLoader.java | 4 ++++ .../engine/core/parse/JobConfigParser.java | 1 + .../engine/server/TaskExecutionService.java | 1 + 7 files changed, 46 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java index 1baf1725621..0f97724828b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java @@ -38,6 +38,7 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.util.function.Consumer; /** * InputFormat to read data from a database and generate Rows. The InputFormat has to be configured @@ -63,12 +64,15 @@ public class JdbcInputFormat implements Serializable { protected boolean hasNext; + protected Consumer customPreparedStatement; + public JdbcInputFormat(JdbcConnectionProvider connectionProvider, JdbcRowConverter jdbcRowConverter, SeaTunnelRowType typeInfo, String queryTemplate, int fetchSize, - Boolean autoCommit + Boolean autoCommit, + Consumer customPreparedStatement ) { this.connectionProvider = connectionProvider; this.jdbcRowConverter = jdbcRowConverter; @@ -76,6 +80,7 @@ public JdbcInputFormat(JdbcConnectionProvider connectionProvider, this.queryTemplate = queryTemplate; this.fetchSize = fetchSize; this.autoCommit = autoCommit; + this.customPreparedStatement = customPreparedStatement; } public void openInputFormat() { @@ -89,10 +94,11 @@ public void openInputFormat() { dbConn.setAutoCommit(autoCommit); } - statement = dbConn.prepareStatement(queryTemplate); + statement = dbConn.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) { statement.setFetchSize(fetchSize); } + customPreparedStatement.accept(statement); } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } catch (ClassNotFoundException cnfe) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 155d8d4c2e9..c953b60cb69 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import java.io.Serializable; +import java.sql.PreparedStatement; +import java.util.function.Consumer; /** * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable @@ -45,8 +47,19 @@ public interface JdbcDialect extends Serializable { /** * get jdbc meta-information type to seatunnel data type mapper. + * * @return a type mapper for the database */ JdbcDialectTypeMapper getJdbcDialectTypeMapper(); + /** + * Different dialects optimize their PreparedStatement + * + * @return The logic about optimize PreparedStatement + */ + default Consumer customPreparedStatement() { + return preparedStatement -> { + }; + } + } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 3e4d7715854..8c3ec7ff3be 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -17,10 +17,15 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.function.Consumer; + public class MysqlDialect implements JdbcDialect { @Override public String dialectName() { @@ -36,4 +41,16 @@ public JdbcRowConverter getRowConverter() { public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { return new MySqlTypeMapper(); } + + @Override + public Consumer customPreparedStatement() { + return preparedStatement -> { + try { + // enable mysql streaming mode + preparedStatement.setFetchSize(Integer.MIN_VALUE); + } catch (SQLException e) { + throw new SeaTunnelException(e); + } + }; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 196b4dcac03..566de8bec67 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -90,7 +90,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException { typeInfo, query, 0, - true + true, + jdbcDialect.customPreparedStatement() ); } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java index 1dc0cceb756..22eac368769 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java @@ -29,6 +29,10 @@ public class SeatunnelChildFirstClassLoader extends SeatunnelBaseClassLoader { private final String[] alwaysParentFirstPatterns; private static final String[] DEFAULT_PARENT_FIRST_PATTERNS = new String[]{ "java.", + "javax.xml", + "org.xml", + "org.w3c", + "org.apache.hadoop", "scala.", "org.apache.seatunnel.", "javax.annotation.", diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java index 92041823b21..ba4c68cf04f 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java @@ -121,6 +121,7 @@ public ImmutablePair, Set> parse() { complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs); } actions.forEach(this::addCommonPluginJarsToAction); + jarUrlsSet.addAll(commonPluginJars); return new ImmutablePair<>(actions, jarUrlsSet); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index fea05ddd465..35eeeab1a3e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -176,6 +176,7 @@ public PassiveCompletableFuture deployTask( return new PassiveCompletableFuture<>(resultFuture); } + @Deprecated public PassiveCompletableFuture deployLocalTask( @NonNull TaskGroup taskGroup, @NonNull CompletableFuture resultFuture) { From 81bee4a397cd2319e40e450c6bcb7e8375a50f41 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 25 Oct 2022 10:26:19 +0800 Subject: [PATCH 02/10] [Engine] [Test] improve engine performance --- .../jdbc/internal/JdbcInputFormat.java | 21 +++++++------------ .../jdbc/internal/dialect/JdbcDialect.java | 13 ++++++++---- .../internal/dialect/mysql/MysqlDialect.java | 17 ++++++--------- .../seatunnel/jdbc/source/JdbcSource.java | 7 +++---- .../GetTaskGroupAddressOperation.java | 3 ++- 5 files changed, 28 insertions(+), 33 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java index 0f97724828b..f9fe6fdfb1d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit; import org.slf4j.Logger; @@ -38,7 +39,6 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; -import java.util.function.Consumer; /** * InputFormat to read data from a database and generate Rows. The InputFormat has to be configured @@ -48,7 +48,7 @@ public class JdbcInputFormat implements Serializable { - protected static final long serialVersionUID = 2L; + private static final long serialVersionUID = 2L; protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class); protected JdbcConnectionProvider connectionProvider; @@ -64,23 +64,22 @@ public class JdbcInputFormat implements Serializable { protected boolean hasNext; - protected Consumer customPreparedStatement; + protected JdbcDialect jdbcDialect; public JdbcInputFormat(JdbcConnectionProvider connectionProvider, - JdbcRowConverter jdbcRowConverter, + JdbcDialect jdbcDialect, SeaTunnelRowType typeInfo, String queryTemplate, int fetchSize, - Boolean autoCommit, - Consumer customPreparedStatement + Boolean autoCommit ) { this.connectionProvider = connectionProvider; - this.jdbcRowConverter = jdbcRowConverter; + this.jdbcRowConverter = jdbcDialect.getRowConverter(); this.typeInfo = typeInfo; this.queryTemplate = queryTemplate; this.fetchSize = fetchSize; this.autoCommit = autoCommit; - this.customPreparedStatement = customPreparedStatement; + this.jdbcDialect = jdbcDialect; } public void openInputFormat() { @@ -94,11 +93,7 @@ public void openInputFormat() { dbConn.setAutoCommit(autoCommit); } - statement = dbConn.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) { - statement.setFetchSize(fetchSize); - } - customPreparedStatement.accept(statement); + statement = jdbcDialect.creatPreparedStatement(dbConn, queryTemplate, fetchSize); } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } catch (ClassNotFoundException cnfe) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index c953b60cb69..12fc2cd8c19 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -20,8 +20,10 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import java.io.Serializable; +import java.sql.Connection; import java.sql.PreparedStatement; -import java.util.function.Consumer; +import java.sql.ResultSet; +import java.sql.SQLException; /** * Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable @@ -57,9 +59,12 @@ public interface JdbcDialect extends Serializable { * * @return The logic about optimize PreparedStatement */ - default Consumer customPreparedStatement() { - return preparedStatement -> { - }; + default PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException { + PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) { + statement.setFetchSize(fetchSize); + } + return statement; } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 8c3ec7ff3be..7d38fc3253a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -17,14 +17,14 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; -import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; -import java.util.function.Consumer; public class MysqlDialect implements JdbcDialect { @Override @@ -43,14 +43,9 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { } @Override - public Consumer customPreparedStatement() { - return preparedStatement -> { - try { - // enable mysql streaming mode - preparedStatement.setFetchSize(Integer.MIN_VALUE); - } catch (SQLException e) { - throw new SeaTunnelException(e); - } - }; + public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException { + PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(Integer.MIN_VALUE); + return statement; } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 566de8bec67..9765980990f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -86,12 +86,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { inputFormat = new JdbcInputFormat( jdbcConnectionProvider, - jdbcDialect.getRowConverter(), + jdbcDialect, typeInfo, query, 0, - true, - jdbcDialect.customPreparedStatement() + true ); } @@ -148,7 +147,7 @@ private SeaTunnelRowType initTableField(Connection conn) { } catch (Exception e) { LOG.warn("get row type info exception", e); } - return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); + return new SeaTunnelRowType(fieldNames.toArray(new String[0]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[0])); } private PartitionParameter initPartitionParameter(String columnName, Connection connection) throws SQLException { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java index 7f69773b1f7..088fabd0ba1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java @@ -30,6 +30,7 @@ import com.hazelcast.spi.impl.operationservice.Operation; import java.io.IOException; +import java.util.Objects; public class GetTaskGroupAddressOperation extends Operation implements IdentifiedDataSerializable { @@ -50,7 +51,7 @@ public void run() throws Exception { response = RetryUtils.retryWithException(() -> server.getCoordinatorService().getJobMaster(taskLocation.getJobId()) .queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()), new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true, - exception -> exception instanceof Exception, Constant.OPERATION_RETRY_SLEEP)); + Objects::nonNull, Constant.OPERATION_RETRY_SLEEP)); } @Override From 9e593064f275c22291756fec3b0104f76dc6c056 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Wed, 26 Oct 2022 16:53:09 +0800 Subject: [PATCH 03/10] [Engine] [Improve] engine performance improve --- .../internal/converter/AbstractJdbcRowConverter.java | 9 ++++----- .../operation/CheckpointFinishedOperation.java | 4 ++-- .../server/task/flow/IntermediateQueueFlowLifeCycle.java | 4 +++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java index c1091293ca7..5f5c557f64a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java @@ -53,10 +53,7 @@ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunn for (int i = 1; i <= seaTunnelDataTypes.length; i++) { Object seatunnelField; SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i - 1]; - if (null == rs.getObject(i)) { - seatunnelField = null; - } - else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { + if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { seatunnelField = rs.getBoolean(i); } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { seatunnelField = rs.getByte(i); @@ -88,7 +85,9 @@ else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { } else { throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); } - + if (rs.wasNull()) { + seatunnelField = null; + } fields.add(seatunnelField); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java index 422190a7d0d..ff5c92300b3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java @@ -76,9 +76,9 @@ protected void readInternal(ObjectDataInput in) throws IOException { public void run() throws Exception { SeaTunnelServer server = getService(); RetryUtils.retryWithException(() -> { - Task task = server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation()) - .getTaskGroup().getTask(taskLocation.getTaskID()); try { + Task task = server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation()) + .getTaskGroup().getTask(taskLocation.getTaskID()); if (successful) { task.notifyCheckpointComplete(checkpointId); } else { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java index bd885916b68..ae9b55e9065 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java @@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle implements OneInputFlowLifeCycle>, OneOutputFlowLifeCycle> { @@ -48,10 +49,11 @@ public void received(Record record) { } } + @SuppressWarnings("checkstyle:MagicNumber") @Override public void collect(Collector> collector) throws Exception { while (true) { - Record record = queue.poll(); + Record record = queue.poll(100, TimeUnit.MILLISECONDS); if (record != null) { handleRecord(record, collector::collect); } else { From afb5396f219f9e0dddcf0dcf0c01d821b67a7466 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Wed, 26 Oct 2022 19:48:59 +0800 Subject: [PATCH 04/10] [Engine] [Improve] engine performance improve --- .../seatunnel/engine/server/checkpoint/CheckpointManager.java | 3 --- .../engine/server/task/SourceSplitEnumeratorTask.java | 1 + .../seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index e7ff7ae2049..78829e4dcd5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -41,7 +41,6 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; -import com.hazelcast.cluster.Address; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; @@ -218,8 +217,6 @@ public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) { } protected InvocationFuture sendOperationToMemberNode(TaskOperation operation) { - Address address = - jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()); return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId())); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index be5927f1058..76ea8738f74 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -233,6 +233,7 @@ private void stateProcess() throws Exception { case RUNNING: // The reader closes automatically after reading if (prepareCloseStatus) { + // TODO we should trigger this after CheckpointCoordinator done triggerBarrier(Barrier.completedBarrier()); currState = PREPARE_CLOSE; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index eeba5588bc1..3a548105d86 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -130,10 +130,10 @@ public void received(Record record) { writer.abortPrepare(); throw e; } + List states = writer.snapshotState(barrier.getId()); if (!writerStateSerializer.isPresent()) { runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList()); } else { - List states = writer.snapshotState(barrier.getId()); runningTask.addState(barrier, sinkAction.getId(), serializeStates(writerStateSerializer.get(), states)); } if (containAggCommitter) { From 174df7c24b92851df0e3ed34a682053f253a61da Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 27 Oct 2022 17:31:15 +0800 Subject: [PATCH 05/10] [Engine] [Improve] engine performance improve --- .../engine/server/TaskExecutionService.java | 9 +++++---- .../server/task/SinkAggregatedCommitterTask.java | 3 +++ .../server/task/SourceSplitEnumeratorTask.java | 3 +++ .../format/text/TextSerializationSchema.java | 12 ++++++------ 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index ba1c0e0c16a..cf086716072 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -273,7 +273,8 @@ private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) { @Override public void run() { - ClassLoader classLoader = executionContexts.get(tracker.taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader(); + TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker = tracker.taskGroupExecutionTracker; + ClassLoader classLoader = executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader(); ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); final Task t = tracker.task; @@ -284,12 +285,12 @@ public void run() { do { result = t.call(); } while (!result.isDone() && isRunning && - !tracker.taskGroupExecutionTracker.executionCompletedExceptionally()); + !taskGroupExecutionTracker.executionCompletedExceptionally()); } catch (Throwable e) { logger.warning("Exception in " + t, e); - tracker.taskGroupExecutionTracker.exception(e); + taskGroupExecutionTracker.exception(e); } finally { - tracker.taskGroupExecutionTracker.taskDone(); + taskGroupExecutionTracker.taskDone(); } Thread.currentThread().setContextClassLoader(oldClassLoader); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index 750a7969089..6fa251b9977 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -117,6 +117,7 @@ public ProgressState call() throws Exception { return progress.toState(); } + @SuppressWarnings("checkstyle:MagicNumber") protected void stateProcess() throws Exception { switch (currState) { case INIT: @@ -142,6 +143,8 @@ protected void stateProcess() throws Exception { case RUNNING: if (prepareCloseStatus) { currState = PREPARE_CLOSE; + } else { + Thread.sleep(100); } break; case PREPARE_CLOSE: diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 76ea8738f74..2b2d2befb54 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -207,6 +207,7 @@ public void readerFinished(long taskID) { } } + @SuppressWarnings("checkstyle:MagicNumber") private void stateProcess() throws Exception { switch (currState) { case INIT: @@ -236,6 +237,8 @@ private void stateProcess() throws Exception { // TODO we should trigger this after CheckpointCoordinator done triggerBarrier(Barrier.completedBarrier()); currState = PREPARE_CLOSE; + } else { + Thread.sleep(100); } break; case PREPARE_CLOSE: diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java index d23d5c2cc5b..b55f5e6af5b 100644 --- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java +++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java @@ -64,17 +64,14 @@ private String convert(Object field, SeaTunnelDataType fieldType) { return ""; } switch (fieldType.getSqlType()) { - case ARRAY: - case MAP: - return JsonUtils.toJsonString(field); + case DOUBLE: + case FLOAT: + case INT: case STRING: case BOOLEAN: case TINYINT: case SMALLINT: - case INT: case BIGINT: - case FLOAT: - case DOUBLE: case DECIMAL: return field.toString(); case DATE: @@ -87,6 +84,9 @@ private String convert(Object field, SeaTunnelDataType fieldType) { return ""; case BYTES: return new String((byte[]) field); + case ARRAY: + case MAP: + return JsonUtils.toJsonString(field); case ROW: Object[] fields = ((SeaTunnelRow) field).getFields(); String[] strings = new String[fields.length]; From 8f87d20ce7e0f0bb270e3f89b41f12688966a016 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Mon, 31 Oct 2022 18:03:50 +0800 Subject: [PATCH 06/10] [Core] [Improve] Add test timeout --- .../apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index c7f8d5eafad..5cdb22770d1 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -144,7 +144,6 @@ public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedExc * @param jobMode jobMode * @param rowNumber row.num per FakeSource parallelism * @param parallelism FakeSource parallelism - * @return */ private ImmutablePair createTestResources(@NonNull String testCaseName, @NonNull JobMode jobMode, long rowNumber, int parallelism) { @@ -216,7 +215,7 @@ public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedEx return clientJobProxy.waitForJobComplete(); }); - Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS) + Awaitility.await().atMost(3, TimeUnit.MINUTES) .untilAsserted(() -> { Thread.sleep(2000); System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft())); From b16bba0cbd6767c97cd558b7d7a407bdb3d6dff8 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 1 Nov 2022 14:45:28 +0800 Subject: [PATCH 07/10] [Bug] [FakeSource] Add log for FakeSourceSplitEnumerator --- .../seatunnel/fake/source/FakeSourceSplitEnumerator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java index 38b4f6b8ed9..1d5ed1aec9b 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java @@ -129,6 +129,7 @@ private void addSplitChangeToPendingAssignments(Collection newS } private void assignPendingSplits() { + LOG.info("Assigning pending splits. pendingSplits={}", pendingSplits); // Check if there's any pending splits for given readers for (int pendingReader : enumeratorContext.registeredReaders()) { // Remove pending assignment for the reader @@ -139,9 +140,11 @@ private void assignPendingSplits() { // Mark pending splits as already assigned assignedSplits.addAll(pendingAssignmentForReader); // Assign pending splits to reader - LOG.info("Assigning splits to readers {}", pendingAssignmentForReader); + LOG.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader); enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader)); enumeratorContext.signalNoMoreSplits(pendingReader); + } else { + LOG.info("No pending splits for reader {}", pendingReader); } } } From 99e47d73958c2d9b2c63e083e2e6894619e4f602 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 1 Nov 2022 16:25:33 +0800 Subject: [PATCH 08/10] [Bug] [FakeSource] Add log for FakeSourceSplitEnumerator --- .../seatunnel/fake/source/FakeSourceSplitEnumerator.java | 3 --- .../engine/server/task/SourceSplitEnumeratorTask.java | 6 +++++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java index 1d5ed1aec9b..04144834b26 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java @@ -129,7 +129,6 @@ private void addSplitChangeToPendingAssignments(Collection newS } private void assignPendingSplits() { - LOG.info("Assigning pending splits. pendingSplits={}", pendingSplits); // Check if there's any pending splits for given readers for (int pendingReader : enumeratorContext.registeredReaders()) { // Remove pending assignment for the reader @@ -143,8 +142,6 @@ private void assignPendingSplits() { LOG.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader); enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader)); enumeratorContext.signalNoMoreSplits(pendingReader); - } else { - LOG.info("No pending splits for reader {}", pendingReader); } } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 2b2d2befb54..f65b80f4718 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -260,7 +260,11 @@ private void stateProcess() throws Exception { } public Set getRegisteredReaders() { - return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet()); + LOGGER.info("taskMemberMapping = " + taskMemberMapping); + LOGGER.info("taskMemberMapping keyset = " + taskMemberMapping.keySet()); + Set result = taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet()); + LOGGER.info("registered readers = " + result); + return result; } private void sendToAllReader(Function function) { From 46e6c72a229f42e7b494f57a5283b32e407ae1f8 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 1 Nov 2022 22:30:01 +0800 Subject: [PATCH 09/10] [Bug] [FakeSource] Add log for FakeSourceSplitEnumerator --- .../apache/seatunnel/engine/server/execution/TaskLocation.java | 1 + .../seatunnel/engine/server/task/SourceSplitEnumeratorTask.java | 1 + 2 files changed, 2 insertions(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java index 64737254ca7..b8f8684dbd7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java @@ -108,6 +108,7 @@ public String toString() { return "TaskLocation{" + "taskGroupLocation=" + taskGroupLocation + ", taskID=" + taskID + + ", index=" + index + '}'; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index f65b80f4718..384f5c3e325 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -262,6 +262,7 @@ private void stateProcess() throws Exception { public Set getRegisteredReaders() { LOGGER.info("taskMemberMapping = " + taskMemberMapping); LOGGER.info("taskMemberMapping keyset = " + taskMemberMapping.keySet()); + LOGGER.info(taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toList()).toString()); Set result = taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet()); LOGGER.info("registered readers = " + result); return result; From 901fe0018874774d8a19800d57aa7a951bb4d4e6 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 1 Nov 2022 22:38:57 +0800 Subject: [PATCH 10/10] [Bug] [Engine] Fix TaskLocation Serializable problem --- .../seatunnel/engine/server/execution/TaskLocation.java | 2 ++ .../engine/server/task/SourceSplitEnumeratorTask.java | 7 +------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java index b8f8684dbd7..03d68f955e7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java @@ -95,12 +95,14 @@ public int getClassId() { public void writeData(ObjectDataOutput out) throws IOException { out.writeObject(taskGroupLocation); out.writeLong(taskID); + out.writeInt(index); } @Override public void readData(ObjectDataInput in) throws IOException { taskGroupLocation = in.readObject(); taskID = in.readLong(); + index = in.readInt(); } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 384f5c3e325..2b2d2befb54 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -260,12 +260,7 @@ private void stateProcess() throws Exception { } public Set getRegisteredReaders() { - LOGGER.info("taskMemberMapping = " + taskMemberMapping); - LOGGER.info("taskMemberMapping keyset = " + taskMemberMapping.keySet()); - LOGGER.info(taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toList()).toString()); - Set result = taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet()); - LOGGER.info("registered readers = " + result); - return result; + return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet()); } private void sendToAllReader(Function function) {