From 8bf7e5259d471b426da6ab3483631626cb725faa Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 18 Oct 2023 16:08:42 +0800 Subject: [PATCH] [CELEBORN-1047] Remove conf `celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled` ### What changes were proposed in this pull request? As title ### Why are the changes needed? The config key `celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled` has become unnecessary as a result of #1932 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #1999 from cfmcgrady/celeborn-1047. Authored-by: Fu Chen Signed-off-by: zky.zhoukeyong --- .../apache/celeborn/common/CelebornConf.scala | 19 ------------------- .../worker/storage/PartitionFilesSorter.java | 3 --- 2 files changed, 22 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 877a2559e16..c46579875c8 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -660,8 +660,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS) def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT) def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE) - def partitionSorterEagerlyRemoveOriginalFilesEnabled: Boolean = - get(PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED) def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT) def partitionSorterReservedMemoryPerPartition: Long = get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY) @@ -2292,23 +2290,6 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("120s") - val PARTITION_SORTER_EAGERLY_REMOVE_ORIGINAL_FILES_ENABLED: ConfigEntry[Boolean] = - buildConf("celeborn.worker.sortPartition.eagerlyRemoveOriginalFiles.enabled") - .categories("worker") - .doc("When set to true, the PartitionSorter immediately removes the original file once " + - "its partition has been successfully sorted. It is important to note that this behavior " + - "may result in a potential issue with the ReusedExchange operation when it triggers both " + - "non-range and range fetch requests simultaneously. When set to false, the " + - "PartitionSorter will retain the original unsorted file. However, it's essential to be " + - "aware that enabling this option may lead to an increase in storage space usage during " + - "the range fetch phase, as both the original and sorted files will be retained until the " + - "shuffle is finished. Note that the default value is configured as 'false' as a " + - "temporary workaround for CELEBORN-980. see CELEBORN-980 for more details.") - .version("0.3.1") - .internal - .booleanConf - .createWithDefault(false) - val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.sortPartition.timeout") .withAlternative("celeborn.worker.partitionSorter.sort.timeout") diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 4de72bdaeda..1741f4bcc23 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -82,7 +82,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { private final AtomicInteger sortedFileCount = new AtomicInteger(); private final AtomicLong sortedFilesSize = new AtomicLong(); - protected final boolean eagerlyRemoveOriginalFilesEnabled; protected final long sortTimeout; protected final long shuffleChunkSize; protected final long reservedMemoryPerPartition; @@ -97,8 +96,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { public PartitionFilesSorter( MemoryManager memoryManager, CelebornConf conf, AbstractSource source) { - this.eagerlyRemoveOriginalFilesEnabled = - conf.partitionSorterEagerlyRemoveOriginalFilesEnabled(); this.sortTimeout = conf.partitionSorterSortPartitionTimeout(); this.shuffleChunkSize = conf.shuffleChunkSize(); this.reservedMemoryPerPartition = conf.partitionSorterReservedMemoryPerPartition();