diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index 0a1179e4224aa..c6cc0b2e83bd8 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -178,36 +178,41 @@ private void testSourceHasChanged( TimeValue delay, Tuple expectedRangeQueryBounds ) throws InterruptedException { - doAnswer(withResponse(newSearchResponse(totalHits))).when(client).execute(eq(TransportSearchAction.TYPE), any(), any()); - String transformId = getTestName(); - TransformConfig transformConfig = newTransformConfigWithDateHistogram( - transformId, - transformVersion, - dateHistogramField, - dateHistogramInterval, - delay - ); - TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig); + var searchResponse = newSearchResponse(totalHits); + try { + doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any()); + String transformId = getTestName(); + TransformConfig transformConfig = newTransformConfigWithDateHistogram( + transformId, + transformVersion, + dateHistogramField, + dateHistogramInterval, + delay + ); + TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig); - SetOnce hasChangedHolder = new SetOnce<>(); - SetOnce exceptionHolder = new SetOnce<>(); - CountDownLatch latch = new CountDownLatch(1); - provider.sourceHasChanged( - lastCheckpoint, - new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch) - ); - assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + SetOnce hasChangedHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.sourceHasChanged( + lastCheckpoint, + new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); - ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); - verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any()); - SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); - BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query(); - RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1); - assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1()))); - assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2()))); + ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any()); + SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query(); + RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1); + assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1()))); + assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2()))); - assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue))); - assertThat(exceptionHolder.get(), is(nullValue())); + assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue))); + assertThat(exceptionHolder.get(), is(nullValue())); + } finally { + searchResponse.decRef(); + } } public void testCreateNextCheckpoint_NoDelay() throws InterruptedException { @@ -338,7 +343,7 @@ public SingleGroupSource get() { private static SearchResponse newSearchResponse(long totalHits) { return new SearchResponse( - new SearchHits(SearchHits.EMPTY, new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 0), + SearchHits.unpooled(SearchHits.EMPTY, new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 0), null, null, false, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/SeqNoPrimaryTermAndIndexTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/SeqNoPrimaryTermAndIndexTests.java index d76b6b67368f9..69139bc3f7561 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/SeqNoPrimaryTermAndIndexTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/SeqNoPrimaryTermAndIndexTests.java @@ -29,7 +29,7 @@ public void testEquals() { } public void testFromSearchHit() { - SearchHit searchHit = new SearchHit(1); + SearchHit searchHit = SearchHit.unpooled(1); long seqNo = randomLongBetween(-2, 10_000); long primaryTerm = randomLongBetween(-2, 10_000); String index = randomAlphaOfLength(10); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index b1c9edc0fab0a..fa8e867d77a49 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -541,26 +541,32 @@ protected void && "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) { listener.onFailure(new SearchContextMissingException(new ShardSearchContextId("sc_missing", 42))); } else { - SearchResponse response = new SearchResponse( - new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), - // Simulate completely null aggs - null, - new Suggest(Collections.emptyList()), - false, - false, - new SearchProfileResults(Collections.emptyMap()), - 1, - null, - 1, - 1, - 0, - 0, - ShardSearchFailure.EMPTY_ARRAY, - SearchResponse.Clusters.EMPTY, - // copy the pit from the request - searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ActionListener.respondAndRelease( + listener, + (Response) new SearchResponse( + SearchHits.unpooled( + new SearchHit[] { SearchHit.unpooled(1) }, + new TotalHits(1L, TotalHits.Relation.EQUAL_TO), + 1.0f + ), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + false, + false, + new SearchProfileResults(Collections.emptyMap()), + 1, + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + // copy the pit from the request + searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ) ); - listener.onResponse((Response) response); } return; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 5c6539d0a5045..6d2ca3c03c6c6 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -221,9 +221,10 @@ protected void onAbort() { @Override void doGetInitialProgress(SearchRequest request, ActionListener responseListener) { - responseListener.onResponse( + ActionListener.respondAndRelease( + responseListener, new SearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f), + SearchHits.EMPTY_WITH_TOTAL_HITS, // Simulate completely null aggs null, new Suggest(Collections.emptyList()), @@ -372,7 +373,7 @@ public void testDoProcessAggNullCheck() { null ); SearchResponse searchResponse = new SearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f), + SearchHits.EMPTY_WITH_TOTAL_HITS, // Simulate completely null aggs null, new Suggest(Collections.emptyList()), @@ -388,29 +389,33 @@ public void testDoProcessAggNullCheck() { ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = searchRequest -> searchResponse; - Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - - TransformAuditor auditor = mock(TransformAuditor.class); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); - - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - null, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); + try { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> searchResponse; + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + TransformAuditor auditor = mock(TransformAuditor.class); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); - IterationResult newPosition = indexer.doProcess(searchResponse); - assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty())); - assertThat(newPosition.getPosition(), is(nullValue())); - assertThat(newPosition.isDone(), is(true)); + IterationResult newPosition = indexer.doProcess(searchResponse); + assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty())); + assertThat(newPosition.getPosition(), is(nullValue())); + assertThat(newPosition.isDone(), is(true)); + } finally { + searchResponse.decRef(); + } } public void testScriptError() throws Exception { @@ -508,7 +513,7 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti ); final SearchResponse searchResponse = new SearchResponse( - new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + SearchHits.unpooled(new SearchHit[] { SearchHit.unpooled(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), // Simulate completely null aggs null, new Suggest(Collections.emptyList()), @@ -524,58 +529,62 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = searchRequest -> searchResponse; - - Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - - Function deleteByQueryFunction = deleteByQueryRequest -> { - throw new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure( - new ElasticsearchParseException("failed to parse date field", new IllegalArgumentException("illegal format")) - ) } + try { + + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> searchResponse; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + Function deleteByQueryFunction = deleteByQueryRequest -> { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { + new ShardSearchFailure( + new ElasticsearchParseException("failed to parse date field", new IllegalArgumentException("illegal format")) + ) } + ); + }; + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + deleteByQueryFunction, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context ); - }; - final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); - final AtomicReference failureMessage = new AtomicReference<>(); + final CountDownLatch latch = indexer.newLatch(1); - MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); - TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - deleteByQueryFunction, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); - - final CountDownLatch latch = indexer.newLatch(1); - - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - - latch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertTrue(failIndexerCalled.get()); - assertThat( - failureMessage.get(), - matchesRegex( - "task encountered irrecoverable failure: org.elasticsearch.ElasticsearchParseException: failed to parse date field;.*" - ) - ); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertTrue(failIndexerCalled.get()); + assertThat( + failureMessage.get(), + matchesRegex( + "task encountered irrecoverable failure: org.elasticsearch.ElasticsearchParseException: failed to parse date field;.*" + ) + ); + } finally { + searchResponse.decRef(); + } } public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exception { @@ -598,7 +607,7 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce ); final SearchResponse searchResponse = new SearchResponse( - new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + SearchHits.unpooled(new SearchHit[] { SearchHit.unpooled(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), // Simulate completely null aggs null, new Suggest(Collections.emptyList()), @@ -614,61 +623,65 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = searchRequest -> searchResponse; - - Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - - Function deleteByQueryFunction = deleteByQueryRequest -> { - throw new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } + try { + + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> searchResponse; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + Function deleteByQueryFunction = deleteByQueryRequest -> { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } + ); + }; + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + auditor.addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "timed out during dbq", + Level.WARNING, + transformId, + "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];" + + " Will automatically retry [1/10]" + ) + ); + TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + deleteByQueryFunction, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context ); - }; - - final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); - final AtomicReference failureMessage = new AtomicReference<>(); - - MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); - auditor.addExpectation( - new MockTransformAuditor.SeenAuditExpectation( - "timed out during dbq", - Level.WARNING, - transformId, - "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];" - + " Will automatically retry [1/10]" - ) - ); - TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); - - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - deleteByQueryFunction, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); - final CountDownLatch latch = indexer.newLatch(1); + final CountDownLatch latch = indexer.newLatch(1); - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - latch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertFalse(failIndexerCalled.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - auditor.assertAllExpectationsMatched(); - assertEquals(1, context.getFailureCount()); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + auditor.assertAllExpectationsMatched(); + assertEquals(1, context.getFailureCount()); + } finally { + searchResponse.decRef(); + } } public void testFailureCounterIsResetOnSuccess() throws Exception { @@ -691,7 +704,7 @@ public void testFailureCounterIsResetOnSuccess() throws Exception { ); final SearchResponse searchResponse = new SearchResponse( - new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + SearchHits.unpooled(new SearchHit[] { SearchHit.unpooled(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), // Simulate completely null aggs null, new Suggest(Collections.emptyList()), @@ -707,72 +720,76 @@ public void testFailureCounterIsResetOnSuccess() throws Exception { ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = new Function<>() { - final AtomicInteger calls = new AtomicInteger(0); - - @Override - public SearchResponse apply(SearchRequest searchRequest) { - int call = calls.getAndIncrement(); - if (call == 0) { - throw new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) } - ); + try { + + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = new Function<>() { + final AtomicInteger calls = new AtomicInteger(0); + + @Override + public SearchResponse apply(SearchRequest searchRequest) { + int call = calls.getAndIncrement(); + if (call == 0) { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) } + ); + } + return searchResponse; } - return searchResponse; - } - }; + }; - Function bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1); + Function bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1); - final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); - final AtomicReference failureMessage = new AtomicReference<>(); + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); - MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); - TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - null, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); - final CountDownLatch latch = indexer.newLatch(1); + final CountDownLatch latch = indexer.newLatch(1); - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - latch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertFalse(failIndexerCalled.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertEquals(1, context.getFailureCount()); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertEquals(1, context.getFailureCount()); - final CountDownLatch secondLatch = indexer.newLatch(1); + final CountDownLatch secondLatch = indexer.newLatch(1); - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()))); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()))); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - secondLatch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertFalse(failIndexerCalled.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - auditor.assertAllExpectationsMatched(); - assertEquals(0, context.getFailureCount()); + secondLatch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + auditor.assertAllExpectationsMatched(); + assertEquals(0, context.getFailureCount()); + } finally { + searchResponse.decRef(); + } } // tests throttling of audits on logs based on repeated exception types diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java index 708cb3d93cbed..512fd7a2383a1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; @@ -112,7 +113,7 @@ public void testTermsFieldCollector() throws IOException { Aggregations aggs = new Aggregations(Collections.singletonList(composite)); SearchResponse response = new SearchResponse( - null, + SearchHits.EMPTY_WITH_TOTAL_HITS, aggs, null, false, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/DateHistogramFieldCollectorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/DateHistogramFieldCollectorTests.java index dab6d8518d28f..fd4e60e485200 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/DateHistogramFieldCollectorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/DateHistogramFieldCollectorTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; @@ -171,7 +172,7 @@ private static QueryBuilder buildFilterQuery(ChangeCollector collector) { private static SearchResponse buildSearchResponse(SingleValue minTimestamp, SingleValue maxTimestamp) { return new SearchResponse( - null, + SearchHits.EMPTY_WITH_TOTAL_HITS, new Aggregations(Arrays.asList(minTimestamp, maxTimestamp)), null, false, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java index 66e7efe764732..be0bb177267bc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java @@ -7,11 +7,11 @@ package org.elasticsearch.xpack.transform.transforms.pivot; -import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -20,9 +20,9 @@ import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.Aggregations; @@ -41,6 +41,8 @@ import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfigTests; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfigTests; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; @@ -241,7 +243,30 @@ public void testProcessSearchResponse() { SettingsConfigTests.randomSettingsConfig(), TransformConfigVersion.CURRENT, Collections.emptySet() - ); + ) { + @Override + public Tuple, Map> processSearchResponse( + SearchResponse searchResponse, + String destinationIndex, + String destinationPipeline, + Map fieldTypeMap, + TransformIndexerStats stats, + TransformProgress progress + ) { + try { + return super.processSearchResponse( + searchResponse, + destinationIndex, + destinationPipeline, + fieldTypeMap, + stats, + progress + ); + } finally { + searchResponse.decRef(); + } + } + }; Aggregations aggs = null; assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue())); @@ -326,7 +351,22 @@ public void testPreviewForCompositeAggregation() throws Exception { } private static SearchResponse searchResponseFromAggs(Aggregations aggs) { - return new SearchResponse(null, aggs, null, false, null, null, 1, null, 10, 5, 0, 0, new ShardSearchFailure[0], null); + return new SearchResponse( + SearchHits.EMPTY_WITH_TOTAL_HITS, + aggs, + null, + false, + null, + null, + 1, + null, + 10, + 5, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + null + ); } private class MyMockClient extends NoOpClient { @@ -355,25 +395,25 @@ protected void searchFailures.add(new ShardSearchFailure(new RuntimeException("shard failed"))); } } - - final SearchResponse response = new SearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0), - null, - null, - false, - null, - null, - 1, - null, - 10, - searchFailures.size() > 0 ? 0 : 5, - 0, - 0, - searchFailures.toArray(new ShardSearchFailure[searchFailures.size()]), - null + ActionListener.respondAndRelease( + listener, + (Response) new SearchResponse( + SearchHits.EMPTY_WITH_TOTAL_HITS, + null, + null, + false, + null, + null, + 1, + null, + 10, + searchFailures.size() > 0 ? 0 : 5, + 0, + 0, + searchFailures.toArray(new ShardSearchFailure[searchFailures.size()]), + null + ) ); - - listener.onResponse((Response) response); return; }