From f14a1eecdb17eb89f98267230f65f4ef77b32031 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Fri, 4 Aug 2023 18:50:46 +0800 Subject: [PATCH] [Improve][API & Zeta] Using connector custom serializer encode/decode states * API: Using DefaultSerializer as connector sink default serializer * Zeta: Using connector custom serializer encode/decode states --- .../api/serialization/DefaultSerializer.java | 3 ++ .../seatunnel/api/sink/SeaTunnelSink.java | 7 +-- .../dag/physical/PhysicalPlanGenerator.java | 5 ++- .../task/SinkAggregatedCommitterTask.java | 6 +++ .../server/task/SourceSeaTunnelTask.java | 15 ++++++- .../task/SourceSplitEnumeratorTask.java | 3 ++ .../SeaTunnelSplitEnumeratorContext.java | 16 ++++--- .../server/task/flow/SinkFlowLifeCycle.java | 38 +++++++--------- .../server/task/flow/SourceFlowLifeCycle.java | 10 +---- .../sink/SinkPrepareCommitOperation.java | 21 ++++++--- .../source/AssignSplitOperation.java | 36 ++++++++++----- .../source/RestoredSplitOperation.java | 44 +++++++++++-------- 12 files changed, 129 insertions(+), 75 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java index 2100b9529cd..5fabe2a284a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java @@ -35,6 +35,9 @@ public byte[] serialize(T obj) throws IOException { @Override public T deserialize(byte[] serialized) throws IOException { + if (serialized == null) { + return null; + } return SerializationUtils.deserialize(serialized); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java index 0ed1b1bf7fb..35d2d5863c3 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.common.PluginIdentifierInterface; import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle; +import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SeaTunnelJobAware; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -84,7 +85,7 @@ default SinkWriter restoreWriter( * @return Serializer of {@link StateT} */ default Optional> getWriterStateSerializer() { - return Optional.empty(); + return Optional.of(new DefaultSerializer()); } /** @@ -104,7 +105,7 @@ default Optional> createCommitter() throws IOExceptio * @return Serializer of {@link CommitInfoT} */ default Optional> getCommitInfoSerializer() { - return Optional.empty(); + return Optional.of(new DefaultSerializer()); } /** @@ -125,6 +126,6 @@ default Optional> getCommitInfoSerializer() { * @return Serializer of {@link AggregatedCommitInfoT} */ default Optional> getAggregatedCommitInfoSerializer() { - return Optional.empty(); + return Optional.of(new DefaultSerializer()); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 69d72d7130a..a238ae134c9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -552,7 +552,10 @@ private List getSourceTask( .getJobId(), taskLocation, finalParallelismIndex, - f); + (PhysicalExecutionFlow< + SourceAction, + SourceConfig>) + f); } else { return new TransformSeaTunnelTask( jobImmutableInformation diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index 1a8ecf29c80..a904146a1d8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -34,6 +34,7 @@ import org.apache.commons.collections4.CollectionUtils; import com.hazelcast.cluster.Address; +import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -45,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -75,6 +77,8 @@ public class SinkAggregatedCommitterTask private final SinkAggregatedCommitter aggregatedCommitter; private transient Serializer aggregatedCommitInfoSerializer; + @Getter private transient Serializer commitInfoSerializer; + private Map writerAddressMap; private ConcurrentMap> commitInfoCache; @@ -107,6 +111,7 @@ public void init() throws Exception { this.writerAddressMap = new ConcurrentHashMap<>(); this.checkpointCommitInfoMap = new ConcurrentHashMap<>(); this.completableFuture = new CompletableFuture<>(); + this.commitInfoSerializer = sink.getSink().getCommitInfoSerializer().get(); this.aggregatedCommitInfoSerializer = sink.getSink().getAggregatedCommitInfoSerializer().get(); log.debug( @@ -250,6 +255,7 @@ public void restoreState(List actionStateList) throws Except actionStateList.stream() .map(ActionSubtaskState::getState) .flatMap(Collection::stream) + .filter(Objects::nonNull) .map( bytes -> sneaky( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 842cf8a6022..8650dc7f2a6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -18,10 +18,11 @@ package org.apache.seatunnel.engine.server.task; import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig; -import org.apache.seatunnel.engine.server.dag.physical.flow.Flow; +import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow; import org.apache.seatunnel.engine.server.execution.ProgressState; import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle; @@ -29,6 +30,7 @@ import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; +import lombok.Getter; import lombok.NonNull; import java.util.List; @@ -41,15 +43,24 @@ public class SourceSeaTunnelTask extends SeaTunne private transient SeaTunnelSourceCollector collector; private transient Object checkpointLock; + @Getter private transient Serializer splitSerializer; + private final PhysicalExecutionFlow sourceFlow; - public SourceSeaTunnelTask(long jobID, TaskLocation taskID, int indexID, Flow executionFlow) { + public SourceSeaTunnelTask( + long jobID, + TaskLocation taskID, + int indexID, + PhysicalExecutionFlow executionFlow) { super(jobID, taskID, indexID, executionFlow); + this.sourceFlow = executionFlow; } @Override public void init() throws Exception { super.init(); this.checkpointLock = new Object(); + this.splitSerializer = sourceFlow.getAction().getSource().getSplitSerializer(); + LOGGER.info("starting seatunnel source task, index " + indexID); if (!(startFlowLifeCycle instanceof SourceFlowLifeCycle)) { throw new TaskRuntimeException( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index da5fa8aeb3a..25fdbc9638c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -37,6 +37,7 @@ import com.hazelcast.cluster.Address; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; +import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -77,6 +78,7 @@ public class SourceSplitEnumeratorTask extends Coord private SeaTunnelSplitEnumeratorContext enumeratorContext; private Serializer enumeratorStateSerializer; + @Getter private Serializer splitSerializer; private int maxReaderSize; private Set unfinishedReaders; @@ -102,6 +104,7 @@ public void init() throws Exception { new SeaTunnelSplitEnumeratorContext<>( this.source.getParallelism(), this, getMetricsContext()); enumeratorStateSerializer = this.source.getSource().getEnumeratorStateSerializer(); + splitSerializer = this.source.getSource().getSplitSerializer(); taskMemberMapping = new ConcurrentHashMap<>(); taskIDToTaskLocationMapping = new ConcurrentHashMap<>(); taskIndexToTaskLocationMapping = new ConcurrentHashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java index c3cce03d3bd..110562e4944 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask; import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation; @@ -31,6 +30,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @Slf4j public class SeaTunnelSplitEnumeratorContext @@ -67,22 +69,26 @@ public void assignSplit(int subtaskIndex, List splits) { log.warn("No reader is obtained, skip this assign!"); return; } + + List splitBytes = + splits.stream() + .map(split -> sneaky(() -> task.getSplitSerializer().serialize(split))) + .collect(Collectors.toList()); task.getExecutionContext() .sendToMember( new AssignSplitOperation<>( - task.getTaskMemberLocationByIndex(subtaskIndex), - SerializationUtils.serialize(splits.toArray())), + task.getTaskMemberLocationByIndex(subtaskIndex), splitBytes), task.getTaskMemberAddressByIndex(subtaskIndex)) .join(); } @Override public void signalNoMoreSplits(int subtaskIndex) { + List emptySplits = Collections.emptyList(); task.getExecutionContext() .sendToMember( new AssignSplitOperation<>( - task.getTaskMemberLocationByIndex(subtaskIndex), - SerializationUtils.serialize(Collections.emptyList().toArray())), + task.getTaskMemberLocationByIndex(subtaskIndex), emptySplits), task.getTaskMemberAddressByIndex(subtaskIndex)) .join(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 7e6d73c5498..5f0ce213f2e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; @@ -44,10 +43,10 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -66,6 +65,7 @@ public class SinkFlowLifeCycle sinkAction; private SinkWriter writer; + private transient Optional> commitInfoSerializer; private transient Optional> writerStateSerializer; private final int indexID; @@ -110,6 +110,7 @@ public SinkFlowLifeCycle( @Override public void init() throws Exception { + this.commitInfoSerializer = sinkAction.getSink().getCommitInfoSerializer(); this.writerStateSerializer = sinkAction.getSink().getWriterStateSerializer(); this.committer = sinkAction.getSink().createCommitter(); this.lastCommitInfo = Optional.empty(); @@ -167,7 +168,7 @@ public void received(Record record) { throw e; } List states = writer.snapshotState(barrier.getId()); - if (!writerStateSerializer.isPresent()) { + if (states == null || states.isEmpty()) { runningTask.addState( barrier, ActionStateKey.of(sinkAction), Collections.emptyList()); } else { @@ -184,10 +185,14 @@ public void received(Record record) { runningTask .getExecutionContext() .sendToMember( - new SinkPrepareCommitOperation( + new SinkPrepareCommitOperation( barrier, committerTaskLocation, - SerializationUtils.serialize(commitInfoT)), + commitInfoT == null + ? null + : commitInfoSerializer + .get() + .serialize(commitInfoT)), committerTaskAddress) .join(); } @@ -243,22 +248,13 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { @Override public void restoreState(List actionStateList) throws Exception { - List states = new ArrayList<>(); - if (writerStateSerializer.isPresent()) { - states = - actionStateList.stream() - .filter(state -> writerStateSerializer.isPresent()) - .map(ActionSubtaskState::getState) - .flatMap(Collection::stream) - .map( - bytes -> - sneaky( - () -> - writerStateSerializer - .get() - .deserialize(bytes))) - .collect(Collectors.toList()); - } + List states = + actionStateList.stream() + .map(ActionSubtaskState::getState) + .flatMap(Collection::stream) + .filter(Objects::nonNull) + .map(bytes -> sneaky(() -> writerStateSerializer.get().deserialize(bytes))) + .collect(Collectors.toList()); if (states.isEmpty()) { this.writer = sinkAction diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index b883bd8ffdd..572836fe517 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.type.Record; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; @@ -59,7 +58,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; import static org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates; @Slf4j @@ -338,21 +336,17 @@ public void restoreState(List actionStateList) throws Except if (actionStateList.isEmpty()) { return; } - List splits = + List splits = actionStateList.stream() .map(ActionSubtaskState::getState) .flatMap(Collection::stream) .filter(Objects::nonNull) - .map(bytes -> sneaky(() -> splitSerializer.deserialize(bytes))) .collect(Collectors.toList()); try { runningTask .getExecutionContext() .sendToMember( - new RestoredSplitOperation( - enumeratorTaskLocation, - SerializationUtils.serialize(splits.toArray()), - indexID), + new RestoredSplitOperation(enumeratorTaskLocation, splits, indexID), enumeratorTaskAddress) .get(); } catch (InterruptedException | ExecutionException e) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java index 06945a61b25..ea6ee9681ed 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.engine.server.task.operation.sink; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.TaskExecutionService; import org.apache.seatunnel.engine.server.execution.TaskLocation; @@ -33,7 +32,7 @@ import java.io.IOException; @NoArgsConstructor -public class SinkPrepareCommitOperation extends BarrierFlowOperation { +public class SinkPrepareCommitOperation extends BarrierFlowOperation { private byte[] commitInfos; public SinkPrepareCommitOperation( @@ -73,16 +72,26 @@ public int getClassId() { public void run() throws Exception { TaskExecutionService taskExecutionService = ((SeaTunnelServer) getService()).getTaskExecutionService(); - SinkAggregatedCommitterTask committerTask = + SinkAggregatedCommitterTask committerTask = taskExecutionService.getTask(taskLocation); - ClassLoader classLoader = + ClassLoader taskClassLoader = taskExecutionService .getExecutionContext(taskLocation.getTaskGroupLocation()) .getClassLoader(); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + if (commitInfos != null) { - committerTask.receivedWriterCommitInfo( - barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader)); + CommitInfoT deserializeCommitInfo = null; + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + deserializeCommitInfo = + committerTask.getCommitInfoSerializer().deserialize(commitInfos); + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } + committerTask.receivedWriterCommitInfo(barrier.getId(), deserializeCommitInfo); } + committerTask.triggerBarrier(barrier); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java index 637a48e8ab4..52350d1483b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.common.utils.RetryUtils; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; @@ -33,18 +32,18 @@ import com.hazelcast.spi.impl.operationservice.Operation; import java.io.IOException; -import java.util.Arrays; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.List; public class AssignSplitOperation extends Operation implements IdentifiedDataSerializable { - private byte[] splits; + private List splits; private TaskLocation taskID; public AssignSplitOperation() {} - public AssignSplitOperation(TaskLocation taskID, byte[] splits) { + public AssignSplitOperation(TaskLocation taskID, List splits) { this.taskID = taskID; this.splits = splits; } @@ -56,13 +55,22 @@ public void run() throws Exception { () -> { SourceSeaTunnelTask task = server.getTaskExecutionService().getTask(taskID); - ClassLoader classLoader = + ClassLoader taskClassLoader = server.getTaskExecutionService() .getExecutionContext(taskID.getTaskGroupLocation()) .getClassLoader(); - Object[] o = SerializationUtils.deserialize(splits, classLoader); - task.receivedSourceSplit( - Arrays.stream(o).map(i -> (SplitT) i).collect(Collectors.toList())); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + List deserializeSplits = new ArrayList<>(); + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + for (byte[] split : this.splits) { + deserializeSplits.add(task.getSplitSerializer().deserialize(split)); + } + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } + + task.receivedSourceSplit(deserializeSplits); return null; }, new RetryUtils.RetryMaterial( @@ -76,13 +84,19 @@ public void run() throws Exception { @Override protected void writeInternal(ObjectDataOutput out) throws IOException { - out.writeByteArray(splits); + out.writeInt(splits.size()); + for (byte[] split : splits) { + out.writeByteArray(split); + } out.writeObject(taskID); } @Override protected void readInternal(ObjectDataInput in) throws IOException { - splits = in.readByteArray(); + splits = new ArrayList<>(in.readInt()); + for (int i = 0; i < splits.size(); i++) { + splits.add(in.readByteArray()); + } taskID = in.readObject(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java index 0c9c3d95c90..c34a3353c93 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.common.utils.RetryUtils; -import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.TaskExecutionService; @@ -34,19 +33,18 @@ import com.hazelcast.nio.ObjectDataOutput; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class RestoredSplitOperation extends TaskOperation { - private byte[] splits; + private List splits; private Integer subtaskIndex; public RestoredSplitOperation() {} public RestoredSplitOperation( - TaskLocation enumeratorLocation, byte[] splits, int subtaskIndex) { + TaskLocation enumeratorLocation, List splits, int subtaskIndex) { super(enumeratorLocation); this.splits = splits; this.subtaskIndex = subtaskIndex; @@ -55,14 +53,20 @@ public RestoredSplitOperation( @Override protected void writeInternal(ObjectDataOutput out) throws IOException { super.writeInternal(out); - out.writeByteArray(splits); + out.writeInt(splits.size()); + for (byte[] split : splits) { + out.writeByteArray(split); + } out.writeInt(subtaskIndex); } @Override protected void readInternal(ObjectDataInput in) throws IOException { super.readInternal(in); - splits = in.readByteArray(); + splits = new ArrayList<>(in.readInt()); + for (int i = 0; i < splits.size(); i++) { + splits.add(in.readByteArray()); + } subtaskIndex = in.readInt(); } @@ -82,27 +86,31 @@ public void run() throws Exception { TaskExecutionService taskExecutionService = server.getTaskExecutionService(); RetryUtils.retryWithException( () -> { - ClassLoader classLoader = + SourceSplitEnumeratorTask task = + taskExecutionService.getTask(taskLocation); + ClassLoader taskClassLoader = taskExecutionService .getExecutionContext(taskLocation.getTaskGroupLocation()) .getClassLoader(); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + + List deserializeSplits = new ArrayList<>(); + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + for (byte[] split : splits) { + deserializeSplits.add(task.getSplitSerializer().deserialize(split)); + } + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } - List deserialize = - Arrays.stream( - (Object[]) - SerializationUtils.deserialize( - splits, classLoader)) - .map(o -> (SourceSplit) o) - .collect(Collectors.toList()); - SourceSplitEnumeratorTask task = - taskExecutionService.getTask(taskLocation); task.getExecutionContext() .getTaskExecutionService() .asyncExecuteFunction( taskLocation.getTaskGroupLocation(), () -> { try { - task.addSplitsBack(deserialize, subtaskIndex); + task.addSplitsBack(deserializeSplits, subtaskIndex); } catch (Exception e) { task.getExecutionContext() .sendToMaster(