Skip to content

Commit

Permalink
Fix concurrency issue with ReinitializingSourceProvider
Browse files Browse the repository at this point in the history
The previous fix to ensure that each thread uses its own SearchProvider wasn't good enough.
When multiple threads access `ReinitializingSourceProvider` the simple thread accounting could still result in returned `SourceProvider` being used by multiple threads concurrently.

The ReinitializingSourceProvider was introduced via elastic#117792

Closes elastic#118238
  • Loading branch information
martijnvg committed Dec 10, 2024
1 parent 2381559 commit bf75a92
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception {
String sourceMode = randomBoolean() ? "stored" : "synthetic";
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
for (int i = 0; i < 10; i++) {
int numDocs = 256;
for (int i = 0; i < numDocs; i++) {
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
}
refresh("test-script");
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {

var pragmas = randomPragmas();
if (canUseQueryPragmas()) {
Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings());
pragmaSettings.put("task_concurrency", 10);
pragmaSettings.put("data_partitioning", "doc");
pragmas = new QueryPragmas(pragmaSettings.build());
}
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList()));
List<Object> k2Column = Iterators.toList(resp.column(1));
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null)));
List<Object> meterColumn = Iterators.toList(resp.column(2));
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
var expectedMeterColumn = new ArrayList<>(numDocs);
double val = 0.0;
for (int i = 0; i < numDocs; i++) {
expectedMeterColumn.add(val);
val += 10000.0;
}
assertThat(meterColumn, equalTo(expectedMeterColumn));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@
package org.elasticsearch.xpack.esql.plugin;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceProvider;

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
* This is a workaround for when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
* A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
* its own {@link SourceProvider}.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;
private final Map<Long, SourceProvider> map = ConcurrentCollections.newConcurrentMap();

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
Expand All @@ -29,15 +33,7 @@ final class ReinitializingSourceProvider implements SourceProvider {
@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
return perThreadProvider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {

var sourceProvider = map.computeIfAbsent(currentThread.threadId(), (key) -> sourceProviderFactory.get());
return sourceProvider.getSource(ctx, doc);
}
}

0 comments on commit bf75a92

Please sign in to comment.