Skip to content

Commit

Permalink
always waiting for listener to be done before closing
Browse files Browse the repository at this point in the history
  • Loading branch information
l-trotta committed Aug 26, 2024
1 parent ac086ff commit 324af0a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class BulkIngester<Context> implements AutoCloseable {
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
private AtomicInteger listenerInProgressCount = new AtomicInteger();

private static class RequestExecution<Context> {
public final long id;
Expand Down Expand Up @@ -235,7 +236,7 @@ private boolean canAddOperation() {
}

private boolean closedAndFlushed() {
return isClosed && operations.isEmpty() && requestsInFlightCount == 0;
return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
}

//----- Ingester logic
Expand Down Expand Up @@ -314,14 +315,32 @@ public void flush() {
if (resp != null) {
// Success
if (listener != null) {
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, resp));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
}
finally {
if(listenerInProgressCount.decrementAndGet() == 0){
closeCondition.signalIfReady();
}
}
});
}
} else {
// Failure
if (listener != null) {
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, thr));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
}
finally {
if(listenerInProgressCount.decrementAndGet() == 0){
closeCondition.signalIfReady();
}
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public void basicTestNoFlush() throws Exception {
multiThreadTest(10, 3, 5, 100, true);
}

@Test
public void basicTestNoFlushWithInternalScheduler() throws Exception {
// Will have nothing to flush on close.
multiThreadTest(10, 3, 5, 100, false);
}

private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations,
boolean externalScheduler) throws Exception {

Expand Down

0 comments on commit 324af0a

Please sign in to comment.