From e22ef17b2ac4564e54375f1a37b0c342c49b2e18 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 1 Nov 2023 10:58:25 +0800 Subject: [PATCH 1/2] [CELEBORN-1094] Optimize mechanism of ChunkManager expired shuffle key cleanup to avoid memory leak --- .../apache/celeborn/common/CelebornConf.scala | 18 +++++ docs/configuration/worker.md | 2 + .../worker/storage/ChunkStreamManager.java | 29 +++----- .../service/deploy/worker/Worker.scala | 37 ++++++---- .../worker/storage/StorageManager.scala | 69 +++++++++---------- .../deploy/worker/storage/WorkerSuite.scala | 9 ++- 6 files changed, 90 insertions(+), 74 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 3c717fa5a9b..0c576319b63 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -658,6 +658,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS) def workerCommitThreads: Int = if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS) + def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS) def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT) def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE) def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT) @@ -972,6 +973,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerDiskTimeSlidingWindowMinFetchCount: Int = get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT) def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE) + def workerDiskCleanThreads: Int = get(WORKER_DISK_CLEAN_THREADS) def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED) def workerDiskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST) def workerDiskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL) @@ -2131,6 +2133,14 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("5G") + val WORKER_DISK_CLEAN_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.disk.clean.threads") + .categories("worker") + .version("0.3.2") + .doc("Thread number of worker to clean up directories of expired shuffle keys on disk.") + .intConf + .createWithDefault(4) + val WORKER_CHECK_FILE_CLEAN_MAX_RETRIES: ConfigEntry[Int] = buildConf("celeborn.worker.storage.checkDirsEmpty.maxRetries") .withAlternative("celeborn.worker.disk.checkFileClean.maxRetries") @@ -2283,6 +2293,14 @@ object CelebornConf extends Logging { .intConf .createWithDefault(32) + val WORKER_CLEAN_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.clean.threads") + .categories("worker") + .version("0.3.2") + .doc("Thread number of worker to clean up expired shuffle keys.") + .intConf + .createWithDefault(64) + val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] = buildConf("celeborn.worker.commitFiles.timeout") .withAlternative("celeborn.worker.shuffle.commit.timeout") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 8154101b0da..ed58e2125b1 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -26,6 +26,7 @@ license: | | celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | celeborn.worker.activeConnection.max | <undefined> | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 | +| celeborn.worker.clean.threads | 64 | Thread number of worker to clean up expired shuffle keys. | 0.3.2 | | celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 | | celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | | celeborn.worker.commitFiles.timeout | 120s | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | @@ -42,6 +43,7 @@ license: | | celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 | | celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | | celeborn.worker.directMemoryRatioToResume | 0.7 | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | +| celeborn.worker.disk.clean.threads | 4 | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | | celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | | celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | | celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java index 04081c29e62..bd1e7e47685 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java @@ -26,8 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +89,7 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex, int offset, int len } FileManagedBuffers buffers = state.buffers; - ManagedBuffer nextChunk = buffers.chunk(chunkIndex, offset, len); - - return nextChunk; + return buffers.chunk(chunkIndex, offset, len); } public TimeWindow getFetchTimeMetric(long streamId) { @@ -105,20 +101,6 @@ public TimeWindow getFetchTimeMetric(long streamId) { } } - public static String genStreamChunkId(long streamId, int chunkId) { - return String.format("%d_%d", streamId, chunkId); - } - - // Parse streamChunkId to be stream id and chunk id. This is used when fetch remote chunk as a - // stream. - public static Pair parseStreamChunkId(String streamChunkId) { - String[] array = streamChunkId.split("_"); - assert array.length == 2 : "Stream id and chunk index should be specified."; - long streamId = Long.parseLong(array[0]); - int chunkIndex = Integer.parseInt(array[1]); - return ImmutablePair.of(streamId, chunkIndex); - } - public void chunkBeingSent(long streamId) { StreamState streamState = streams.get(streamId); if (streamState != null) { @@ -184,14 +166,21 @@ public long nextStreamId() { } public void cleanupExpiredShuffleKey(Set expiredShuffleKeys) { + logger.info( + "Clean up expired shuffle keys {}", + String.join(",", expiredShuffleKeys.toArray(new String[0]))); for (String expiredShuffleKey : expiredShuffleKeys) { Set expiredStreamIds = shuffleStreamIds.remove(expiredShuffleKey); // normally expiredStreamIds set will be empty as streamId will be removed when be fully read if (expiredStreamIds != null && !expiredStreamIds.isEmpty()) { - streams.keySet().removeAll(expiredStreamIds); + expiredStreamIds.parallelStream().forEach(streams::remove); } } + logger.info( + "Cleaned up expired shuffle keys. The count of shuffle keys and streams: {}, {}", + shuffleStreamIds.size(), + streams.size()); } public Tuple2 getShuffleKeyAndFileName(long streamId) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 6c0f981ce1e..df6301bbff9 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -251,7 +251,11 @@ private[celeborn] class Worker( val replicateThreadPool: ThreadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("worker-replicate-data", conf.workerReplicateThreads) val commitThreadPool: ThreadPoolExecutor = - ThreadUtils.newDaemonCachedThreadPool("Worker-CommitFiles", conf.workerCommitThreads) + ThreadUtils.newDaemonCachedThreadPool("worker-commit-files", conf.workerCommitThreads) + val cleanThreadPool: ThreadPoolExecutor = + ThreadUtils.newDaemonCachedThreadPool( + "worker-clean-expired-shuffle-keys", + conf.workerCleanThreads) val asyncReplyPool: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply") val timer = new HashedWheelTimer() @@ -400,7 +404,7 @@ private[celeborn] class Worker( while (true) { val expiredShuffleKeys = cleanTaskQueue.take() try { - cleanup(expiredShuffleKeys) + cleanup(expiredShuffleKeys, cleanThreadPool) } catch { case e: Throwable => logError("Cleanup failed", e) @@ -512,20 +516,23 @@ private[celeborn] class Worker( throw new CelebornException("Register worker failed.", exception) } @VisibleForTesting - def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized { - expiredShuffleKeys.asScala.foreach { shuffleKey => - partitionLocationInfo.removeShuffle(shuffleKey) - shufflePartitionType.remove(shuffleKey) - shufflePushDataTimeout.remove(shuffleKey) - shuffleMapperAttempts.remove(shuffleKey) - shuffleCommitInfos.remove(shuffleKey) - workerInfo.releaseSlots(shuffleKey) - logInfo(s"Cleaned up expired shuffle $shuffleKey") + def cleanup(expiredShuffleKeys: JHashSet[String], threadPool: ThreadPoolExecutor): Unit = + synchronized { + expiredShuffleKeys.asScala.foreach { shuffleKey => + partitionLocationInfo.removeShuffle(shuffleKey) + shufflePartitionType.remove(shuffleKey) + shufflePushDataTimeout.remove(shuffleKey) + shuffleMapperAttempts.remove(shuffleKey) + shuffleCommitInfos.remove(shuffleKey) + workerInfo.releaseSlots(shuffleKey) + logInfo(s"Cleaned up expired shuffle $shuffleKey") + } + partitionsSorter.cleanup(expiredShuffleKeys) + fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys) + threadPool.execute(new Runnable { + override def run(): Unit = storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys) + }) } - partitionsSorter.cleanup(expiredShuffleKeys) - storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys) - fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys) - } override def getWorkerInfo: String = { val sb = new StringBuilder diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 28ee0407aee..e059ae4aff4 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -29,6 +29,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import io.netty.buffer.PooledByteBufAllocator +import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission @@ -88,7 +89,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs diskInfo => cleaners.put( diskInfo.mountPoint, - ThreadUtils.newDaemonCachedThreadPool(s"Disk-cleaner-${diskInfo.mountPoint}", 1)) + ThreadUtils.newDaemonCachedThreadPool( + s"disk-cleaner-${diskInfo.mountPoint}", + conf.workerDiskCleanThreads)) } cleaners } @@ -156,7 +159,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized { if (diskStatus == DiskStatus.CRITICAL_ERROR) { - logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk operator.") + logInfo(s"Disk $mountPoint faces critical error, will remove its disk operator.") val operator = diskOperators.remove(mountPoint) if (operator != null) { operator.shutdown() @@ -168,7 +171,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs if (!diskOperators.containsKey(mountPoint)) { diskOperators.put( mountPoint, - ThreadUtils.newDaemonCachedThreadPool(s"Disk-cleaner-${mountPoint}", 1)) + ThreadUtils.newDaemonCachedThreadPool( + s"disk-cleaner-$mountPoint", + conf.workerDiskCleanThreads)) } } @@ -176,7 +181,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs private val counterOperator = new IntUnaryOperator() { override def applyAsInt(operand: Int): Int = { val dirs = healthyWorkingDirs() - if (dirs.length > 0) { + if (dirs.nonEmpty) { (operand + 1) % dirs.length } else 0 } @@ -254,12 +259,12 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val shuffleKey = parseDbShuffleKey(key) try { val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache) - logDebug(s"Reload DB: ${shuffleKey} -> ${files}") + logDebug(s"Reload DB: $shuffleKey -> $files") fileInfos.put(shuffleKey, files) db.delete(entry.getKey) } catch { case exception: Exception => - logError(s"Reload DB: ${shuffleKey} failed.", exception) + logError(s"Reload DB: $shuffleKey failed.", exception) } } else { return @@ -523,7 +528,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hdfsFileWriter = hdfsWriters.get(fileInfo.getFilePath) if (hdfsFileWriter != null) { hdfsFileWriter.destroy(new IOException( - s"Destroy FileWriter ${hdfsFileWriter} caused by shuffle ${shuffleKey} expired.")) + s"Destroy FileWriter $hdfsFileWriter caused by shuffle $shuffleKey expired.")) hdfsWriters.remove(fileInfo.getFilePath) } } else { @@ -534,7 +539,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val fileWriter = writers.get(fileInfo.getFilePath) if (fileWriter != null) { fileWriter.destroy(new IOException( - s"Destroy FileWriter ${fileWriter} caused by shuffle ${shuffleKey} expired.")) + s"Destroy FileWriter $fileWriter caused by shuffle $shuffleKey expired.")) writers.remove(fileInfo.getFilePath) } } @@ -611,9 +616,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs .filter(diskInfo => diskInfo.status == DiskStatus.HEALTHY || diskInfo.status == DiskStatus.HIGH_DISK_USAGE) - .map { case diskInfo => - (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles())) - } + .map(diskInfo => + (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles()))) val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1) diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) => @@ -629,34 +633,25 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } private def deleteDirectory(dir: File, threadPool: ThreadPoolExecutor): Unit = { - val allContents = dir.listFiles - if (allContents != null) { - for (file <- allContents) { - deleteDirectory(file, threadPool) - } + if (dir.exists()) { + threadPool.submit(new Runnable { + override def run(): Unit = { + deleteDirectoryWithRetry(dir) + } + }) } - threadPool.submit(new Runnable { - override def run(): Unit = { - deleteFileWithRetry(dir) - } - }) } - private def deleteFileWithRetry(file: File): Unit = { - if (file.exists()) { - var retryCount = 0 - var deleteSuccess = false - while (!deleteSuccess && retryCount <= 3) { - deleteSuccess = file.delete() - retryCount = retryCount + 1 - if (!deleteSuccess) { - Thread.sleep(200 * retryCount) - } - } - if (deleteSuccess) { - logDebug(s"Deleted expired shuffle file $file.") - } else { - logWarning(s"Failed to delete expired shuffle file $file.") + private def deleteDirectoryWithRetry(dir: File): Unit = { + var retryCount = 0 + var deleteSuccess = false + while (!deleteSuccess && retryCount <= 3) { + try { + FileUtils.deleteDirectory(dir) + deleteSuccess = true + } catch { + case _: IOException => + retryCount = retryCount + 1 } } } @@ -696,7 +691,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs retryTimes += 1 if (retryTimes < conf.workerCheckFileCleanMaxRetries) { logInfo(s"Working directory's files have not been cleaned up completely, " + - s"will start ${retryTimes + 1}th attempt after ${workerCheckFileCleanTimeout} milliseconds.") + s"will start ${retryTimes + 1}th attempt after $workerCheckFileCleanTimeout milliseconds.") } Thread.sleep(workerCheckFileCleanTimeout) } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala index 1fde28f0b31..d5f75380bbd 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.funsuite.AnyFunSuite import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType} -import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils} +import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils, ThreadUtils} import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach { @@ -83,7 +83,12 @@ class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach { val shuffleKey2 = "2-2" expiredShuffleKeys.add(shuffleKey1) expiredShuffleKeys.add(shuffleKey2) - worker.cleanup(expiredShuffleKeys) + worker.cleanup( + expiredShuffleKeys, + ThreadUtils.newDaemonCachedThreadPool( + "worker-clean-expired-shuffle-keys", + conf.workerCleanThreads)) + Thread.sleep(3000) worker.storageManager.workingDirWriters.values().asScala.map(t => assert(t.size() == 0)) } From 115e27d9d1392ad50834d533687f093b7751c416 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 2 Nov 2023 15:23:28 +0800 Subject: [PATCH 2/2] [CELEBORN-1094] Optimize mechanism of ChunkManager expired shuffle key cleanup to avoid memory leak --- .../service/deploy/worker/storage/ChunkStreamManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java index bd1e7e47685..f488399d5f1 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java @@ -174,7 +174,7 @@ public void cleanupExpiredShuffleKey(Set expiredShuffleKeys) { // normally expiredStreamIds set will be empty as streamId will be removed when be fully read if (expiredStreamIds != null && !expiredStreamIds.isEmpty()) { - expiredStreamIds.parallelStream().forEach(streams::remove); + expiredStreamIds.forEach(streams::remove); } } logger.info(