Skip to content

Commit

Permalink
SHS-NG M4.3: Port StorageTab to the new backend.
Browse files Browse the repository at this point in the history
This required adding information about StreamBlockId to the store,
which is not available yet via the API. So an internal type was added
until there's a need to expose that information in the API.

The UI only lists RDDs that have cached partitions, and that information
wasn't being correctly captured in the listener, so that's also fixed,
along with some minor (internal) API adjustments so that the UI can
get the correct data.
  • Loading branch information
Marcelo Vanzin committed May 30, 2017
1 parent d66024c commit c5a17fd
Showing 19 changed files with 246 additions and 941 deletions.
32 changes: 30 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStateListener.scala
Original file line number Diff line number Diff line change
@@ -449,7 +449,8 @@ private[spark] class AppStateListener(
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
event.blockUpdatedInfo.blockId match {
case block: RDDBlockId => updateRDDBlock(event, block)
case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too.
case stream: StreamBlockId => updateStreamBlock(event, stream)
case _ =>
}
}

@@ -537,7 +538,14 @@ private[spark] class AppStateListener(
}
rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta)
rdd.diskUsed = newValue(rdd.diskUsed, diskDelta)
update(rdd)

// Need to always flush the RDD in live applications in case there's a single cached block,
// which means there won't be a subsequent event that could trigger the flush.
if (live) {
update(rdd)
} else {
liveUpdate(rdd)
}
}

maybeExec.foreach { exec =>
@@ -557,6 +565,26 @@ private[spark] class AppStateListener(
}
}

private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
val storageLevel = event.blockUpdatedInfo.storageLevel
if (storageLevel.isValid) {
val data = new StreamBlockData(
stream.name,
event.blockUpdatedInfo.blockManagerId.executorId,
event.blockUpdatedInfo.blockManagerId.hostPort,
storageLevel.description,
storageLevel.useMemory,
storageLevel.useDisk,
storageLevel.deserialized,
event.blockUpdatedInfo.memSize,
event.blockUpdatedInfo.diskSize)
kvstore.write(data)
} else {
kvstore.delete(classOf[StreamBlockData],
Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId))
}
}

private def getOrCreateExecutor(executorId: String): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId))
}
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStateStore.scala
Original file line number Diff line number Diff line change
@@ -199,8 +199,10 @@ private[spark] class AppStateStore(store: KVStore) {
indexed.skip(offset).max(length).asScala.map(_.info).toSeq
}

def rddList(): Seq[v1.RDDStorageInfo] = {
store.view(classOf[RDDStorageInfoWrapper]).sorted().asScala.map(_.info).toSeq
def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
store.view(classOf[RDDStorageInfoWrapper]).sorted().asScala.map(_.info).filter { rdd =>
!cachedOnly || rdd.numCachedPartitions > 0
}.toSeq
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
@@ -211,6 +213,10 @@ private[spark] class AppStateStore(store: KVStore) {
store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq
}

def streamBlocksList(): Seq[StreamBlockData] = {
store.view(classOf[StreamBlockData]).asScala.toSeq
}

def close(): Unit = {
store.close()
}
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
@@ -422,16 +422,17 @@ private class LiveRDDPartition(val blockName: String) {

}

private class LiveRDDDistribution(val exec: LiveExecutor) {
private class LiveRDDDistribution(exec: LiveExecutor) {

val executorId = exec.executorId
var memoryRemaining = exec.maxMemory
var memoryUsed = 0L
var diskUsed = 0L

var onHeapUsed = 0L
var offHeapUsed = 0L
var onHeapRemaining = 0L
var offHeapRemaining = 0L
var onHeapRemaining = exec.totalOnHeap
var offHeapRemaining = exec.totalOffHeap

def toApi(): v1.RDDDataDistribution = {
new v1.RDDDataDistribution(
@@ -478,7 +479,7 @@ private class LiveRDD(info: RDDInfo) extends LiveEntity {
}

val dists = if (distributions.nonEmpty) {
Some(distributions.values.toList.sortBy(_.exec.executorId).map(_.toApi()))
Some(distributions.values.toList.sortBy(_.executorId).map(_.toApi()))
} else {
None
}
Original file line number Diff line number Diff line change
@@ -21,90 +21,11 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.storage.StorageListener

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
includeDetails = false)
}
}
def rddList(): Seq[RDDStorageInfo] = ui.store.rddList()

}

private[spark] object AllRDDResource {

def getRDDStorageInfo(
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
}

def getRDDStorageInfo(
rddId: Int,
rddInfo: RDDInfo,
storageStatusList: Seq[StorageStatus],
includeDetails: Boolean): RDDStorageInfo = {
val workers = storageStatusList.map { (rddId, _) }
val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
val blocks = storageStatusList
.flatMap { _.rddBlocksById(rddId) }
.sortWith { _._1.name < _._1.name }
.map { case (blockId, status) =>
(blockId, status, blockLocations.getOrElse(blockId, Seq[String]("Unknown")))
}

val dataDistribution = if (includeDetails) {
Some(storageStatusList.map { status =>
new RDDDataDistribution(
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
diskUsed = status.diskUsedByRdd(rddId),
onHeapMemoryUsed = Some(
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
offHeapMemoryUsed = Some(
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
onHeapMemoryRemaining = status.onHeapMemRemaining,
offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
}
val partitions = if (includeDetails) {
Some(blocks.map { case (id, block, locations) =>
new RDDPartitionInfo(
blockName = id.name,
storageLevel = block.storageLevel.description,
memoryUsed = block.memSize,
diskUsed = block.diskSize,
executors = locations
)
} )
} else {
None
}

new RDDStorageInfo(
id = rddId,
name = rddInfo.name,
numPartitions = rddInfo.numPartitions,
numCachedPartitions = rddInfo.numCachedPartitions,
storageLevel = rddInfo.storageLevel.description,
memoryUsed = rddInfo.memSize,
diskUsed = rddInfo.diskSize,
dataDistribution = dataDistribution,
partitions = partitions
)
}
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.util.NoSuchElementException
import javax.ws.rs.{GET, PathParam, Produces}
import javax.ws.rs.core.MediaType

@@ -26,9 +27,12 @@ private[v1] class OneRDDResource(ui: SparkUI) {

@GET
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
try {
ui.store.rdd(rddId)
} catch {
case _: NoSuchElementException =>
throw new NotFoundException(s"no rdd found w/ id $rddId")
}
}

}
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
@@ -141,3 +141,19 @@ private[spark] class ExecutorStageSummaryWrapper(
private[spark] class ExecutorEventData(
@KVIndexParam val id: Long,
val event: SparkListenerEvent)

private[spark] class StreamBlockData(
val name: String,
val executorId: String,
val hostPort: String,
val storageLevel: String,
val useMemory: Boolean,
val useDisk: Boolean,
val deserialized: Boolean,
val memSize: Long,
val diskSize: Long) {

@JsonIgnore @KVIndex
def key: Array[String] = Array(name, executorId)

}
100 changes: 0 additions & 100 deletions core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala

This file was deleted.

Loading

0 comments on commit c5a17fd

Please sign in to comment.