Skip to content

Commit

Permalink
Change the OnlineRadixSorter yet again, this time to use a sorted map…
Browse files Browse the repository at this point in the history
… to allow for chaining.

In addition to chaining sequential stages together (which now, again, need to create all intermediates before the current work item), the class now emits which sequential stages are missing (outstanding) in the DCF's diagnostic supplier.
Sample output may look like this
[504336483] Caller-task completion for idx=3[…]<-[335107734] OnlineRadixSorterForIntegratedKeys.addFutureForWork[…]<-[215078753] Kickoff for slot #3[…]<-[1384454980] Work to finish for slot #2 is awaiting [slotsOutstanding:2][…]

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Apr 25, 2024
1 parent 030f91c commit e8305a7
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,8 @@ public DiagnosticTrackableCompletableFuture<D,T> getInnerComposedPendingCompleta
}

public <U> DiagnosticTrackableCompletableFuture<D, U>
handle(@NonNull BiFunction<? super T, Throwable, ? extends U> fn,
@NonNull Supplier<D> diagnosticSupplier) {
CompletableFuture<U> newCf = this.future.handle(fn::apply);
handle(@NonNull BiFunction<? super T, Throwable, ? extends U> fn, @NonNull Supplier<D> diagnosticSupplier) {
CompletableFuture<U> newCf = this.future.handle(fn);
return new DiagnosticTrackableCompletableFuture<>(newCf, diagnosticSupplier, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.lucene.document.IntRange;
import org.opensearch.migrations.replay.datatypes.FutureTransformer;

import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This provides a simple implementation to sort incoming elements that are ordered by a sequence
Expand All @@ -28,16 +37,26 @@
public class OnlineRadixSorter {
@AllArgsConstructor
private static class IndexedWork {
public final int index;
public final DiagnosticTrackableCompletableFuture<String,Void> signalingFuture;
public final DiagnosticTrackableCompletableFuture<String,Void> workCompletedFuture;
public final DiagnosticTrackableCompletableFuture<String,Void> signalingToStartFuture;
public DiagnosticTrackableCompletableFuture<String,? extends Object> workCompletedFuture;
public final DiagnosticTrackableCompletableFuture<String,Void> signalWorkCompletedFuture;

public <T> DiagnosticTrackableCompletableFuture<String,T>
addWorkFuture(FutureTransformer<T> processor, int index) {
var rval = processor.apply(signalingToStartFuture)
.whenComplete((v,t)->
signalWorkCompletedFuture.future.complete(null),
()->"Caller-task completion for idx=" + index);
workCompletedFuture = rval;
return rval;
}
}

private final PriorityQueue<IndexedWork> items;
private final SortedMap<Integer,IndexedWork> items;
int currentOffset;

public OnlineRadixSorter(int startingOffset) {
items = new PriorityQueue<>(Comparator.comparingInt(iw->iw.index));
items = new TreeMap<>();
currentOffset = startingOffset;
}

Expand All @@ -54,42 +73,48 @@ public OnlineRadixSorter(int startingOffset) {
* @return
*/
public <T> DiagnosticTrackableCompletableFuture<String,T>
addFutureForWork(int index, FutureTransformer<T> processor) {
var signalFuture = new StringTrackableCompletableFuture<Void>("OnlineRadixSorter signal future #" + index);
var continueFuture = processor.apply(signalFuture);

// purposefully use getDeferredFutureThroughHandle to do type erasure on T to get it back to Void
// since the caller is creating a DCF<T> for their needs. However, type T will only come up again
// as per the work that was set within the processor. There's no benefit to making the underlying
// datastore aware of that T, hence the erasure.
var workBundle = new IndexedWork(index, signalFuture,
continueFuture.thenApply(v->{
log.atDebug().setMessage(()->"Increasing currentOffset to " + currentOffset +
" for " + System.identityHashCode(this)).log();
items.remove();
addFutureForWork(final int index, FutureTransformer<T> processor) {
var oldWorkItem = items.get(index);
if (oldWorkItem == null) {
if (index < currentOffset) {
throw new IllegalArgumentException("index (" + index + ")" +
" must be > last processed item (" + currentOffset + ")");
}
for (int nextKey = Math.max(currentOffset, items.isEmpty() ? 0 : items.lastKey()+1);
nextKey<=index;
++nextKey) {
int finalNextKey = nextKey;
var signalFuture = items.isEmpty() ?
new StringTrackableCompletableFuture<Void>(
CompletableFuture.completedFuture(null), "unlinked signaling future") :
items.get(items.lastKey()).signalWorkCompletedFuture
.thenAccept(v-> {},
()->"Kickoff for slot #" + finalNextKey);
oldWorkItem = new IndexedWork(signalFuture, null,
new StringTrackableCompletableFuture<Void>(()->"Work to finish for slot #" + finalNextKey +
" is awaiting [" + getAwaitingTextUpTo(index) + "]"));
oldWorkItem.signalWorkCompletedFuture.whenComplete((v,t)->{
++currentOffset;
pullNextWorkItemOrDoNothing();
return null;
}, () -> "Bumping currentOffset and checking if the next items should be signaled"));
items.add(workBundle);
if (index == this.currentOffset) {
pullNextWorkItemOrDoNothing();
items.remove(finalNextKey);
}, ()->"cleaning up spent work for idx #" + finalNextKey);
items.put(nextKey, oldWorkItem);
}
}
return continueFuture;
return oldWorkItem.addWorkFuture(processor, index);
}

private void pullNextWorkItemOrDoNothing() {
Optional.ofNullable(items.isEmpty() ? null : items.peek())
.filter(indexedWork -> indexedWork.index == currentOffset)
.ifPresent(indexedWork -> {
var firstSignal = indexedWork.signalingFuture.future.complete(null);
assert firstSignal : "expected only this function to signal completion of the signaling future " +
"and for it to only be called once";
});
public String getAwaitingTextUpTo(int upTo) {
return "slotsOutstanding:" +
IntStream.range(0, upTo-currentOffset)
.map(i->upTo-i-1)
.mapToObj(i -> Optional.ofNullable(items.get(i))
.flatMap(wi->Optional.ofNullable(wi.workCompletedFuture))
.map(ignored->"")
.orElse(i+""))
.filter(s->!s.isEmpty())
.collect(Collectors.joining(","));
}

public boolean hasPending() { return !items.isEmpty(); }

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("OnlineRadixSorter{");
Expand All @@ -100,6 +125,8 @@ public String toString() {
return sb.toString();
}

public boolean hasPending() { return !items.isEmpty(); }

public long numPending() {
return items.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -118,6 +119,8 @@ public void testFutureGraphBuildout() throws Exception {
lastEndTime.plus(Duration.ofMillis(100)));

Assertions.assertEquals(NUM_REQUESTS_TO_SCHEDULE, scheduledRequests.size());
var reversedScheduledRequests = new ArrayList<>(scheduledRequests);
Collections.reverse(reversedScheduledRequests);
for (int i = 0; i < scheduledRequests.size(); ++i) {
for (int j = 0; j<NUM_PACKETS+1; ++j) {
var pktConsumer = connectionToConsumerMap.get((long)i);
Expand All @@ -126,8 +129,8 @@ public void testFutureGraphBuildout() throws Exception {
int finalI = i;
int finalJ = j;
log.atInfo().setMessage(() -> "cf @ " + finalI + "," + finalJ + " =\n" +
scheduledRequests.stream().map(sr-> getParentsDiagnosticString(sr, ""))
.collect(Collectors.joining("\n")))
reversedScheduledRequests.stream().map(sr-> getParentsDiagnosticString(sr, ""))
.collect(Collectors.joining("\n---\n")))
.log();
pktConsumer.consumeIsReady.release();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,55 @@
package org.opensearch.migrations.replay.util;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
class OnlineRadixSorterTest {

private static String stringify(Stream<Integer> stream) {
return stream.map(i->i.toString()).collect(Collectors.joining(","));
return stream.map(Object::toString).collect(Collectors.joining(","));
}

private static String add(OnlineRadixSorterForIntegratedKeys<Integer> sorter, ArrayList<Integer> receivedItems, int v) {
sorter.add(v, ()-> receivedItems.add(v));
private static String add(OnlineRadixSorterForIntegratedKeys<Integer> sorter,
Map<Integer, DiagnosticTrackableCompletableFuture<String,Void>> m,
ArrayList<Integer> receivedItems, int v) {
var dcf = sorter.add(v, () -> receivedItems.add(v));
if (m != null) { m.put(v, dcf); }
log.atInfo().setMessage(()->"after adding work... "+dcf).log();
return stringify(receivedItems.stream());
}

@Test
void testOnlineRadixSorter_inOrder() {
var radixSorter = new OnlineRadixSorterForIntegratedKeys(1, i -> (int) i);
Assertions.assertEquals("1", add(radixSorter, new ArrayList<Integer>(), 1));
Assertions.assertEquals("2", add(radixSorter, new ArrayList<Integer>(), 2));
Assertions.assertEquals("3", add(radixSorter, new ArrayList<Integer>(), 3));
Assertions.assertEquals("1", add(radixSorter, null, new ArrayList<Integer>(), 1));
Assertions.assertEquals("2", add(radixSorter, null, new ArrayList<Integer>(), 2));
Assertions.assertEquals("3", add(radixSorter, null, new ArrayList<Integer>(), 3));
}

@Test
void testOnlineRadixSorter_outOfOrder() {
var radixSorter = new OnlineRadixSorterForIntegratedKeys(1, i->(int) i);
var receiverList = new ArrayList<Integer>();
Assertions.assertEquals("", add(radixSorter, receiverList, 3));
Assertions.assertEquals("", add(radixSorter, receiverList, 4));
Assertions.assertEquals("1", add(radixSorter, receiverList, 1));
var dcfMap = new HashMap<Integer,DiagnosticTrackableCompletableFuture<String,Void>>();
Assertions.assertEquals("", add(radixSorter, dcfMap, receiverList, 3));
Assertions.assertEquals("", add(radixSorter, dcfMap, receiverList, 4));
Assertions.assertEquals("1", add(radixSorter, dcfMap, receiverList, 1));
log.atInfo().setMessage(()->"after adding work for '1'... dcf[3]=" + dcfMap.get(3)).log();
log.atInfo().setMessage(()->"after adding work for '1'... dcf[4]=" + dcfMap.get(4)).log();
receiverList.clear();
Assertions.assertEquals("2,3,4", add(radixSorter, receiverList, 2));
Assertions.assertEquals("2,3,4", add(radixSorter, dcfMap, receiverList, 2));
receiverList.clear();
Assertions.assertEquals("5", add(radixSorter, receiverList, 5));
Assertions.assertEquals("5", add(radixSorter, dcfMap, receiverList, 5));
receiverList.clear();
Assertions.assertEquals("", add(radixSorter, receiverList, 7));
Assertions.assertEquals("", add(radixSorter, dcfMap, receiverList, 7));
receiverList.clear();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.replay.util;

import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

import java.util.function.ToIntFunction;

/**
Expand All @@ -10,6 +11,7 @@
*
* @param <T>
*/
@Slf4j
public class OnlineRadixSorterForIntegratedKeys<T> extends OnlineRadixSorter {

ToIntFunction<T> radixResolver;
Expand All @@ -19,9 +21,9 @@ public OnlineRadixSorterForIntegratedKeys(int startingOffset, ToIntFunction<T> r
this.radixResolver = radixResolver;
}

public void add(T item, Runnable sortedItemVisitor) {
super.addFutureForWork(radixResolver.applyAsInt(item), signalFuture->signalFuture.map(
public DiagnosticTrackableCompletableFuture<String, Void> add(T item, Runnable sortedItemVisitor) {
return super.addFutureForWork(radixResolver.applyAsInt(item), signalFuture->signalFuture.map(
f->f.whenComplete((v,t)->sortedItemVisitor.run()),
()->"OnlineRadixSorterForIntegratedKeys.add"));
()->"OnlineRadixSorterForIntegratedKeys.addFutureForWork"));
}
}

0 comments on commit e8305a7

Please sign in to comment.