Skip to content

Commit

Permalink
Use tar and gzip to compress+archive shipped jars (#2)
Browse files Browse the repository at this point in the history
* Use tar and gzip to archive shipped jars.

* Address comments

* Move files to resolve merge
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent f20397b commit 728be0e
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 48 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2585,6 +2585,7 @@
<module>resource-managers/kubernetes/docker-minimal-bundle</module>
<module>resource-managers/kubernetes/integration-tests</module>
<module>resource-managers/kubernetes/integration-tests-spark-jobs</module>
<module>resource-managers/kubernetes/integration-tests-spark-jobs-helpers</module>
</modules>
</profile>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.concurrent.duration.DurationInt
import scala.util.Success

import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -284,8 +284,8 @@ private[spark] class Client(
case other => RemoteAppResource(other)
}

val uploadDriverExtraClasspathBase64Contents = getFileContents(uploadedDriverExtraClasspath)
val uploadJarsBase64Contents = getFileContents(uploadedJars)
val uploadDriverExtraClasspathBase64Contents = compressJars(uploadedDriverExtraClasspath)
val uploadJarsBase64Contents = compressJars(uploadedJars)
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
Expand All @@ -296,19 +296,10 @@ private[spark] class Client(
uploadedJarsBase64Contents = uploadJarsBase64Contents)
}

def getFileContents(maybeFilePaths: Option[String]): Array[(String, String)] = {
def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
maybeFilePaths
.map(_.split(",").map(filePath => {
val fileToUpload = new File(filePath)
if (!fileToUpload.isFile) {
throw new IllegalStateException("Provided file to upload for driver extra classpath" +
s" does not exist or is not a file: $filePath")
} else {
val fileBytes = Files.toByteArray(fileToUpload)
val fileBase64 = Base64.encodeBase64String(fileBytes)
(fileToUpload.getName, fileBase64)
}
})).getOrElse(Array.empty[(String, String)])
.map(_.split(","))
.map(CompressionUtils.createTarGzip(_))
}

private def getDriverLauncherService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ case class KubernetesCreateSubmissionRequest(
val appArgs: Array[String],
val sparkProperties: Map[String, String],
val secret: String,
val uploadedDriverExtraClasspathBase64Contents: Array[(String, String)]
= Array.empty[(String, String)],
val uploadedJarsBase64Contents: Array[(String, String)]
= Array.empty[(String, String)]) extends SubmitRestProtocolRequest {
val uploadedDriverExtraClasspathBase64Contents: Option[TarGzippedData],
val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}

case class TarGzippedData(
val dataBase64: String,
val blockSize: Int = 10240,
val recordSize: Int = 512,
val encoding: String
)

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.rest.kubernetes

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
import org.apache.commons.compress.utils.CharsetNames
import org.apache.commons.io.IOUtils
import scala.collection.mutable

import org.apache.spark.deploy.rest.TarGzippedData
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ByteBufferOutputStream, Utils}

private[spark] object CompressionUtils extends Logging {
// Defaults from TarArchiveOutputStream
private val BLOCK_SIZE = 10240
private val RECORD_SIZE = 512
private val ENCODING = CharsetNames.UTF_8

/**
* Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
* memory as an instance of {@link TarGzippedData}. The files are taken without consideration to their
* original folder structure, and are added to the tar archive in a flat hierarchy. Directories are
* not allowed, and duplicate file names are de-duplicated by appending a numeric suffix to the file name,
* before the file extension. For example, if paths a/b.txt and b/b.txt were provided, then the files added
* to the tar archive would be b.txt and b-1.txt.
* @param paths A list of file paths to be archived
* @return An in-memory representation of the compressed data.
*/
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
gzipping,
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarStream =>
val usedFileNames = mutable.HashSet.empty[String]
for (path <- paths) {
val file = new File(path)
if (!file.isFile) {
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
s" not exist or is a directory.")
}
var resolvedFileName = file.getName
val extension = Files.getFileExtension(file.getName)
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
var deduplicationCounter = 1
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" +
s" file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
val tarEntry = new TarArchiveEntry(file, resolvedFileName)
tarStream.putArchiveEntry(tarEntry)
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
IOUtils.copy(fileInput, tarStream)
}
tarStream.closeArchiveEntry()
}
}
}
raw
}
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
TarGzippedData(
dataBase64 = compressedAsBase64,
blockSize = BLOCK_SIZE,
recordSize = RECORD_SIZE,
encoding = ENCODING
)
}

