Skip to content

Commit

Permalink
keep track of docid to avoid going backwards to protected against mis…
Browse files Browse the repository at this point in the history
…using doc values APIs when source mode is synthetic:

```
java.lang.AssertionError
        at __randomizedtesting.SeedInfo.seed([BA10D1FF912D808]:0)
        at org.apache.lucene.tests.index.AssertingLeafReader$AssertingNumericDocValues.advanceExact(AssertingLeafReader.java:757)
        at org.apache.lucene.index.SingletonSortedNumericDocValues.advanceExact(SingletonSortedNumericDocValues.java:62)
        at org.elasticsearch.index.mapper.SortedNumericDocValuesSyntheticFieldLoader$ImmediateDocValuesLoader.advanceToDoc(SortedNumericDocValuesSyntheticFieldLoader.java:142)
        at org.elasticsearch.index.mapper.ObjectMapper$SyntheticSourceFieldLoader$ObjectDocValuesLoader.advanceToDoc(ObjectMapper.java:965)
        at org.elasticsearch.index.mapper.SourceLoader$Synthetic$SyntheticLeaf.write(SourceLoader.java:210)
        at org.elasticsearch.index.mapper.SourceLoader$Synthetic$SyntheticLeaf.source(SourceLoader.java:181)
        at org.elasticsearch.index.mapper.SourceLoader$Synthetic$LeafWithMetrics.source(SourceLoader.java:146)
        at org.elasticsearch.search.lookup.SyntheticSourceProvider$SyntheticSourceLeafLoader.getSource(SyntheticSourceProvider.java:58)
        at org.elasticsearch.search.lookup.SyntheticSourceProvider.getSource(SyntheticSourceProvider.java:42)
        at org.elasticsearch.xpack.esql.plugin.ReinitializingSourceProvider.getSource(ReinitializingSourceProvider.java:41)
        at org.elasticsearch.search.lookup.LeafSearchLookup.lambda$new$0(LeafSearchLookup.java:40)
        at org.elasticsearch.script.AbstractFieldScript.extractFromSource(AbstractFieldScript.java:107)
        at org.elasticsearch.script.AbstractFieldScript.emitFromSource(AbstractFieldScript.java:127)
        at org.elasticsearch.script.LongFieldScript$1$1.execute(LongFieldScript.java:29)
        at org.elasticsearch.script.AbstractFieldScript.runForDoc(AbstractFieldScript.java:159)
        at org.elasticsearch.index.fielddata.LongScriptDocValues.advanceExact(LongScriptDocValues.java:26)
        at org.elasticsearch.search.MultiValueMode$6.advanceExact(MultiValueMode.java:534)
        at org.elasticsearch.index.fielddata.FieldData$17.advanceExact(FieldData.java:620)
        at org.apache.lucene.search.comparators.LongComparator$LongLeafComparator.getValueForDoc(LongComparator.java:80)
        at org.apache.lucene.search.comparators.LongComparator$LongLeafComparator.copy(LongComparator.java:105)
        at org.apache.lucene.search.TopFieldCollector$TopFieldLeafCollector.collectAnyHit(TopFieldCollector.java:124)
        at org.apache.lucene.search.TopFieldCollector$SimpleFieldCollector$1.collect(TopFieldCollector.java:209)
        at org.apache.lucene.search.Weight$DefaultBulkScorer.scoreRange(Weight.java:305)
        at org.apache.lucene.search.Weight$DefaultBulkScorer.score(Weight.java:264)
        at org.elasticsearch.compute.lucene.LuceneOperator$LuceneScorer.scoreNextRange(LuceneOperator.java:193)
        at org.elasticsearch.compute.lucene.LuceneTopNSourceOperator.collect(LuceneTopNSourceOperator.java:168)
        at org.elasticsearch.compute.lucene.LuceneTopNSourceOperator.getCheckedOutput(LuceneTopNSourceOperator.java:148)
        at org.elasticsearch.compute.lucene.LuceneOperator.getOutput(LuceneOperator.java:118)
        at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:258)
        at org.elasticsearch.compute.operator.Driver.run(Driver.java:189)
        at org.elasticsearch.compute.operator.Driver$1.doRun(Driver.java:378)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
        at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:34)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1575)
```
  • Loading branch information
martijnvg committed Dec 11, 2024
1 parent ead8717 commit 653ed40
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
* This class exists as a workaround for using SourceProvider in the compute engine.
* <p>
* The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
* A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
* its own {@link SourceProvider}.
* <p>
* Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;

// Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
// (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
private int lastSeenDocId;

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}
Expand All @@ -32,10 +40,11 @@ final class ReinitializingSourceProvider implements SourceProvider {
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
lastSeenDocId = doc;
return provider.source.getSource(ctx, doc);
}

Expand Down

0 comments on commit 653ed40

Please sign in to comment.