From 0f9c48560bd452ed7f664b0d72f01b289e7a87d2 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Fri, 7 Jul 2023 21:21:01 +0800 Subject: [PATCH] [Improve][Connector[File] Optimize files commit order Before using `HashMap` store files path, so every checkpoint file commit is out of order. Now switch to using `LinkedHashMap` to ensure that files are commit in the generated order --- .../file/sink/BaseFileSinkWriter.java | 6 +- .../sink/commit/FileAggregatedCommitInfo.java | 6 +- .../file/sink/commit/FileCommitInfo.java | 6 +- .../commit/FileSinkAggregatedCommitter.java | 15 ++-- .../file/sink/commit/FileSinkCommitter.java | 75 ------------------- .../file/sink/state/FileSinkState.java | 6 +- .../sink/writer/AbstractWriteStrategy.java | 44 +++++++---- .../file/sink/writer/ExcelWriteStrategy.java | 7 +- .../file/sink/writer/JsonWriteStrategy.java | 5 +- .../file/sink/writer/OrcWriteStrategy.java | 6 +- .../sink/writer/ParquetWriteStrategy.java | 7 +- .../file/sink/writer/TextWriteStrategy.java | 5 +- .../file/sink/writer/WriteStrategy.java | 4 +- .../S3RedshiftSinkAggregatedCommitter.java | 5 +- 14 files changed, 68 insertions(+), 129 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java index 7102e954a46..22200249f63 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java @@ -34,14 +34,14 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; public class BaseFileSinkWriter implements SinkWriter { - private final WriteStrategy writeStrategy; + protected final WriteStrategy writeStrategy; private final FileSystemUtils fileSystemUtils; @SuppressWarnings("checkstyle:MagicNumber") @@ -67,7 +67,7 @@ public BaseFileSinkWriter( List transactions = findTransactionList(jobId, uuidPrefix); FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(fileSystemUtils); - HashMap fileStatesMap = new HashMap<>(); + LinkedHashMap fileStatesMap = new LinkedHashMap<>(); fileSinkStates.forEach( fileSinkState -> fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState)); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java index 16d94a1f63a..5ca3b30fade 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java @@ -21,8 +21,8 @@ import lombok.Data; import java.io.Serializable; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; @Data @AllArgsConstructor @@ -34,7 +34,7 @@ public class FileAggregatedCommitInfo implements Serializable { * *

V is the target file path of the data file. */ - private final Map> transactionMap; + private final LinkedHashMap> transactionMap; /** * Storage the partition information in map. @@ -43,5 +43,5 @@ public class FileAggregatedCommitInfo implements Serializable { * *

V is the list of partition column's values. */ - private final Map> partitionDirAndValuesMap; + private final LinkedHashMap> partitionDirAndValuesMap; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java index 86c433b8f55..27e74ff0a87 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java @@ -21,8 +21,8 @@ import lombok.Data; import java.io.Serializable; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; @Data @AllArgsConstructor @@ -34,7 +34,7 @@ public class FileCommitInfo implements Serializable { * *

V is the target file path of the data file. */ - private final Map needMoveFiles; + private final LinkedHashMap needMoveFiles; /** * Storage the partition information in map. @@ -43,7 +43,7 @@ public class FileCommitInfo implements Serializable { * *

V is the list of partition column's values. */ - private final Map> partitionDirAndValuesMap; + private final LinkedHashMap> partitionDirAndValuesMap; /** Storage the transaction directory */ private final String transactionDir; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java index b12ef1165a2..a076188e2a2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -44,7 +44,7 @@ public List commit( aggregatedCommitInfos.forEach( aggregatedCommitInfo -> { try { - for (Map.Entry> entry : + for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { @@ -77,13 +77,14 @@ public FileAggregatedCommitInfo combine(List commitInfos) { if (commitInfos == null || commitInfos.size() == 0) { return null; } - Map> aggregateCommitInfo = new HashMap<>(); - Map> partitionDirAndValuesMap = new HashMap<>(); + LinkedHashMap> aggregateCommitInfo = + new LinkedHashMap<>(); + LinkedHashMap> partitionDirAndValuesMap = new LinkedHashMap<>(); commitInfos.forEach( commitInfo -> { - Map needMoveFileMap = + LinkedHashMap needMoveFileMap = aggregateCommitInfo.computeIfAbsent( - commitInfo.getTransactionDir(), k -> new HashMap<>()); + commitInfo.getTransactionDir(), k -> new LinkedHashMap<>()); needMoveFileMap.putAll(commitInfo.getNeedMoveFiles()); if (commitInfo.getPartitionDirAndValuesMap() != null && !commitInfo.getPartitionDirAndValuesMap().isEmpty()) { @@ -109,7 +110,7 @@ public void abort(List aggregatedCommitInfos) throws E aggregatedCommitInfos.forEach( aggregatedCommitInfo -> { try { - for (Map.Entry> entry : + for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { // rollback the file for (Map.Entry mvFileEntry : diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java deleted file mode 100644 index 6525b5e7d4b..00000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.sink.commit; - -import org.apache.seatunnel.api.sink.SinkCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** Deprecated interface since 2.3.0-beta, now used {@link FileSinkAggregatedCommitter} */ -@Deprecated -public class FileSinkCommitter implements SinkCommitter { - private final FileSystemUtils fileSystemUtils; - - public FileSinkCommitter(FileSystemUtils fileSystemUtils) { - this.fileSystemUtils = fileSystemUtils; - } - - @Override - public List commit(List commitInfos) throws IOException { - ArrayList failedCommitInfos = new ArrayList<>(); - for (FileCommitInfo commitInfo : commitInfos) { - Map needMoveFiles = commitInfo.getNeedMoveFiles(); - needMoveFiles.forEach( - (k, v) -> { - try { - fileSystemUtils.renameFile(k, v, true); - } catch (IOException e) { - failedCommitInfos.add(commitInfo); - } - }); - fileSystemUtils.deleteFile(commitInfo.getTransactionDir()); - } - return failedCommitInfos; - } - - /** - * Abort the transaction, this method will be called (**Only** on Spark engine) when the commit - * is failed. - * - * @param commitInfos The list of commit message, used to abort the commit. - * @throws IOException throw IOException when close failed. - */ - @Override - public void abort(List commitInfos) throws IOException { - for (FileCommitInfo commitInfo : commitInfos) { - Map needMoveFiles = commitInfo.getNeedMoveFiles(); - for (Map.Entry entry : needMoveFiles.entrySet()) { - if (fileSystemUtils.fileExist(entry.getValue()) - && !fileSystemUtils.fileExist(entry.getKey())) { - fileSystemUtils.renameFile(entry.getValue(), entry.getKey(), true); - } - } - fileSystemUtils.deleteFile(commitInfo.getTransactionDir()); - } - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java index 7d28df23051..34ca13625f6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java @@ -21,8 +21,8 @@ import lombok.Data; import java.io.Serializable; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; @Data @AllArgsConstructor @@ -30,7 +30,7 @@ public class FileSinkState implements Serializable { private final String transactionId; private final String uuidPrefix; private final Long checkpointId; - private final Map needMoveFiles; - private final Map> partitionDirAndValuesMap; + private final LinkedHashMap needMoveFiles; + private final LinkedHashMap> partitionDirAndValuesMap; private final String transactionDir; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 6820d28d855..f3160eec7e6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,9 +77,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected String uuidPrefix; protected String transactionDirectory; - protected Map needMoveFiles; - protected Map beingWrittenFile = new HashMap<>(); - private Map> partitionDirAndValuesMap; + protected LinkedHashMap needMoveFiles; + protected LinkedHashMap beingWrittenFile = new LinkedHashMap<>(); + private LinkedHashMap> partitionDirAndValuesMap; protected SeaTunnelRowType seaTunnelRowType; // Checkpoint id from engine is start with 1 @@ -111,13 +112,18 @@ public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIn @Override public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException { if (currentBatchSize >= batchSize) { - this.partId++; + newFilePart(); currentBatchSize = 0; - beingWrittenFile.clear(); } currentBatchSize++; } + public synchronized void newFilePart() { + this.partId++; + beingWrittenFile.clear(); + log.debug("new file part: {}", partId); + } + protected SeaTunnelRowType buildSchemaWithRowType( SeaTunnelRowType seaTunnelRowType, List sinkColumnsIndex) { SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); @@ -177,9 +183,9 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { * @return the map of partition directory */ @Override - public Map> generatorPartitionDir(SeaTunnelRow seaTunnelRow) { + public LinkedHashMap> generatorPartitionDir(SeaTunnelRow seaTunnelRow) { List partitionFieldsIndexInRow = fileSinkConfig.getPartitionFieldsIndexInRow(); - Map> partitionDirAndValuesMap = new HashMap<>(1); + LinkedHashMap> partitionDirAndValuesMap = new LinkedHashMap<>(1); if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) { partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null); return partitionDirAndValuesMap; @@ -258,12 +264,15 @@ public String generateFileName(String transactionId) { @Override public Optional prepareCommit() { this.finishAndCloseFile(); - Map commitMap = new HashMap<>(this.needMoveFiles); - Map> copyMap = + LinkedHashMap commitMap = new LinkedHashMap<>(this.needMoveFiles); + LinkedHashMap> copyMap = this.partitionDirAndValuesMap.entrySet().stream() .collect( Collectors.toMap( - Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))); + Map.Entry::getKey, + e -> new ArrayList<>(e.getValue()), + (e1, e2) -> e1, + LinkedHashMap::new)); return Optional.of(new FileCommitInfo(commitMap, copyMap, transactionDirectory)); } @@ -301,8 +310,8 @@ public void beginTransaction(Long checkpointId) { this.checkpointId = checkpointId; this.transactionId = getTransactionId(checkpointId); this.transactionDirectory = getTransactionDir(this.transactionId); - this.needMoveFiles = new HashMap<>(); - this.partitionDirAndValuesMap = new HashMap<>(); + this.needMoveFiles = new LinkedHashMap<>(); + this.partitionDirAndValuesMap = new LinkedHashMap<>(); } private String getTransactionId(Long checkpointId) { @@ -325,18 +334,21 @@ private String getTransactionId(Long checkpointId) { */ @Override public List snapshotState(long checkpointId) { - Map> commitMap = + LinkedHashMap> commitMap = this.partitionDirAndValuesMap.entrySet().stream() .collect( Collectors.toMap( - Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))); + Map.Entry::getKey, + e -> new ArrayList<>(e.getValue()), + (e1, e2) -> e1, + LinkedHashMap::new)); ArrayList fileState = Lists.newArrayList( new FileSinkState( this.transactionId, this.uuidPrefix, this.checkpointId, - new HashMap<>(this.needMoveFiles), + new LinkedHashMap<>(this.needMoveFiles), commitMap, this.getTransactionDir(transactionId))); this.beingWrittenFile.clear(); @@ -363,7 +375,7 @@ public static String getTransactionDirPrefix(String tmpPath, String jobId, Strin } public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow) { - Map> dataPartitionDirAndValuesMap = + LinkedHashMap> dataPartitionDirAndValuesMap = generatorPartitionDir(seaTunnelRow); String beingWrittenFileKey = dataPartitionDirAndValuesMap.keySet().toArray()[0].toString(); // get filePath from beingWrittenFile diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java index bb8d09d30fd..d5786ea2f8c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java @@ -28,15 +28,14 @@ import lombok.NonNull; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.LinkedHashMap; public class ExcelWriteStrategy extends AbstractWriteStrategy { - private final Map beingWrittenWriter; + private final LinkedHashMap beingWrittenWriter; public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) { super(fileSinkConfig); - this.beingWrittenWriter = new HashMap<>(); + this.beingWrittenWriter = new LinkedHashMap<>(); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java index c16f6135770..c72a38068df 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java @@ -33,17 +33,18 @@ import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; public class JsonWriteStrategy extends AbstractWriteStrategy { private final byte[] rowDelimiter; private SerializationSchema serializationSchema; - private final Map beingWrittenOutputStream; + private final LinkedHashMap beingWrittenOutputStream; private final Map isFirstWrite; public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) { super(textFileSinkConfig); - this.beingWrittenOutputStream = new HashMap<>(); + this.beingWrittenOutputStream = new LinkedHashMap<>(); this.isFirstWrite = new HashMap<>(); this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index 551d02f5b97..0e55b46e26f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -55,16 +55,16 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.temporal.ChronoField; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; public class OrcWriteStrategy extends AbstractWriteStrategy { - private final Map beingWrittenWriter; + private final LinkedHashMap beingWrittenWriter; public OrcWriteStrategy(FileSinkConfig fileSinkConfig) { super(fileSinkConfig); - this.beingWrittenWriter = new HashMap<>(); + this.beingWrittenWriter = new LinkedHashMap<>(); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java index ce104da8008..8c2c9382008 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java @@ -57,15 +57,14 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @SuppressWarnings("checkstyle:MagicNumber") public class ParquetWriteStrategy extends AbstractWriteStrategy { - private final Map> beingWrittenWriter; + private final LinkedHashMap> beingWrittenWriter; private AvroSchemaConverter schemaConverter; private Schema schema; public static final int[] PRECISION_TO_BYTE_COUNT = new int[38]; @@ -80,7 +79,7 @@ public class ParquetWriteStrategy extends AbstractWriteStrategy { public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) { super(fileSinkConfig); - this.beingWrittenWriter = new HashMap<>(); + this.beingWrittenWriter = new LinkedHashMap<>(); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 7e94e13c961..f309edb70f2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -36,10 +36,11 @@ import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; public class TextWriteStrategy extends AbstractWriteStrategy { - private final Map beingWrittenOutputStream; + private final LinkedHashMap beingWrittenOutputStream; private final Map isFirstWrite; private final String fieldDelimiter; private final String rowDelimiter; @@ -50,7 +51,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy { public TextWriteStrategy(FileSinkConfig fileSinkConfig) { super(fileSinkConfig); - this.beingWrittenOutputStream = new HashMap<>(); + this.beingWrittenOutputStream = new LinkedHashMap<>(); this.isFirstWrite = new HashMap<>(); this.fieldDelimiter = fileSinkConfig.getFieldDelimiter(); this.rowDelimiter = fileSinkConfig.getRowDelimiter(); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java index 6d75de29c6c..a64af87d061 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java @@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration; import java.io.Serializable; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; public interface WriteStrategy extends Transaction, Serializable { /** @@ -67,7 +67,7 @@ public interface WriteStrategy extends Transaction, Serializable { * @param seaTunnelRow seaTunnelRow * @return the map of partition directory */ - Map> generatorPartitionDir(SeaTunnelRow seaTunnelRow); + LinkedHashMap> generatorPartitionDir(SeaTunnelRow seaTunnelRow); /** * use transaction id generate file name diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java index 97476fafc56..620bea134b7 100644 --- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -58,7 +59,7 @@ public List commit( aggregatedCommitInfos.forEach( aggregatedCommitInfo -> { try { - for (Map.Entry> entry : + for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { @@ -92,7 +93,7 @@ public void abort(List aggregatedCommitInfos) { aggregatedCommitInfos.forEach( aggregatedCommitInfo -> { try { - for (Map.Entry> entry : + for (Map.Entry> entry : aggregatedCommitInfo.getTransactionMap().entrySet()) { // delete the transaction dir fileSystemUtils.deleteFile(entry.getKey());