From d7b78cfc5224669b6bfbd226bdcd44b4fc142b04 Mon Sep 17 00:00:00 2001 From: mingji Date: Mon, 1 Jul 2024 17:47:50 +0800 Subject: [PATCH 1/7] [CELEBORN-1483] Add storage policy --- .../celeborn/common/protocol/StorageInfo.java | 6 + .../apache/celeborn/common/CelebornConf.scala | 39 +++++- .../celeborn/common/CelebornConfSuite.scala | 47 ++++++++ docs/configuration/worker.md | 2 + .../service/deploy/worker/Worker.scala | 2 + .../worker/storage/StorageManager.scala | 4 +- .../deploy/worker/storage/StoragePolicy.scala | 111 ++++++++++++++++++ 7 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java index d929091283b..621edb774f6 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java @@ -40,11 +40,13 @@ public int getValue() { public static final Map typesMap = new HashMap<>(); public static final Set typeNames = new HashSet<>(); + public static final Map types = new HashMap<>(); static { for (Type type : Type.values()) { typesMap.put(type.value, type); typeNames.add(type.name()); + types.put(type.name(), type); } } @@ -234,4 +236,8 @@ public static int getAvailableTypes(List types) { } return ava; } + + public static Type fromStrToType(String typeStr) { + return types.get(typeStr); + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 742c164449f..e021f2528cc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -32,7 +32,7 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.internal.config._ import org.apache.celeborn.common.network.util.ByteUnit import org.apache.celeborn.common.protocol._ -import org.apache.celeborn.common.protocol.StorageInfo.Type +import org.apache.celeborn.common.protocol.StorageInfo.{typesMap, validate, Type} import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD} import org.apache.celeborn.common.rpc.RpcTimeout import org.apache.celeborn.common.util.{JavaUtils, Utils} @@ -1130,6 +1130,19 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def readBufferTargetUpdateInterval: Long = get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL) def readBufferTargetNotifyThreshold: Long = get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD) def readBuffersToTriggerReadMin: Int = get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN) + def workerStoragePolicyCreateFilePolicy: Option[List[String]] = + get(WORKER_STORAGE_CREATE_FILE_POLICY).map { + policy => policy.split(",").map(_.trim).toList + }.orElse(Some(List("MEMORY", "HDD", "SSD", "HDFS", "OSS"))) + + def workerStoragePolicyEvictFilePolicy: Option[Map[String, List[String]]] = + get(WORKER_STORAGE_EVICT_POLICY).map { + policy => + policy.split("\\|").map(group => { + val groupArr = group.split(",") + Map(groupArr.head -> groupArr.slice(1, groupArr.length).toList) + }).reduce(_ ++ _) + }.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")))) // ////////////////////////////////////////////////////// // Decommission // @@ -2814,6 +2827,30 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1000ms") + val WORKER_STORAGE_CREATE_FILE_POLICY: OptionalConfigEntry[String] = + buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy") + .categories("worker") + .doc("This defined the order for create files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS") + .version("0.5.1") + .stringConf + .checkValue( + _.split(",").map(str => StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => + p), + "Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS") + .createOptional + + val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] = + buildConf("celeborn.worker.storage.storagePolicy.evictPolicy") + .categories("worker") + .doc("This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes.") + .version("0.5.1") + .stringConf + .checkValue( + _.replace("|", ",").split(",").map(str => + StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p), + "Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,OSS") + .createOptional + val WORKER_HTTP_HOST: ConfigEntry[String] = buildConf("celeborn.worker.http.host") .withAlternative("celeborn.metrics.worker.prometheus.host") diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index 08c673599ad..6da252d9d93 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -305,6 +305,7 @@ class CelebornConfSuite extends CelebornFunSuite { def moduleKey(config: ConfigEntry[_]): String = { config.key.replace("", module) } + conf.set(moduleKey(NETWORK_IO_MODE), transportTestNetworkIoMode) conf.set( moduleKey(NETWORK_IO_PREFER_DIRECT_BUFS), @@ -406,4 +407,50 @@ class CelebornConfSuite extends CelebornFunSuite { assert(conf.networkIoConnectTimeoutMs("test_child_module") == fallbackValue) } + test("Test storage policy case 1") { + val conf = new CelebornConf() + conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD") + val createFilePolicy1 = conf.workerStoragePolicyCreateFilePolicy + assert(List("MEMORY", "SSD") == createFilePolicy1.get) + + conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,HDFS") + val createFilePolicy2 = conf.workerStoragePolicyCreateFilePolicy + assert(List("MEMORY", "HDFS") == createFilePolicy2.get) + + conf.unset("celeborn.worker.storage.storagePolicy.createFilePolicy") + val createFilePolicy3 = conf.workerStoragePolicyCreateFilePolicy + assert(List("MEMORY", "HDD", "SSD", "HDFS", "OSS") == createFilePolicy3.get) + + try { + conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "ABC") + val createFilePolicy4 = conf.workerStoragePolicyCreateFilePolicy + } catch { + case e: Exception => + assert(e.isInstanceOf[IllegalArgumentException]) + } + } + + test("Test storage policy case 2") { + val conf = new CelebornConf() + conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "MEMORY,SSD") + val evictPolicy1 = conf.workerStoragePolicyEvictFilePolicy + assert(Map("MEMORY" -> List("SSD")) == evictPolicy1.get) + + conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "MEMORY,SSD,HDFS|HDD,HDFS") + val evictPolicy2 = conf.workerStoragePolicyEvictFilePolicy + assert(Map("MEMORY" -> List("SSD", "HDFS"), "HDD" -> List("HDFS")) == evictPolicy2.get) + + conf.unset("celeborn.worker.storage.storagePolicy.evictPolicy") + val evictPolicy3 = conf.workerStoragePolicyEvictFilePolicy + assert(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")) == evictPolicy3.get) + + try { + conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "ABC") + conf.workerStoragePolicyEvictFilePolicy + } catch { + case e: Exception => + assert(e.isInstanceOf[IllegalArgumentException]) + } + } + } diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index ef15cd7d0cd..0f911b0ad6f 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -154,6 +154,8 @@ license: | | celeborn.worker.storage.disk.reserve.ratio | <undefined> | false | Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio. | 0.3.2 | | | celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size | | celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on disk. | 0.3.2 | | +| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for create files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS | 0.5.1 | | +| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes. | 0.5.1 | | | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir | | celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | | | celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 8ee278b4585..3555443f0ec 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -60,6 +60,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingSta import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager} +import org.apache.celeborn.service.deploy.worker.storage.StoragePolicy private[celeborn] class Worker( override val conf: CelebornConf, @@ -169,6 +170,7 @@ private[celeborn] class Worker( } val storageManager = new StorageManager(conf, workerSource) + StoragePolicy.initlize(conf, storageManager, workerSource) val memoryManager: MemoryManager = MemoryManager.initialize(conf, storageManager) memoryManager.registerMemoryListener(storageManager) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index e767060c7a4..e2d75ef7e6a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -873,7 +873,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs && MemoryManager.instance().memoryFileStorageAvailable()) { logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") ( - createMemoryFile( + createMemoryFileInfo( partitionDataWriterContext.getAppId, partitionDataWriterContext.getShuffleId, location.getFileName, @@ -899,7 +899,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } } - def createMemoryFile( + def createMemoryFileInfo( appId: String, shuffleId: Int, fileName: String, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala new file mode 100644 index 00000000000..746da6a5357 --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.exception.CelebornIOException +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.metrics.source.AbstractSource +import org.apache.celeborn.common.protocol.StorageInfo + +object StoragePolicy extends Logging { + var storageManager: StorageManager = _ + var conf: CelebornConf = _ + var createFileOrder: Option[List[String]] = _ + var evictFileOrder: Option[Map[String, List[String]]] = _ + var source: AbstractSource = _ + + def initlize( + conf: CelebornConf, + storageManager: StorageManager, + abstractSource: AbstractSource): Unit = { + this.storageManager = storageManager + this.conf = conf + this.createFileOrder = conf.workerStoragePolicyCreateFilePolicy + this.evictFileOrder = conf.workerStoragePolicyEvictFilePolicy + this.source = abstractSource + } + + def getEvictedFile( + celebornFile: CelebornFile, + partitionDataWriterContext: PartitionDataWriterContext): CelebornFile = { + evictFileOrder.foreach(orders => { + orders.foreach(order => { + val current = order._1 + val overrideOrder = order._2 + if (StorageInfo.fromStrToType(current) == celebornFile.storageType) { + return createFile(partitionDataWriterContext, overrideOrder) + } + }) + }) + null + } + + def createFile( + partitionDataWriterContext: PartitionDataWriterContext, + overrideOrder: List[String] = null): CelebornFile = { + logDebug( + s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val location = partitionDataWriterContext.getPartitionLocation + + def tryCreateFileByType(storageInfoType: StorageInfo.Type): CelebornFile = { + storageInfoType match { + case StorageInfo.Type.MEMORY => + logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val memoryFileInfo = storageManager.createMemoryFileInfo( + partitionDataWriterContext.getAppId, + partitionDataWriterContext.getShuffleId, + location.getFileName, + partitionDataWriterContext.getUserIdentifier, + partitionDataWriterContext.getPartitionType, + partitionDataWriterContext.isPartitionSplitEnabled) + partitionDataWriterContext.setStorageType(storageInfoType) + new CelebornMemoryFile(conf, source, memoryFileInfo, storageInfoType) + case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS => + logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile( + location, + partitionDataWriterContext.getAppId, + partitionDataWriterContext.getShuffleId, + location.getFileName, + partitionDataWriterContext.getUserIdentifier, + partitionDataWriterContext.getPartitionType, + partitionDataWriterContext.isPartitionSplitEnabled) + if (storageInfoType == StorageInfo.Type.HDD || storageInfoType == StorageInfo.Type.SSD) { + new CelebornDiskFile(flusher, diskFileInfo, workingDir, storageInfoType) + } else { + new CelebornDFSFile(flusher, diskFileInfo, storageInfoType) + } + } + } + + val tmpOrder = + if (overrideOrder != null) { + Some(overrideOrder) + } else { + createFileOrder + } + tmpOrder.foreach(lst => { + for (storageStr <- lst) { + val storageInfoType = StorageInfo.fromStrToType(storageStr) + return tryCreateFileByType(storageInfoType) + } + }) + throw new CelebornIOException(s"Create file failed for ${partitionDataWriterContext}") + } +} From 23a350db0f7363b24f168914d087492472e600d8 Mon Sep 17 00:00:00 2001 From: mingji Date: Mon, 1 Jul 2024 17:50:23 +0800 Subject: [PATCH 2/7] add missing class --- .../deploy/worker/storage/CelebornFile.scala | 92 +++++++++++++++++++ .../worker/storage/CelebornFileProxy.scala | 57 ++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala new file mode 100644 index 00000000000..9d0a97083c3 --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage + +import io.netty.buffer.{ByteBuf, CompositeByteBuf} +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MemoryFileInfo} +import org.apache.celeborn.common.metrics.source.AbstractSource +import org.apache.celeborn.common.protocol.StorageInfo + +import java.io.File +import java.nio.channels.FileChannel + +abstract class CelebornFile { + var fileInfo: FileInfo = _ + var flushBuffer: CompositeByteBuf = _ + val flushLock = new AnyRef + var flusher: Flusher = _ + var flushWorkerIndex: Int = _ + var writerCloseTimeoutMs: Long = _ + var flusherBufferSize = 0L + var source: AbstractSource = _ // metrics + var chunkSize: Long = _ + var metricsCollectCriticalEnabled = false + var storageType: StorageInfo.Type = _ + + def write(buf: ByteBuf): Unit + + def needEvict: Boolean + + def evict(file: CelebornFile): Unit + + def close(): Unit +} + +class CelebornMemoryFile( + conf: CelebornConf, + source: AbstractSource, + fileInfo: MemoryFileInfo, + storageType: StorageInfo.Type) extends CelebornFile { + + override def write(buf: ByteBuf): Unit = {} + + override def needEvict: Boolean = ??? + + override def evict(file: CelebornFile): Unit = ??? + + override def close(): Unit = ??? +} + +class CelebornDiskFile( + flusher: Flusher, + diskFileInfo: DiskFileInfo, + workingDir: File, + storageType: StorageInfo.Type) extends CelebornFile { + private var channel: FileChannel = null + + override def write(buf: ByteBuf): Unit = ??? + + override def needEvict: Boolean = ??? + + override def evict(file: CelebornFile): Unit = ??? + + override def close(): Unit = ??? +} + +class CelebornDFSFile(flusher: Flusher, hdfsFileInfo: DiskFileInfo, storageType: StorageInfo.Type) + extends CelebornFile { + + override def write(buf: ByteBuf): Unit = ??? + + override def needEvict: Boolean = ??? + + override def evict(file: CelebornFile): Unit = ??? + + override def close(): Unit = ??? +} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala new file mode 100644 index 00000000000..bcb0ca49f3e --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage + +import io.netty.buffer.ByteBuf +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.metrics.source.AbstractSource + +class CelebornFileProxy( + partitionDataWriterContext: PartitionDataWriterContext, + storageManager: StorageManager, + conf: CelebornConf, + source: AbstractSource) { + var currentFile: CelebornFile = _ + var flusher: Flusher = null + var flushWorkerIndex = 0 + + currentFile = StoragePolicy.createFile(partitionDataWriterContext) + + def write(buf: ByteBuf) = { + this.synchronized { + currentFile.write(buf) + } + } + + def evict(force: Boolean) = { + if (currentFile.needEvict || force) { + this.synchronized { + val nFile = StoragePolicy.getEvictedFile(currentFile, partitionDataWriterContext) + currentFile.evict(nFile) + currentFile = nFile + } + } + } + + def close(): Unit = {} + + def isMemoryShuffleFile: Boolean = { + currentFile.isInstanceOf[CelebornMemoryFile] + } + +} From 4d89f862cdf9f6c68fd1ac2ba6cd5307ce7654cd Mon Sep 17 00:00:00 2001 From: mingji Date: Mon, 1 Jul 2024 17:55:11 +0800 Subject: [PATCH 3/7] fix style --- .../service/deploy/worker/storage/CelebornFile.scala | 7 ++++--- .../service/deploy/worker/storage/CelebornFileProxy.scala | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala index 9d0a97083c3..5d67f75d80e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFile.scala @@ -17,15 +17,16 @@ package org.apache.celeborn.service.deploy.worker.storage +import java.io.File +import java.nio.channels.FileChannel + import io.netty.buffer.{ByteBuf, CompositeByteBuf} + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MemoryFileInfo} import org.apache.celeborn.common.metrics.source.AbstractSource import org.apache.celeborn.common.protocol.StorageInfo -import java.io.File -import java.nio.channels.FileChannel - abstract class CelebornFile { var fileInfo: FileInfo = _ var flushBuffer: CompositeByteBuf = _ diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala index bcb0ca49f3e..dd8a382c6dc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala @@ -18,6 +18,7 @@ package org.apache.celeborn.service.deploy.worker.storage import io.netty.buffer.ByteBuf + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.metrics.source.AbstractSource From 1a67e91c8a1c270dda0122f303d0b731b2dfec8b Mon Sep 17 00:00:00 2001 From: mingji Date: Mon, 1 Jul 2024 20:13:08 +0800 Subject: [PATCH 4/7] refine --- .../storage/PartitionDataWriterContext.java | 10 ++++++++++ .../deploy/worker/storage/StoragePolicy.scala | 20 +++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java index b73b901adf3..86d53f9a9ea 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java @@ -21,6 +21,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.common.protocol.PartitionSplitMode; import org.apache.celeborn.common.protocol.PartitionType; +import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.util.Utils; public class PartitionDataWriterContext { @@ -34,6 +35,7 @@ public class PartitionDataWriterContext { private final boolean partitionSplitEnabled; private final String shuffleKey; private final PartitionType partitionType; + private StorageInfo.Type storageType = null; public PartitionDataWriterContext( long splitThreshold, @@ -96,4 +98,12 @@ public String getShuffleKey() { public PartitionType getPartitionType() { return partitionType; } + + public StorageInfo.Type getStorageType() { + return storageType; + } + + public void setStorageType(StorageInfo.Type storageType) { + this.storageType = storageType; + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 746da6a5357..152c18ae059 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -44,21 +44,18 @@ object StoragePolicy extends Logging { def getEvictedFile( celebornFile: CelebornFile, partitionDataWriterContext: PartitionDataWriterContext): CelebornFile = { - evictFileOrder.foreach(orders => { - orders.foreach(order => { - val current = order._1 - val overrideOrder = order._2 - if (StorageInfo.fromStrToType(current) == celebornFile.storageType) { - return createFile(partitionDataWriterContext, overrideOrder) - } - }) - }) + if (evictFileOrder.isDefined) { + if (evictFileOrder.get.contains(celebornFile.storageType.name())) { + val order = evictFileOrder.get.get(celebornFile.storageType.name()) + return createFile(partitionDataWriterContext, order) + } + } null } def createFile( partitionDataWriterContext: PartitionDataWriterContext, - overrideOrder: List[String] = null): CelebornFile = { + overrideOrder: Option[List[String]] = null): CelebornFile = { logDebug( s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") val location = partitionDataWriterContext.getPartitionLocation @@ -96,10 +93,11 @@ object StoragePolicy extends Logging { val tmpOrder = if (overrideOrder != null) { - Some(overrideOrder) + overrideOrder } else { createFileOrder } + tmpOrder.foreach(lst => { for (storageStr <- lst) { val storageInfoType = StorageInfo.fromStrToType(storageStr) From 5b32503de3222960040f9ef4fa55ca8c7087c93d Mon Sep 17 00:00:00 2001 From: mingji Date: Wed, 3 Jul 2024 15:41:45 +0800 Subject: [PATCH 5/7] Add ut. Address comments. --- .../apache/celeborn/common/CelebornConf.scala | 12 ++- docs/configuration/worker.md | 4 +- .../service/deploy/worker/Worker.scala | 1 - .../worker/storage/CelebornFileProxy.scala | 9 +- .../worker/storage/StorageManager.scala | 1 + .../deploy/worker/storage/StoragePolicy.scala | 94 +++++++++---------- .../worker/storage/StoragePolicySuite.scala | 66 +++++++++++++ 7 files changed, 128 insertions(+), 59 deletions(-) create mode 100644 worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index e021f2528cc..a2b2e284f83 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2830,19 +2830,25 @@ object CelebornConf extends Logging { val WORKER_STORAGE_CREATE_FILE_POLICY: OptionalConfigEntry[String] = buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy") .categories("worker") - .doc("This defined the order for create files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS") + .doc("This defined the order for creating files across available storages." + + " Available storages options are: MEMORY,SSD,HDD,HDFS,OSS") .version("0.5.1") .stringConf .checkValue( _.split(",").map(str => StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p), - "Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS") + "Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS,OSS") .createOptional val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] = buildConf("celeborn.worker.storage.storagePolicy.evictPolicy") .categories("worker") - .doc("This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes.") + .doc("This define the order of evict files if the storages are available." + + " Available storages: MEMORY,SSD,HDD,HDFS. " + + "Definition: StorageTypes|StorageTypes|StorageTypes. " + + "Example: MEMORY,SSD|SSD,HDFS." + + " The example means that a MEMORY shuffle file can be evicted to SSD " + + "and a SSD shuffle file can be evicted to HDFS.") .version("0.5.1") .stringConf .checkValue( diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 0f911b0ad6f..53938ccc0c0 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -154,8 +154,8 @@ license: | | celeborn.worker.storage.disk.reserve.ratio | <undefined> | false | Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio. | 0.3.2 | | | celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size | | celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on disk. | 0.3.2 | | -| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for create files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS | 0.5.1 | | -| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes. | 0.5.1 | | +| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 | | +| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The example means that a MEMORY shuffle file can be evicted to SSD and a SSD shuffle file can be evicted to HDFS. | 0.5.1 | | | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir | | celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | | | celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 3555443f0ec..2f87673838c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -170,7 +170,6 @@ private[celeborn] class Worker( } val storageManager = new StorageManager(conf, workerSource) - StoragePolicy.initlize(conf, storageManager, workerSource) val memoryManager: MemoryManager = MemoryManager.initialize(conf, storageManager) memoryManager.registerMemoryListener(storageManager) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala index dd8a382c6dc..4c4a4b5b474 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/CelebornFileProxy.scala @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.metrics.source.AbstractSource +import org.apache.celeborn.common.protocol.StorageInfo class CelebornFileProxy( partitionDataWriterContext: PartitionDataWriterContext, @@ -31,7 +32,7 @@ class CelebornFileProxy( var flusher: Flusher = null var flushWorkerIndex = 0 - currentFile = StoragePolicy.createFile(partitionDataWriterContext) + currentFile = storageManager.storagePolicy.createFile(partitionDataWriterContext) def write(buf: ByteBuf) = { this.synchronized { @@ -42,7 +43,8 @@ class CelebornFileProxy( def evict(force: Boolean) = { if (currentFile.needEvict || force) { this.synchronized { - val nFile = StoragePolicy.getEvictedFile(currentFile, partitionDataWriterContext) + val nFile = + storageManager.storagePolicy.getEvictedFile(currentFile, partitionDataWriterContext) currentFile.evict(nFile) currentFile = nFile } @@ -52,7 +54,6 @@ class CelebornFileProxy( def close(): Unit = {} def isMemoryShuffleFile: Boolean = { - currentFile.isInstanceOf[CelebornMemoryFile] + currentFile.storageType == StorageInfo.Type.MEMORY } - } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index e2d75ef7e6a..34579327afb 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -68,6 +68,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hasHDFSStorage = conf.hasHDFSStorage val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout + val storagePolicy = new StoragePolicy(conf, this, workerSource) // (deviceName -> deviceInfo) and (mount point -> diskInfo) val (deviceInfos, diskInfos) = { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 152c18ae059..c56126e738a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -23,31 +23,18 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.metrics.source.AbstractSource import org.apache.celeborn.common.protocol.StorageInfo -object StoragePolicy extends Logging { - var storageManager: StorageManager = _ - var conf: CelebornConf = _ - var createFileOrder: Option[List[String]] = _ - var evictFileOrder: Option[Map[String, List[String]]] = _ - var source: AbstractSource = _ - - def initlize( - conf: CelebornConf, - storageManager: StorageManager, - abstractSource: AbstractSource): Unit = { - this.storageManager = storageManager - this.conf = conf - this.createFileOrder = conf.workerStoragePolicyCreateFilePolicy - this.evictFileOrder = conf.workerStoragePolicyEvictFilePolicy - this.source = abstractSource - } +class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: AbstractSource) + extends Logging { + var createFileOrder: Option[List[String]] = conf.workerStoragePolicyCreateFilePolicy + var evictFileOrder: Option[Map[String, List[String]]] = conf.workerStoragePolicyEvictFilePolicy def getEvictedFile( celebornFile: CelebornFile, partitionDataWriterContext: PartitionDataWriterContext): CelebornFile = { - if (evictFileOrder.isDefined) { - if (evictFileOrder.get.contains(celebornFile.storageType.name())) { - val order = evictFileOrder.get.get(celebornFile.storageType.name()) - return createFile(partitionDataWriterContext, order) + evictFileOrder.foreach { order => + val orderList = order.get(celebornFile.storageType.name()) + if (orderList != null) { + return createFile(partitionDataWriterContext, orderList) } } null @@ -61,33 +48,39 @@ object StoragePolicy extends Logging { val location = partitionDataWriterContext.getPartitionLocation def tryCreateFileByType(storageInfoType: StorageInfo.Type): CelebornFile = { - storageInfoType match { - case StorageInfo.Type.MEMORY => - logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") - val memoryFileInfo = storageManager.createMemoryFileInfo( - partitionDataWriterContext.getAppId, - partitionDataWriterContext.getShuffleId, - location.getFileName, - partitionDataWriterContext.getUserIdentifier, - partitionDataWriterContext.getPartitionType, - partitionDataWriterContext.isPartitionSplitEnabled) - partitionDataWriterContext.setStorageType(storageInfoType) - new CelebornMemoryFile(conf, source, memoryFileInfo, storageInfoType) - case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS => - logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") - val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile( - location, - partitionDataWriterContext.getAppId, - partitionDataWriterContext.getShuffleId, - location.getFileName, - partitionDataWriterContext.getUserIdentifier, - partitionDataWriterContext.getPartitionType, - partitionDataWriterContext.isPartitionSplitEnabled) - if (storageInfoType == StorageInfo.Type.HDD || storageInfoType == StorageInfo.Type.SSD) { - new CelebornDiskFile(flusher, diskFileInfo, workingDir, storageInfoType) - } else { - new CelebornDFSFile(flusher, diskFileInfo, storageInfoType) - } + try { + storageInfoType match { + case StorageInfo.Type.MEMORY => + logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val memoryFileInfo = storageManager.createMemoryFileInfo( + partitionDataWriterContext.getAppId, + partitionDataWriterContext.getShuffleId, + location.getFileName, + partitionDataWriterContext.getUserIdentifier, + partitionDataWriterContext.getPartitionType, + partitionDataWriterContext.isPartitionSplitEnabled) + partitionDataWriterContext.setStorageType(storageInfoType) + new CelebornMemoryFile(conf, source, memoryFileInfo, storageInfoType) + case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS => + logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile( + location, + partitionDataWriterContext.getAppId, + partitionDataWriterContext.getShuffleId, + location.getFileName, + partitionDataWriterContext.getUserIdentifier, + partitionDataWriterContext.getPartitionType, + partitionDataWriterContext.isPartitionSplitEnabled) + if (storageInfoType == StorageInfo.Type.HDD || storageInfoType == StorageInfo.Type.SSD) { + new CelebornDiskFile(flusher, diskFileInfo, workingDir, storageInfoType) + } else { + new CelebornDFSFile(flusher, diskFileInfo, storageInfoType) + } + } + } catch { + case e: Exception => + logError(s"create celeborn file for storage ${storageInfoType} failed", e) + null } } @@ -101,7 +94,10 @@ object StoragePolicy extends Logging { tmpOrder.foreach(lst => { for (storageStr <- lst) { val storageInfoType = StorageInfo.fromStrToType(storageStr) - return tryCreateFileByType(storageInfoType) + val file = tryCreateFileByType(storageInfoType) + if (file != null) { + return file + } } }) throw new CelebornIOException(s"Create file failed for ${partitionDataWriterContext}") diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala new file mode 100644 index 00000000000..273866048bc --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala @@ -0,0 +1,66 @@ +package org.apache.celeborn.service.deploy.worker.storage + +import java.io.File + +import org.mockito.ArgumentMatchers.any +import org.mockito.MockitoSugar.mock +import org.mockito.MockitoSugar.when + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.meta.{DiskFileInfo, MemoryFileInfo} +import org.apache.celeborn.common.metrics.source.AbstractSource +import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo} + +class StoragePolicySuite extends CelebornFunSuite { + val mockedStorageManager: StorageManager = mock[StorageManager] + val mockedSource: AbstractSource = mock[AbstractSource] + val mockedPartitionWriterContext: PartitionDataWriterContext = mock[PartitionDataWriterContext] + + val mockedCelebornMemoryFile = mock[MemoryFileInfo] + when( + mockedStorageManager.createMemoryFileInfo(any(), any(), any(), any(), any(), any())).thenAnswer( + mockedCelebornMemoryFile) + + val mockedDiskFile = mock[DiskFileInfo] + val mockedFlusher = mock[Flusher] + val mockedFile = mock[File] + when( + mockedStorageManager.createDiskFile( + any(), + any(), + any(), + any(), + any(), + any(), + any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile)) + + val mockedPartitionLocation = + new PartitionLocation(1, 1, "h1", 1, 2, 3, 4, PartitionLocation.Mode.PRIMARY) + when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(mockedPartitionLocation) + + test("test create file order case1") { + val conf = new CelebornConf() + conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS") + val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) + val file = storagePolicy.createFile(mockedPartitionWriterContext) + assert(file.isInstanceOf[CelebornMemoryFile]) + } + + test("test create file order case2") { + val conf = new CelebornConf() + conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS") + val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) + val file = storagePolicy.createFile(mockedPartitionWriterContext) + assert(file.isInstanceOf[CelebornDiskFile]) + } + + test("test getEvicted file case1") { + val mockedMemoryFile = mock[CelebornMemoryFile] + val conf = new CelebornConf() + val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) + when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY) + val nFile = storagePolicy.getEvictedFile(mockedMemoryFile, mockedPartitionWriterContext) + assert(nFile.isInstanceOf[CelebornDiskFile]) + } +} From b49e3ed175703b0a3ba4ce524f05468aa8ff9a14 Mon Sep 17 00:00:00 2001 From: mingji Date: Fri, 5 Jul 2024 10:52:29 +0800 Subject: [PATCH 6/7] refine --- .../deploy/worker/storage/StoragePolicy.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index c56126e738a..228063d81e1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -42,7 +42,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: def createFile( partitionDataWriterContext: PartitionDataWriterContext, - overrideOrder: Option[List[String]] = null): CelebornFile = { + order: Option[List[String]] = createFileOrder): CelebornFile = { logDebug( s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") val location = partitionDataWriterContext.getPartitionLocation @@ -84,14 +84,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } } - val tmpOrder = - if (overrideOrder != null) { - overrideOrder - } else { - createFileOrder - } - - tmpOrder.foreach(lst => { + order.foreach(lst => { for (storageStr <- lst) { val storageInfoType = StorageInfo.fromStrToType(storageStr) val file = tryCreateFileByType(storageInfoType) @@ -100,6 +93,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } } }) + throw new CelebornIOException(s"Create file failed for ${partitionDataWriterContext}") } } From 4e1260d9e0954ed5edb9d9c338094f989131c85b Mon Sep 17 00:00:00 2001 From: mingji Date: Fri, 5 Jul 2024 10:54:57 +0800 Subject: [PATCH 7/7] fix license header --- .../worker/storage/StoragePolicySuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala index 273866048bc..c2cfd596be1 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicySuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.celeborn.service.deploy.worker.storage import java.io.File