Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10617: oak-search-elastic fix deadlock with includePathRestrictions=false and multiple filtered results #1276

Merged
merged 10 commits into from
Jan 23, 2024
Merged
5 changes: 2 additions & 3 deletions oak-doc/src/site/markdown/query/elastic.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ however there are differences:
* Indexes are NOT automatically built when needed:
They can be built by setting the `reindex` property to `true` or by using the `oak-run` tool.
We recommend to build them using the `oak-run` tool.
* `evaluatePathRestrictions` is only checked at query time (to keep the compatibility with Lucene). The parent paths are
always indexed. Changing this flag won't require a reindex then. It's strongly suggested to enable it. This control
might be removed in the future.
* `evaluatePathRestrictions` cannot be disabled. The parent paths are always indexed. Queries with path restrictions are
evaluated at index level when possible, otherwise they are evaluated at repository level.
* `codec` is ignored.
* `compatVersion` is ignored.
* `useIfExists` is ignored.
Expand Down
5 changes: 5 additions & 0 deletions oak-search-elastic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
<artifactId>org.apache.felix.scr.annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.annotation.versioning</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.component.annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class ElasticIndexDefinition extends IndexDefinition {
public static final String QUERY_FETCH_SIZES = "queryFetchSizes";
public static final Long[] QUERY_FETCH_SIZES_DEFAULT = new Long[]{10L, 100L, 1000L};

public static final String QUERY_TIMEOUT_MS = "queryTimeoutMs";
public static final long QUERY_TIMEOUT_MS_DEFAULT = 60000;

public static final String TRACK_TOTAL_HITS = "trackTotalHits";
public static final Integer TRACK_TOTAL_HITS_DEFAULT = 10000;

Expand Down Expand Up @@ -138,6 +141,7 @@ public class ElasticIndexDefinition extends IndexDefinition {
public final int numberOfShards;
public final int numberOfReplicas;
public final int[] queryFetchSizes;
public final long queryTimeoutMs;
public final Integer trackTotalHits;
public final String dynamicMapping;
public final boolean failOnError;
Expand All @@ -162,6 +166,7 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
this.similarityTagsBoost = getOptionalValue(defn, SIMILARITY_TAGS_BOOST, SIMILARITY_TAGS_BOOST_DEFAULT);
this.queryFetchSizes = Arrays.stream(getOptionalValues(defn, QUERY_FETCH_SIZES, Type.LONGS, Long.class, QUERY_FETCH_SIZES_DEFAULT))
.mapToInt(Long::intValue).toArray();
this.queryTimeoutMs = getOptionalValue(defn, QUERY_TIMEOUT_MS, QUERY_TIMEOUT_MS_DEFAULT);
this.trackTotalHits = getOptionalValue(defn, TRACK_TOTAL_HITS, TRACK_TOTAL_HITS_DEFAULT);
this.dynamicMapping = getOptionalValue(defn, DYNAMIC_MAPPING, DYNAMIC_MAPPING_DEFAULT);
this.failOnError = getOptionalValue(defn, FAIL_ON_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,49 +596,47 @@ private List<Query> nonFullTextConstraints(IndexPlan plan, PlanResult planResult
nodeTypeConstraints.ifPresent(queries::add);
}

if (elasticIndexDefinition.evaluatePathRestrictions()) {
String path = FulltextIndex.getPathRestriction(plan);
switch (filter.getPathRestriction()) {
case ALL_CHILDREN:
if (!"/".equals(path)) {
queries.add(newAncestorQuery(path));
String path = FulltextIndex.getPathRestriction(plan);
switch (filter.getPathRestriction()) {
case ALL_CHILDREN:
if (!"/".equals(path)) {
queries.add(newAncestorQuery(path));
}
break;
case DIRECT_CHILDREN:
queries.add(Query.of(q -> q.bool(b -> b.must(newAncestorQuery(path)).must(newDepthQuery(path, planResult)))));
break;
case EXACT:
// For transformed paths, we can only add path restriction if absolute path to property can be deduced
if (planResult.isPathTransformed()) {
String parentPathSegment = planResult.getParentPathSegment();
if (!any.test(PathUtils.elements(parentPathSegment), "*")) {
queries.add(newPathQuery(path + parentPathSegment));
}
break;
case DIRECT_CHILDREN:
queries.add(Query.of(q -> q.bool(b -> b.must(newAncestorQuery(path)).must(newDepthQuery(path, planResult)))));
break;
case EXACT:
} else {
queries.add(newPathQuery(path));
}
break;
case PARENT:
if (PathUtils.denotesRoot(path)) {
// there's no parent of the root node
// we add a path that can not possibly occur because there
// is no way to say "match no documents" in Lucene
queries.add(newPathQuery("///"));
} else {
// For transformed paths, we can only add path restriction if absolute path to property can be deduced
if (planResult.isPathTransformed()) {
String parentPathSegment = planResult.getParentPathSegment();
if (!any.test(PathUtils.elements(parentPathSegment), "*")) {
queries.add(newPathQuery(path + parentPathSegment));
queries.add(newPathQuery(PathUtils.getParentPath(path) + parentPathSegment));
}
} else {
queries.add(newPathQuery(path));
queries.add(newPathQuery(PathUtils.getParentPath(path)));
}
break;
case PARENT:
if (PathUtils.denotesRoot(path)) {
// there's no parent of the root node
// we add a path that can not possibly occur because there
// is no way to say "match no documents" in Lucene
queries.add(newPathQuery("///"));
} else {
// For transformed paths, we can only add path restriction if absolute path to property can be deduced
if (planResult.isPathTransformed()) {
String parentPathSegment = planResult.getParentPathSegment();
if (!any.test(PathUtils.elements(parentPathSegment), "*")) {
queries.add(newPathQuery(PathUtils.getParentPath(path) + parentPathSegment));
}
} else {
queries.add(newPathQuery(PathUtils.getParentPath(path)));
}
}
break;
case NO_RESTRICTION:
break;
}
}
break;
case NO_RESTRICTION:
break;
}

for (Filter.PropertyRestriction pr : filter.getPropertyRestrictions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.osgi.annotation.versioning.ProviderType;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* Generic listener of Elastic response
*/
@ProviderType
public interface ElasticResponseListener {

Set<String> DEFAULT_SOURCE_FIELDS = Collections.singleton(FieldNames.PATH);
Set<String> DEFAULT_SOURCE_FIELDS = Set.of(FieldNames.PATH);

/**
* Returns the source fields this listener is interested on
Expand Down Expand Up @@ -68,8 +69,9 @@ default void startData(long totalHits) { /*empty*/ }
/**
* This method is called for each {@link Hit} retrieved
* @param searchHit a search result
* @return true if the search hit was successfully processed, false otherwise
*/
void on(Hit<ObjectNode> searchHit);
boolean on(Hit<ObjectNode> searchHit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -69,7 +71,7 @@ public class ElasticResultRowAsyncIterator implements ElasticQueryIterator, Elas
private static final FulltextResultRow POISON_PILL =
new FulltextResultRow("___OAK_POISON_PILL___", 0d, Collections.emptyMap(), null, null);

private final BlockingQueue<FulltextResultRow> queue = new LinkedBlockingQueue<>();
private final BlockingQueue<FulltextResultRow> queue;

private final ElasticIndexNode indexNode;
private final IndexPlan indexPlan;
Expand All @@ -96,6 +98,9 @@ public ElasticResultRowAsyncIterator(@NotNull ElasticIndexNode indexNode,
this.rowInclusionPredicate = rowInclusionPredicate;
this.metricHandler = metricHandler;
this.elasticFacetProvider = elasticRequestHandler.getAsyncFacetProvider(indexNode.getConnection(), elasticResponseHandler);
// set the queue size to the limit of the query. This is to avoid to load too many results in memory in case the
// consumer is slow to process them
this.queue = new LinkedBlockingQueue<>((int) indexPlan.getFilter().getQueryLimits().getLimitReads());
this.elasticQueryScanner = initScanner();
}

Expand All @@ -108,7 +113,7 @@ public boolean hasNext() {
elasticQueryScanner.scan();
}
try {
nextRow = queue.take();
nextRow = queue.poll(indexNode.getDefinition().queryTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt status
throw new IllegalStateException("Error reading next result from Elastic", e);
Expand Down Expand Up @@ -144,12 +149,12 @@ public FulltextResultRow next() {
}

@Override
public void on(Hit<ObjectNode> searchHit) {
public boolean on(Hit<ObjectNode> searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null) {
if (rowInclusionPredicate != null && !rowInclusionPredicate.test(path)) {
LOG.trace("Path {} not included because of hierarchy inclusion rules", path);
return;
return false;
}
LOG.trace("Path {} satisfies hierarchy inclusion rules", path);
try {
Expand All @@ -159,7 +164,9 @@ public void on(Hit<ObjectNode> searchHit) {
Thread.currentThread().interrupt(); // restore interrupt status
throw new IllegalStateException("Error producing results into the iterator queue", e);
}
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -333,16 +340,25 @@ public void onSuccess(SearchResponse<ObjectNode> searchResponse) {
}

LOG.trace("Emitting {} search hits, for a total of {} scanned results", searchHits.size(), scannedRows);

BitSet listenersWithHits = new BitSet(searchHitListeners.size());

for (Hit<ObjectNode> hit : searchHits) {
for (SearchHitListener l : searchHitListeners) {
l.on(hit);
for (int index = 0; index < searchHitListeners.size(); index++) {
SearchHitListener l = searchHitListeners.get(index);
if (l.on(hit)) {
listenersWithHits.set(index);
}
}
}
// if any listener has not processed any hit, it means we need to load more data since there could be
// listeners waiting for some results before triggering a new scan
boolean areAllListenersProcessed = listenersWithHits.cardinality() == searchHitListeners.size();

if (!anyDataLeft.get()) {
LOG.trace("No data left: closing scanner, notifying listeners");
close();
} else if (fullScan) {
} else if (fullScan || !areAllListenersProcessed) {
scan();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public boolean isFullScan() {
}

@Override
public void on(Hit<ObjectNode> searchHit) {
public boolean on(Hit<ObjectNode> searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null && isAccessible.test(path)) {
for (String field: facetFields) {
Expand All @@ -90,6 +90,7 @@ public void on(Hit<ObjectNode> searchHit) {
}
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.index.IndexDescendantSpellcheckCommonTest;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

import javax.jcr.Repository;

Expand All @@ -38,4 +40,10 @@ protected Repository createJcrRepository() {
return jcr.createRepository();
}

@Override
@Test
@Ignore("OAK-10617: path restrictions are always applied. This test is not applicable for Elastic")
public void descendantSuggestionRequirePathRestrictionIndex() throws Exception {
super.descendantSuggestionRequirePathRestrictionIndex();
}
}
Loading