Skip to content

Commit

Permalink
rollup
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jan 2, 2024
1 parent 34d7b08 commit fa1210e
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,36 +178,41 @@ private void testSourceHasChanged(
TimeValue delay,
Tuple<Long, Long> expectedRangeQueryBounds
) throws InterruptedException {
doAnswer(withResponse(newSearchResponse(totalHits))).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
String transformId = getTestName();
TransformConfig transformConfig = newTransformConfigWithDateHistogram(
transformId,
transformVersion,
dateHistogramField,
dateHistogramInterval,
delay
);
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
var searchResponse = newSearchResponse(totalHits);
try {
doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
String transformId = getTestName();
TransformConfig transformConfig = newTransformConfigWithDateHistogram(
transformId,
transformVersion,
dateHistogramField,
dateHistogramInterval,
delay
);
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);

SetOnce<Boolean> hasChangedHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.sourceHasChanged(
lastCheckpoint,
new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
SetOnce<Boolean> hasChangedHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.sourceHasChanged(
lastCheckpoint,
new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));

ArgumentCaptor<SearchRequest> searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class);
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any());
SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query();
RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1);
assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1())));
assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2())));
ArgumentCaptor<SearchRequest> searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class);
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any());
SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query();
RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1);
assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1())));
assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2())));

assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue)));
assertThat(exceptionHolder.get(), is(nullValue()));
assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue)));
assertThat(exceptionHolder.get(), is(nullValue()));
} finally {
searchResponse.decRef();
}
}

public void testCreateNextCheckpoint_NoDelay() throws InterruptedException {
Expand Down Expand Up @@ -338,7 +343,7 @@ public SingleGroupSource get() {

private static SearchResponse newSearchResponse(long totalHits) {
return new SearchResponse(
new SearchHits(SearchHits.EMPTY, new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 0),
SearchHits.unpooled(SearchHits.EMPTY, new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 0),
null,
null,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void testEquals() {
}

public void testFromSearchHit() {
SearchHit searchHit = new SearchHit(1);
SearchHit searchHit = SearchHit.unpooled(1);
long seqNo = randomLongBetween(-2, 10_000);
long primaryTerm = randomLongBetween(-2, 10_000);
String index = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,26 +541,32 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
&& "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) {
listener.onFailure(new SearchContextMissingException(new ShardSearchContextId("sc_missing", 42)));
} else {
SearchResponse response = new SearchResponse(
new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
// Simulate completely null aggs
null,
new Suggest(Collections.emptyList()),
false,
false,
new SearchProfileResults(Collections.emptyMap()),
1,
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
// copy the pit from the request
searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
ActionListener.respondAndRelease(
listener,
(Response) new SearchResponse(
SearchHits.unpooled(
new SearchHit[] { SearchHit.unpooled(1) },
new TotalHits(1L, TotalHits.Relation.EQUAL_TO),
1.0f
),
// Simulate completely null aggs
null,
new Suggest(Collections.emptyList()),
false,
false,
new SearchProfileResults(Collections.emptyMap()),
1,
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
// copy the pit from the request
searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
)
);
listener.onResponse((Response) response);

}
return;
Expand Down
Loading

0 comments on commit fa1210e

Please sign in to comment.