Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/master' into SPARK-22187-1
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jul 11, 2018
2 parents c9f600b + 006e798 commit dbd08ca
Show file tree
Hide file tree
Showing 148 changed files with 4,190 additions and 943 deletions.
24 changes: 15 additions & 9 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,20 @@ checkJavaVersion <- function() {

# If java is missing from PATH, we get an error in Unix and a warning in Windows
javaVersionOut <- tryCatch(
launchScript(javaBin, "-version", wait = TRUE, stdout = TRUE, stderr = TRUE),
error = function(e) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", e)
},
warning = function(w) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", w)
})
if (is_windows()) {
# See SPARK-24535
system2(javaBin, "-version", wait = TRUE, stdout = TRUE, stderr = TRUE)
} else {
launchScript(javaBin, "-version", wait = TRUE, stdout = TRUE, stderr = TRUE)
},
error = function(e) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", e)
},
warning = function(w) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", w)
})
javaVersionFilter <- Filter(
function(x) {
grepl(" version", x)
Expand All @@ -93,6 +98,7 @@ checkJavaVersion <- function() {
stop(paste("Java version", sparkJavaVersion, "is required for this package; found version:",
javaVersionStr))
}
return(javaVersionNum)
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ sparkR.sparkContext <- function(
submitOps <- getClientModeSparkSubmitOpts(
Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
sparkEnvirMap)
checkJavaVersion()
invisible(checkJavaVersion())
launchBackend(
args = path,
sparkHome = sparkHome,
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_basic.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
tryCatch( checkJavaVersion(),
error = function(e) { skip("error on Java check") },
warning = function(e) { skip("warning on Java check") } )

sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)

Expand Down Expand Up @@ -50,6 +54,10 @@ test_that("create DataFrame from list or data.frame", {
})

test_that("spark.glm and predict", {
tryCatch( checkJavaVersion(),
error = function(e) { skip("error on Java check") },
warning = function(e) { skip("warning on Java check") } )

sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)

Expand Down
5 changes: 5 additions & 0 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ summary(model)
Predict values on training data
```{r}
prediction <- predict(model, training)
head(select(prediction, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
```

#### Logistic Regression
Expand All @@ -613,6 +614,7 @@ summary(model)
Predict values on training data
```{r}
fitted <- predict(model, training)
head(select(fitted, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
```

Multinomial logistic regression against three classes
Expand Down Expand Up @@ -807,6 +809,7 @@ df <- createDataFrame(t)
dtModel <- spark.decisionTree(df, Survived ~ ., type = "classification", maxDepth = 2)
summary(dtModel)
predictions <- predict(dtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
```

#### Gradient-Boosted Trees
Expand All @@ -822,6 +825,7 @@ df <- createDataFrame(t)
gbtModel <- spark.gbt(df, Survived ~ ., type = "classification", maxDepth = 2, maxIter = 2)
summary(gbtModel)
predictions <- predict(gbtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
```

#### Random Forest
Expand All @@ -837,6 +841,7 @@ df <- createDataFrame(t)
rfModel <- spark.randomForest(df, Survived ~ ., type = "classification", maxDepth = 2, numTrees = 2)
summary(rfModel)
predictions <- predict(rfModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
```

#### Bisecting k-Means
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
<artifactId>xbean-asm6-shaded</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private[spark] class SparkSubmit extends Logging {
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)

def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains(scheme) ||
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
}

Expand Down Expand Up @@ -578,7 +578,8 @@ private[spark] class SparkSubmit extends Logging {
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
// This assumes both primaryResource and user jars are local jars, or already downloaded
// to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
// added to the classpath of YARN client.
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
Expand Down Expand Up @@ -702,6 +703,8 @@ private[spark] class SparkSubmit extends Logging {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
} else {
childArgs ++= Array("--main-class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ package object config {
private[spark] val EVENT_LOG_OVERWRITE =
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)

private[spark] val EVENT_LOG_CALLSITE_FORM =
ConfigBuilder("spark.eventLog.callsite").stringConf.createWithDefault("short")

private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional

Expand Down Expand Up @@ -483,10 +486,11 @@ package object config {

private[spark] val FORCE_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
.doc("Comma-separated list of schemes for which files will be downloaded to the " +
.doc("Comma-separated list of schemes for which resources will be downloaded to the " +
"local disk prior to being added to YARN's distributed cache. For use in cases " +
"where the YARN service does not support schemes that are supported by Spark, like http, " +
"https and ftp.")
"https and ftp, or jars required to be in the local YARN client's classpath. Wildcard " +
"'*' is denoted to download resources for all the schemes.")
.stringConf
.toSequence
.createWithDefault(Nil)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {

@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@transient lazy val _locations = BlockManager.blockIdsToLocations(blockIds, SparkEnv.get)
@volatile private var _isValid = true

override def getPartitions: Array[Partition] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ private[spark] class AppStatusStore(

def operationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
val job = store.read(classOf[JobDataWrapper], jobId)
val stages = job.info.stageIds
val stages = job.info.stageIds.sorted

stages.map { id =>
val g = store.read(classOf[RDDOperationGraphWrapper], id).toRDDOperationGraph()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager}
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
Expand Down Expand Up @@ -1554,7 +1555,7 @@ private[spark] class BlockManager(
private[spark] object BlockManager {
private val ID_GENERATOR = new IdGenerator

def blockIdsToHosts(
def blockIdsToLocations(
blockIds: Array[BlockId],
env: SparkEnv,
blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = {
Expand All @@ -1569,7 +1570,9 @@ private[spark] object BlockManager {

val blockManagers = new HashMap[BlockId, Seq[String]]
for (i <- 0 until blockIds.length) {
blockManagers(blockIds(i)) = blockLocations(i).map(_.host)
blockManagers(blockIds(i)) = blockLocations(i).map { loc =>
ExecutorCacheTaskLocation(loc.host, loc.executorId).toString
}
}
blockManagers.toMap
}
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.storage

import org.apache.spark.SparkEnv
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -53,10 +55,16 @@ class RDDInfo(
}

private[spark] object RDDInfo {
private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)

def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
val parentIds = rdd.dependencies.map(_.rdd.id)
val callSite = callsiteForm match {
case "short" => rdd.creationSite.shortForm
case "long" => rdd.creationSite.longForm
}
new RDDInfo(rdd.id, rddName, rdd.partitions.length,
rdd.getStorageLevel, parentIds, rdd.creationSite.shortForm, rdd.scope)
rdd.getStorageLevel, parentIds, callSite, rdd.scope)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.{Map, Set, Stack}
import scala.language.existentials

import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm5.Opcodes._
import org.apache.xbean.asm6.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm6.Opcodes._

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.internal.Logging
Expand Down
29 changes: 19 additions & 10 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -995,20 +995,24 @@ class SparkSubmitSuite
}

test("download remote resource if it is not supported by yarn service") {
testRemoteResources(enableHttpFs = false, blacklistHttpFs = false)
testRemoteResources(enableHttpFs = false)
}

test("avoid downloading remote resource if it is supported by yarn service") {
testRemoteResources(enableHttpFs = true, blacklistHttpFs = false)
testRemoteResources(enableHttpFs = true)
}

test("force download from blacklisted schemes") {
testRemoteResources(enableHttpFs = true, blacklistHttpFs = true)
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("http"))
}

test("force download for all the schemes") {
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
}

private def testRemoteResources(
enableHttpFs: Boolean,
blacklistHttpFs: Boolean): Unit = {
blacklistSchemes: Seq[String] = Nil): Unit = {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
if (enableHttpFs) {
Expand All @@ -1025,8 +1029,8 @@ class SparkSubmitSuite
val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"

val forceDownloadArgs = if (blacklistHttpFs) {
Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http")
val forceDownloadArgs = if (blacklistSchemes.nonEmpty) {
Seq("--conf", s"spark.yarn.dist.forceDownloadSchemes=${blacklistSchemes.mkString(",")}")
} else {
Nil
}
Expand All @@ -1044,14 +1048,19 @@ class SparkSubmitSuite

val jars = conf.get("spark.yarn.dist.jars").split(",").toSet

// The URI of remote S3 resource should still be remote.
assert(jars.contains(tmpS3JarPath))
def isSchemeBlacklisted(scheme: String) = {
blacklistSchemes.contains("*") || blacklistSchemes.contains(scheme)
}

if (!isSchemeBlacklisted("s3")) {
assert(jars.contains(tmpS3JarPath))
}

if (enableHttpFs && !blacklistHttpFs) {
if (enableHttpFs && blacklistSchemes.isEmpty) {
// If Http FS is supported by yarn service, the URI of remote http resource should
// still be remote.
assert(jars.contains(tmpHttpJarPath))
} else {
} else if (!enableHttpFs || isSchemeBlacklisted("http")) {
// If Http FS is not supported by yarn service, or http scheme is configured to be force
// downloading, the URI of remote http resource should be changed to a local one.
val jarName = new File(tmpHttpJar.toURI).getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(mockBlockTransferService.tempFileManager === store.remoteBlockTempFileManager)
}

test("query locations of blockIds") {
val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
val blockLocations = Seq(BlockManagerId("1", "host1", 100), BlockManagerId("2", "host2", 200))
when(mockBlockManagerMaster.getLocations(mc.any[Array[BlockId]]))
.thenReturn(Array(blockLocations))
val env = mock(classOf[SparkEnv])

val blockIds: Array[BlockId] = Array(StreamBlockId(1, 2))
val locs = BlockManager.blockIdsToLocations(blockIds, env, mockBlockManagerMaster)
val expectedLocs = Seq("executor_host1_1", "executor_host2_2")
assert(locs(blockIds(0)) == expectedLocs)
}

class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0
var tempFileManager: TempFileManager = null
Expand Down
5 changes: 4 additions & 1 deletion dev/create-release/releaseutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@
print("Install using 'sudo pip install unidecode'")
sys.exit(-1)

if sys.version < '3':
input = raw_input

# Contributors list file name
contributors_file_name = "contributors.txt"


# Prompt the user to answer yes or no until they do so
def yesOrNoPrompt(msg):
response = raw_input("%s [y/n]: " % msg)
response = input("%s [y/n]: " % msg)
while response != "y" and response != "n":
return yesOrNoPrompt(msg)
return response == "y"
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ stringtemplate-3.2.1.jar
super-csv-2.2.0.jar
univocity-parsers-2.6.3.jar
validation-api-1.1.0.Final.jar
xbean-asm5-shaded-4.4.jar
xbean-asm6-shaded-4.8.jar
xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.0.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ stringtemplate-3.2.1.jar
super-csv-2.2.0.jar
univocity-parsers-2.6.3.jar
validation-api-1.1.0.Final.jar
xbean-asm5-shaded-4.4.jar
xbean-asm6-shaded-4.8.jar
xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.0.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ token-provider-1.0.1.jar
univocity-parsers-2.6.3.jar
validation-api-1.1.0.Final.jar
woodstox-core-5.0.3.jar
xbean-asm5-shaded-4.4.jar
xbean-asm6-shaded-4.8.jar
xz-1.0.jar
zjsonpatch-0.3.0.jar
zookeeper-3.4.9.jar
Expand Down
Loading

0 comments on commit dbd08ca

Please sign in to comment.