Skip to content

Commit

Permalink
Merge branch 'main' into remove-awaitsfix-101932
Browse files Browse the repository at this point in the history
  • Loading branch information
cbuescher committed Oct 2, 2024
2 parents 6386dd9 + fca267e commit 1bf60bd
Show file tree
Hide file tree
Showing 200 changed files with 1,664 additions and 680 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
import org.elasticsearch.compute.aggregation.CountDistinctDoubleAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.CountDistinctLongAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.FilteredAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.MaxDoubleAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.MaxLongAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.MinDoubleAggregatorFunctionSupplier;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.AggregationOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.HashAggregationOperator;
import org.elasticsearch.compute.operator.Operator;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -94,13 +97,20 @@ public class AggregatorBenchmark {

private static final String NONE = "none";

private static final String CONSTANT_TRUE = "constant_true";
private static final String ALL_TRUE = "all_true";
private static final String HALF_TRUE = "half_true";
private static final String CONSTANT_FALSE = "constant_false";

static {
// Smoke test all the expected values and force loading subclasses more like prod
try {
for (String grouping : AggregatorBenchmark.class.getField("grouping").getAnnotationsByType(Param.class)[0].value()) {
for (String op : AggregatorBenchmark.class.getField("op").getAnnotationsByType(Param.class)[0].value()) {
for (String blockType : AggregatorBenchmark.class.getField("blockType").getAnnotationsByType(Param.class)[0].value()) {
run(grouping, op, blockType, 50);
for (String filter : AggregatorBenchmark.class.getField("filter").getAnnotationsByType(Param.class)[0].value()) {
run(grouping, op, blockType, filter, 10);
}
}
}
}
Expand All @@ -118,10 +128,14 @@ public class AggregatorBenchmark {
@Param({ VECTOR_LONGS, HALF_NULL_LONGS, VECTOR_DOUBLES, HALF_NULL_DOUBLES })
public String blockType;

private static Operator operator(DriverContext driverContext, String grouping, String op, String dataType) {
@Param({ NONE, CONSTANT_TRUE, ALL_TRUE, HALF_TRUE, CONSTANT_FALSE })
public String filter;

private static Operator operator(DriverContext driverContext, String grouping, String op, String dataType, String filter) {

if (grouping.equals("none")) {
return new AggregationOperator(
List.of(supplier(op, dataType, 0).aggregatorFactory(AggregatorMode.SINGLE).apply(driverContext)),
List.of(supplier(op, dataType, filter, 0).aggregatorFactory(AggregatorMode.SINGLE).apply(driverContext)),
driverContext
);
}
Expand All @@ -144,14 +158,14 @@ private static Operator operator(DriverContext driverContext, String grouping, S
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
};
return new HashAggregationOperator(
List.of(supplier(op, dataType, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
List.of(supplier(op, dataType, filter, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
() -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false),
driverContext
);
}

private static AggregatorFunctionSupplier supplier(String op, String dataType, int dataChannel) {
return switch (op) {
private static AggregatorFunctionSupplier supplier(String op, String dataType, String filter, int dataChannel) {
return filtered(switch (op) {
case COUNT -> CountAggregatorFunction.supplier(List.of(dataChannel));
case COUNT_DISTINCT -> switch (dataType) {
case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(List.of(dataChannel), 3000);
Expand All @@ -174,10 +188,22 @@ private static AggregatorFunctionSupplier supplier(String op, String dataType, i
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
};
default -> throw new IllegalArgumentException("unsupported op [" + op + "]");
};
}, filter);
}

private static void checkExpected(String grouping, String op, String blockType, String dataType, Page page, int opCount) {
private static void checkExpected(
String grouping,
String op,
String blockType,
String filter,
String dataType,
Page page,
int opCount
) {
if (filter.equals(CONSTANT_FALSE) || filter.equals(HALF_TRUE)) {
// We don't verify these because it's hard to get the right answer.
return;
}
String prefix = String.format("[%s][%s][%s] ", grouping, op, blockType);
if (grouping.equals("none")) {
checkUngrouped(prefix, op, dataType, page, opCount);
Expand Down Expand Up @@ -559,27 +585,73 @@ private static BytesRef bytesGroup(int group) {
});
}

private static AggregatorFunctionSupplier filtered(AggregatorFunctionSupplier agg, String filter) {
if (filter.equals("none")) {
return agg;
}
BooleanBlock mask = mask(filter).asBlock();
return new FilteredAggregatorFunctionSupplier(agg, context -> new EvalOperator.ExpressionEvaluator() {
@Override
public Block eval(Page page) {
mask.incRef();
return mask;
}

@Override
public void close() {
mask.close();
}
});
}

private static BooleanVector mask(String filter) {
// Usually BLOCK_LENGTH is the count of positions, but sometimes the blocks are longer
int positionCount = BLOCK_LENGTH * 10;
return switch (filter) {
case CONSTANT_TRUE -> blockFactory.newConstantBooleanVector(true, positionCount);
case ALL_TRUE -> {
try (BooleanVector.Builder builder = blockFactory.newBooleanVectorFixedBuilder(positionCount)) {
for (int i = 0; i < positionCount; i++) {
builder.appendBoolean(true);
}
yield builder.build();
}
}
case HALF_TRUE -> {
try (BooleanVector.Builder builder = blockFactory.newBooleanVectorFixedBuilder(positionCount)) {
for (int i = 0; i < positionCount; i++) {
builder.appendBoolean(i % 2 == 0);
}
yield builder.build();
}
}
case CONSTANT_FALSE -> blockFactory.newConstantBooleanVector(false, positionCount);
default -> throw new IllegalArgumentException("unsupported filter [" + filter + "]");
};
}

@Benchmark
@OperationsPerInvocation(OP_COUNT * BLOCK_LENGTH)
public void run() {
run(grouping, op, blockType, OP_COUNT);
run(grouping, op, blockType, filter, OP_COUNT);
}

private static void run(String grouping, String op, String blockType, int opCount) {
private static void run(String grouping, String op, String blockType, String filter, int opCount) {
// System.err.printf("[%s][%s][%s][%s][%s]\n", grouping, op, blockType, filter, opCount);
String dataType = switch (blockType) {
case VECTOR_LONGS, HALF_NULL_LONGS -> LONGS;
case VECTOR_DOUBLES, HALF_NULL_DOUBLES -> DOUBLES;
default -> throw new IllegalArgumentException();
};

DriverContext driverContext = driverContext();
try (Operator operator = operator(driverContext, grouping, op, dataType)) {
try (Operator operator = operator(driverContext, grouping, op, dataType, filter)) {
Page page = page(driverContext.blockFactory(), grouping, blockType);
for (int i = 0; i < opCount; i++) {
operator.addInput(page.shallowCopy());
}
operator.finish();
checkExpected(grouping, op, blockType, dataType, operator.getOutput(), opCount);
checkExpected(grouping, op, blockType, filter, dataType, operator.getOutput(), opCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.util.automaton.Automata;
import org.apache.lucene.util.automaton.Automaton;
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.apache.lucene.util.automaton.MinimizationOperations;
import org.apache.lucene.util.automaton.Operations;
import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.Build;
Expand Down Expand Up @@ -439,7 +440,7 @@ private static CharacterRunAutomaton buildAutomaton(List<String> includePatterns
? includeAutomaton
: Operations.minus(includeAutomaton, excludeAutomaton, Operations.DEFAULT_DETERMINIZE_WORK_LIMIT);

return new CharacterRunAutomaton(finalAutomaton);
return new CharacterRunAutomaton(MinimizationOperations.minimize(finalAutomaton, Operations.DEFAULT_DETERMINIZE_WORK_LIMIT));
}

private static Automaton patternsToAutomaton(List<String> patterns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;

Expand Down Expand Up @@ -357,6 +358,66 @@ public void testIgnoreMalformedSetting() throws IOException {
}
}

public void testIgnoreAboveSetting() throws IOException {
// with default template
{
assertOK(createDataStream(client, "logs-test-1"));
String logsIndex1 = getDataStreamBackingIndex(client, "logs-test-1", 0);
assertThat(getSetting(client, logsIndex1, "index.mapping.ignore_above"), equalTo("8191"));
for (String newValue : List.of("512", "2048", "12000", String.valueOf(Integer.MAX_VALUE))) {
closeIndex(logsIndex1);
updateIndexSettings(logsIndex1, Settings.builder().put("index.mapping.ignore_above", newValue));
assertThat(getSetting(client, logsIndex1, "index.mapping.ignore_above"), equalTo(newValue));
}
for (String newValue : List.of(String.valueOf((long) Integer.MAX_VALUE + 1), String.valueOf(Long.MAX_VALUE))) {
closeIndex(logsIndex1);
ResponseException ex = assertThrows(
ResponseException.class,
() -> updateIndexSettings(logsIndex1, Settings.builder().put("index.mapping.ignore_above", newValue))
);
assertThat(
ex.getMessage(),
Matchers.containsString("Failed to parse value [" + newValue + "] for setting [index.mapping.ignore_above]")
);
}
}
// with override template
{
var template = """
{
"template": {
"settings": {
"index": {
"mapping": {
"ignore_above": "128"
}
}
}
}
}""";
assertOK(putComponentTemplate(client, "logs@custom", template));
assertOK(createDataStream(client, "logs-custom-dev"));
String index = getDataStreamBackingIndex(client, "logs-custom-dev", 0);
assertThat(getSetting(client, index, "index.mapping.ignore_above"), equalTo("128"));
for (String newValue : List.of("64", "256", "12000", String.valueOf(Integer.MAX_VALUE))) {
closeIndex(index);
updateIndexSettings(index, Settings.builder().put("index.mapping.ignore_above", newValue));
assertThat(getSetting(client, index, "index.mapping.ignore_above"), equalTo(newValue));
}
}
// standard index
{
String index = "test-index";
createIndex(index);
assertThat(getSetting(client, index, "index.mapping.ignore_above"), equalTo(Integer.toString(Integer.MAX_VALUE)));
for (String newValue : List.of("256", "512", "12000", String.valueOf(Integer.MAX_VALUE))) {
closeIndex(index);
updateIndexSettings(index, Settings.builder().put("index.mapping.ignore_above", newValue));
assertThat(getSetting(client, index, "index.mapping.ignore_above"), equalTo(newValue));
}
}
}

private static Map<String, Object> getMapping(final RestClient client, final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_mapping");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class KibanaThreadPoolIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), "1KB")
.put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "1KB")
.put("thread_pool.search.size", 1)
.put("thread_pool.search.queue_size", 1)
.put("thread_pool.write.size", 1)
Expand Down
6 changes: 6 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,12 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/113874
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
issue: https://github.com/elastic/elasticsearch/issues/113916
- class: org.elasticsearch.kibana.KibanaThreadPoolIT
method: testBlockedThreadPoolsRejectUserRequests
issue: https://github.com/elastic/elasticsearch/issues/113939
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=indices.create/20_synthetic_source/object array in object with dynamic override}
issue: https://github.com/elastic/elasticsearch/issues/113966

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class IndexingPressureRestIT extends HttpSmokeTestCase {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put(IndexingPressure.MAX_COORDINATING_BYTES.getKey(), "1KB")
.put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), "1KB")
.put(unboundedWriteQueue)
.build();
}
Expand Down
1 change: 0 additions & 1 deletion rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,5 @@ tasks.named("precommit").configure {
tasks.named("yamlRestCompatTestTransform").configure({task ->
task.skipTest("indices.sort/10_basic/Index Sort", "warning does not exist for compatibility")
task.skipTest("search/330_fetch_fields/Test search rewrite", "warning does not exist for compatibility")
task.skipTest("range/20_synthetic_source/Date range", "date range breaking change causes tests to produce incorrect values for compatibility")
task.skipTestsByFilePattern("indices.create/synthetic_source*.yml", "@UpdateForV9 -> tests do not pass after bumping API version to 9 [ES-9597]")
})
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ object array in object with dynamic override:
_source:
mode: synthetic
properties:
id:
type: integer
path_no:
dynamic: false
properties:
Expand All @@ -552,19 +554,25 @@ object array in object with dynamic override:
refresh: true
body:
- '{ "create": { } }'
- '{ "path_no": [ { "some_int": 10 }, {"name": "foo"} ], "path_runtime": [ { "some_int": 20 }, {"name": "bar"} ], "name": "baz" }'
- '{ "id": 1, "path_no": [ { "some_int": 30 }, {"name": "baz"}, { "some_int": 20 }, {"name": "bar"} ], "name": "A" }'
- '{ "create": { } }'
- '{ "id": 2, "path_runtime": [ { "some_int": 30 }, {"name": "baz"}, { "some_int": 20 }, {"name": "bar"} ], "name": "B" }'
- match: { errors: false }

- do:
search:
index: test
sort: id

- match: { hits.total.value: 1 }
- match: { hits.hits.0._source.name: baz }
- match: { hits.hits.0._source.path_no.0.some_int: 10 }
- match: { hits.hits.0._source.path_no.1.name: foo }
- match: { hits.hits.0._source.path_runtime.0.some_int: 20 }
- match: { hits.hits.0._source.path_runtime.1.name: bar }
- match: { hits.hits.0._source.id: 1 }
- match: { hits.hits.0._source.name: A }
- match: { hits.hits.0._source.path_no.some_int: [ 30, 20 ] }
- match: { hits.hits.0._source.path_no.name: [ bar, baz ] }

- match: { hits.hits.1._source.id: 2 }
- match: { hits.hits.1._source.name: B }
- match: { hits.hits.1._source.path_runtime.some_int: [ 30, 20 ] }
- match: { hits.hits.1._source.path_runtime.name: [ bar, baz ] }


---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ create logs index:
settings:
index:
mode: logsdb
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
Expand Down
Loading

0 comments on commit 1bf60bd

Please sign in to comment.