Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1483] Add storage policy #2595

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public int getValue() {

public static final Map<Integer, Type> typesMap = new HashMap<>();
public static final Set<String> typeNames = new HashSet<>();
public static final Map<String, Type> types = new HashMap<>();

static {
for (Type type : Type.values()) {
typesMap.put(type.value, type);
typeNames.add(type.name());
types.put(type.name(), type);
}
}

Expand Down Expand Up @@ -234,4 +236,8 @@ public static int getAvailableTypes(List<Type> types) {
}
return ava;
}

public static Type fromStrToType(String typeStr) {
return types.get(typeStr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
import org.apache.celeborn.common.network.util.ByteUnit
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.StorageInfo.Type
import org.apache.celeborn.common.protocol.StorageInfo.{typesMap, validate, Type}
import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
import org.apache.celeborn.common.rpc.RpcTimeout
import org.apache.celeborn.common.util.{JavaUtils, Utils}
Expand Down Expand Up @@ -1130,6 +1130,19 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def readBufferTargetUpdateInterval: Long = get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
def readBufferTargetNotifyThreshold: Long = get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
def readBuffersToTriggerReadMin: Int = get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
def workerStoragePolicyCreateFilePolicy: Option[List[String]] =
get(WORKER_STORAGE_CREATE_FILE_POLICY).map {
policy => policy.split(",").map(_.trim).toList
}.orElse(Some(List("MEMORY", "HDD", "SSD", "HDFS", "OSS")))

def workerStoragePolicyEvictFilePolicy: Option[Map[String, List[String]]] =
get(WORKER_STORAGE_EVICT_POLICY).map {
policy =>
policy.split("\\|").map(group => {
val groupArr = group.split(",")
Map(groupArr.head -> groupArr.slice(1, groupArr.length).toList)
}).reduce(_ ++ _)
}.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS"))))

// //////////////////////////////////////////////////////
// Decommission //
Expand Down Expand Up @@ -2814,6 +2827,36 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1000ms")

val WORKER_STORAGE_CREATE_FILE_POLICY: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy")
.categories("worker")
.doc("This defined the order for creating files across available storages." +
" Available storages options are: MEMORY,SSD,HDD,HDFS,OSS")
.version("0.5.1")
.stringConf
.checkValue(
_.split(",").map(str => StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p =>
p),
"Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS,OSS")
.createOptional

val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.storagePolicy.evictPolicy")
.categories("worker")
.doc("This define the order of evict files if the storages are available." +
" Available storages: MEMORY,SSD,HDD,HDFS. " +
"Definition: StorageTypes|StorageTypes|StorageTypes. " +
"Example: MEMORY,SSD|SSD,HDFS." +
" The example means that a MEMORY shuffle file can be evicted to SSD " +
"and a SSD shuffle file can be evicted to HDFS.")
.version("0.5.1")
.stringConf
.checkValue(
_.replace("|", ",").split(",").map(str =>
StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p),
"Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,OSS")
.createOptional

val WORKER_HTTP_HOST: ConfigEntry[String] =
buildConf("celeborn.worker.http.host")
.withAlternative("celeborn.metrics.worker.prometheus.host")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ class CelebornConfSuite extends CelebornFunSuite {
def moduleKey(config: ConfigEntry[_]): String = {
config.key.replace("<module>", module)
}

conf.set(moduleKey(NETWORK_IO_MODE), transportTestNetworkIoMode)
conf.set(
moduleKey(NETWORK_IO_PREFER_DIRECT_BUFS),
Expand Down Expand Up @@ -406,4 +407,50 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(conf.networkIoConnectTimeoutMs("test_child_module") == fallbackValue)
}

test("Test storage policy case 1") {
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD")
val createFilePolicy1 = conf.workerStoragePolicyCreateFilePolicy
assert(List("MEMORY", "SSD") == createFilePolicy1.get)

conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,HDFS")
val createFilePolicy2 = conf.workerStoragePolicyCreateFilePolicy
assert(List("MEMORY", "HDFS") == createFilePolicy2.get)

conf.unset("celeborn.worker.storage.storagePolicy.createFilePolicy")
val createFilePolicy3 = conf.workerStoragePolicyCreateFilePolicy
assert(List("MEMORY", "HDD", "SSD", "HDFS", "OSS") == createFilePolicy3.get)

try {
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "ABC")
val createFilePolicy4 = conf.workerStoragePolicyCreateFilePolicy
} catch {
case e: Exception =>
assert(e.isInstanceOf[IllegalArgumentException])
}
}

test("Test storage policy case 2") {
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "MEMORY,SSD")
val evictPolicy1 = conf.workerStoragePolicyEvictFilePolicy
assert(Map("MEMORY" -> List("SSD")) == evictPolicy1.get)

conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "MEMORY,SSD,HDFS|HDD,HDFS")
val evictPolicy2 = conf.workerStoragePolicyEvictFilePolicy
assert(Map("MEMORY" -> List("SSD", "HDFS"), "HDD" -> List("HDFS")) == evictPolicy2.get)

conf.unset("celeborn.worker.storage.storagePolicy.evictPolicy")
val evictPolicy3 = conf.workerStoragePolicyEvictFilePolicy
assert(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS")) == evictPolicy3.get)

try {
conf.set("celeborn.worker.storage.storagePolicy.evictPolicy", "ABC")
conf.workerStoragePolicyEvictFilePolicy
} catch {
case e: Exception =>
assert(e.isInstanceOf[IllegalArgumentException])
}
}

}
2 changes: 2 additions & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ license: |
| celeborn.worker.storage.disk.reserve.ratio | &lt;undefined&gt; | false | Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space between the reserved space and the space calculate via reserved ratio. | 0.3.2 | |
| celeborn.worker.storage.disk.reserve.size | 5G | false | Celeborn worker reserved space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size |
| celeborn.worker.storage.expireDirs.timeout | 1h | false | The timeout for a expire dirs to be deleted on disk. | 0.3.2 | |
| celeborn.worker.storage.storagePolicy.createFilePolicy | &lt;undefined&gt; | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 | |
| celeborn.worker.storage.storagePolicy.evictPolicy | &lt;undefined&gt; | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The example means that a MEMORY shuffle file can be evicted to SSD and a SSD shuffle file can be evicted to HDFS. | 0.5.1 | |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.PartitionSplitMode;
import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.util.Utils;

public class PartitionDataWriterContext {
Expand All @@ -34,6 +35,7 @@ public class PartitionDataWriterContext {
private final boolean partitionSplitEnabled;
private final String shuffleKey;
private final PartitionType partitionType;
private StorageInfo.Type storageType = null;

public PartitionDataWriterContext(
long splitThreshold,
Expand Down Expand Up @@ -96,4 +98,12 @@ public String getShuffleKey() {
public PartitionType getPartitionType() {
return partitionType;
}

public StorageInfo.Type getStorageType() {
return storageType;
}

public void setStorageType(StorageInfo.Type storageType) {
this.storageType = storageType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingSta
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}
import org.apache.celeborn.service.deploy.worker.storage.StoragePolicy

private[celeborn] class Worker(
override val conf: CelebornConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.worker.storage

import java.io.File
import java.nio.channels.FileChannel

import io.netty.buffer.{ByteBuf, CompositeByteBuf}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MemoryFileInfo}
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.protocol.StorageInfo

abstract class CelebornFile {
var fileInfo: FileInfo = _
var flushBuffer: CompositeByteBuf = _
val flushLock = new AnyRef
var flusher: Flusher = _
var flushWorkerIndex: Int = _
var writerCloseTimeoutMs: Long = _
var flusherBufferSize = 0L
var source: AbstractSource = _ // metrics
var chunkSize: Long = _
var metricsCollectCriticalEnabled = false
var storageType: StorageInfo.Type = _

def write(buf: ByteBuf): Unit

def needEvict: Boolean

def evict(file: CelebornFile): Unit

def close(): Unit
}

class CelebornMemoryFile(
conf: CelebornConf,
source: AbstractSource,
fileInfo: MemoryFileInfo,
storageType: StorageInfo.Type) extends CelebornFile {

override def write(buf: ByteBuf): Unit = {}

override def needEvict: Boolean = ???

override def evict(file: CelebornFile): Unit = ???

override def close(): Unit = ???
}

class CelebornDiskFile(
flusher: Flusher,
diskFileInfo: DiskFileInfo,
workingDir: File,
storageType: StorageInfo.Type) extends CelebornFile {
private var channel: FileChannel = null

override def write(buf: ByteBuf): Unit = ???

override def needEvict: Boolean = ???

override def evict(file: CelebornFile): Unit = ???

override def close(): Unit = ???
}

class CelebornDFSFile(flusher: Flusher, hdfsFileInfo: DiskFileInfo, storageType: StorageInfo.Type)
extends CelebornFile {

override def write(buf: ByteBuf): Unit = ???

override def needEvict: Boolean = ???

override def evict(file: CelebornFile): Unit = ???

override def close(): Unit = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.worker.storage

import io.netty.buffer.ByteBuf

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.protocol.StorageInfo

class CelebornFileProxy(
partitionDataWriterContext: PartitionDataWriterContext,
storageManager: StorageManager,
conf: CelebornConf,
source: AbstractSource) {
var currentFile: CelebornFile = _
var flusher: Flusher = null
var flushWorkerIndex = 0

currentFile = storageManager.storagePolicy.createFile(partitionDataWriterContext)

def write(buf: ByteBuf) = {
this.synchronized {
currentFile.write(buf)
}
}

def evict(force: Boolean) = {
if (currentFile.needEvict || force) {
this.synchronized {
val nFile =
storageManager.storagePolicy.getEvictedFile(currentFile, partitionDataWriterContext)
currentFile.evict(nFile)
currentFile = nFile
}
}
}

def close(): Unit = {}

def isMemoryShuffleFile: Boolean = {
currentFile.storageType == StorageInfo.Type.MEMORY
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val hasHDFSStorage = conf.hasHDFSStorage

val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
val storagePolicy = new StoragePolicy(conf, this, workerSource)

// (deviceName -> deviceInfo) and (mount point -> diskInfo)
val (deviceInfos, diskInfos) = {
Expand Down Expand Up @@ -873,7 +874,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
&& MemoryManager.instance().memoryFileStorageAvailable()) {
logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
(
createMemoryFile(
createMemoryFileInfo(
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
Expand All @@ -899,7 +900,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
}

def createMemoryFile(
def createMemoryFileInfo(
appId: String,
shuffleId: Int,
fileName: String,
Expand Down
Loading
Loading