Skip to content

Commit

Permalink
SHS-NG M4.6: Add "max" and "last" to kvstore iterators.
Browse files Browse the repository at this point in the history
This makes it easier for callers to control the end of iteration,
making it easier to write Scala code that automatically closes
underlying iterator resources. Before, code had to use Scala's
"takeWhile", convert the result to a list, and manually close the
iterators; with these two parameters, that can be avoided in a
bunch of cases, with iterators auto-closing when the last element
is reached.
  • Loading branch information
Marcelo Vanzin committed May 1, 2017
1 parent 03112a2 commit fedc9ec
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
* </p>
*
* <p>
* The iterators returns by this view are of type {@link KVStoreIterator}; they auto-close
* The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close
* when used in a for loop that exhausts their contents, but when used manually, they need
* to be closed explicitly unless all elements are read.
* to be closed explicitly unless all elements are read. For this reason, it's recommended
* that {@link #last(Object)} and {@link #max(long)} be used to make it easier to release
* resources associated with the iterator by better controlling how many elements will be
* retrieved.
* </p>
*/
public abstract class KVStoreView<T> implements Iterable<T> {
Expand All @@ -43,7 +46,9 @@ public abstract class KVStoreView<T> implements Iterable<T> {
boolean ascending = true;
String index = KVIndex.NATURAL_INDEX_NAME;
Object first = null;
Object last = null;
long skip = 0L;
long max = Long.MAX_VALUE;

public KVStoreView(Class<T> type) {
this.type = type;
Expand Down Expand Up @@ -74,7 +79,25 @@ public KVStoreView<T> first(Object value) {
}

/**
* Skips a number of elements in the resulting iterator.
* Stops iteration at the given value of the chosen index.
*/
public KVStoreView<T> last(Object value) {
this.last = value;
return this;
}

/**
* Stops iteration after a number of elements has been retrieved.
*/
public KVStoreView<T> max(long max) {
Preconditions.checkArgument(max > 0L, "max must be positive.");
this.max = max;
return this;
}

/**
* Skips a number of elements at the start of iteration. Skipped elements are not accounted
* when using {@link #max(long)}.
*/
public KVStoreView<T> skip(long n) {
this.skip = n;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
private final LevelDBTypeInfo.Index index;
private final byte[] indexKeyPrefix;
private final byte[] end;
private final long max;

private boolean checkedNext;
private T next;
private boolean closed;
private long count;

/**
* Creates a simple iterator over db keys.
Expand All @@ -55,6 +57,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
this.it = db.db().iterator();
this.indexKeyPrefix = keyPrefix;
this.end = null;
this.max = -1L;
it.seek(keyPrefix);
}

Expand All @@ -69,6 +72,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
this.ti = db.getTypeInfo(type);
this.index = ti.index(params.index);
this.indexKeyPrefix = index.keyPrefix();
this.max = params.max;

byte[] firstKey;
if (params.first != null) {
Expand All @@ -84,10 +88,13 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
}
it.seek(firstKey);

byte[] end = null;
if (ascending) {
this.end = index.end();
end = params.last != null ? index.end(params.last) : index.end();
} else {
this.end = null;
if (params.last != null) {
end = index.start(params.last);
}
if (it.hasNext()) {
// When descending, the caller may have set up the start of iteration at a non-existant
// entry that is guaranteed to be after the desired entry. For example, if you have a
Expand All @@ -101,6 +108,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
}
}
}
this.end = end;

if (params.skip > 0) {
skip(params.skip);
Expand Down Expand Up @@ -186,6 +194,10 @@ protected void finalize() throws Throwable {
}

private T loadNext() {
if (count >= max) {
return null;
}

try {
while (true) {
boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
Expand All @@ -212,11 +224,16 @@ private T loadNext() {
return null;
}

// If there's a known end key and it's found, stop.
if (end != null && Arrays.equals(nextKey, end)) {
return null;
// If there's a known end key and iteration has gone past it, stop.
if (end != null) {
int comp = compare(nextKey, end) * (ascending ? 1 : -1);
if (comp > 0) {
return null;
}
}

count++;

// Next element is part of the iteration, return it.
if (index == null || index.isCopy()) {
return db.serializer.deserialize(nextEntry.getValue(), type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -152,109 +153,170 @@ public static void cleanup() throws Exception {

@Test
public void naturalIndex() throws Exception {
testIteration(NATURAL_ORDER, view(), null);
testIteration(NATURAL_ORDER, view(), null, null);
}

@Test
public void refIndex() throws Exception {
testIteration(REF_INDEX_ORDER, view().index("id"), null);
testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
}

@Test
public void copyIndex() throws Exception {
testIteration(COPY_INDEX_ORDER, view().index("name"), null);
testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
}

@Test
public void numericIndex() throws Exception {
testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null);
testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
}

@Test
public void naturalIndexDescending() throws Exception {
testIteration(NATURAL_ORDER, view().reverse(), null);
testIteration(NATURAL_ORDER, view().reverse(), null, null);
}

@Test
public void refIndexDescending() throws Exception {
testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null);
testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
}

@Test
public void copyIndexDescending() throws Exception {
testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null);
testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, null);
}

@Test
public void numericIndexDescending() throws Exception {
testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null);
testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, null);
}

@Test
public void naturalIndexWithStart() throws Exception {
CustomType1 first = pickFirst();
testIteration(NATURAL_ORDER, view().first(first.key), first);
CustomType1 first = pickLimit();
testIteration(NATURAL_ORDER, view().first(first.key), first, null);
}

@Test
public void refIndexWithStart() throws Exception {
CustomType1 first = pickFirst();
testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first);
CustomType1 first = pickLimit();
testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, null);
}

@Test
public void copyIndexWithStart() throws Exception {
CustomType1 first = pickFirst();
testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first);
CustomType1 first = pickLimit();
testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first, null);
}

@Test
public void numericIndexWithStart() throws Exception {
CustomType1 first = pickFirst();
testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first);
CustomType1 first = pickLimit();
testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first, null);
}

@Test
public void naturalIndexDescendingWithStart() throws Exception {
CustomType1 first = pickFirst();
testIteration(NATURAL_ORDER, view().reverse().first(first.key), first);
CustomType1 first = pickLimit();
testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null);
}

@Test
public void refIndexDescendingWithStart() throws Exception {
CustomType1 first = pickFirst();
testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first);
CustomType1 first = pickLimit();
testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first, null);
}

@Test
public void copyIndexDescendingWithStart() throws Exception {
CustomType1 first = pickFirst();
CustomType1 first = pickLimit();
testIteration(COPY_INDEX_ORDER, view().reverse().index("name").first(first.name),
first);
first, null);
}

@Test
public void numericIndexDescendingWithStart() throws Exception {
CustomType1 first = pickFirst();
CustomType1 first = pickLimit();
testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").first(first.num),
first);
first, null);
}

@Test
public void naturalIndexWithSkip() throws Exception {
testIteration(NATURAL_ORDER, view().skip(RND.nextInt(allEntries.size() / 2)), null);
testIteration(NATURAL_ORDER, view().skip(RND.nextInt(allEntries.size() / 2)), null, null);
}

@Test
public void refIndexWithSkip() throws Exception {
testIteration(REF_INDEX_ORDER, view().index("id").skip(RND.nextInt(allEntries.size() / 2)),
null);
null, null);
}

@Test
public void copyIndexWithSkip() throws Exception {
testIteration(COPY_INDEX_ORDER, view().index("name").skip(RND.nextInt(allEntries.size() / 2)),
null);
null, null);
}

@Test
public void naturalIndexWithMax() throws Exception {
testIteration(NATURAL_ORDER, view().max(RND.nextInt(allEntries.size() / 2)), null, null);
}

@Test
public void copyIndexWithMax() throws Exception {
testIteration(COPY_INDEX_ORDER, view().index("name").max(RND.nextInt(allEntries.size() / 2)),
null, null);
}

@Test
public void naturalIndexWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(NATURAL_ORDER, view().last(last.key), null, last);
}

@Test
public void refIndexWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, last);
}

@Test
public void copyIndexWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), null, last);
}

@Test
public void numericIndexWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), null, last);
}

@Test
public void naturalIndexDescendingWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
}

@Test
public void refIndexDescendingWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), null, last);
}

@Test
public void copyIndexDescendingWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(COPY_INDEX_ORDER, view().reverse().index("name").last(last.name),
null, last);
}

@Test
public void numericIndexDescendingWithLast() throws Exception {
CustomType1 last = pickLimit();
testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").last(last.num),
null, last);
}

@Test
Expand All @@ -272,8 +334,8 @@ public void testRefWithIntNaturalKey() throws Exception {
}
}

private CustomType1 pickFirst() {
// Picks a first element that has clashes with other elements in the given index.
private CustomType1 pickLimit() {
// Picks an element that has clashes with other elements in the given index.
return clashingEntries.get(RND.nextInt(clashingEntries.size()));
}

Expand All @@ -297,22 +359,32 @@ private <T extends Comparable<T>> int compareWithFallback(
private void testIteration(
final BaseComparator order,
final KVStoreView<CustomType1> params,
final CustomType1 first) throws Exception {
final CustomType1 first,
final CustomType1 last) throws Exception {
List<CustomType1> indexOrder = sortBy(order.fallback());
if (!params.ascending) {
indexOrder = Lists.reverse(indexOrder);
}

Iterable<CustomType1> expected = indexOrder;
BaseComparator expectedOrder = params.ascending ? order : order.reverse();

if (first != null) {
final BaseComparator expectedOrder = params.ascending ? order : order.reverse();
expected = Iterables.filter(expected, v -> expectedOrder.compare(first, v) <= 0);
}

if (last != null) {
expected = Iterables.filter(expected, v -> expectedOrder.compare(v, last) <= 0);
}

if (params.skip > 0) {
expected = Iterables.skip(expected, (int) params.skip);
}

if (params.max != Long.MAX_VALUE) {
expected = Iterables.limit(expected, (int) params.max);
}

List<CustomType1> actual = collect(params);
compareLists(expected, actual);
}
Expand Down
Loading

0 comments on commit fedc9ec

Please sign in to comment.