diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index af5401baa..91d194844 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -76,6 +76,7 @@ public class BulkIngester implements AutoCloseable { private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation); private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest); private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed); + private AtomicInteger listenerInProgressCount = new AtomicInteger(); private static class RequestExecution { public final long id; @@ -235,7 +236,7 @@ private boolean canAddOperation() { } private boolean closedAndFlushed() { - return isClosed && operations.isEmpty() && requestsInFlightCount == 0; + return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0; } //----- Ingester logic @@ -314,14 +315,32 @@ public void flush() { if (resp != null) { // Success if (listener != null) { - scheduler.submit(() -> listener.afterBulk(exec.id, exec.request, - exec.contexts, resp)); + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + } + finally { + if(listenerInProgressCount.decrementAndGet() == 0){ + closeCondition.signalIfReady(); + } + } + }); } } else { // Failure if (listener != null) { - scheduler.submit(() -> listener.afterBulk(exec.id, exec.request, - exec.contexts, thr)); + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + } + finally { + if(listenerInProgressCount.decrementAndGet() == 0){ + closeCondition.signalIfReady(); + } + } + }); } } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 4f93429a6..a76f3f75f 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -105,6 +105,12 @@ public void basicTestNoFlush() throws Exception { multiThreadTest(10, 3, 5, 100, true); } + @Test + public void basicTestNoFlushWithInternalScheduler() throws Exception { + // Will have nothing to flush on close. + multiThreadTest(10, 3, 5, 100, false); + } + private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception {