diff --git a/pom.xml b/pom.xml
index 963c9ad4e4a0c..ee156556373ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2585,6 +2585,7 @@
resource-managers/kubernetes/docker-minimal-bundle
resource-managers/kubernetes/integration-tests
resource-managers/kubernetes/integration-tests-spark-jobs
+ resource-managers/kubernetes/integration-tests-spark-jobs-helpers
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index 0715c84495a2c..230598d63bed1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -35,7 +35,7 @@ import scala.concurrent.duration.DurationInt
import scala.util.Success
import org.apache.spark.{SPARK_VERSION, SparkConf}
-import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
+import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -284,8 +284,8 @@ private[spark] class Client(
case other => RemoteAppResource(other)
}
- val uploadDriverExtraClasspathBase64Contents = getFileContents(uploadedDriverExtraClasspath)
- val uploadJarsBase64Contents = getFileContents(uploadedJars)
+ val uploadDriverExtraClasspathBase64Contents = compressJars(uploadedDriverExtraClasspath)
+ val uploadJarsBase64Contents = compressJars(uploadedJars)
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
@@ -296,19 +296,10 @@ private[spark] class Client(
uploadedJarsBase64Contents = uploadJarsBase64Contents)
}
- def getFileContents(maybeFilePaths: Option[String]): Array[(String, String)] = {
+ def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
maybeFilePaths
- .map(_.split(",").map(filePath => {
- val fileToUpload = new File(filePath)
- if (!fileToUpload.isFile) {
- throw new IllegalStateException("Provided file to upload for driver extra classpath" +
- s" does not exist or is not a file: $filePath")
- } else {
- val fileBytes = Files.toByteArray(fileToUpload)
- val fileBase64 = Base64.encodeBase64String(fileBytes)
- (fileToUpload.getName, fileBase64)
- }
- })).getOrElse(Array.empty[(String, String)])
+ .map(_.split(","))
+ .map(CompressionUtils.createTarGzip(_))
}
private def getDriverLauncherService(
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
index 4b7bb66083f29..6da1a848b25e7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
@@ -27,14 +27,19 @@ case class KubernetesCreateSubmissionRequest(
val appArgs: Array[String],
val sparkProperties: Map[String, String],
val secret: String,
- val uploadedDriverExtraClasspathBase64Contents: Array[(String, String)]
- = Array.empty[(String, String)],
- val uploadedJarsBase64Contents: Array[(String, String)]
- = Array.empty[(String, String)]) extends SubmitRestProtocolRequest {
+ val uploadedDriverExtraClasspathBase64Contents: Option[TarGzippedData],
+ val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}
+case class TarGzippedData(
+ val dataBase64: String,
+ val blockSize: Int = 10240,
+ val recordSize: Int = 512,
+ val encoding: String
+)
+
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala
new file mode 100644
index 0000000000000..805a52bada219
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.rest.kubernetes
+
+import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import com.google.common.io.Files
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
+import org.apache.commons.compress.utils.CharsetNames
+import org.apache.commons.io.IOUtils
+import scala.collection.mutable
+
+import org.apache.spark.deploy.rest.TarGzippedData
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ByteBufferOutputStream, Utils}
+
+private[spark] object CompressionUtils extends Logging {
+ // Defaults from TarArchiveOutputStream
+ private val BLOCK_SIZE = 10240
+ private val RECORD_SIZE = 512
+ private val ENCODING = CharsetNames.UTF_8
+
+ /**
+ * Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
+ * memory as an instance of {@link TarGzippedData}. The files are taken without consideration to their
+ * original folder structure, and are added to the tar archive in a flat hierarchy. Directories are
+ * not allowed, and duplicate file names are de-duplicated by appending a numeric suffix to the file name,
+ * before the file extension. For example, if paths a/b.txt and b/b.txt were provided, then the files added
+ * to the tar archive would be b.txt and b-1.txt.
+ * @param paths A list of file paths to be archived
+ * @return An in-memory representation of the compressed data.
+ */
+ def createTarGzip(paths: Iterable[String]): TarGzippedData = {
+ val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
+ Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping =>
+ Utils.tryWithResource(new TarArchiveOutputStream(
+ gzipping,
+ BLOCK_SIZE,
+ RECORD_SIZE,
+ ENCODING)) { tarStream =>
+ val usedFileNames = mutable.HashSet.empty[String]
+ for (path <- paths) {
+ val file = new File(path)
+ if (!file.isFile) {
+ throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
+ s" not exist or is a directory.")
+ }
+ var resolvedFileName = file.getName
+ val extension = Files.getFileExtension(file.getName)
+ val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
+ var deduplicationCounter = 1
+ while (usedFileNames.contains(resolvedFileName)) {
+ val oldResolvedFileName = resolvedFileName
+ resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
+ logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" +
+ s" file name $resolvedFileName instead.")
+ deduplicationCounter += 1
+ }
+ usedFileNames += resolvedFileName
+ val tarEntry = new TarArchiveEntry(file, resolvedFileName)
+ tarStream.putArchiveEntry(tarEntry)
+ Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
+ IOUtils.copy(fileInput, tarStream)
+ }
+ tarStream.closeArchiveEntry()
+ }
+ }
+ }
+ raw
+ }
+ val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
+ TarGzippedData(
+ dataBase64 = compressedAsBase64,
+ blockSize = BLOCK_SIZE,
+ recordSize = RECORD_SIZE,
+ encoding = ENCODING
+ )
+ }
+
+ /**
+ * Decompresses the provided tar archive to a directory.
+ * @param compressedData In-memory representation of the compressed data, ideally created via
+ * {@link createTarGzip}.
+ * @param rootOutputDir Directory to write the output files to. All files from the tarball
+ * are written here in a flat hierarchy.
+ * @return List of file paths for each file that was unpacked from the archive.
+ */
+ def unpackAndWriteCompressedFiles(
+ compressedData: TarGzippedData,
+ rootOutputDir: File): Seq[String] = {
+ val paths = mutable.Buffer.empty[String]
+ val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
+ if (!rootOutputDir.exists) {
+ if (!rootOutputDir.mkdirs) {
+ throw new IllegalStateException(s"Failed to create output directory for unpacking" +
+ s" files at ${rootOutputDir.getAbsolutePath}")
+ }
+ } else if (rootOutputDir.isFile) {
+ throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
+ s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
+ }
+ Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
+ Utils.tryWithResource(new GZIPInputStream(compressedBytesStream)) { gzipped =>
+ Utils.tryWithResource(new TarArchiveInputStream(
+ gzipped,
+ compressedData.blockSize,
+ compressedData.recordSize,
+ compressedData.encoding)) { tarInputStream =>
+ var nextTarEntry = tarInputStream.getNextTarEntry
+ while (nextTarEntry != null) {
+ val outputFile = new File(rootOutputDir, nextTarEntry.getName)
+ Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
+ IOUtils.copy(tarInputStream, fileOutputStream)
+ }
+ paths += outputFile.getAbsolutePath
+ nextTarEntry = tarInputStream.getNextTarEntry
+ }
+ }
+ }
+ }
+ paths.toSeq
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
index 0a2e8176394ab..2ca3d4a8c0656 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
@@ -217,30 +217,11 @@ private[spark] class KubernetesSparkRestServer(
}
private def writeBase64ContentsToFiles(
- filesBase64Contents: Array[(String, String)],
+ maybeCompressedFiles: Option[TarGzippedData],
rootDir: File): Seq[String] = {
- val resolvedFileNames = new scala.collection.mutable.HashSet[String]
- val resolvedFilePaths = new ArrayBuffer[String]
- for (file <- filesBase64Contents) {
- var currentFileName = file._1
- var deduplicationCounter = 1
- while (resolvedFileNames.contains(currentFileName)) {
- // Prepend the deduplication counter so as to not mess with the extension
- currentFileName = s"$deduplicationCounter-$currentFileName"
- deduplicationCounter += 1
- }
- val resolvedFile = new File(rootDir, currentFileName)
- val resolvedFilePath = resolvedFile.getAbsolutePath
- if (resolvedFile.createNewFile()) {
- val fileContents = Base64.decodeBase64(file._2)
- Files.write(fileContents, resolvedFile)
- } else {
- throw new IllegalStateException(s"Could not write jar file to $resolvedFilePath")
- }
- resolvedFileNames += currentFileName
- resolvedFilePaths += resolvedFilePath
- }
- resolvedFilePaths.toSeq
+ maybeCompressedFiles.map { compressedFiles =>
+ CompressionUtils.unpackAndWriteCompressedFiles(compressedFiles, rootDir)
+ }.getOrElse(Seq.empty[String])
}
}
diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs-helpers/pom.xml b/resource-managers/kubernetes/integration-tests-spark-jobs-helpers/pom.xml
new file mode 100644
index 0000000000000..f99838636b349
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests-spark-jobs-helpers/pom.xml
@@ -0,0 +1,33 @@
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.11
+ 2.2.0-SNAPSHOT
+ ../../../pom.xml
+
+
+ spark-kubernetes-integration-tests-spark-jobs-helpers_2.11
+ jar
+ Spark Project Kubernetes Integration Tests Spark Jobs Helpers
+
+
+
+
diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs-helpers/src/main/java/org/apache/spark/deploy/kubernetes/integrationtest/PiHelper.java b/resource-managers/kubernetes/integration-tests-spark-jobs-helpers/src/main/java/org/apache/spark/deploy/kubernetes/integrationtest/PiHelper.java
new file mode 100644
index 0000000000000..99d982397bb6e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests-spark-jobs-helpers/src/main/java/org/apache/spark/deploy/kubernetes/integrationtest/PiHelper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.integrationtest;
+
+/**
+ * Primarily extracted so that a separate jar can be added as a dependency for the
+ * test Spark job.
+ */
+public class PiHelper {
+ public static int helpPi() {
+ double x = Math.random() * 2 - 1;
+ double y = Math.random() * 2 - 1;
+ if (x*x + y*y < 1) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/pom.xml b/resource-managers/kubernetes/integration-tests-spark-jobs/pom.xml
index 12b0234ae71bd..59e59aca5109b 100644
--- a/resource-managers/kubernetes/integration-tests-spark-jobs/pom.xml
+++ b/resource-managers/kubernetes/integration-tests-spark-jobs/pom.xml
@@ -29,6 +29,12 @@
Spark Project Kubernetes Integration Tests Spark Jobs
+
+ org.apache.spark
+ spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}
+ ${project.version}
+ provided
+
org.apache.spark
spark-core_${scala.binary.version}
diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala
index 6e4660b771305..d3372749f999e 100644
--- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala
+++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/SparkPiWithInfiniteWait.scala
@@ -16,8 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest.jobs
-import scala.math.random
-
+import org.apache.spark.deploy.kubernetes.integrationtest.PiHelper
import org.apache.spark.sql.SparkSession
// Equivalent to SparkPi except does not stop the Spark Context
@@ -32,10 +31,8 @@ private[spark] object SparkPiWithInfiniteWait {
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 10
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
- val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
- val x = random * 2 - 1
- val y = random * 2 - 1
- if (x*x + y*y < 1) 1 else 0
+ val count = spark.sparkContext.parallelize(1 until n, slices).map { _ =>
+ PiHelper.helpPi()
}.reduce(_ + _)
// scalastyle:off println
println("Pi is roughly " + 4.0 * count / (n - 1))
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 1e7eb0e12e6df..569527de8e300 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -48,6 +48,12 @@
${project.version}
test
+
+ org.apache.spark
+ spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}
+ ${project.version}
+ test
+
org.apache.spark
spark-docker-minimal-bundle_${scala.binary.version}
@@ -123,6 +129,13 @@
jar
${project.build.directory}/integration-tests-spark-jobs
+
+ org.apache.spark
+ spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}
+ ${project.version}
+ jar
+ ${project.build.directory}/integration-tests-spark-jobs-helpers
+
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index 183f666994d38..6247a1674f8d6 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -41,6 +41,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.listFiles()(0)
.getAbsolutePath
+ private val HELPER_JAR = Paths.get("target", "integration-tests-spark-jobs-helpers")
+ .toFile
+ .listFiles()(0)
+ .getAbsolutePath
+
private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
private val MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
@@ -117,6 +122,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.set("spark.kubernetes.namespace", NAMESPACE)
.set("spark.kubernetes.driver.docker.image", "spark-driver:latest")
.set("spark.kubernetes.executor.docker.image", "spark-executor:latest")
+ .set("spark.kubernetes.driver.uploads.jars", HELPER_JAR)
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
.set("spark.executors.instances", "1")
@@ -142,6 +148,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"--executor-memory", "512m",
"--executor-cores", "1",
"--num-executors", "1",
+ "--upload-jars", HELPER_JAR,
"--class", MAIN_CLASS,
"--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}",
"--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}",