/**
* Decompresses the provided tar archive to a directory.
* @param compressedData In-memory representation of the compressed data, ideally created via
* {@link createTarGzip}.
* @param rootOutputDir Directory to write the output files to. All files from the tarball
* are written here in a flat hierarchy.
* @return List of file paths for each file that was unpacked from the archive.
*/
def unpackAndWriteCompressedFiles(
compressedData: TarGzippedData,
rootOutputDir: File): Seq[String] = {
val paths = mutable.Buffer.empty[String]
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
if (!rootOutputDir.exists) {
if (!rootOutputDir.mkdirs) {
throw new IllegalStateException(s"Failed to create output directory for unpacking" +
s" files at ${rootOutputDir.getAbsolutePath}")
}
} else if (rootOutputDir.isFile) {
throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
}
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
Utils.tryWithResource(new GZIPInputStream(compressedBytesStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(rootOutputDir, nextTarEntry.getName)
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
IOUtils.copy(tarInputStream, fileOutputStream)
}
paths += outputFile.getAbsolutePath
nextTarEntry = tarInputStream.getNextTarEntry
}
}
}
}
paths.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,30 +217,11 @@ private[spark] class KubernetesSparkRestServer(
}

private def writeBase64ContentsToFiles(
filesBase64Contents: Array[(String, String)],
maybeCompressedFiles: Option[TarGzippedData],
rootDir: File): Seq[String] = {
val resolvedFileNames = new scala.collection.mutable.HashSet[String]
val resolvedFilePaths = new ArrayBuffer[String]
for (file <- filesBase64Contents) {
var currentFileName = file._1
var deduplicationCounter = 1
while (resolvedFileNames.contains(currentFileName)) {
// Prepend the deduplication counter so as to not mess with the extension
currentFileName = s"$deduplicationCounter-$currentFileName"
deduplicationCounter += 1
}
val resolvedFile = new File(rootDir, currentFileName)
val resolvedFilePath = resolvedFile.getAbsolutePath
if (resolvedFile.createNewFile()) {
val fileContents = Base64.decodeBase64(file._2)
Files.write(fileContents, resolvedFile)
} else {
throw new IllegalStateException(s"Could not write jar file to $resolvedFilePath")
}
resolvedFileNames += currentFileName
resolvedFilePaths += resolvedFilePath
}
resolvedFilePaths.toSeq
maybeCompressedFiles.map { compressedFiles =>
CompressionUtils.unpackAndWriteCompressedFiles(compressedFiles, rootDir)
}.getOrElse(Seq.empty[String])
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes Integration Tests Spark Jobs Helpers</name>

<dependencies>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest;

/**
* Primarily extracted so that a separate jar can be added as a dependency for the
* test Spark job.
*/
public class PiHelper {
public static int helpPi() {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
if (x*x + y*y < 1) {
return 1;
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
<name>Spark Project Kubernetes Integration Tests Spark Jobs</name>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest.jobs

import scala.math.random

import org.apache.spark.deploy.kubernetes.integrationtest.PiHelper
import org.apache.spark.sql.SparkSession

// Equivalent to SparkPi except does not stop the Spark Context
Expand All @@ -32,10 +31,8 @@ private[spark] object SparkPiWithInfiniteWait {
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 10
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
val count = spark.sparkContext.parallelize(1 until n, slices).map { _ =>
PiHelper.helpPi()
}.reduce(_ + _)
// scalastyle:off println
println("Pi is roughly " + 4.0 * count / (n - 1))
Expand Down
13 changes: 13 additions & 0 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-docker-minimal-bundle_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -123,6 +129,13 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/integration-tests-spark-jobs</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes-integration-tests-spark-jobs-helpers_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>jar</type>
<outputDirectory>${project.build.directory}/integration-tests-spark-jobs-helpers</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
Loading

0 comments on commit 728be0e

Please sign in to comment.