Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bulk ingester might skip listener requests #867

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class BulkIngester<Context> implements AutoCloseable {
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
private AtomicInteger listenerInProgressCount = new AtomicInteger();

private static class RequestExecution<Context> {
public final long id;
Expand Down Expand Up @@ -235,7 +236,7 @@ private boolean canAddOperation() {
}

private boolean closedAndFlushed() {
return isClosed && operations.isEmpty() && requestsInFlightCount == 0;
return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
}

//----- Ingester logic
Expand Down Expand Up @@ -311,25 +312,42 @@ public void flush() {
if (exec != null) {
// A request was actually sent
exec.futureResponse.handle((resp, thr) -> {

sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});

if (resp != null) {
// Success
if (listener != null) {
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, resp));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
}
finally {
if(listenerInProgressCount.decrementAndGet() == 0){
closeCondition.signalIfReady();
}
}
});
}
} else {
// Failure
if (listener != null) {
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
exec.contexts, thr));
listenerInProgressCount.incrementAndGet();
scheduler.submit(() -> {
try {
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
}
finally {
if(listenerInProgressCount.decrementAndGet() == 0){
closeCondition.signalIfReady();
}
}
});
}
}

sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,44 @@ private void printStats(TestTransport transport) {
@Test
public void basicTestFlush() throws Exception {
// Prime numbers, so that we have leftovers to flush before shutting down
multiThreadTest(7, 3, 5, 101);
multiThreadTest(7, 3, 5, 101, true);
}

@Test
public void basicTestFlushWithInternalScheduler() throws Exception {
// Prime numbers, so that we have leftovers to flush before shutting down
multiThreadTest(7, 3, 5, 101, false);
}

@Test
public void basicTestNoFlush() throws Exception {
// Will have nothing to flush on close.
multiThreadTest(10, 3, 5, 100);
multiThreadTest(10, 3, 5, 100, true);
}

private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
@Test
public void basicTestNoFlushWithInternalScheduler() throws Exception {
// Will have nothing to flush on close.
multiThreadTest(10, 3, 5, 100, false);
}

private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations,
boolean externalScheduler) throws Exception {

CountingListener listener = new CountingListener();
TestTransport transport = new TestTransport();
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
ScheduledExecutorService scheduler;
if (externalScheduler) {
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("my-bulk-ingester-executor#" );
t.setName("my-bulk-ingester-executor#");
t.setDaemon(true);
return t;
});
});
} else {
scheduler = null;
}

BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(client)
Expand Down Expand Up @@ -139,7 +157,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,

ingester.close();
transport.close();
scheduler.shutdownNow();
if (scheduler != null) scheduler.shutdownNow();

printStats(ingester);
printStats(listener);
Expand Down
Loading