Skip to content

Commit

Permalink
Fix unmuted tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cbuescher committed Oct 4, 2024
1 parent 1bf60bd commit a64f0cc
Showing 1 changed file with 126 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,54 +339,59 @@ private void testMixedVersionsShardsSearch(VersionInformation oldVersion, Versio
SearchTransportService searchTransportService = new SearchTransportService(null, null, null);
SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
searchRequest,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
controller,
task::isCancelled,
task.getProgressListener(),
shardsIter.size(),
exc -> {}
);
final List<Object> responses = new ArrayList<>();
SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(
logger,
null,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", AliasFilter.EMPTY),
Collections.emptyMap(),
EsExecutors.DIRECT_EXECUTOR_SERVICE,
resultConsumer,
searchRequest,
new ActionListener<>() {
@Override
public void onFailure(Exception e) {
responses.add(e);
}
try (
QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
searchRequest,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
controller,
task::isCancelled,
task.getProgressListener(),
shardsIter.size(),
exc -> {}
)
) {
final List<Object> responses = new ArrayList<>();
SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(
logger,
null,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", AliasFilter.EMPTY),
Collections.emptyMap(),
EsExecutors.DIRECT_EXECUTOR_SERVICE,
resultConsumer,
searchRequest,
new ActionListener<>() {
@Override
public void onFailure(Exception e) {
responses.add(e);
}

public void onResponse(SearchResponse response) {
responses.add(response);
};
},
shardsIter,
timeProvider,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY,
null
);
public void onResponse(SearchResponse response) {
responses.add(response);
}

newSearchAsyncAction.start();
assertThat(responses, hasSize(1));
assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0);
assertThat(e.getCause(), instanceOf(VersionMismatchException.class));
assertThat(
e.getCause().getMessage(),
equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")
);
;
},
shardsIter,
timeProvider,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY,
null
);

newSearchAsyncAction.start();
assertThat(responses, hasSize(1));
assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0);
assertThat(e.getCause(), instanceOf(VersionMismatchException.class));
assertThat(
e.getCause().getMessage(),
equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")
);
}
}

public void testMinimumVersionSameAsOldVersion() throws Exception {
Expand Down Expand Up @@ -639,77 +644,84 @@ public void sendExecuteQuery(
};
SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
searchRequest,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
controller,
task::isCancelled,
task.getProgressListener(),
shardsIter.size(),
exc -> {}
);

CountDownLatch latch = new CountDownLatch(1);
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
logger,
null,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", AliasFilter.EMPTY),
Collections.emptyMap(),
EsExecutors.DIRECT_EXECUTOR_SERVICE,
resultConsumer,
searchRequest,
null,
shardsIter,
timeProvider,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY,
null
try (
QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
searchRequest,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
controller,
task::isCancelled,
task.getProgressListener(),
shardsIter.size(),
exc -> {}
)
) {
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() {
latch.countDown();
}
};
}
};
ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(
new ShardId(new Index("idx", "_na_"), 2),
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"),
ShardRouting.Role.DEFAULT
);
SearchShardIterator shardIt = new SearchShardIterator(
null,
new ShardId(new Index("idx", "_na_"), 2),
singletonList(routingOldVersionShard),
idx
);
routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0);
routingOldVersionShard.started();
action.start();
latch.await();
assertThat(successfulOps.get(), equalTo(2));
SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
assertThat(phase.totalHits().value, equalTo(2L));
assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
logger,
null,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", AliasFilter.EMPTY),
Collections.emptyMap(),
EsExecutors.DIRECT_EXECUTOR_SERVICE,
resultConsumer,
searchRequest,
null,
shardsIter,
timeProvider,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY,
null
) {
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() {
latch.countDown();
}
};
}
};
ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(
new ShardId(new Index("idx", "_na_"), 2),
true,
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"),
ShardRouting.Role.DEFAULT
);
SearchShardIterator shardIt = new SearchShardIterator(
null,
new ShardId(new Index("idx", "_na_"), 2),
singletonList(routingOldVersionShard),
idx
);
routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0);
routingOldVersionShard.started();
action.start();
latch.await();
assertThat(successfulOps.get(), equalTo(2));
SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
assertThat(phase.totalHits().value, equalTo(2L));
assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));

SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null);
SearchActionListener<SearchPhaseResult> listener = new SearchActionListener<SearchPhaseResult>(searchShardTarget, 0) {
@Override
public void onFailure(Exception e) {}
SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null);
SearchActionListener<SearchPhaseResult> listener = new SearchActionListener<SearchPhaseResult>(searchShardTarget, 0) {
@Override
public void onFailure(Exception e) {}

@Override
protected void innerOnResponse(SearchPhaseResult response) {}
};
Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener));
assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
@Override
protected void innerOnResponse(SearchPhaseResult response) {}
};
Exception e = expectThrows(
VersionMismatchException.class,
() -> action.executePhaseOnShard(shardIt, searchShardTarget, listener)
);
assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
}
}
}

0 comments on commit a64f0cc

Please sign in to comment.