From e8305a7b69e9fc439e4d8d0272bcfd06b4a325bf Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Thu, 25 Apr 2024 12:04:52 -0400 Subject: [PATCH] =?UTF-8?q?Change=20the=20OnlineRadixSorter=20yet=20again,?= =?UTF-8?q?=20this=20time=20to=20use=20a=20sorted=20map=20to=20allow=20for?= =?UTF-8?q?=20chaining.=20In=20addition=20to=20chaining=20sequential=20sta?= =?UTF-8?q?ges=20together=20(which=20now,=20again,=20need=20to=20create=20?= =?UTF-8?q?all=20intermediates=20before=20the=20current=20work=20item),=20?= =?UTF-8?q?the=20class=20now=20emits=20which=20sequential=20stages=20are?= =?UTF-8?q?=20missing=20(outstanding)=20in=20the=20DCF's=20diagnostic=20su?= =?UTF-8?q?pplier.=20Sample=20output=20may=20look=20like=20this=20[5043364?= =?UTF-8?q?83]=20Caller-task=20completion=20for=20idx=3D3[=E2=80=A6]<-[335?= =?UTF-8?q?107734]=20OnlineRadixSorterForIntegratedKeys.addFutureForWork[?= =?UTF-8?q?=E2=80=A6]<-[215078753]=20Kickoff=20for=20slot=20#3[=E2=80=A6] --- .../DiagnosticTrackableCompletableFuture.java | 5 +- .../replay/util/OnlineRadixSorter.java | 99 ++++++++++++------- .../replay/RequestSenderOrchestratorTest.java | 7 +- .../replay/util/OnlineRadixSorterTest.java | 35 ++++--- .../OnlineRadixSorterForIntegratedKeys.java | 10 +- 5 files changed, 99 insertions(+), 57 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java index b577b0589..a5bf73127 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/DiagnosticTrackableCompletableFuture.java @@ -163,9 +163,8 @@ public DiagnosticTrackableCompletableFuture getInnerComposedPendingCompleta } public DiagnosticTrackableCompletableFuture - handle(@NonNull BiFunction fn, - @NonNull Supplier diagnosticSupplier) { - CompletableFuture newCf = this.future.handle(fn::apply); + handle(@NonNull BiFunction fn, @NonNull Supplier diagnosticSupplier) { + CompletableFuture newCf = this.future.handle(fn); return new DiagnosticTrackableCompletableFuture<>(newCf, diagnosticSupplier, this); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java index 61e954555..c97f53495 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java @@ -2,11 +2,20 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.lucene.document.IntRange; import org.opensearch.migrations.replay.datatypes.FutureTransformer; +import java.util.ArrayDeque; import java.util.Comparator; +import java.util.Deque; import java.util.Optional; -import java.util.PriorityQueue; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * This provides a simple implementation to sort incoming elements that are ordered by a sequence @@ -28,16 +37,26 @@ public class OnlineRadixSorter { @AllArgsConstructor private static class IndexedWork { - public final int index; - public final DiagnosticTrackableCompletableFuture signalingFuture; - public final DiagnosticTrackableCompletableFuture workCompletedFuture; + public final DiagnosticTrackableCompletableFuture signalingToStartFuture; + public DiagnosticTrackableCompletableFuture workCompletedFuture; + public final DiagnosticTrackableCompletableFuture signalWorkCompletedFuture; + + public DiagnosticTrackableCompletableFuture + addWorkFuture(FutureTransformer processor, int index) { + var rval = processor.apply(signalingToStartFuture) + .whenComplete((v,t)-> + signalWorkCompletedFuture.future.complete(null), + ()->"Caller-task completion for idx=" + index); + workCompletedFuture = rval; + return rval; + } } - private final PriorityQueue items; + private final SortedMap items; int currentOffset; public OnlineRadixSorter(int startingOffset) { - items = new PriorityQueue<>(Comparator.comparingInt(iw->iw.index)); + items = new TreeMap<>(); currentOffset = startingOffset; } @@ -54,42 +73,48 @@ public OnlineRadixSorter(int startingOffset) { * @return */ public DiagnosticTrackableCompletableFuture - addFutureForWork(int index, FutureTransformer processor) { - var signalFuture = new StringTrackableCompletableFuture("OnlineRadixSorter signal future #" + index); - var continueFuture = processor.apply(signalFuture); - - // purposefully use getDeferredFutureThroughHandle to do type erasure on T to get it back to Void - // since the caller is creating a DCF for their needs. However, type T will only come up again - // as per the work that was set within the processor. There's no benefit to making the underlying - // datastore aware of that T, hence the erasure. - var workBundle = new IndexedWork(index, signalFuture, - continueFuture.thenApply(v->{ - log.atDebug().setMessage(()->"Increasing currentOffset to " + currentOffset + - " for " + System.identityHashCode(this)).log(); - items.remove(); + addFutureForWork(final int index, FutureTransformer processor) { + var oldWorkItem = items.get(index); + if (oldWorkItem == null) { + if (index < currentOffset) { + throw new IllegalArgumentException("index (" + index + ")" + + " must be > last processed item (" + currentOffset + ")"); + } + for (int nextKey = Math.max(currentOffset, items.isEmpty() ? 0 : items.lastKey()+1); + nextKey<=index; + ++nextKey) { + int finalNextKey = nextKey; + var signalFuture = items.isEmpty() ? + new StringTrackableCompletableFuture( + CompletableFuture.completedFuture(null), "unlinked signaling future") : + items.get(items.lastKey()).signalWorkCompletedFuture + .thenAccept(v-> {}, + ()->"Kickoff for slot #" + finalNextKey); + oldWorkItem = new IndexedWork(signalFuture, null, + new StringTrackableCompletableFuture(()->"Work to finish for slot #" + finalNextKey + + " is awaiting [" + getAwaitingTextUpTo(index) + "]")); + oldWorkItem.signalWorkCompletedFuture.whenComplete((v,t)->{ ++currentOffset; - pullNextWorkItemOrDoNothing(); - return null; - }, () -> "Bumping currentOffset and checking if the next items should be signaled")); - items.add(workBundle); - if (index == this.currentOffset) { - pullNextWorkItemOrDoNothing(); + items.remove(finalNextKey); + }, ()->"cleaning up spent work for idx #" + finalNextKey); + items.put(nextKey, oldWorkItem); + } } - return continueFuture; + return oldWorkItem.addWorkFuture(processor, index); } - private void pullNextWorkItemOrDoNothing() { - Optional.ofNullable(items.isEmpty() ? null : items.peek()) - .filter(indexedWork -> indexedWork.index == currentOffset) - .ifPresent(indexedWork -> { - var firstSignal = indexedWork.signalingFuture.future.complete(null); - assert firstSignal : "expected only this function to signal completion of the signaling future " + - "and for it to only be called once"; - }); + public String getAwaitingTextUpTo(int upTo) { + return "slotsOutstanding:" + + IntStream.range(0, upTo-currentOffset) + .map(i->upTo-i-1) + .mapToObj(i -> Optional.ofNullable(items.get(i)) + .flatMap(wi->Optional.ofNullable(wi.workCompletedFuture)) + .map(ignored->"") + .orElse(i+"")) + .filter(s->!s.isEmpty()) + .collect(Collectors.joining(",")); } - public boolean hasPending() { return !items.isEmpty(); } - @Override public String toString() { final StringBuilder sb = new StringBuilder("OnlineRadixSorter{"); @@ -100,6 +125,8 @@ public String toString() { return sb.toString(); } + public boolean hasPending() { return !items.isEmpty(); } + public long numPending() { return items.size(); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 9820e99e5..9f0c6886d 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Optional; @@ -118,6 +119,8 @@ public void testFutureGraphBuildout() throws Exception { lastEndTime.plus(Duration.ofMillis(100))); Assertions.assertEquals(NUM_REQUESTS_TO_SCHEDULE, scheduledRequests.size()); + var reversedScheduledRequests = new ArrayList<>(scheduledRequests); + Collections.reverse(reversedScheduledRequests); for (int i = 0; i < scheduledRequests.size(); ++i) { for (int j = 0; j "cf @ " + finalI + "," + finalJ + " =\n" + - scheduledRequests.stream().map(sr-> getParentsDiagnosticString(sr, "")) - .collect(Collectors.joining("\n"))) + reversedScheduledRequests.stream().map(sr-> getParentsDiagnosticString(sr, "")) + .collect(Collectors.joining("\n---\n"))) .log(); pktConsumer.consumeIsReady.release(); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/OnlineRadixSorterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/OnlineRadixSorterTest.java index b8ce37c6e..1058b1d7f 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/OnlineRadixSorterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/OnlineRadixSorterTest.java @@ -1,44 +1,55 @@ package org.opensearch.migrations.replay.util; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +@Slf4j class OnlineRadixSorterTest { private static String stringify(Stream stream) { - return stream.map(i->i.toString()).collect(Collectors.joining(",")); + return stream.map(Object::toString).collect(Collectors.joining(",")); } - private static String add(OnlineRadixSorterForIntegratedKeys sorter, ArrayList receivedItems, int v) { - sorter.add(v, ()-> receivedItems.add(v)); + private static String add(OnlineRadixSorterForIntegratedKeys sorter, + Map> m, + ArrayList receivedItems, int v) { + var dcf = sorter.add(v, () -> receivedItems.add(v)); + if (m != null) { m.put(v, dcf); } + log.atInfo().setMessage(()->"after adding work... "+dcf).log(); return stringify(receivedItems.stream()); } @Test void testOnlineRadixSorter_inOrder() { var radixSorter = new OnlineRadixSorterForIntegratedKeys(1, i -> (int) i); - Assertions.assertEquals("1", add(radixSorter, new ArrayList(), 1)); - Assertions.assertEquals("2", add(radixSorter, new ArrayList(), 2)); - Assertions.assertEquals("3", add(radixSorter, new ArrayList(), 3)); + Assertions.assertEquals("1", add(radixSorter, null, new ArrayList(), 1)); + Assertions.assertEquals("2", add(radixSorter, null, new ArrayList(), 2)); + Assertions.assertEquals("3", add(radixSorter, null, new ArrayList(), 3)); } @Test void testOnlineRadixSorter_outOfOrder() { var radixSorter = new OnlineRadixSorterForIntegratedKeys(1, i->(int) i); var receiverList = new ArrayList(); - Assertions.assertEquals("", add(radixSorter, receiverList, 3)); - Assertions.assertEquals("", add(radixSorter, receiverList, 4)); - Assertions.assertEquals("1", add(radixSorter, receiverList, 1)); + var dcfMap = new HashMap>(); + Assertions.assertEquals("", add(radixSorter, dcfMap, receiverList, 3)); + Assertions.assertEquals("", add(radixSorter, dcfMap, receiverList, 4)); + Assertions.assertEquals("1", add(radixSorter, dcfMap, receiverList, 1)); + log.atInfo().setMessage(()->"after adding work for '1'... dcf[3]=" + dcfMap.get(3)).log(); + log.atInfo().setMessage(()->"after adding work for '1'... dcf[4]=" + dcfMap.get(4)).log(); receiverList.clear(); - Assertions.assertEquals("2,3,4", add(radixSorter, receiverList, 2)); + Assertions.assertEquals("2,3,4", add(radixSorter, dcfMap, receiverList, 2)); receiverList.clear(); - Assertions.assertEquals("5", add(radixSorter, receiverList, 5)); + Assertions.assertEquals("5", add(radixSorter, dcfMap, receiverList, 5)); receiverList.clear(); - Assertions.assertEquals("", add(radixSorter, receiverList, 7)); + Assertions.assertEquals("", add(radixSorter, dcfMap, receiverList, 7)); receiverList.clear(); } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/util/OnlineRadixSorterForIntegratedKeys.java b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/util/OnlineRadixSorterForIntegratedKeys.java index 2acf33308..4c04388fb 100644 --- a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/util/OnlineRadixSorterForIntegratedKeys.java +++ b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/util/OnlineRadixSorterForIntegratedKeys.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.replay.util; -import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; + import java.util.function.ToIntFunction; /** @@ -10,6 +11,7 @@ * * @param */ +@Slf4j public class OnlineRadixSorterForIntegratedKeys extends OnlineRadixSorter { ToIntFunction radixResolver; @@ -19,9 +21,9 @@ public OnlineRadixSorterForIntegratedKeys(int startingOffset, ToIntFunction r this.radixResolver = radixResolver; } - public void add(T item, Runnable sortedItemVisitor) { - super.addFutureForWork(radixResolver.applyAsInt(item), signalFuture->signalFuture.map( + public DiagnosticTrackableCompletableFuture add(T item, Runnable sortedItemVisitor) { + return super.addFutureForWork(radixResolver.applyAsInt(item), signalFuture->signalFuture.map( f->f.whenComplete((v,t)->sortedItemVisitor.run()), - ()->"OnlineRadixSorterForIntegratedKeys.add")); + ()->"OnlineRadixSorterForIntegratedKeys.addFutureForWork")); } }