Skip to content

Commit

Permalink
Make heavy query rewrites concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamvishu committed Mar 3, 2023
1 parent 569533b commit 55bafc1
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 92 deletions.
73 changes: 52 additions & 21 deletions lucene/core/src/java/org/apache/lucene/search/BlendedTermQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
Expand Down Expand Up @@ -269,28 +273,55 @@ public String toString(String field) {
@Override
public final Query rewrite(IndexSearcher indexSearcher) throws IOException {
final TermStates[] contexts = ArrayUtil.copyOfSubArray(this.contexts, 0, this.contexts.length);
for (int i = 0; i < contexts.length; ++i) {
if (contexts[i] == null
|| contexts[i].wasBuiltFor(indexSearcher.getTopReaderContext()) == false) {
contexts[i] = TermStates.build(indexSearcher.getTopReaderContext(), terms[i], true);
}
}

// Compute aggregated doc freq and total term freq
// df will be the max of all doc freqs
// ttf will be the sum of all total term freqs
int df = 0;
long ttf = 0;
for (TermStates ctx : contexts) {
df = Math.max(df, ctx.docFreq());
ttf += ctx.totalTermFreq();
}

for (int i = 0; i < contexts.length; ++i) {
contexts[i] = adjustFrequencies(indexSearcher.getTopReaderContext(), contexts[i], df, ttf);
}

Query[] termQueries = new Query[terms.length];
AtomicInteger index = new AtomicInteger(0);
AtomicInteger df = new AtomicInteger(0);
AtomicInteger ttf = new AtomicInteger(0);
Executor executor = Objects.requireNonNullElse(indexSearcher.getExecutor(), Runnable::run);
SliceExecutor sliceExecutor = new SliceExecutor(executor);
List<FutureTask<Object>> tasks =
Arrays.stream(contexts)
.map(
task ->
new FutureTask<>(
() -> {
int i = index.getAndIncrement();
if (contexts[i] == null
|| contexts[i].wasBuiltFor(indexSearcher.getTopReaderContext())
== false) {
contexts[i] =
TermStates.build(
indexSearcher.getTopReaderContext(), terms[i], true);
}
// Compute aggregated doc freq and total term freq
// df will be the max of all doc freqs
// ttf will be the sum of all total term freqs
df.set(Math.max(df.get(), contexts[i].docFreq()));
ttf.set((int) (ttf.get() + contexts[i].totalTermFreq()));
return null;
}))
.toList();
sliceExecutor.invokeAll(tasks);

index.set(0);
tasks =
Arrays.stream(contexts)
.map(
ctx ->
new FutureTask<>(
() -> {
int i = index.getAndIncrement();
contexts[i] =
adjustFrequencies(
indexSearcher.getTopReaderContext(),
contexts[i],
df.get(),
ttf.get());
return null;
}))
.toList();
sliceExecutor.invokeAll(tasks);

for (int i = 0; i < terms.length; ++i) {
termQueries[i] = new TermQuery(terms[i], contexts[i]);
if (boosts[i] != 1f) {
Expand Down
130 changes: 84 additions & 46 deletions lucene/core/src/java/org/apache/lucene/search/FieldExistsQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
package org.apache.lucene.search;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.document.KnnFloatVectorField;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
Expand All @@ -29,6 +34,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.ThreadInterruptedException;

/**
* A {@link Query} that matches documents that contain either a {@link KnnFloatVectorField}, {@link
Expand Down Expand Up @@ -110,59 +116,91 @@ public int hashCode() {
@Override
public Query rewrite(IndexSearcher indexSearcher) throws IOException {
IndexReader reader = indexSearcher.getIndexReader();
boolean allReadersRewritable = true;
final List<FutureTask<Boolean>> tasks =
reader.leaves().stream()
.map(
ctx ->
new FutureTask<>(
() -> {
AtomicBoolean allReadersRewritable = new AtomicBoolean(true);
LeafReader leaf = ctx.reader();
FieldInfos fieldInfos = leaf.getFieldInfos();
FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
if (fieldInfo == null) {
allReadersRewritable.set(false);
return allReadersRewritable.get();
}

for (LeafReaderContext context : reader.leaves()) {
LeafReader leaf = context.reader();
FieldInfos fieldInfos = leaf.getFieldInfos();
FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
if (fieldInfo.hasNorms()) { // the field indexes norms
if (reader.getDocCount(field) != reader.maxDoc()) {
allReadersRewritable.set(false);
return allReadersRewritable.get();
}
} else if (fieldInfo.getVectorDimension()
!= 0) { // the field indexes vectors
int numVectors =
switch (fieldInfo.getVectorEncoding()) {
case FLOAT32 -> leaf.getFloatVectorValues(field).size();
case BYTE -> leaf.getByteVectorValues(field).size();
};
if (numVectors != leaf.maxDoc()) {
allReadersRewritable.set(false);
return allReadersRewritable.get();
}
} else if (fieldInfo.getDocValuesType()
!= DocValuesType.NONE) { // the field indexes doc values or points

if (fieldInfo == null) {
allReadersRewritable = false;
break;
}
// This optimization is possible due to LUCENE-9334 enforcing a field to
// always uses the
// same data structures (all or nothing). Since there's no index
// statistic to detect when
// all documents have doc values for a specific field, FieldExistsQuery
// can only be
// rewritten to MatchAllDocsQuery for doc values field, when that same
// field also indexes
// terms or point values which do have index statistics, and those
// statistics confirm that
// all documents in this segment have values terms or point values.

if (fieldInfo.hasNorms()) { // the field indexes norms
if (reader.getDocCount(field) != reader.maxDoc()) {
allReadersRewritable = false;
break;
}
} else if (fieldInfo.getVectorDimension() != 0) { // the field indexes vectors
int numVectors =
switch (fieldInfo.getVectorEncoding()) {
case FLOAT32 -> leaf.getFloatVectorValues(field).size();
case BYTE -> leaf.getByteVectorValues(field).size();
};
if (numVectors != leaf.maxDoc()) {
allReadersRewritable = false;
break;
}
} else if (fieldInfo.getDocValuesType()
!= DocValuesType.NONE) { // the field indexes doc values or points
Terms terms = leaf.terms(field);
PointValues pointValues = leaf.getPointValues(field);

// This optimization is possible due to LUCENE-9334 enforcing a field to always uses the
// same data structures (all or nothing). Since there's no index statistic to detect when
// all documents have doc values for a specific field, FieldExistsQuery can only be
// rewritten to MatchAllDocsQuery for doc values field, when that same field also indexes
// terms or point values which do have index statistics, and those statistics confirm that
// all documents in this segment have values terms or point values.
if ((terms == null || terms.getDocCount() != leaf.maxDoc())
&& (pointValues == null
|| pointValues.getDocCount() != leaf.maxDoc())) {
allReadersRewritable.set(false);
return allReadersRewritable.get();
}
} else {
throw new IllegalStateException(buildErrorMsg(fieldInfo));
}
allReadersRewritable.set(true);
return allReadersRewritable.get();
}))
.toList();
Executor executor = Objects.requireNonNullElse(indexSearcher.getExecutor(), Runnable::run);
SliceExecutor sliceExecutor = new SliceExecutor(executor);
sliceExecutor.invokeAll(tasks);

Terms terms = leaf.terms(field);
PointValues pointValues = leaf.getPointValues(field);

if ((terms == null || terms.getDocCount() != leaf.maxDoc())
&& (pointValues == null || pointValues.getDocCount() != leaf.maxDoc())) {
allReadersRewritable = false;
break;
}
} else {
throw new IllegalStateException(buildErrorMsg(fieldInfo));
List<Boolean> res =
tasks.stream()
.map(
task -> {
try {
return task.get();
} catch (ExecutionException e) {
throw new IllegalStateException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
})
.toList();
for (boolean i : res) {
if (i == false) {
return super.rewrite(indexSearcher);
}
}
if (allReadersRewritable) {
return new MatchAllDocsQuery();
}
return super.rewrite(indexSearcher);
return new MatchAllDocsQuery();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import static org.apache.lucene.search.suggest.document.CompletionAnalyzer.HOLE_CHARACTER;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
Expand Down Expand Up @@ -84,42 +89,54 @@ public Term getTerm() {

@Override
public Query rewrite(IndexSearcher indexSearcher) throws IOException {
byte type = 0;
AtomicReference<Byte> type = new AtomicReference<>((byte) 0);
boolean first = true;
Terms terms;
AtomicBoolean ab = new AtomicBoolean(first);
AtomicReference<Terms> aterms = new AtomicReference<>(null);
AtomicInteger ai = new AtomicInteger(0);
Executor executor = Objects.requireNonNullElse(indexSearcher.getExecutor(), Runnable::run);
LeafReader[] leafReaders = new LeafReader[indexSearcher.getLeafContexts().size()];
int i = 0;
for (LeafReaderContext context : indexSearcher.getLeafContexts()) {
LeafReader leafReader = context.reader();
try {
if ((terms = leafReader.terms(getField())) == null) {
continue;
}
} catch (
@SuppressWarnings("unused")
IOException e) {
continue;
}
if (terms instanceof CompletionTerms) {
CompletionTerms completionTerms = (CompletionTerms) terms;
byte t = completionTerms.getType();
if (first) {
type = t;
first = false;
} else if (type != t) {
throw new IllegalStateException(getField() + " has values of multiple types");
}
}
leafReaders[i++] = context.reader();
}
if (indexSearcher.getLeafContexts().size() > 0) {
executor.execute(
() -> {
int index = ai.getAndIncrement();
try {
aterms.set(leafReaders[index].terms(getField()));
if (aterms.get() == null) {
return;
}
} catch (
@SuppressWarnings("unused")
IOException e) {
return;
}
if (aterms.get() instanceof CompletionTerms) {
CompletionTerms completionTerms = (CompletionTerms) aterms.get();
byte t = completionTerms.getType();
if (ab.get()) {
type.set(t);
ab.set(false);
} else if (type.get() != t) {
throw new IllegalStateException(getField() + " has values of multiple types");
}
}
});
}

if (first == false) {
if (ab.get() == false) {
if (this instanceof ContextQuery) {
if (type == SuggestField.TYPE) {
if (type.get() == SuggestField.TYPE) {
throw new IllegalStateException(
this.getClass().getSimpleName()
+ " can not be executed against a non context-enabled SuggestField: "
+ getField());
}
} else {
if (type == ContextSuggestField.TYPE) {
if (type.get() == ContextSuggestField.TYPE) {
return new ContextQuery(this);
}
}
Expand Down

0 comments on commit 55bafc1

Please sign in to comment.