From bf75a92ce9bcdc60e55c9e6ce391b8fa5c51d107 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 10 Dec 2024 18:18:17 +0100 Subject: [PATCH] Fix concurrency issue with ReinitializingSourceProvider The previous fix to ensure that each thread uses its own SearchProvider wasn't good enough. When multiple threads access `ReinitializingSourceProvider` the simple thread accounting could still result in returned `SourceProvider` being used by multiple threads concurrently. The ReinitializingSourceProvider was introduced via #117792 Closes #118238 --- .../xpack/esql/action/EsqlActionIT.java | 26 +++++++++++++++---- .../plugin/ReinitializingSourceProvider.java | 20 ++++++-------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 00f53d31165b1..de9eb166688f9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.junit.Before; import java.io.IOException; @@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception { String sourceMode = randomBoolean() ? "stored" : "synthetic"; Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode); client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get(); - for (int i = 0; i < 10; i++) { + int numDocs = 256; + for (int i = 0; i < numDocs; i++) { index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i)); } refresh("test-script"); - try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) { + + var pragmas = randomPragmas(); + if (canUseQueryPragmas()) { + Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings()); + pragmaSettings.put("task_concurrency", 10); + pragmaSettings.put("data_partitioning", "doc"); + pragmas = new QueryPragmas(pragmaSettings.build()); + } + try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) { List k1Column = Iterators.toList(resp.column(0)); - assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)); + assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList())); List k2Column = Iterators.toList(resp.column(1)); - assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null)); + assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null))); List meterColumn = Iterators.toList(resp.column(2)); - assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0)); + var expectedMeterColumn = new ArrayList<>(numDocs); + double val = 0.0; + for (int i = 0; i < numDocs; i++) { + expectedMeterColumn.add(val); + val += 10000.0; + } + assertThat(meterColumn, equalTo(expectedMeterColumn)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java index b6b2c6dfec755..0672ce087bd4f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -8,19 +8,23 @@ package org.elasticsearch.xpack.esql.plugin; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.lookup.Source; import org.elasticsearch.search.lookup.SourceProvider; import java.io.IOException; +import java.util.Map; import java.util.function.Supplier; /** - * This is a workaround for when compute engine executes concurrently with data partitioning by docid. + * This is a workaround for when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). + * A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses + * its own {@link SourceProvider}. */ final class ReinitializingSourceProvider implements SourceProvider { - private PerThreadSourceProvider perThreadProvider; private final Supplier sourceProviderFactory; + private final Map map = ConcurrentCollections.newConcurrentMap(); ReinitializingSourceProvider(Supplier sourceProviderFactory) { this.sourceProviderFactory = sourceProviderFactory; @@ -29,15 +33,7 @@ final class ReinitializingSourceProvider implements SourceProvider { @Override public Source getSource(LeafReaderContext ctx, int doc) throws IOException { var currentThread = Thread.currentThread(); - PerThreadSourceProvider provider = perThreadProvider; - if (provider == null || provider.creatingThread != currentThread) { - provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); - this.perThreadProvider = provider; - } - return perThreadProvider.source.getSource(ctx, doc); - } - - private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) { - + var sourceProvider = map.computeIfAbsent(currentThread.threadId(), (key) -> sourceProviderFactory.get()); + return sourceProvider.getSource(ctx, doc); } }