diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index c6fe93ac3..e7c426034 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -363,6 +363,9 @@ public void add(BulkOperation operation, Context context) { if (!canAddOperation()) { flush(); } + else { + addCondition.signalIfReady(); + } }); } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index a72473eda..d6aee8cc5 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -28,6 +28,7 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.core.bulk.OperationType; import co.elastic.clients.elasticsearch.end_to_end.RequestTest; +import co.elastic.clients.elasticsearch.indices.IndicesStatsResponse; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.json.SimpleJsonpMapper; @@ -156,6 +157,50 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, assertEquals(expectedRequests, transport.requestsStarted.get()); } + @Test + public void multiThreadStressTest() throws InterruptedException, IOException { + + String index = "bulk-ingester-stress-test"; + ElasticsearchClient client = ElasticsearchTestServer.global().client(); + + // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme + // situation where the number of adding threads greatly exceeds the number of concurrent requests + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly. + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .globalSettings(s -> s.index(index)) + .flushInterval(5, TimeUnit.SECONDS) + ); + + RequestTest.AppData appData = new RequestTest.AppData(); + appData.setIntValue(42); + appData.setMsg("Some message"); + + ExecutorService executor = Executors.newFixedThreadPool(50); + + for (int i = 0; i < 100000; i++) { + int ii = i; + Runnable thread = () -> { + int finalI = ii; + ingester.add(_1 -> _1 + .create(_2 -> _2 + .id(String.valueOf(finalI)) + .document(appData) + )); + }; + executor.submit(thread); + } + + executor.awaitTermination(10,TimeUnit.SECONDS); + ingester.close(); + + client.indices().refresh(); + + IndicesStatsResponse indexStats = client.indices().stats(g -> g.index(index)); + + assertTrue(indexStats.indices().get(index).primaries().docs().count()==100000); + } + @Test public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport();