Skip to content

Commit

Permalink
Upgrade Elasticsearch 8.x to version 8.15.0. [DOX-412]
Browse files Browse the repository at this point in the history
Incompatible change in elastic/elasticsearch-java#830.

(Totally meaningless release notes entry: "Fixed bug in BulkIngester")

https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
  • Loading branch information
blackwinter committed Aug 16, 2024
1 parent fbe4280 commit a345a15
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ gradle.rootProject {
'commons-cli' : '1.3.1',
'commons-io' : '2.7',
'elasticsearch2' : '2.2.1',
'elasticsearch8' : '8.14.1',
'elasticsearch8' : '8.15.0',
'htsjdk' : '4.0.1',
'jackson' : '2.13.4.2',
'jdk' : '17',
Expand Down
59 changes: 58 additions & 1 deletion src/main/java/hbz/limetrans/ElasticsearchClientV8.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -63,7 +68,9 @@ public class ElasticsearchClientV8 extends ElasticsearchClient { // checkstyle-d
private final long mBulkSizeValue;

private BulkIngester<Void> mBulkIngester;
private ElasticsearchBulkListener mBulkListener;
private RestClientTransport mTransport;
private ScheduledExecutorService mBulkScheduler;
private co.elastic.clients.elasticsearch.ElasticsearchClient mClient;

public ElasticsearchClientV8(final Settings aSettings) {
Expand All @@ -75,6 +82,8 @@ public ElasticsearchClientV8(final Settings aSettings) {
@Override
public void reset() {
mBulkIngester = null;
mBulkListener = null;
mBulkScheduler = null;

super.reset();
}
Expand Down Expand Up @@ -269,9 +278,19 @@ private void addBulk(final Function<BulkOperation.Builder, ObjectBuilder<BulkOpe

@Override
protected void createBulk(final int aBulkActions, final int aBulkRequests) {
mBulkScheduler = Executors.newScheduledThreadPool(aBulkRequests + 1, r -> {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("bulk-ingester-executor#" + t.getId());
t.setDaemon(true);
return t;
});

mBulkListener = new ElasticsearchBulkListener(this);

mBulkIngester = BulkIngester.of(b -> b
.client(mClient)
.listener(new ElasticsearchBulkListener(this))
.listener(mBulkListener)
.scheduler(mBulkScheduler)
.maxOperations(aBulkActions)
.maxSize(mBulkSizeValue)
.maxConcurrentRequests(aBulkRequests)
Expand All @@ -287,10 +306,23 @@ protected boolean isBulkClosed() {
protected boolean closeBulk() throws InterruptedException {
try {
mBulkIngester.close();

if (!mBulkListener.awaitTermination(1, TimeUnit.MINUTES)) {
getLogger().warn("Some bulk listener tasks still pending");
}

mBulkScheduler.shutdown();

if (!mBulkScheduler.awaitTermination(1, TimeUnit.MINUTES)) {
getLogger().warn("Some bulk scheduler tasks still pending");
}

return mBulkIngester.pendingRequests() == 0;
}
finally {
mBulkIngester = null;
mBulkListener = null;
mBulkScheduler = null;
}
}

Expand Down Expand Up @@ -349,19 +381,24 @@ private interface IOFunction<T, R> {

private static class ElasticsearchBulkListener implements BulkListener<Void> {

private static final int PENDING_TASKS_WAIT = 100;

private final ElasticsearchClient mClient;
private final LongAdder mBulkHandlers = new LongAdder();

private ElasticsearchBulkListener(final ElasticsearchClient aClient) {
mClient = aClient;
}

@Override
public void beforeBulk(final long aId, final BulkRequest aRequest, final List<Void> aContexts) {
mBulkHandlers.increment();
mClient.beforeBulk(aId, aRequest.operations().size(), -1);
}

@Override
public void afterBulk(final long aId, final BulkRequest aRequest, final List<Void> aContexts, final BulkResponse aResponse) {
mBulkHandlers.decrement();
mClient.afterBulk(aId, aResponse.took(), (d, s, f) -> aResponse.items().forEach(r -> {
if (r.operationType() == OperationType.Delete) {
d.run();
Expand All @@ -381,9 +418,29 @@ public void afterBulk(final long aId, final BulkRequest aRequest, final List<Voi

@Override
public void afterBulk(final long aId, final BulkRequest aRequest, final List<Void> aContexts, final Throwable aThrowable) {
mBulkHandlers.decrement();
mClient.afterBulk(aId, aThrowable);
}

public boolean awaitTermination(final long aTimeout, final TimeUnit aUnit) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

new Thread(() -> {
while (mBulkHandlers.sum() != 0) {
try {
Thread.sleep(PENDING_TASKS_WAIT);
}
catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

latch.countDown();
}).start();

return latch.await(aTimeout, aUnit);
}

}

}

0 comments on commit a345a15

Please sign in to comment.