From fedc9ecb6c55dc9a70a90a7cfd723090645a0dc1 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin
Date: Fri, 24 Mar 2017 13:07:17 -0700
Subject: [PATCH] SHS-NG M4.6: Add "max" and "last" to kvstore iterators.
This makes it easier for callers to control the end of iteration,
making it easier to write Scala code that automatically closes
underlying iterator resources. Before, code had to use Scala's
"takeWhile", convert the result to a list, and manually close the
iterators; with these two parameters, that can be avoided in a
bunch of cases, with iterators auto-closing when the last element
is reached.
---
.../org/apache/spark/kvstore/KVStoreView.java | 29 +++-
.../apache/spark/kvstore/LevelDBIterator.java | 27 +++-
.../apache/spark/kvstore/DBIteratorSuite.java | 134 ++++++++++++++----
.../apache/spark/status/AppStateStore.scala | 90 ++++--------
.../org/apache/spark/ui/jobs/StageTable.scala | 2 +-
5 files changed, 176 insertions(+), 106 deletions(-)
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
index a68c37942dee4..ab86dc35f14a2 100644
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
@@ -31,9 +31,12 @@
*
*
*
- * The iterators returns by this view are of type {@link KVStoreIterator}; they auto-close
+ * The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close
* when used in a for loop that exhausts their contents, but when used manually, they need
- * to be closed explicitly unless all elements are read.
+ * to be closed explicitly unless all elements are read. For this reason, it's recommended
+ * that {@link #last(Object)} and {@link #max(long)} be used to make it easier to release
+ * resources associated with the iterator by better controlling how many elements will be
+ * retrieved.
*
*/
public abstract class KVStoreView implements Iterable {
@@ -43,7 +46,9 @@ public abstract class KVStoreView implements Iterable {
boolean ascending = true;
String index = KVIndex.NATURAL_INDEX_NAME;
Object first = null;
+ Object last = null;
long skip = 0L;
+ long max = Long.MAX_VALUE;
public KVStoreView(Class type) {
this.type = type;
@@ -74,7 +79,25 @@ public KVStoreView first(Object value) {
}
/**
- * Skips a number of elements in the resulting iterator.
+ * Stops iteration at the given value of the chosen index.
+ */
+ public KVStoreView last(Object value) {
+ this.last = value;
+ return this;
+ }
+
+ /**
+ * Stops iteration after a number of elements has been retrieved.
+ */
+ public KVStoreView max(long max) {
+ Preconditions.checkArgument(max > 0L, "max must be positive.");
+ this.max = max;
+ return this;
+ }
+
+ /**
+ * Skips a number of elements at the start of iteration. Skipped elements are not accounted
+ * when using {@link #max(long)}.
*/
public KVStoreView skip(long n) {
this.skip = n;
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
index 0a4b29d2453fb..10cb580d85e0c 100644
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
@@ -38,10 +38,12 @@ class LevelDBIterator implements KVStoreIterator {
private final LevelDBTypeInfo.Index index;
private final byte[] indexKeyPrefix;
private final byte[] end;
+ private final long max;
private boolean checkedNext;
private T next;
private boolean closed;
+ private long count;
/**
* Creates a simple iterator over db keys.
@@ -55,6 +57,7 @@ class LevelDBIterator implements KVStoreIterator {
this.it = db.db().iterator();
this.indexKeyPrefix = keyPrefix;
this.end = null;
+ this.max = -1L;
it.seek(keyPrefix);
}
@@ -69,6 +72,7 @@ class LevelDBIterator implements KVStoreIterator {
this.ti = db.getTypeInfo(type);
this.index = ti.index(params.index);
this.indexKeyPrefix = index.keyPrefix();
+ this.max = params.max;
byte[] firstKey;
if (params.first != null) {
@@ -84,10 +88,13 @@ class LevelDBIterator implements KVStoreIterator {
}
it.seek(firstKey);
+ byte[] end = null;
if (ascending) {
- this.end = index.end();
+ end = params.last != null ? index.end(params.last) : index.end();
} else {
- this.end = null;
+ if (params.last != null) {
+ end = index.start(params.last);
+ }
if (it.hasNext()) {
// When descending, the caller may have set up the start of iteration at a non-existant
// entry that is guaranteed to be after the desired entry. For example, if you have a
@@ -101,6 +108,7 @@ class LevelDBIterator implements KVStoreIterator {
}
}
}
+ this.end = end;
if (params.skip > 0) {
skip(params.skip);
@@ -186,6 +194,10 @@ protected void finalize() throws Throwable {
}
private T loadNext() {
+ if (count >= max) {
+ return null;
+ }
+
try {
while (true) {
boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
@@ -212,11 +224,16 @@ private T loadNext() {
return null;
}
- // If there's a known end key and it's found, stop.
- if (end != null && Arrays.equals(nextKey, end)) {
- return null;
+ // If there's a known end key and iteration has gone past it, stop.
+ if (end != null) {
+ int comp = compare(nextKey, end) * (ascending ? 1 : -1);
+ if (comp > 0) {
+ return null;
+ }
}
+ count++;
+
// Next element is part of the iteration, return it.
if (index == null || index.isCopy()) {
return db.serializer.deserialize(nextEntry.getValue(), type);
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
index 88c7cc08984bb..0119d58d8ae26 100644
--- a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
@@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -152,109 +153,170 @@ public static void cleanup() throws Exception {
@Test
public void naturalIndex() throws Exception {
- testIteration(NATURAL_ORDER, view(), null);
+ testIteration(NATURAL_ORDER, view(), null, null);
}
@Test
public void refIndex() throws Exception {
- testIteration(REF_INDEX_ORDER, view().index("id"), null);
+ testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
}
@Test
public void copyIndex() throws Exception {
- testIteration(COPY_INDEX_ORDER, view().index("name"), null);
+ testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
}
@Test
public void numericIndex() throws Exception {
- testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null);
+ testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
}
@Test
public void naturalIndexDescending() throws Exception {
- testIteration(NATURAL_ORDER, view().reverse(), null);
+ testIteration(NATURAL_ORDER, view().reverse(), null, null);
}
@Test
public void refIndexDescending() throws Exception {
- testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null);
+ testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
}
@Test
public void copyIndexDescending() throws Exception {
- testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null);
+ testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, null);
}
@Test
public void numericIndexDescending() throws Exception {
- testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null);
+ testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, null);
}
@Test
public void naturalIndexWithStart() throws Exception {
- CustomType1 first = pickFirst();
- testIteration(NATURAL_ORDER, view().first(first.key), first);
+ CustomType1 first = pickLimit();
+ testIteration(NATURAL_ORDER, view().first(first.key), first, null);
}
@Test
public void refIndexWithStart() throws Exception {
- CustomType1 first = pickFirst();
- testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first);
+ CustomType1 first = pickLimit();
+ testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, null);
}
@Test
public void copyIndexWithStart() throws Exception {
- CustomType1 first = pickFirst();
- testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first);
+ CustomType1 first = pickLimit();
+ testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first, null);
}
@Test
public void numericIndexWithStart() throws Exception {
- CustomType1 first = pickFirst();
- testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first);
+ CustomType1 first = pickLimit();
+ testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first, null);
}
@Test
public void naturalIndexDescendingWithStart() throws Exception {
- CustomType1 first = pickFirst();
- testIteration(NATURAL_ORDER, view().reverse().first(first.key), first);
+ CustomType1 first = pickLimit();
+ testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null);
}
@Test
public void refIndexDescendingWithStart() throws Exception {
- CustomType1 first = pickFirst();
- testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first);
+ CustomType1 first = pickLimit();
+ testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first, null);
}
@Test
public void copyIndexDescendingWithStart() throws Exception {
- CustomType1 first = pickFirst();
+ CustomType1 first = pickLimit();
testIteration(COPY_INDEX_ORDER, view().reverse().index("name").first(first.name),
- first);
+ first, null);
}
@Test
public void numericIndexDescendingWithStart() throws Exception {
- CustomType1 first = pickFirst();
+ CustomType1 first = pickLimit();
testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").first(first.num),
- first);
+ first, null);
}
@Test
public void naturalIndexWithSkip() throws Exception {
- testIteration(NATURAL_ORDER, view().skip(RND.nextInt(allEntries.size() / 2)), null);
+ testIteration(NATURAL_ORDER, view().skip(RND.nextInt(allEntries.size() / 2)), null, null);
}
@Test
public void refIndexWithSkip() throws Exception {
testIteration(REF_INDEX_ORDER, view().index("id").skip(RND.nextInt(allEntries.size() / 2)),
- null);
+ null, null);
}
@Test
public void copyIndexWithSkip() throws Exception {
testIteration(COPY_INDEX_ORDER, view().index("name").skip(RND.nextInt(allEntries.size() / 2)),
- null);
+ null, null);
+ }
+
+ @Test
+ public void naturalIndexWithMax() throws Exception {
+ testIteration(NATURAL_ORDER, view().max(RND.nextInt(allEntries.size() / 2)), null, null);
+ }
+
+ @Test
+ public void copyIndexWithMax() throws Exception {
+ testIteration(COPY_INDEX_ORDER, view().index("name").max(RND.nextInt(allEntries.size() / 2)),
+ null, null);
+ }
+
+ @Test
+ public void naturalIndexWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(NATURAL_ORDER, view().last(last.key), null, last);
+ }
+
+ @Test
+ public void refIndexWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, last);
+ }
+
+ @Test
+ public void copyIndexWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), null, last);
+ }
+
+ @Test
+ public void numericIndexWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), null, last);
+ }
+
+ @Test
+ public void naturalIndexDescendingWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
+ }
+
+ @Test
+ public void refIndexDescendingWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), null, last);
+ }
+
+ @Test
+ public void copyIndexDescendingWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(COPY_INDEX_ORDER, view().reverse().index("name").last(last.name),
+ null, last);
+ }
+
+ @Test
+ public void numericIndexDescendingWithLast() throws Exception {
+ CustomType1 last = pickLimit();
+ testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").last(last.num),
+ null, last);
}
@Test
@@ -272,8 +334,8 @@ public void testRefWithIntNaturalKey() throws Exception {
}
}
- private CustomType1 pickFirst() {
- // Picks a first element that has clashes with other elements in the given index.
+ private CustomType1 pickLimit() {
+ // Picks an element that has clashes with other elements in the given index.
return clashingEntries.get(RND.nextInt(clashingEntries.size()));
}
@@ -297,22 +359,32 @@ private > int compareWithFallback(
private void testIteration(
final BaseComparator order,
final KVStoreView params,
- final CustomType1 first) throws Exception {
+ final CustomType1 first,
+ final CustomType1 last) throws Exception {
List indexOrder = sortBy(order.fallback());
if (!params.ascending) {
indexOrder = Lists.reverse(indexOrder);
}
Iterable expected = indexOrder;
+ BaseComparator expectedOrder = params.ascending ? order : order.reverse();
+
if (first != null) {
- final BaseComparator expectedOrder = params.ascending ? order : order.reverse();
expected = Iterables.filter(expected, v -> expectedOrder.compare(first, v) <= 0);
}
+ if (last != null) {
+ expected = Iterables.filter(expected, v -> expectedOrder.compare(v, last) <= 0);
+ }
+
if (params.skip > 0) {
expected = Iterables.skip(expected, (int) params.skip);
}
+ if (params.max != Long.MAX_VALUE) {
+ expected = Iterables.limit(expected, (int) params.max);
+ }
+
List actual = collect(params);
compareLists(expected, actual);
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala
index 36dc2979aa02a..87a4d84c22b12 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala
@@ -63,19 +63,8 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
}
def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
- val it = store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse()
- .closeableIterator()
- val view = it.asScala.map(_.info)
- try {
- val execs = if (activeOnly) {
- view.takeWhile(_.isActive)
- } else {
- view
- }
- execs.toList
- } finally {
- it.close()
- }
+ store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true).last(true)
+ .asScala.map(_.info).toSeq
}
def executorSummary(executorId: String): Option[v1.ExecutorSummary] = {
@@ -97,13 +86,8 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
}
def stageData(stageId: Int): Seq[v1.StageData] = {
- val it = store.view(classOf[StageDataWrapper]).index("stageId").first(stageId)
- .closeableIterator()
- try {
- it.asScala.map(_.info).takeWhile(_.stageId == stageId).map(stageWithDetails).toList
- } finally {
- it.close()
- }
+ store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId)
+ .asScala.map { s => stageWithDetails(s.info) }.toSeq
}
def lastStageAttempt(stageId: Int): v1.StageData = {
@@ -129,20 +113,14 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
val stage = Array(stageId, stageAttemptId)
- val it = store.view(classOf[TaskDataWrapper])
+ val rawMetrics = store.view(classOf[TaskDataWrapper])
.index("stage")
.first(stage)
- .closeableIterator()
-
- val rawMetrics = try {
- it.asScala
- .takeWhile { t => Arrays.equals(t.stage, stage) }
- .flatMap(_.info.taskMetrics)
- .toList
- .view
- } finally {
- it.close()
- }
+ .last(stage)
+ .asScala
+ .flatMap(_.info.taskMetrics)
+ .toList
+ .view
def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
@@ -226,8 +204,8 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = {
val stageKey = Array(stageId, stageAttemptId)
- val view = store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).reverse()
- toTaskList(view, stageKey, 0, maxTasks).reverse
+ store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse()
+ .max(maxTasks).asScala.map(_.info).toSeq.reverse
}
def taskList(
@@ -240,31 +218,14 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
val base = store.view(classOf[TaskDataWrapper])
val indexed = sortBy match {
case v1.TaskSorting.ID =>
- base.index("stage").first(stageKey)
+ base.index("stage").first(stageKey).last(stageKey)
case v1.TaskSorting.INCREASING_RUNTIME =>
- base.index("runtime").first(stageKey ++ Array(-1L))
+ base.index("runtime").first(stageKey ++ Array(-1L)).last(stageKey ++ Array(Long.MaxValue))
case v1.TaskSorting.DECREASING_RUNTIME =>
- base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).reverse()
- }
- toTaskList(indexed, stageKey, offset, length)
- }
-
- private def toTaskList(
- view: KVStoreView[TaskDataWrapper],
- stageKey: Array[Int],
- offset: Int,
- length: Int): Seq[v1.TaskData] = {
- val it = view.skip(offset).closeableIterator()
- try {
- var taken = 0
- it.asScala.takeWhile { t =>
- val take = taken < length && Arrays.equals(t.stage, stageKey)
- taken += 1
- take
- }.map(_.info).toList
- } finally {
- it.close()
+ base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).last(stageKey ++ Array(-1L))
+ .reverse()
}
+ indexed.skip(offset).max(length).asScala.map(_.info).toSeq
}
private def stageWithDetails(stage: v1.StageData): v1.StageData = {
@@ -275,16 +236,13 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
.toMap
val stageKey = Array(stage.stageId, stage.attemptId)
- val it = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stageKey)
- .closeableIterator()
- val execs = try {
- it.asScala
- .takeWhile { exec => Arrays.equals(exec.stage, stageKey) }
- .map { exec => (exec.executorId -> exec.info) }
- .toMap
- } finally {
- it.close()
- }
+ val execs = store.view(classOf[ExecutorStageSummaryWrapper])
+ .index("stage")
+ .first(stageKey)
+ .last(stageKey)
+ .asScala
+ .map { exec => (exec.executorId -> exec.info) }
+ .toMap
new v1.StageData(
stage.status,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index ad623612d3dee..1bbe15755969a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -438,7 +438,7 @@ private[ui] class StageDataSource(
// The submission time for a stage is misleading because it counts the time
// the stage waits to be launched. (SPARK-10930)
val taskLaunchTimes = store
- .taskList(stageData.stageId, stageData.attemptId, 0, -1, v1.TaskSorting.ID)
+ .taskList(stageData.stageId, stageData.attemptId, 0, Int.MaxValue, v1.TaskSorting.ID)
.map(_.launchTime.getTime()).filter(_ > 0)
val duration: Option[Long] =
if (taskLaunchTimes.nonEmpty) {