Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-30724
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
  • Loading branch information
wangyum committed Feb 12, 2020
2 parents 88ca4c2 + c198620 commit 1c3d98c
Show file tree
Hide file tree
Showing 222 changed files with 4,275 additions and 2,970 deletions.
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ Collate:
RoxygenNote: 5.0.1
VignetteBuilder: knitr
NeedsCompilation: no
Encoding: UTF-8
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ test_that("Check masked functions", {
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop", "window", "as.data.frame", "union", "not")
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
version <- packageVersion("base")
if (as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3) {
namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
}
masked <- conflicts(detail = TRUE)$`package:SparkR`
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/tests/fulltests/test_includePackage.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ rdd <- parallelize(sc, nums, 2L)

test_that("include inside function", {
# Only run the test if plyr is installed.
if ("plyr" %in% rownames(installed.packages())) {
suppressPackageStartupMessages(library(plyr))
if ("plyr" %in% rownames(installed.packages()) &&
suppressPackageStartupMessages(suppressWarnings(library(plyr, logical.return = TRUE)))) {
generateData <- function(x) {
suppressPackageStartupMessages(library(plyr))
attach(airquality)
Expand All @@ -44,8 +44,8 @@ test_that("include inside function", {

test_that("use include package", {
# Only run the test if plyr is installed.
if ("plyr" %in% rownames(installed.packages())) {
suppressPackageStartupMessages(library(plyr))
if ("plyr" %in% rownames(installed.packages()) &&
suppressPackageStartupMessages(suppressWarnings(library(plyr, logical.return = TRUE)))) {
generateData <- function(x) {
attach(airquality)
result <- transform(Ozone, logOzone = log(Ozone))
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,8 @@ test_that("string operators", {
expect_true(first(select(df, endsWith(df$name, "el")))[[1]])
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
expect_equal(first(select(df, substr(df$name, 4, 6)))[[1]], "hae")
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
version <- packageVersion("base")
if (as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3) {
expect_true(startsWith("Hello World", "Hello"))
expect_false(endsWith("Hello World", "a"))
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0")),
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"),
AlternateConfig("spark.maxRemoteBlockSizeFetchToMem", "3.0")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
DRIVER_MEMORY_OVERHEAD.key -> Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ trait Logging {
}

// For testing
def initializeForcefully(isInterpreter: Boolean, silent: Boolean): Unit = {
private[spark] def initializeForcefully(isInterpreter: Boolean, silent: Boolean): Unit = {
initializeLogging(isInterpreter, silent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ package object config {
.createWithDefault(Int.MaxValue)

private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
ConfigBuilder("spark.network.maxRemoteBlockSizeFetchToMem")
.doc("Remote block will be fetched to disk when size of the block is above this threshold " +
"in bytes. This is to avoid a giant request takes too much memory. Note this " +
"configuration will affect both shuffle fetch and block manager remote block fetch. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ private[spark] class TaskSchedulerImpl(
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableResources = shuffledOffers.map(_.resources).toArray
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ private[spark] class TaskSetManager(
index: Int,
resolveRacks: Boolean = true,
speculatable: Boolean = false): Unit = {
// A zombie TaskSetManager may reach here while handling failed task.
if (isZombie) return
val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks
for (loc <- tasks(index).preferredLocations) {
loc match {
Expand Down Expand Up @@ -1082,6 +1084,8 @@ private[spark] class TaskSetManager(
}

def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",

"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class AppStatusListener(
// causing too many writes to the underlying store, and other expensive operations).
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]()
private[spark] val liveExecutors = new HashMap[String, LiveExecutor]()
private val deadExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
Expand Down Expand Up @@ -772,6 +772,11 @@ private[spark] class AppStatusListener(
event.maxOnHeapMem.foreach { _ =>
exec.totalOnHeap = event.maxOnHeapMem.get
exec.totalOffHeap = event.maxOffHeapMem.get
// SPARK-30594: whenever(first time or re-register) a BlockManager added, all blocks
// from this BlockManager will be reported to driver later. So, we should clean up
// used memory to avoid overlapped count.
exec.usedOnHeap = 0
exec.usedOffHeap = 0
}
exec.isActive = true
exec.maxMemory = event.maxMem
Expand Down Expand Up @@ -1042,7 +1047,7 @@ private[spark] class AppStatusListener(
}
}

private def updateExecutorMemoryDiskInfo(
private[spark] def updateExecutorMemoryDiskInfo(
exec: LiveExecutor,
storageLevel: StorageLevel,
memoryDelta: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private class LiveTask(

}

private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {

var hostPort: String = null
var host: String = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ class BlockManagerMasterEndpoint(

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
context.reply(isSuccess)
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}

case GetLocations(blockId) =>
context.reply(getLocations(blockId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("clean up used memory when BlockManager added") {
val listener = new AppStatusListener(store, conf, true)
// Add block manager at the first time
val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(
time, driver, 42L, Some(43L), Some(44L)))
// Update the memory metrics
listener.updateExecutorMemoryDiskInfo(
listener.liveExecutors(SparkContext.DRIVER_IDENTIFIER),
StorageLevel.MEMORY_AND_DISK,
10L,
10L
)
// Re-add the same block manager again
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(
time, driver, 42L, Some(43L), Some(44L)))

check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d =>
val memoryMetrics = d.info.memoryMetrics.get
assert(memoryMetrics.usedOffHeapStorageMemory == 0)
assert(memoryMetrics.usedOnHeapStorageMemory == 0)
}
}


private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, Transpo
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ExecutorDiskUtils, ExternalBlockStoreClient}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerBlockUpdated}
import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
Expand All @@ -71,6 +71,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val allStores = ArrayBuffer[BlockManager]()
var rpcEnv: RpcEnv = null
var master: BlockManagerMaster = null
var liveListenerBus: LiveListenerBus = null
val securityMgr = new SecurityManager(new SparkConf(false))
val bcastManager = new BroadcastManager(true, new SparkConf(false), securityMgr)
val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false), bcastManager, true)
Expand Down Expand Up @@ -145,9 +146,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
when(sc.conf).thenReturn(conf)

val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
liveListenerBus = spy(new LiveListenerBus(conf))
master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(conf), None, blockManagerInfo)),
liveListenerBus, None, blockManagerInfo)),
rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true))

Expand All @@ -164,6 +166,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv.awaitTermination()
rpcEnv = null
master = null
liveListenerBus = null
} finally {
super.afterEach()
}
Expand Down Expand Up @@ -1693,6 +1696,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(locs(blockIds(0)) == expectedLocs)
}

test("SPARK-30594: Do not post SparkListenerBlockUpdated when updateBlockInfo returns false") {
// update block info for non-existent block manager
val updateInfo = UpdateBlockInfo(BlockManagerId("1", "host1", 100),
BlockId("test_1"), StorageLevel.MEMORY_ONLY, 1, 1)
val result = master.driverEndpoint.askSync[Boolean](updateInfo)

assert(!result)
verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo)))
}

class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0
var tempFileManager: DownloadFileManager = null
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7-hive-1.2
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.16//jul-to-slf4j-1.7.16.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar
kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar
kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar
kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar
kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar
kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
7 changes: 3 additions & 4 deletions dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ hive-jdbc/2.3.6//hive-jdbc-2.3.6.jar
hive-llap-common/2.3.6//hive-llap-common-2.3.6.jar
hive-metastore/2.3.6//hive-metastore-2.3.6.jar
hive-serde/2.3.6//hive-serde-2.3.6.jar
hive-service-rpc/2.3.6//hive-service-rpc-2.3.6.jar
hive-shims-0.23/2.3.6//hive-shims-0.23-2.3.6.jar
hive-shims-common/2.3.6//hive-shims-common-2.3.6.jar
hive-shims-scheduler/2.3.6//hive-shims-scheduler-2.3.6.jar
Expand Down Expand Up @@ -153,9 +152,9 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.16//jul-to-slf4j-1.7.16.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar
kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar
kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar
kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar
kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar
kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
7 changes: 3 additions & 4 deletions dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ hive-jdbc/2.3.6//hive-jdbc-2.3.6.jar
hive-llap-common/2.3.6//hive-llap-common-2.3.6.jar
hive-metastore/2.3.6//hive-metastore-2.3.6.jar
hive-serde/2.3.6//hive-serde-2.3.6.jar
hive-service-rpc/2.3.6//hive-service-rpc-2.3.6.jar
hive-shims-0.23/2.3.6//hive-shims-0.23-2.3.6.jar
hive-shims-common/2.3.6//hive-shims-common-2.3.6.jar
hive-shims-scheduler/2.3.6//hive-shims-scheduler-2.3.6.jar
Expand Down Expand Up @@ -165,9 +164,9 @@ kerby-pkix/1.0.1//kerby-pkix-1.0.1.jar
kerby-util/1.0.1//kerby-util-1.0.1.jar
kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar
kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar
kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar
kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar
kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar
kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
12 changes: 7 additions & 5 deletions dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ MINIMUM_PYCODESTYLE="2.4.0"

SPHINX_BUILD="sphinx-build"

PYTHON_EXECUTABLE="python3"

function compile_python_test {
local COMPILE_STATUS=
local COMPILE_REPORT=
Expand All @@ -38,7 +40,7 @@ function compile_python_test {

# compileall: https://docs.python.org/3/library/compileall.html
echo "starting python compilation test..."
COMPILE_REPORT=$( (python3 -B -mcompileall -q -l -x "[/\\\\][.]git" $1) 2>&1)
COMPILE_REPORT=$( ("$PYTHON_EXECUTABLE" -B -mcompileall -q -l -x "[/\\\\][.]git" $1) 2>&1)
COMPILE_STATUS=$?

if [ $COMPILE_STATUS -ne 0 ]; then
Expand Down Expand Up @@ -70,7 +72,7 @@ function pycodestyle_test {
RUN_LOCAL_PYCODESTYLE="False"
if hash "$PYCODESTYLE_BUILD" 2> /dev/null; then
VERSION=$( $PYCODESTYLE_BUILD --version 2> /dev/null)
EXPECTED_PYCODESTYLE=$( (python3 -c 'from distutils.version import LooseVersion;
EXPECTED_PYCODESTYLE=$( ("$PYTHON_EXECUTABLE" -c 'from distutils.version import LooseVersion;
print(LooseVersion("""'${VERSION[0]}'""") >= LooseVersion("""'$MINIMUM_PYCODESTYLE'"""))')\
2> /dev/null)

Expand All @@ -96,7 +98,7 @@ function pycodestyle_test {
fi

echo "starting pycodestyle test..."
PYCODESTYLE_REPORT=$( (python3 "$PYCODESTYLE_SCRIPT_PATH" --config=dev/tox.ini $1) 2>&1)
PYCODESTYLE_REPORT=$( ("$PYTHON_EXECUTABLE" "$PYCODESTYLE_SCRIPT_PATH" --config=dev/tox.ini $1) 2>&1)
PYCODESTYLE_STATUS=$?
else
# we have the right version installed, so run locally
Expand Down Expand Up @@ -130,7 +132,7 @@ function flake8_test {

FLAKE8_VERSION="$($FLAKE8_BUILD --version 2> /dev/null)"
VERSION=($FLAKE8_VERSION)
EXPECTED_FLAKE8=$( (python3 -c 'from distutils.version import LooseVersion;
EXPECTED_FLAKE8=$( ("$PYTHON_EXECUTABLE" -c 'from distutils.version import LooseVersion;
print(LooseVersion("""'${VERSION[0]}'""") >= LooseVersion("""'$MINIMUM_FLAKE8'"""))') \
2> /dev/null)

Expand Down Expand Up @@ -175,7 +177,7 @@ function pydocstyle_test {
fi

PYDOCSTYLE_VERSION="$($PYDOCSTYLEBUILD --version 2> /dev/null)"
EXPECTED_PYDOCSTYLE=$(python3 -c 'from distutils.version import LooseVersion; \
EXPECTED_PYDOCSTYLE=$("$PYTHON_EXECUTABLE" -c 'from distutils.version import LooseVersion; \
print(LooseVersion("""'$PYDOCSTYLE_VERSION'""") >= LooseVersion("""'$MINIMUM_PYDOCSTYLE'"""))' \
2> /dev/null)

Expand Down
1 change: 0 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ def __hash__(self):
"pyspark.sql.avro.functions",
"pyspark.sql.pandas.conversion",
"pyspark.sql.pandas.map_ops",
"pyspark.sql.pandas.functions",
"pyspark.sql.pandas.group_ops",
"pyspark.sql.pandas.types",
"pyspark.sql.pandas.serializers",
Expand Down
1 change: 1 addition & 0 deletions docs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sql-configs.html
2 changes: 2 additions & 0 deletions docs/_data/menu-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
subitems:
- text: "Generic Load/Save Functions"
url: sql-data-sources-load-save-functions.html
- text: "Generic File Source Options"
url: sql-data-sources-generic-options.html
- text: Parquet Files
url: sql-data-sources-parquet.html
- text: ORC Files
Expand Down
Loading

0 comments on commit 1c3d98c

Please sign in to comment.