Skip to content

Commit

Permalink
return from provider local variable as the perThreadProvider fiel…
Browse files Browse the repository at this point in the history
…d can be stale
  • Loading branch information
martijnvg committed Dec 11, 2024
1 parent 5e046ce commit 66e09b6
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,19 @@
package org.elasticsearch.xpack.esql.plugin;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceProvider;

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
* A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
* its own {@link SourceProvider}.
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;
private final Map<Long, SourceProvider> map = ConcurrentCollections.newConcurrentMap();

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
Expand All @@ -33,7 +29,15 @@ final class ReinitializingSourceProvider implements SourceProvider {
@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
var sourceProvider = map.computeIfAbsent(currentThread.threadId(), (key) -> sourceProviderFactory.get());
return sourceProvider.getSource(ctx, doc);
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
return provider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {

}
}

0 comments on commit 66e09b6

Please sign in to comment.