From 6663e26ff2bfe671cf36280fd1f7799c0cae763c Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Mon, 8 Jan 2024 11:39:53 +0800 Subject: [PATCH] [#1416] feat(spark): support custom hadoop config in client side (#1417) ### What changes were proposed in this pull request? support custom hadoop config in client side ### Why are the changes needed? It's necessary to support user specify custom hadoop config in client side for remote storage. For #1416 ### Does this PR introduce _any_ user-facing change? Yes. The prefix key of `spark.rss.hadoop.*` for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage ### How was this patch tested? UTs --- .../manager/RssShuffleManagerBase.java | 15 ++++++- .../manager/RssShuffleManagerBaseTest.java | 42 +++++++++++++++++++ .../spark/shuffle/RssShuffleManager.java | 2 +- .../spark/shuffle/RssShuffleManager.java | 2 +- .../uniffle/common/config/RssClientConf.java | 9 ++++ docs/client_guide/spark_client_guide.md | 17 ++++---- 6 files changed, 75 insertions(+), 12 deletions(-) create mode 100644 client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index b0836f9d14..b70ab6933a 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -41,6 +41,7 @@ import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; +import static org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX; import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED; public abstract class RssShuffleManagerBase implements RssShuffleManagerInterface, ShuffleManager { @@ -161,7 +162,7 @@ private static MapOutputTrackerMaster getMapOutputTrackerMaster() { return tracker instanceof MapOutputTrackerMaster ? (MapOutputTrackerMaster) tracker : null; } - private Map parseRemoteStorageConf(Configuration conf) { + private static Map parseRemoteStorageConf(Configuration conf) { Map confItems = Maps.newHashMap(); for (Map.Entry entry : conf) { confItems.put(entry.getKey(), entry.getValue()); @@ -169,13 +170,23 @@ private Map parseRemoteStorageConf(Configuration conf) { return confItems; } - protected RemoteStorageInfo getRemoteStorageInfo(SparkConf sparkConf) { + protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf sparkConf) { Map confItems = Maps.newHashMap(); RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); if (rssConf.getBoolean(RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED)) { confItems = parseRemoteStorageConf(new Configuration(true)); } + for (String key : rssConf.getKeySet()) { + if (key.startsWith(HADOOP_CONFIG_KEY_PREFIX)) { + String val = rssConf.getString(key, null); + if (val != null) { + String extractedKey = key.replaceFirst(HADOOP_CONFIG_KEY_PREFIX, ""); + confItems.put(extractedKey, val); + } + } + } + return new RemoteStorageInfo( sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), confItems); } diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java new file mode 100644 index 0000000000..15cc7fac03 --- /dev/null +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.shuffle.manager; + +import org.apache.spark.SparkConf; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.RemoteStorageInfo; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RssShuffleManagerBaseTest { + + @Test + public void testGetDefaultRemoteStorageInfo() { + SparkConf sparkConf = new SparkConf(); + RemoteStorageInfo remoteStorageInfo = + RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf); + assertTrue(remoteStorageInfo.getConfItems().isEmpty()); + + sparkConf.set("spark.rss.hadoop.fs.defaultFs", "hdfs://rbf-xxx/foo"); + remoteStorageInfo = RssShuffleManagerBase.getDefaultRemoteStorageInfo(sparkConf); + assertEquals(remoteStorageInfo.getConfItems().size(), 1); + assertEquals(remoteStorageInfo.getConfItems().get("fs.defaultFs"), "hdfs://rbf-xxx/foo"); + } +} diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 26cbd64126..fdcc167b53 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -306,7 +306,7 @@ public ShuffleHandle registerShuffle( } String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()); - RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf); + RemoteStorageInfo defaultRemoteStorage = getDefaultRemoteStorageInfo(sparkConf); RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage( appId, defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 013a4acf19..27c8bb8ba5 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -410,7 +410,7 @@ public ShuffleHandle registerShuffle( } String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key()); - RemoteStorageInfo defaultRemoteStorage = getRemoteStorageInfo(sparkConf); + RemoteStorageInfo defaultRemoteStorage = getDefaultRemoteStorageInfo(sparkConf); RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage( id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient); diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index f732687903..b8adb591b8 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -25,6 +25,15 @@ import static org.apache.uniffle.common.compression.Codec.Type.LZ4; public class RssClientConf { + /** + * The prefix key for Hadoop conf. For Spark like that: + * + *

key: spark.rss.hadoop.fs.defaultFS val: hdfs://rbf-x1 + * + *

The key will be extracted to the hadoop conf: "fs.defaultFS" and inject this into Hadoop + * storage configuration. + */ + public static final String HADOOP_CONFIG_KEY_PREFIX = "rss.hadoop."; public static final ConfigOption COMPRESSION_TYPE = ConfigOptions.key("rss.client.io.compression.codec") diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md index 75e2086a37..528da39be5 100644 --- a/docs/client_guide/spark_client_guide.md +++ b/docs/client_guide/spark_client_guide.md @@ -78,14 +78,15 @@ Local shuffle reader as its name indicates is suitable and optimized for spark's The important configuration is listed as following. -| Property Name | Default | Description | -|-------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data | -| spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server | -| spark.rss.client.unregister.thread.pool.size | 10 | The max size of thread pool of unregistering | -| spark.rss.client.unregister.request.timeout.sec | 10 | The max timeout sec when doing unregister to remote shuffle-servers | -| spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data | -| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath | +| Property Name | Default | Description | +|-------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data | +| spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server | +| spark.rss.client.unregister.thread.pool.size | 10 | The max size of thread pool of unregistering | +| spark.rss.client.unregister.request.timeout.sec | 10 | The max timeout sec when doing unregister to remote shuffle-servers | +| spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data | +| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath | +| spark.rss.hadoop.* | - | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage | ### Adaptive Remote Shuffle Enabling Currently, this feature only supports Spark.