Skip to content

Commit

Permalink
OAK-10617: oak-search-elastic fix deadlock with includePathRestrictio…
Browse files Browse the repository at this point in the history
…ns=false and multiple filtered results (apache#1276)

* OAK-10617: oak-search-elastic fix deadlock with includePathRestrictions=false and multiple filtered results

* OAK-10617: always apply path restrictions in ES

* OAK-10617: remove use of guava from modified classes

* OAK-10617: updated docs

* OAK-10617: simplify logic to check if all listeners have process at least one element

* OAK-10617: async iterator queue is now bounded to the read limit

* OAK-10617: small improvement in logic to check if all listeners have process at least one element

* OAK-10617: minor improvement

* OAK-10617: minor improvement: use a BitSet to keep track of search hit listeners

* OAK-10617: annotate ElasticResponseListener with @ProviderType
  • Loading branch information
fabriziofortino authored Jan 23, 2024
1 parent 08bf64a commit 8703590
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 102 deletions.
5 changes: 2 additions & 3 deletions oak-doc/src/site/markdown/query/elastic.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ however there are differences:
* Indexes are NOT automatically built when needed:
They can be built by setting the `reindex` property to `true` or by using the `oak-run` tool.
We recommend to build them using the `oak-run` tool.
* `evaluatePathRestrictions` is only checked at query time (to keep the compatibility with Lucene). The parent paths are
always indexed. Changing this flag won't require a reindex then. It's strongly suggested to enable it. This control
might be removed in the future.
* `evaluatePathRestrictions` cannot be disabled. The parent paths are always indexed. Queries with path restrictions are
evaluated at index level when possible, otherwise they are evaluated at repository level.
* `codec` is ignored.
* `compatVersion` is ignored.
* `useIfExists` is ignored.
Expand Down
5 changes: 5 additions & 0 deletions oak-search-elastic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
<artifactId>org.apache.felix.scr.annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.annotation.versioning</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.service.component.annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class ElasticIndexDefinition extends IndexDefinition {
public static final String QUERY_FETCH_SIZES = "queryFetchSizes";
public static final Long[] QUERY_FETCH_SIZES_DEFAULT = new Long[]{10L, 100L, 1000L};

public static final String QUERY_TIMEOUT_MS = "queryTimeoutMs";
public static final long QUERY_TIMEOUT_MS_DEFAULT = 60000;

public static final String TRACK_TOTAL_HITS = "trackTotalHits";
public static final Integer TRACK_TOTAL_HITS_DEFAULT = 10000;

Expand Down Expand Up @@ -138,6 +141,7 @@ public class ElasticIndexDefinition extends IndexDefinition {
public final int numberOfShards;
public final int numberOfReplicas;
public final int[] queryFetchSizes;
public final long queryTimeoutMs;
public final Integer trackTotalHits;
public final String dynamicMapping;
public final boolean failOnError;
Expand All @@ -162,6 +166,7 @@ public ElasticIndexDefinition(NodeState root, NodeState defn, String indexPath,
this.similarityTagsBoost = getOptionalValue(defn, SIMILARITY_TAGS_BOOST, SIMILARITY_TAGS_BOOST_DEFAULT);
this.queryFetchSizes = Arrays.stream(getOptionalValues(defn, QUERY_FETCH_SIZES, Type.LONGS, Long.class, QUERY_FETCH_SIZES_DEFAULT))
.mapToInt(Long::intValue).toArray();
this.queryTimeoutMs = getOptionalValue(defn, QUERY_TIMEOUT_MS, QUERY_TIMEOUT_MS_DEFAULT);
this.trackTotalHits = getOptionalValue(defn, TRACK_TOTAL_HITS, TRACK_TOTAL_HITS_DEFAULT);
this.dynamicMapping = getOptionalValue(defn, DYNAMIC_MAPPING, DYNAMIC_MAPPING_DEFAULT);
this.failOnError = getOptionalValue(defn, FAIL_ON_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,49 +596,47 @@ private List<Query> nonFullTextConstraints(IndexPlan plan, PlanResult planResult
nodeTypeConstraints.ifPresent(queries::add);
}

if (elasticIndexDefinition.evaluatePathRestrictions()) {
String path = FulltextIndex.getPathRestriction(plan);
switch (filter.getPathRestriction()) {
case ALL_CHILDREN:
if (!"/".equals(path)) {
queries.add(newAncestorQuery(path));
String path = FulltextIndex.getPathRestriction(plan);
switch (filter.getPathRestriction()) {
case ALL_CHILDREN:
if (!"/".equals(path)) {
queries.add(newAncestorQuery(path));
}
break;
case DIRECT_CHILDREN:
queries.add(Query.of(q -> q.bool(b -> b.must(newAncestorQuery(path)).must(newDepthQuery(path, planResult)))));
break;
case EXACT:
// For transformed paths, we can only add path restriction if absolute path to property can be deduced
if (planResult.isPathTransformed()) {
String parentPathSegment = planResult.getParentPathSegment();
if (!any.test(PathUtils.elements(parentPathSegment), "*")) {
queries.add(newPathQuery(path + parentPathSegment));
}
break;
case DIRECT_CHILDREN:
queries.add(Query.of(q -> q.bool(b -> b.must(newAncestorQuery(path)).must(newDepthQuery(path, planResult)))));
break;
case EXACT:
} else {
queries.add(newPathQuery(path));
}
break;
case PARENT:
if (PathUtils.denotesRoot(path)) {
// there's no parent of the root node
// we add a path that can not possibly occur because there
// is no way to say "match no documents" in Lucene
queries.add(newPathQuery("///"));
} else {
// For transformed paths, we can only add path restriction if absolute path to property can be deduced
if (planResult.isPathTransformed()) {
String parentPathSegment = planResult.getParentPathSegment();
if (!any.test(PathUtils.elements(parentPathSegment), "*")) {
queries.add(newPathQuery(path + parentPathSegment));
queries.add(newPathQuery(PathUtils.getParentPath(path) + parentPathSegment));
}
} else {
queries.add(newPathQuery(path));
queries.add(newPathQuery(PathUtils.getParentPath(path)));
}
break;
case PARENT:
if (PathUtils.denotesRoot(path)) {
// there's no parent of the root node
// we add a path that can not possibly occur because there
// is no way to say "match no documents" in Lucene
queries.add(newPathQuery("///"));
} else {
// For transformed paths, we can only add path restriction if absolute path to property can be deduced
if (planResult.isPathTransformed()) {
String parentPathSegment = planResult.getParentPathSegment();
if (!any.test(PathUtils.elements(parentPathSegment), "*")) {
queries.add(newPathQuery(PathUtils.getParentPath(path) + parentPathSegment));
}
} else {
queries.add(newPathQuery(PathUtils.getParentPath(path)));
}
}
break;
case NO_RESTRICTION:
break;
}
}
break;
case NO_RESTRICTION:
break;
}

for (Filter.PropertyRestriction pr : filter.getPropertyRestrictions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.osgi.annotation.versioning.ProviderType;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* Generic listener of Elastic response
*/
@ProviderType
public interface ElasticResponseListener {

Set<String> DEFAULT_SOURCE_FIELDS = Collections.singleton(FieldNames.PATH);
Set<String> DEFAULT_SOURCE_FIELDS = Set.of(FieldNames.PATH);

/**
* Returns the source fields this listener is interested on
Expand Down Expand Up @@ -68,8 +69,9 @@ default void startData(long totalHits) { /*empty*/ }
/**
* This method is called for each {@link Hit} retrieved
* @param searchHit a search result
* @return true if the search hit was successfully processed, false otherwise
*/
void on(Hit<ObjectNode> searchHit);
boolean on(Hit<ObjectNode> searchHit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -69,7 +71,7 @@ public class ElasticResultRowAsyncIterator implements ElasticQueryIterator, Elas
private static final FulltextResultRow POISON_PILL =
new FulltextResultRow("___OAK_POISON_PILL___", 0d, Collections.emptyMap(), null, null);

private final BlockingQueue<FulltextResultRow> queue = new LinkedBlockingQueue<>();
private final BlockingQueue<FulltextResultRow> queue;

private final ElasticIndexNode indexNode;
private final IndexPlan indexPlan;
Expand All @@ -96,6 +98,9 @@ public ElasticResultRowAsyncIterator(@NotNull ElasticIndexNode indexNode,
this.rowInclusionPredicate = rowInclusionPredicate;
this.metricHandler = metricHandler;
this.elasticFacetProvider = elasticRequestHandler.getAsyncFacetProvider(indexNode.getConnection(), elasticResponseHandler);
// set the queue size to the limit of the query. This is to avoid to load too many results in memory in case the
// consumer is slow to process them
this.queue = new LinkedBlockingQueue<>((int) indexPlan.getFilter().getQueryLimits().getLimitReads());
this.elasticQueryScanner = initScanner();
}

Expand All @@ -108,7 +113,7 @@ public boolean hasNext() {
elasticQueryScanner.scan();
}
try {
nextRow = queue.take();
nextRow = queue.poll(indexNode.getDefinition().queryTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt status
throw new IllegalStateException("Error reading next result from Elastic", e);
Expand Down Expand Up @@ -144,12 +149,12 @@ public FulltextResultRow next() {
}

@Override
public void on(Hit<ObjectNode> searchHit) {
public boolean on(Hit<ObjectNode> searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null) {
if (rowInclusionPredicate != null && !rowInclusionPredicate.test(path)) {
LOG.trace("Path {} not included because of hierarchy inclusion rules", path);
return;
return false;
}
LOG.trace("Path {} satisfies hierarchy inclusion rules", path);
try {
Expand All @@ -159,7 +164,9 @@ public void on(Hit<ObjectNode> searchHit) {
Thread.currentThread().interrupt(); // restore interrupt status
throw new IllegalStateException("Error producing results into the iterator queue", e);
}
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -333,16 +340,25 @@ public void onSuccess(SearchResponse<ObjectNode> searchResponse) {
}

LOG.trace("Emitting {} search hits, for a total of {} scanned results", searchHits.size(), scannedRows);

BitSet listenersWithHits = new BitSet(searchHitListeners.size());

for (Hit<ObjectNode> hit : searchHits) {
for (SearchHitListener l : searchHitListeners) {
l.on(hit);
for (int index = 0; index < searchHitListeners.size(); index++) {
SearchHitListener l = searchHitListeners.get(index);
if (l.on(hit)) {
listenersWithHits.set(index);
}
}
}
// if any listener has not processed any hit, it means we need to load more data since there could be
// listeners waiting for some results before triggering a new scan
boolean areAllListenersProcessed = listenersWithHits.cardinality() == searchHitListeners.size();

if (!anyDataLeft.get()) {
LOG.trace("No data left: closing scanner, notifying listeners");
close();
} else if (fullScan) {
} else if (fullScan || !areAllListenersProcessed) {
scan();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public boolean isFullScan() {
}

@Override
public void on(Hit<ObjectNode> searchHit) {
public boolean on(Hit<ObjectNode> searchHit) {
final String path = elasticResponseHandler.getPath(searchHit);
if (path != null && isAccessible.test(path)) {
for (String field: facetFields) {
Expand All @@ -90,6 +90,7 @@ public void on(Hit<ObjectNode> searchHit) {
}
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.plugins.index.IndexDescendantSpellcheckCommonTest;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

import javax.jcr.Repository;

Expand All @@ -38,4 +40,10 @@ protected Repository createJcrRepository() {
return jcr.createRepository();
}

@Override
@Test
@Ignore("OAK-10617: path restrictions are always applied. This test is not applicable for Elastic")
public void descendantSuggestionRequirePathRestrictionIndex() throws Exception {
super.descendantSuggestionRequirePathRestrictionIndex();
}
}
Loading

0 comments on commit 8703590

Please sign in to comment.