Skip to content

Commit

Permalink
Parallelize knn query rewrite across slices rather than segments (apa…
Browse files Browse the repository at this point in the history
…che#12325)

The concurrent query rewrite for knn vectory query introduced with apache#12160
requests one thread per segment to the executor. To align this with the
IndexSearcher parallel behaviour, we should rather parallelize across
slices. Also, we can reuse the same slice executor instance that the
index searcher already holds, in that way we are using a
QueueSizeBasedExecutor when a thread pool executor is provided.
  • Loading branch information
javanna authored May 26, 2023
1 parent c188d47 commit 10bebde
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.index.FieldInfo;
Expand Down Expand Up @@ -81,11 +81,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
filterWeight = null;
}

Executor executor = indexSearcher.getExecutor();
SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
// in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
TopDocs[] perLeafResults =
(executor == null)
(sliceExecutor == null)
? sequentialSearch(reader.leaves(), filterWeight)
: parallelSearch(reader.leaves(), filterWeight, executor);
: parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);

// Merge sort the results
TopDocs topK = TopDocs.merge(k, perLeafResults);
Expand All @@ -109,27 +110,40 @@ private TopDocs[] sequentialSearch(
}

private TopDocs[] parallelSearch(
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
List<FutureTask<TopDocs>> tasks =
leafReaderContexts.stream()
.map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
.toList();
IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {

List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
int segmentsCount = 0;
for (IndexSearcher.LeafSlice slice : slices) {
segmentsCount += slice.leaves.length;
tasks.add(
new FutureTask<>(
() -> {
TopDocs[] results = new TopDocs[slice.leaves.length];
int i = 0;
for (LeafReaderContext context : slice.leaves) {
results[i++] = searchLeaf(context, filterWeight);
}
return results;
}));
}

SliceExecutor sliceExecutor = new SliceExecutor(executor);
sliceExecutor.invokeAll(tasks);

return tasks.stream()
.map(
task -> {
try {
return task.get();
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
})
.toArray(TopDocs[]::new);
TopDocs[] topDocs = new TopDocs[segmentsCount];
int i = 0;
for (FutureTask<TopDocs[]> task : tasks) {
try {
for (TopDocs docs : task.get()) {
topDocs[i++] = docs;
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
return topDocs;
}

private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,10 @@ public Executor getExecutor() {
return executor;
}

SliceExecutor getSliceExecutor() {
return sliceExecutor;
}

/**
* Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
* typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to
Expand Down

0 comments on commit 10bebde

Please sign in to comment.