From c49e6a4599311f69fc35abcb04fc68347d332172 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 11 Dec 2024 11:13:24 +0100 Subject: [PATCH] Fix issues with ReinitializingSourceProvider (#118370) The previous fix to ensure that each thread uses its own SearchProvider wasn't good enough. The read from `perThreadProvider` field could be stale and therefore returning a previous source provider. Instead the source provider should be returned from `provider` local variable. This change also addresses another issue, sometimes current docid goes backwards compared to last seen docid and this causes issue when synthetic source provider is used, as doc values can't advance backwards. This change addresses that by returning a new source provider if backwards docid is detected. Closes #118238 --- docs/changelog/118370.yaml | 6 +++++ .../xpack/esql/action/EsqlActionIT.java | 26 +++++++++++++++---- .../plugin/ReinitializingSourceProvider.java | 17 +++++++++--- 3 files changed, 41 insertions(+), 8 deletions(-) create mode 100644 docs/changelog/118370.yaml diff --git a/docs/changelog/118370.yaml b/docs/changelog/118370.yaml new file mode 100644 index 0000000000000..e6a429448e493 --- /dev/null +++ b/docs/changelog/118370.yaml @@ -0,0 +1,6 @@ +pr: 118370 +summary: Fix concurrency issue with `ReinitializingSourceProvider` +area: Mapping +type: bug +issues: + - 118238 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 00f53d31165b1..de9eb166688f9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.junit.Before; import java.io.IOException; @@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception { String sourceMode = randomBoolean() ? "stored" : "synthetic"; Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode); client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get(); - for (int i = 0; i < 10; i++) { + int numDocs = 256; + for (int i = 0; i < numDocs; i++) { index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i)); } refresh("test-script"); - try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) { + + var pragmas = randomPragmas(); + if (canUseQueryPragmas()) { + Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings()); + pragmaSettings.put("task_concurrency", 10); + pragmaSettings.put("data_partitioning", "doc"); + pragmas = new QueryPragmas(pragmaSettings.build()); + } + try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) { List k1Column = Iterators.toList(resp.column(0)); - assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)); + assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList())); List k2Column = Iterators.toList(resp.column(1)); - assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null)); + assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null))); List meterColumn = Iterators.toList(resp.column(2)); - assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0)); + var expectedMeterColumn = new ArrayList<>(numDocs); + double val = 0.0; + for (int i = 0; i < numDocs; i++) { + expectedMeterColumn.add(val); + val += 10000.0; + } + assertThat(meterColumn, equalTo(expectedMeterColumn)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java index b6b2c6dfec755..8dee3478b3b64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -15,13 +15,23 @@ import java.util.function.Supplier; /** - * This is a workaround for when compute engine executes concurrently with data partitioning by docid. + * This class exists as a workaround for using SourceProvider in the compute engine. + *

+ * The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). + * A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses + * its own {@link SourceProvider}. + *

+ * Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle. */ final class ReinitializingSourceProvider implements SourceProvider { private PerThreadSourceProvider perThreadProvider; private final Supplier sourceProviderFactory; + // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized: + // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards) + private int lastSeenDocId; + ReinitializingSourceProvider(Supplier sourceProviderFactory) { this.sourceProviderFactory = sourceProviderFactory; } @@ -30,11 +40,12 @@ final class ReinitializingSourceProvider implements SourceProvider { public Source getSource(LeafReaderContext ctx, int doc) throws IOException { var currentThread = Thread.currentThread(); PerThreadSourceProvider provider = perThreadProvider; - if (provider == null || provider.creatingThread != currentThread) { + if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) { provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); this.perThreadProvider = provider; } - return perThreadProvider.source.getSource(ctx, doc); + lastSeenDocId = doc; + return provider.source.getSource(ctx, doc); } private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {