Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TransportSearchAction to allow run can_match exclusively #95064

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
Expand All @@ -44,7 +42,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -68,16 +65,14 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
private final Logger logger;
private final SearchRequest request;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
private final ActionListener<SearchResponse> listener;
private final SearchResponse.Clusters clusters;
private final ActionListener<GroupShardsIterator<SearchShardIterator>> listener;
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTransportService searchTransportService;
private final Map<SearchShardIterator, Integer> shardItIndexMap;
private final Map<String, Float> concreteIndexBoosts;
private final Map<String, AliasFilter> aliasFilter;
private final SearchTask task;
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final Executor executor;

private final CanMatchSearchPhaseResults results;
Expand All @@ -91,13 +86,11 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
Map<String, Float> concreteIndexBoosts,
Executor executor,
SearchRequest request,
ActionListener<SearchResponse> listener,
GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
ActionListener<GroupShardsIterator<SearchShardIterator>> listener
) {
super("can_match");
this.logger = logger;
Expand All @@ -106,12 +99,10 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase {
this.request = request;
this.listener = listener;
this.shardsIts = shardsIts;
this.clusters = clusters;
this.timeProvider = timeProvider;
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.task = task;
this.phaseFactory = phaseFactory;
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
this.executor = executor;
this.shardItIndexMap = new HashMap<>();
Expand Down Expand Up @@ -145,8 +136,11 @@ public void run() throws IOException {
);
}
}

runCoordinatorRewritePhase();
if (getNumShards() == 0) {
finishPhase();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you added this if because you removed the same check that happens before at the beginning of the start method. As far as I can see, runCoordinatorRewritePhase loops over the iterators which does nothing if they are empty, and then calls finishPhase. That makes me wonder if we need this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I removed it in e412458.

} else {
runCoordinatorRewritePhase();
}
}

// tries to pre-filter shards based on information that's available to the coordinator
Expand Down Expand Up @@ -372,14 +366,7 @@ private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<
}

private void finishPhase() {
try {
phaseFactory.apply(getIterator(results, shardsIts)).start();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [%s] phase", request, getName()), e);
}
onPhaseFailure("finish", e);
}
listener.onResponse(getIterator(results, shardsIts));
}

private static final float DEFAULT_INDEX_BOOST = 1.0f;
Expand Down Expand Up @@ -418,30 +405,6 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha

@Override
public void start() {
if (getNumShards() == 0) {
// no search shards to search on, bail with empty response
// (it happens with search across _all with no indices around and consistent with broadcast operations)
int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO
: request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO
: request.source().trackTotalHitsUpTo();
// total hits is null in the response if the tracking of total hits is disabled
boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;
listener.onResponse(
new SearchResponse(
withTotalHits ? InternalSearchResponse.EMPTY_WITH_TOTAL_HITS : InternalSearchResponse.EMPTY_WITHOUT_TOTAL_HITS,
null,
0,
0,
0,
timeProvider.buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
)
);
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on removing this if not needed. The only reason I can think of that this was here is to return as early as possible and avoid calling executor.execute rather than returning a little later.

If I had to pick between this check and the one you added above (provided we need one of them, which we maybe don't?), I would keep this one as it happens earlier. Maybe we could call finishPhase directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I added the early finish back in 18b7182.


// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,39 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenPointInTimeRequest, OpenPointInTimeResponse> {
public static final String OPEN_SHARD_READER_CONTEXT_NAME = "indices:data/read/open_reader_context";

private final TransportSearchAction transportSearchAction;
private final SearchTransportService searchTransportService;
private final TransportService transportService;
private final SearchService searchService;

Expand All @@ -46,12 +57,14 @@ public TransportOpenPointInTimeAction(
TransportService transportService,
SearchService searchService,
ActionFilters actionFilters,
TransportSearchAction transportSearchAction
TransportSearchAction transportSearchAction,
SearchTransportService searchTransportService
) {
super(OpenPointInTimeAction.NAME, transportService, actionFilters, OpenPointInTimeRequest::new);
this.transportService = transportService;
this.transportSearchAction = transportSearchAction;
this.searchService = searchService;
this.searchTransportService = searchTransportService;
transportService.registerRequestHandler(
OPEN_SHARD_READER_CONTEXT_NAME,
ThreadPool.Names.SAME,
Expand All @@ -74,30 +87,92 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
.routing(request.routing())
.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(false);
transportSearchAction.executeRequest(
(SearchTask) task,
searchRequest,
"open_search_context",
true,
(searchTask, shardIt, connection, phaseListener) -> {
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(
shardIt.shardId(),
shardIt.getOriginalIndices(),
request.keepAlive()
);
transportService.sendChildRequest(
connection,
OPEN_SHARD_READER_CONTEXT_NAME,
shardRequest,
searchTask,
new ActionListenerResponseHandler<SearchPhaseResult>(phaseListener, ShardOpenReaderResponse::new)
);
},
listener.map(r -> {
assert r.pointInTimeId() != null : r;
return new OpenPointInTimeResponse(r.pointInTimeId());
})
);
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {
assert r.pointInTimeId() != null : r;
return new OpenPointInTimeResponse(r.pointInTimeId());
}), searchListener -> new OpenPointInTimePhase(request, searchListener));
}

private final class OpenPointInTimePhase implements TransportSearchAction.SearchPhaseProvider {
private final OpenPointInTimeRequest pitRequest;
private final ActionListener<SearchResponse> listener;

OpenPointInTimePhase(OpenPointInTimeRequest pitRequest, ActionListener<SearchResponse> listener) {
this.pitRequest = pitRequest;
this.listener = listener;
}

@Override
public SearchPhase newSearchPhase(
SearchTask task,
SearchRequest searchRequest,
Executor executor,
GroupShardsIterator<SearchShardIterator> shardIterators,
TransportSearchAction.SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
ClusterState clusterState,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
return new AbstractSearchAsyncAction<>(
actionName,
logger,
searchTransportService,
connectionLookup,
aliasFilter,
concreteIndexBoosts,
executor,
searchRequest,
listener,
shardIterators,
timeProvider,
clusterState,
task,
new ArraySearchPhaseResults<>(shardIterators.size()),
1,
clusters
) {
@Override
protected void executePhaseOnShard(
SearchShardIterator shardIt,
SearchShardTarget shard,
SearchActionListener<SearchPhaseResult> phaseListener
) {
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(
shardIt.shardId(),
shardIt.getOriginalIndices(),
pitRequest.keepAlive()
);
Transport.Connection connection = connectionLookup.apply(shardIt.getClusterAlias(), shard.getNodeId());
transportService.sendChildRequest(
connection,
OPEN_SHARD_READER_CONTEXT_NAME,
shardRequest,
task,
new ActionListenerResponseHandler<>(phaseListener, ShardOpenReaderResponse::new)
);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase(getName()) {
@Override
public void run() {
final AtomicArray<SearchPhaseResult> atomicArray = results.getAtomicArray();
sendSearchResponse(InternalSearchResponse.EMPTY_WITH_TOTAL_HITS, atomicArray);
}
};
}

@Override
boolean buildPointInTimeFromSearchResults() {
return true;
}
};
}
}

private static final class ShardOpenReaderRequest extends TransportRequest implements IndicesRequest {
Expand Down
Loading