-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor TransportSearchAction to allow run can_match exclusively #95064
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,6 @@ | |
import org.elasticsearch.search.SearchShardTarget; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.search.internal.AliasFilter; | ||
import org.elasticsearch.search.internal.InternalSearchResponse; | ||
import org.elasticsearch.search.internal.SearchContext; | ||
import org.elasticsearch.search.internal.ShardSearchRequest; | ||
import org.elasticsearch.search.sort.FieldSortBuilder; | ||
import org.elasticsearch.search.sort.MinAndMax; | ||
|
@@ -44,7 +42,6 @@ | |
import java.util.concurrent.Executor; | ||
import java.util.concurrent.atomic.AtomicReferenceArray; | ||
import java.util.function.BiFunction; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
import java.util.stream.Stream; | ||
|
@@ -68,16 +65,14 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase { | |
private final Logger logger; | ||
private final SearchRequest request; | ||
private final GroupShardsIterator<SearchShardIterator> shardsIts; | ||
private final ActionListener<SearchResponse> listener; | ||
private final SearchResponse.Clusters clusters; | ||
private final ActionListener<GroupShardsIterator<SearchShardIterator>> listener; | ||
private final TransportSearchAction.SearchTimeProvider timeProvider; | ||
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection; | ||
private final SearchTransportService searchTransportService; | ||
private final Map<SearchShardIterator, Integer> shardItIndexMap; | ||
private final Map<String, Float> concreteIndexBoosts; | ||
private final Map<String, AliasFilter> aliasFilter; | ||
private final SearchTask task; | ||
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory; | ||
private final Executor executor; | ||
|
||
private final CanMatchSearchPhaseResults results; | ||
|
@@ -91,13 +86,11 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase { | |
Map<String, Float> concreteIndexBoosts, | ||
Executor executor, | ||
SearchRequest request, | ||
ActionListener<SearchResponse> listener, | ||
GroupShardsIterator<SearchShardIterator> shardsIts, | ||
TransportSearchAction.SearchTimeProvider timeProvider, | ||
SearchTask task, | ||
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory, | ||
SearchResponse.Clusters clusters, | ||
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider | ||
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider, | ||
ActionListener<GroupShardsIterator<SearchShardIterator>> listener | ||
) { | ||
super("can_match"); | ||
this.logger = logger; | ||
|
@@ -106,12 +99,10 @@ final class CanMatchPreFilterSearchPhase extends SearchPhase { | |
this.request = request; | ||
this.listener = listener; | ||
this.shardsIts = shardsIts; | ||
this.clusters = clusters; | ||
this.timeProvider = timeProvider; | ||
this.concreteIndexBoosts = concreteIndexBoosts; | ||
this.aliasFilter = aliasFilter; | ||
this.task = task; | ||
this.phaseFactory = phaseFactory; | ||
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider; | ||
this.executor = executor; | ||
this.shardItIndexMap = new HashMap<>(); | ||
|
@@ -145,8 +136,11 @@ public void run() throws IOException { | |
); | ||
} | ||
} | ||
|
||
runCoordinatorRewritePhase(); | ||
if (getNumShards() == 0) { | ||
finishPhase(); | ||
} else { | ||
runCoordinatorRewritePhase(); | ||
} | ||
} | ||
|
||
// tries to pre-filter shards based on information that's available to the coordinator | ||
|
@@ -372,14 +366,7 @@ private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List< | |
} | ||
|
||
private void finishPhase() { | ||
try { | ||
phaseFactory.apply(getIterator(results, shardsIts)).start(); | ||
} catch (Exception e) { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug(() -> format("Failed to execute [%s] while running [%s] phase", request, getName()), e); | ||
} | ||
onPhaseFailure("finish", e); | ||
} | ||
listener.onResponse(getIterator(results, shardsIts)); | ||
} | ||
|
||
private static final float DEFAULT_INDEX_BOOST = 1.0f; | ||
|
@@ -418,30 +405,6 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha | |
|
||
@Override | ||
public void start() { | ||
if (getNumShards() == 0) { | ||
// no search shards to search on, bail with empty response | ||
// (it happens with search across _all with no indices around and consistent with broadcast operations) | ||
int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO | ||
: request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO | ||
: request.source().trackTotalHitsUpTo(); | ||
// total hits is null in the response if the tracking of total hits is disabled | ||
boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED; | ||
listener.onResponse( | ||
new SearchResponse( | ||
withTotalHits ? InternalSearchResponse.EMPTY_WITH_TOTAL_HITS : InternalSearchResponse.EMPTY_WITHOUT_TOTAL_HITS, | ||
null, | ||
0, | ||
0, | ||
0, | ||
timeProvider.buildTookInMillis(), | ||
ShardSearchFailure.EMPTY_ARRAY, | ||
clusters, | ||
null | ||
) | ||
); | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on removing this if not needed. The only reason I can think of that this was here is to return as early as possible and avoid calling If I had to pick between this check and the one you added above (provided we need one of them, which we maybe don't?), I would keep this one as it happens earlier. Maybe we could call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion. I added the early finish back in 18b7182. |
||
|
||
// Note that the search is failed when this task is rejected by the executor | ||
executor.execute(new AbstractRunnable() { | ||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you added this
if
because you removed the same check that happens before at the beginning of thestart
method. As far as I can see,runCoordinatorRewritePhase
loops over the iterators which does nothing if they are empty, and then callsfinishPhase
. That makes me wonder if we need this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I removed it in e412458.