diff --git a/.github/workflows/github-merit-badger.yml b/.github/workflows/github-merit-badger.yml deleted file mode 100644 index ee00e62da2f08..0000000000000 --- a/.github/workflows/github-merit-badger.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: github-merit-badger -on: - pull_request_target: - types: - - opened - -jobs: - call-action: - runs-on: ubuntu-latest - permissions: - pull-requests: write - steps: - - uses: aws-github-ops/github-merit-badger@v0.0.98 - id: merit-badger - with: - github-token: ${{ secrets.GITHUB_TOKEN }} - badges: '[first-time-contributor,repeat-contributor,valued-contributor,seasoned-contributor,all-star-contributor,distinguished-contributor]' - thresholds: '[0,3,6,13,25,50]' - badge-type: 'achievement' - ignore-usernames: '[opensearch-ci-bot, dependabot, opensearch-trigger-bot]' diff --git a/CHANGELOG.md b/CHANGELOG.md index 67feeb77c5213..e17b696c1f762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,9 +39,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.maxmind.geoip2:geoip2` from 4.0.0 to 4.0.1 - Bump `com.networknt:json-schema-validator` from 1.0.76 to 1.0.78 - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.0.1 -- Bump `org.apache.zookeeper:zookeeper` from 3.8.0 to 3.8.1 -- Bump `net.minidev:json-smart` from 2.4.8 to 2.4.10 -- Bump `org.apache.maven:maven-model` from 3.8.6 to 3.9.1 ### Changed - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) @@ -67,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove LegacyESVersion.V_7_6_ and V_7_7_ Constants ([#4837](https://github.com/opensearch-project/OpenSearch/pull/4837)) - Remove LegacyESVersion.V_7_10_ Constants ([#5018](https://github.com/opensearch-project/OpenSearch/pull/5018)) - Remove Version.V_1_ Constants ([#5021](https://github.com/opensearch-project/OpenSearch/pull/5021)) +- Remove custom Map, List and Set collection classes ([#6871](https://github.com/opensearch-project/OpenSearch/pull/6871)) ### Fixed - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) @@ -90,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791)) - Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464) - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331)) +- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) @@ -101,6 +100,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.google.protobuf:protobuf-java` from 3.22.0 to 3.22.2 - Bump Netty to 4.1.90.Final ([#6677](https://github.com/opensearch-project/OpenSearch/pull/6677) - Bump `com.diffplug.spotless` from 6.15.0 to 6.17.0 +- Bump `org.apache.zookeeper:zookeeper` from 3.8.0 to 3.8.1 +- Bump `net.minidev:json-smart` from 2.4.7 to 2.4.10 +- Bump `org.apache.maven:maven-model` from 3.6.2 to 3.9.1 +- Bump `org.codehaus.jettison:jettison` from 1.5.3 to 1.5.4 ([#6878](https://github.com/opensearch-project/OpenSearch/pull/6878)) ### Changed - Require MediaType in Strings.toString API ([#6009](https://github.com/opensearch-project/OpenSearch/pull/6009)) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java new file mode 100644 index 0000000000000..03d541dbb7de5 --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.store.remote.filecache; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.store.IndexInput; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.opensearch.index.store.remote.filecache.CachedIndexInput; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; + +/** + * Simple benchmark test of {@link FileCache}. It uses a uniform random distribution + * of keys, which is very simple but unlikely to be representative of any real life + * workload. + */ +@Warmup(iterations = 1) +@Measurement(iterations = 1) +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +@Threads(8) +@SuppressWarnings("unused") // invoked by benchmarking framework +public class FileCacheBenchmark { + private static final CachedIndexInput INDEX_INPUT = new FixedSizeStubIndexInput(); + + @Benchmark + public void get(CacheParameters parameters, Blackhole blackhole) { + blackhole.consume(parameters.fileCache.get(randomKeyInCache(parameters))); + } + + @Benchmark + public void replace(CacheParameters parameters, Blackhole blackhole) { + blackhole.consume(parameters.fileCache.put(randomKeyInCache(parameters), INDEX_INPUT)); + } + + @Benchmark + public void put(CacheParameters parameters, Blackhole blackhole) { + blackhole.consume(parameters.fileCache.put(randomKeyNotInCache(parameters), INDEX_INPUT)); + } + + @Benchmark + public void remove(CacheParameters parameters) { + parameters.fileCache.remove(randomKeyInCache(parameters)); + } + + private static Path randomKeyInCache(CacheParameters parameters) { + int i = ThreadLocalRandom.current().nextInt(parameters.maximumNumberOfEntries); + return Paths.get(Integer.toString(i)); + } + + private static Path randomKeyNotInCache(CacheParameters parameters) { + int i = ThreadLocalRandom.current().nextInt(parameters.maximumNumberOfEntries, parameters.maximumNumberOfEntries * 2); + return Paths.get(Integer.toString(i)); + } + + @State(Scope.Benchmark) + public static class CacheParameters { + @Param({ "65536", "1048576" }) + int maximumNumberOfEntries; + + @Param({ "1", "8" }) + int concurrencyLevel; + + FileCache fileCache; + + @Setup + public void setup() { + fileCache = FileCacheFactory.createConcurrentLRUFileCache( + (long) maximumNumberOfEntries * INDEX_INPUT.length(), + concurrencyLevel + ); + for (long i = 0; i < maximumNumberOfEntries; i++) { + final Path key = Paths.get(Long.toString(i)); + fileCache.put(key, INDEX_INPUT); + fileCache.decRef(key); + } + } + } + + /** + * Stubbed out IndexInput that does nothing but report a fixed size + */ + private static class FixedSizeStubIndexInput extends CachedIndexInput { + private FixedSizeStubIndexInput() { + super(FixedSizeStubIndexInput.class.getSimpleName()); + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() {} + + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + @Override + public void seek(long pos) { + throw new UnsupportedOperationException(); + } + + @Override + public long length() { + return 1024 * 1024 * 8; // 8MiB + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 21070972d2a23..3a1013c01dd53 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -114,7 +114,7 @@ dependencies { api 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' api 'org.jdom:jdom2:2.0.6.1' api "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${props.getProperty('kotlin')}" - api 'de.thetaphi:forbiddenapis:3.4' + api 'de.thetaphi:forbiddenapis:3.5.1' api 'com.avast.gradle:gradle-docker-compose-plugin:0.16.11' api "org.yaml:snakeyaml:${props.getProperty('snakeyaml')}" api 'org.apache.maven:maven-model:3.9.1' diff --git a/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditPrecommitPlugin.java b/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditPrecommitPlugin.java index b2d12c4fc2f79..d83f1b01ee043 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditPrecommitPlugin.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditPrecommitPlugin.java @@ -51,7 +51,7 @@ public class ThirdPartyAuditPrecommitPlugin extends PrecommitPlugin { public TaskProvider createTask(Project project) { project.getPlugins().apply(CompileOnlyResolvePlugin.class); project.getConfigurations().create("forbiddenApisCliJar"); - project.getDependencies().add("forbiddenApisCliJar", "de.thetaphi:forbiddenapis:3.4"); + project.getDependencies().add("forbiddenApisCliJar", "de.thetaphi:forbiddenapis:3.5.1"); Configuration jdkJarHellConfig = project.getConfigurations().create(JDK_JAR_HELL_CONFIG_NAME); if (BuildParams.isInternal() && project.getPath().equals(":libs:opensearch-core") == false) { diff --git a/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditTask.java b/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditTask.java index 88af1ef8c94e3..6139291b9be1b 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditTask.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/precommit/ThirdPartyAuditTask.java @@ -79,9 +79,7 @@ @CacheableTask public class ThirdPartyAuditTask extends DefaultTask { - private static final Pattern MISSING_CLASS_PATTERN = Pattern.compile( - "WARNING: Class '(.*)' cannot be loaded \\(.*\\)\\. Please fix the classpath!" - ); + private static final Pattern MISSING_CLASS_PATTERN = Pattern.compile("DEBUG: Class '(.*)' cannot be loaded \\(.*\\)\\."); private static final Pattern VIOLATION_PATTERN = Pattern.compile("\\s\\sin ([a-zA-Z0-9$.]+) \\(.*\\)"); private static final int SIG_KILL_EXIT_VALUE = 137; @@ -367,7 +365,7 @@ private String runForbiddenAPIsCli() throws IOException { spec.jvmArgs("-Xmx1g"); spec.jvmArgs(LoggedExec.shortLivedArgs()); spec.getMainClass().set("de.thetaphi.forbiddenapis.cli.CliMain"); - spec.args("-f", getSignatureFile().getAbsolutePath(), "-d", getJarExpandDir(), "--allowmissingclasses"); + spec.args("-f", getSignatureFile().getAbsolutePath(), "-d", getJarExpandDir(), "--debug", "--allowmissingclasses"); spec.setErrorOutput(errorOut); if (getLogger().isInfoEnabled() == false) { spec.setStandardOutput(new NullOutputStream()); diff --git a/buildSrc/src/testKit/thirdPartyAudit/build.gradle b/buildSrc/src/testKit/thirdPartyAudit/build.gradle index 537bf3c1fad71..553ff5d8e6ed2 100644 --- a/buildSrc/src/testKit/thirdPartyAudit/build.gradle +++ b/buildSrc/src/testKit/thirdPartyAudit/build.gradle @@ -40,7 +40,6 @@ repositories { } dependencies { - forbiddenApisCliJar 'de.thetaphi:forbiddenapis:3.4' jdkJarHell 'org.opensearch:opensearch-core:current' compileOnly "org.${project.properties.compileOnlyGroup}:${project.properties.compileOnlyVersion}" implementation "org.${project.properties.compileGroup}:${project.properties.compileVersion}" diff --git a/buildSrc/version.properties b/buildSrc/version.properties index f2a91c6740f5b..93bb82c7b5a01 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -17,18 +17,18 @@ supercsv = 2.4.0 log4j = 2.17.1 slf4j = 1.7.36 asm = 9.4 -jettison = 1.5.3 +jettison = 1.5.4 woodstox = 6.4.0 kotlin = 1.7.10 antlr4 = 4.11.1 guava = 31.1-jre - + # when updating the JNA version, also update the version in buildSrc/build.gradle jna = 5.5.0 netty = 4.1.90.Final joda = 2.12.2 - + # client dependencies httpclient5 = 5.1.4 httpcore5 = 5.1.5 diff --git a/libs/common/src/main/java/org/opensearch/common/collect/List.java b/libs/common/src/main/java/org/opensearch/common/collect/List.java deleted file mode 100644 index 07cfc2c019856..0000000000000 --- a/libs/common/src/main/java/org/opensearch/common/collect/List.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.common.collect; - -import java.util.Collection; - -/** - * Java 9 List - * - * - * @opensearch.internal - */ -@Deprecated(forRemoval = true) -public class List { - - /** - * Delegates to the Java9 {@code List.of()} method. - * - * @param the {@code List}'s element type - * @return an empty {@code List} - */ - public static java.util.List of() { - return java.util.List.of(); - } - - /** - * Delegates to the Java9 {@code List.of()} method. - * - * @param the {@code List}'s element type - * @param e1 the single element - * @return a {@code List} containing the specified element - */ - public static java.util.List of(T e1) { - return java.util.List.of(e1); - } - - /** - * Delegates to the Java9 {@code List.of()} method. - * - * @param the {@code List}'s element type - * @param e1 the single element - * @return a {@code List} containing the specified element - */ - public static java.util.List of(T e1, T e2) { - return java.util.List.of(e1, e2); - } - - /** - * Delegates to the Java9 {@code List.of()} method. - * - * @param entries the elements to be contained in the list - * @param the {@code List}'s element type - * @return an unmodifiable list containing the specified elements. - */ - @SafeVarargs - @SuppressWarnings("varargs") - public static java.util.List of(T... entries) { - return java.util.List.of(entries); - } - - /** - * Delegates to the Java9 {@code List.copyOf()} method. - * - * @param the {@code List}'s element type - * @param coll a {@code Collection} from which elements are drawn, must be non-null - * @return a {@code List} containing the elements of the given {@code Collection} - */ - public static java.util.List copyOf(Collection coll) { - return java.util.List.copyOf(coll); - } -} diff --git a/libs/common/src/main/java/org/opensearch/common/collect/Map.java b/libs/common/src/main/java/org/opensearch/common/collect/Map.java deleted file mode 100644 index 3913c0fd942a4..0000000000000 --- a/libs/common/src/main/java/org/opensearch/common/collect/Map.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.common.collect; - -/** - * Java 9 Map - * - * - * @opensearch.internal - */ -@Deprecated(forRemoval = true) -public class Map { - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of() { - return java.util.Map.of(); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1) { - return java.util.Map.of(k1, v1); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1, K k2, V v2) { - return java.util.Map.of(k1, v1, k2, v2); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1, K k2, V v2, K k3, V v3) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1, K k2, V v2, K k3, V v3, K k4, V v4) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1, K k2, V v2, K k3, V v3, K k4, V v4, K k5, V v5) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1, K k2, V v2, K k3, V v3, K k4, V v4, K k5, V v5, K k6, V v6) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5, k6, v6); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of(K k1, V v1, K k2, V v2, K k3, V v3, K k4, V v4, K k5, V v5, K k6, V v6, K k7, V v7) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5, k6, v6, k7, v7); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of( - K k1, - V v1, - K k2, - V v2, - K k3, - V v3, - K k4, - V v4, - K k5, - V v5, - K k6, - V v6, - K k7, - V v7, - K k8, - V v8 - ) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5, k6, v6, k7, v7, k8, v8); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of( - K k1, - V v1, - K k2, - V v2, - K k3, - V v3, - K k4, - V v4, - K k5, - V v5, - K k6, - V v6, - K k7, - V v7, - K k8, - V v8, - K k9, - V v9 - ) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5, k6, v6, k7, v7, k8, v8, k9, v9); - } - - /** - * Delegates to the Java9 {@code Map.of()} method. - */ - public static java.util.Map of( - K k1, - V v1, - K k2, - V v2, - K k3, - V v3, - K k4, - V v4, - K k5, - V v5, - K k6, - V v6, - K k7, - V v7, - K k8, - V v8, - K k9, - V v9, - K k10, - V v10 - ) { - return java.util.Map.of(k1, v1, k2, v2, k3, v3, k4, v4, k5, v5, k6, v6, k7, v7, k8, v8, k9, v9, k10, v10); - } - - /** - * Delegates to the Java9 {@code Map.ofEntries()} method. - */ - @SafeVarargs - @SuppressWarnings("varargs") - public static java.util.Map ofEntries(java.util.Map.Entry... entries) { - return java.util.Map.ofEntries(entries); - } - - /** - * Delegates to the Java9 {@code Map.entry()} method. - */ - public static java.util.Map.Entry entry(K k, V v) { - return java.util.Map.entry(k, v); - } - - /** - * Delegates to the Java10 {@code Map.copyOf()} method. - */ - public static java.util.Map copyOf(java.util.Map map) { - return java.util.Map.copyOf(map); - } - -} diff --git a/libs/common/src/main/java/org/opensearch/common/collect/Set.java b/libs/common/src/main/java/org/opensearch/common/collect/Set.java deleted file mode 100644 index 0c3899b2aaacd..0000000000000 --- a/libs/common/src/main/java/org/opensearch/common/collect/Set.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.common.collect; - -import java.util.Collection; - -/** - * Java 9 Set - * - * - * @opensearch.internal - */ -@Deprecated(forRemoval = true) -public class Set { - - /** - * Delegates to the Java9 {@code Set.of()} method. - * - * @param the {@code Set}'s element type - * @return an empty {@code Set} - */ - public static java.util.Set of() { - return java.util.Set.of(); - } - - /** - * Delegates to the Java9 {@code Set.of()} method. - * - * @param the {@code Set}'s element type - * @param e1 the single element - * @return a {@code Set} containing the specified element - */ - public static java.util.Set of(T e1) { - return java.util.Set.of(e1); - } - - /** - * Delegates to the Java9 {@code Set.of()} method. - * - * @param the {@code Set}'s element type - * @param e1 the first element - * @param e2 the second element - * @return a {@code Set} containing the specified element - */ - public static java.util.Set of(T e1, T e2) { - return java.util.Set.of(e1, e2); - } - - /** - * Delegates to the Java9 {@code Set.of()} method. - * - * @param entries the elements to be contained in the set - * @param the {@code Set}'s element type - * @return an unmodifiable set containing the specified elements. - */ - @SafeVarargs - @SuppressWarnings("varargs") - public static java.util.Set of(T... entries) { - return java.util.Set.of(entries); - } - - /** - * Delegates to the Java10 {@code Set.copyOf} method. - * - * @param the {@code Set}'s element type - * @param coll a {@code Collection} from which elements are drawn, must be non-null - * @return a {@code Set} containing the elements of the given {@code Collection} - */ - public static java.util.Set copyOf(Collection coll) { - return java.util.Set.copyOf(coll); - } -} diff --git a/plugins/discovery-azure-classic/licenses/jettison-1.5.3.jar.sha1 b/plugins/discovery-azure-classic/licenses/jettison-1.5.3.jar.sha1 deleted file mode 100644 index afd13439e739c..0000000000000 --- a/plugins/discovery-azure-classic/licenses/jettison-1.5.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -964d35bbdecbbc33cf2f2044e8a1648d9f6f1474 \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/jettison-1.5.4.jar.sha1 b/plugins/discovery-azure-classic/licenses/jettison-1.5.4.jar.sha1 new file mode 100644 index 0000000000000..a87b7691bfce8 --- /dev/null +++ b/plugins/discovery-azure-classic/licenses/jettison-1.5.4.jar.sha1 @@ -0,0 +1 @@ +174ca56c411b05aec323d8f53e66709c0d28b337 \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index ee5150c97fb4f..ad6c396df69a1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -17,6 +17,7 @@ import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.SegmentReplicationBaseIT; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestStatus; import org.opensearch.test.OpenSearchIntegTestCase; @@ -52,12 +53,24 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + @Override + public Settings indexSettings() { + // we want to control refreshes + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.refresh_interval", -1) + .build(); + } + @Override protected Collection> nodePlugins() { return asList(MockTransportService.TestPlugin.class); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671") public void testWritesRejected() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -76,6 +89,10 @@ public void testWritesRejected() throws Exception { indexingThread.start(); indexingThread.join(); latch.await(); + + indexDoc(); + totalDocs.incrementAndGet(); + refresh(INDEX_NAME); // index again while we are stale. assertBusy(() -> { expectThrows(OpenSearchRejectedExecutionException.class, () -> { @@ -90,6 +107,7 @@ public void testWritesRejected() throws Exception { // index another doc showing there is no pressure enforced. indexDoc(); + refresh(INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); verifyStoreContent(); } @@ -98,7 +116,6 @@ public void testWritesRejected() throws Exception { * This test ensures that a replica can be added while the index is under write block. * Ensuring that only write requests are blocked. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671") public void testAddReplicaWhileWritesBlocked() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -118,6 +135,9 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { indexingThread.start(); indexingThread.join(); latch.await(); + indexDoc(); + totalDocs.incrementAndGet(); + refresh(INDEX_NAME); // index again while we are stale. assertBusy(() -> { expectThrows(OpenSearchRejectedExecutionException.class, () -> { @@ -142,6 +162,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { // index another doc showing there is no pressure enforced. indexDoc(); + refresh(INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); verifyStoreContent(); } @@ -258,7 +279,7 @@ private void assertFailedRequests(BulkResponse response) { } private void indexDoc() { - client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get(); + client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").execute().actionGet(); } private void assertEqualSegmentInfosVersion(List replicaNames, IndexShard primaryShard) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 1482e73efdace..197a364ddeaa0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -49,6 +49,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -513,6 +514,292 @@ public void testShardRoutingWithNetworkDisruption_FailOpenEnabled() throws Excep assertNoSearchInAZ("a"); } + public void testStrictWeightedRoutingWithCustomString_FailOpenEnabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + String customPreference = randomAlphaOfLength(10); + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(customPreference) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + logger.info("--> shards should fail due to network disruption"); + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + try { + assertSearchInAZ("b"); + } catch (AssertionError ae) { + assertSearchInAZ("c"); + } + assertNoSearchInAZ("a"); + } + + public void testStrictWeightedRoutingWithCustomString_FailOpenDisabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + String customPreference = randomAlphaOfLength(10); + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(customPreference) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + logger.info("--> shards should fail due to network disruption"); + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertNotEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + Set expectedHotNodes = new HashSet<>(); + for (DiscoveryNode node : dataNodes) { + if (node.getAttributes().getOrDefault("zone", "").equals("b")) { + expectedHotNodes.add(node.getId()); + } + } + + assertEquals(expectedHotNodes, hitNodes); + + assertSearchInAZ("b"); + assertNoSearchInAZ("c"); + assertNoSearchInAZ("a"); + } + + /** + * Should failopen shards even if failopen enabled with custom search preference. + * @throws Exception + */ + public void testStrictWeightedRoutingWithShardPrefNetworkDisruption_FailOpenEnabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("c").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + ShardId shardId = internalCluster().clusterService() + .state() + .getRoutingTable() + .index("test") + .randomAllActiveShardsIt() + .getShardRoutings() + .stream() + .filter(shard -> { + return dataNodes.get(shard.currentNodeId()).getAttributes().getOrDefault("zone", "").equals("c"); + }) + .findFirst() + .get() + .shardId(); + + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("c").get(0)) + .prepareSearch("test") + .setPreference(String.format(Locale.ROOT, "_shards:%s", shardId.getId())) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertNoSearchInAZ("a"); + try { + assertSearchInAZ("c"); + } catch (AssertionError ae) { + assertSearchInAZ("b"); + } + } + + public void testStrictWeightedRoutingWithShardPref() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + ShardId shardId = internalCluster().clusterService() + .state() + .getRoutingTable() + .index("test") + .randomAllActiveShardsIt() + .getShardRoutings() + .stream() + .filter(shard -> { + return dataNodes.get(shard.currentNodeId()).getAttributes().getOrDefault("zone", "").equals("c"); + }) + .findFirst() + .get() + .shardId(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(String.format(Locale.ROOT, "_shards:%s", shardId.getId())) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + assertNotEquals(searchResponse.getHits().getTotalHits().value, 0); + } catch (Exception t) { + fail("search should not fail"); + } + } + assertNoSearchInAZ("c"); + } + private void assertNoSearchInAZ(String az) { ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); String dataNodeId = null; @@ -806,7 +1093,6 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio * Assert that preference search with custom string doesn't hit a node in weighed away az */ public void testStrictWeightedRoutingWithCustomString() { - Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -844,13 +1130,18 @@ public void testStrictWeightedRoutingWithCustomString() { .get(); // make search requests with custom string - searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + internalCluster().client(nodeMap.get("a").get(0)) .prepareSearch() .setSize(20) .setPreference(customPreference) + .setQuery(QueryBuilders.matchAllQuery()) .get(); // assert search on data nodes on az c (weighed away az) - assertSearchInAZ("c"); + try { + assertSearchInAZ("c"); + } catch (AssertionError ae) { + assertSearchInAZ("a"); + } } @@ -889,24 +1180,61 @@ public void testPreferenceSearchWithWeightedRouting() { SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) .prepareSearch() - .setSize(0) - .setPreference("_local") + .setPreference(randomFrom("_local", "_prefer_nodes:" + "zone:a", customPreference)) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); - searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) .prepareSearch() - .setSize(0) .setPreference( "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) ) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + } - searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + public void testPreferenceSearchWithIgnoreWeightedRouting() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", false) + .put("cluster.routing.ignore_weighted_routing", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + String customPreference = randomAlphaOfLength(10); + String nodeInZoneA = nodeMap.get("a").get(0); + String nodeInZoneB = nodeMap.get("b").get(0); + String nodeInZoneC = nodeMap.get("c").get(0); + + Map nodeIDMap = new HashMap<>(); + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) .prepareSearch() - .setSize(0) - .setPreference("_prefer_nodes:zone:a") + .setPreference(randomFrom("_local", "_prefer_nodes:" + "zone:a", customPreference)) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + .prepareSearch() + .setPreference( + "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) + ) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); } @@ -915,7 +1243,6 @@ public void testPreferenceSearchWithWeightedRouting() { * Assert that preference based search with preference type is not allowed with strict weighted shard routing */ public void testStrictWeightedRouting() { - Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -935,11 +1262,6 @@ public void testStrictWeightedRouting() { setShardRoutingWeights(weights); String nodeInZoneA = nodeMap.get("a").get(0); - assertThrows( - PreferenceBasedSearchNotAllowedException.class, - () -> internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_local").get() - ); - assertThrows( PreferenceBasedSearchNotAllowedException.class, () -> internalCluster().client(nodeMap.get("b").get(0)) @@ -957,7 +1279,48 @@ public void testStrictWeightedRouting() { .setPreference("_prefer_nodes:" + nodeInZoneA) .get() ); + } + public void testStrictWeightedRoutingAllowedForSomeSearchPrefs() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + String nodeInZoneA = nodeMap.get("a").get(0); + String customPreference = randomAlphaOfLength(10); + + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_only_local:" + nodeInZoneA) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_local:" + nodeInZoneA) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_shards:1").get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference(customPreference).get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); } /** diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 7857c6430a5e3..30c2123b30aa7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -537,6 +537,7 @@ public void testPruneFileCacheOnIndexDeletion() throws Exception { assertAllNodesFileCacheEmpty(); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6888") public void testCacheFilesAreClosedAfterUse() throws Exception { final int numReplicasIndex = randomIntBetween(1, 4); final String indexName = "test-idx"; diff --git a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java index dbef876c9a258..72c189f20eaf6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java @@ -20,6 +20,8 @@ import java.util.List; +import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING; + /** * This class contains logic to find next shard to retry search request in case of failure from other shard copy. * This decides if retryable shard search requests can be tried on shard copies present in data @@ -72,9 +74,13 @@ public SearchShardTarget findNext( Runnable onShardSkipped ) { SearchShardTarget next = shardIt.nextOrNull(); + if (ignoreWeightedRouting(clusterState)) { + return next; + } + while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) { SearchShardTarget nextShard = next; - if (canFailOpen(nextShard.getShardId(), exception, clusterState)) { + if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) { logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception); getWeightedRoutingStats().updateFailOpenCount(); break; @@ -98,10 +104,13 @@ public SearchShardTarget findNext( */ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) { ShardRouting next = shardsIt.nextOrNull(); + if (ignoreWeightedRouting(clusterState)) { + return next; + } while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) { ShardRouting nextShard = next; - if (canFailOpen(nextShard.shardId(), exception, clusterState)) { + if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) { logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception); getWeightedRoutingStats().updateFailOpenCount(); break; @@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster * @return true if can fail open ie request shard copies present in nodes with weighted shard * routing weight set to zero */ - private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) { - return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); + private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) { + return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); } private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) { @@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI return false; } + private boolean ignoreWeightedRouting(ClusterState clusterState) { + return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings()); + } + public WeightedRoutingStats getWeightedRoutingStats() { return WeightedRoutingStats.getInstance(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 6a877838ece95..711a750ade712 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -324,9 +324,12 @@ public ShardIterator activeInitializingShardsWeightedIt( WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight, - boolean isFailOpenEnabled + boolean isFailOpenEnabled, + @Nullable Integer seed ) { - final int seed = shufflerForWeightedRouting.nextSeed(); + if (seed == null) { + seed = shufflerForWeightedRouting.nextSeed(); + } List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); // append shards for attribute value with weight zero, so that shard search requests can be tried on @@ -350,6 +353,7 @@ public ShardIterator activeInitializingShardsWeightedIt( logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); } } + return new PlainShardIterator(shardId, ordered); } @@ -371,21 +375,6 @@ private List activeInitializingShardsWithWeights( return orderedListWithDistinctShards; } - /** - * Returns an iterator over active and initializing shards, shards are ordered by weighted - * round-robin scheduling policy. Uses the passed seed to shuffle the shards. - * - */ - public ShardIterator activeInitializingShardsSimpleWeightedIt( - WeightedRouting weightedRouting, - DiscoveryNodes nodes, - double defaultWeight, - int seed - ) { - List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); - return new PlainShardIterator(shardId, ordered); - } - /** * Returns a list containing shard routings ordered using weighted round-robin scheduling. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index b247936245151..566eefe2c4f1a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -93,16 +93,30 @@ public class OperationRouting { public static final Setting STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting( "cluster.routing.weighted.strict", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting( + "cluster.routing.ignore_weighted_routing", false, Setting.Property.Dynamic, Setting.Property.NodeScope ); + + private static final List WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList( + Preference.ONLY_NODES, + Preference.PREFER_NODES + ); + private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; private volatile double weightedRoutingDefaultWeight; private volatile boolean isFailOpenEnabled; private volatile boolean isStrictWeightedShardRouting; + private volatile boolean ignoreWeightedRouting; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings); this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings); + this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting); + clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) { this.isStrictWeightedShardRouting = strictWeightedShardRouting; } + void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) { + this.ignoreWeightedRouting = isWeightedRoundRobinEnabled; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -314,11 +334,7 @@ private ShardIterator preferenceActiveShardIterator( } } preferenceType = Preference.parse(preference); - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { - throw new PreferenceBasedSearchNotAllowedException( - "Preference type based routing not allowed with strict weighted shard routing enabled" - ); - } + checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata); switch (preferenceType) { case PREFER_NODES: final Set nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")) @@ -344,11 +360,16 @@ private ShardIterator preferenceActiveShardIterator( // for a different element in the list by also incorporating the // shard ID into the hash of the user-supplied preference key. routingHash = 31 * routingHash + indexShard.shardId.hashCode(); - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { - return indexShard.activeInitializingShardsSimpleWeightedIt( + if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting( + isStrictWeightedShardRouting, + ignoreWeightedRouting, + weightedRoutingMetadata + )) { + return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, getWeightedRoutingDefaultWeight(), + isFailOpenEnabled, routingHash ); } else if (ignoreAwarenessAttributes()) { @@ -365,12 +386,13 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { + if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, getWeightedRoutingDefaultWeight(), - isFailOpenEnabled + isFailOpenEnabled, + null ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { @@ -438,4 +460,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor(); } + private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) { + if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting( + isStrictWeightedShardRouting, + ignoreWeightedRouting, + weightedRoutingMetadata + ) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) { + throw new PreferenceBasedSearchNotAllowedException( + "Preference type based routing not allowed with strict weighted shard routing enabled" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java index c7d40cbbbea61..72387aad0fa45 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java @@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) { } return false; } + + public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) { + return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet(); + } + + public static boolean shouldPerformStrictWeightedRouting( + boolean isStrictWeightedShardRouting, + boolean ignoreWeightedRouting, + WeightedRoutingMetadata weightedRoutingMetadata + ) { + return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index edc5389798fac..b7979407f97a0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -546,6 +546,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED, OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, + OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/CollectionUtils.java b/server/src/main/java/org/opensearch/common/util/CollectionUtils.java index d8a86f4878f58..6452d7061fdfa 100644 --- a/server/src/main/java/org/opensearch/common/util/CollectionUtils.java +++ b/server/src/main/java/org/opensearch/common/util/CollectionUtils.java @@ -32,12 +32,10 @@ package org.opensearch.common.util; -import com.carrotsearch.hppc.ObjectArrayList; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefArray; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.InPlaceMergeSorter; -import org.apache.lucene.util.IntroSorter; import org.opensearch.common.Strings; import org.opensearch.common.collect.Iterators; @@ -50,7 +48,9 @@ import java.util.Comparator; import java.util.HashMap; import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -95,59 +95,28 @@ public static List rotate(final List list, int distance) { return new RotatedList<>(list, d); } - public static void sortAndDedup(final ObjectArrayList array) { - int len = array.size(); - if (len > 1) { - sort(array); - int uniqueCount = 1; - for (int i = 1; i < len; ++i) { - if (!Arrays.equals(array.get(i), array.get(i - 1))) { - array.set(uniqueCount++, array.get(i)); - } - } - array.elementsCount = uniqueCount; + /** + * in place de-duplicates items in a list + */ + public static void sortAndDedup(final List array, Comparator comparator) { + // base case: one item + if (array.size() <= 1) { + return; } - } - - public static void sort(final ObjectArrayList array) { - new IntroSorter() { - - byte[] pivot; - - @Override - protected void swap(int i, int j) { - final byte[] tmp = array.get(i); - array.set(i, array.get(j)); - array.set(j, tmp); - } - - @Override - protected int compare(int i, int j) { - return compare(array.get(i), array.get(j)); - } - - @Override - protected void setPivot(int i) { - pivot = array.get(i); - } - - @Override - protected int comparePivot(int j) { - return compare(pivot, array.get(j)); + array.sort(comparator); + ListIterator deduped = array.listIterator(); + T cmp = deduped.next(); // return the first item and advance + Iterator oldArray = array.iterator(); + oldArray.next(); // advance to the old to the second item (advanced to third below) + + do { + T old = oldArray.next(); // get the next item and advance iter + if (comparator.compare(cmp, old) != 0 && (cmp = deduped.next()) != old) { + deduped.set(old); } - - private int compare(byte[] left, byte[] right) { - for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { - int a = left[i] & 0xFF; - int b = right[j] & 0xFF; - if (a != b) { - return a - b; - } - } - return left.length - right.length; - } - - }.sort(0, array.size()); + } while (oldArray.hasNext()); + // in place update + array.subList(deduped.nextIndex(), array.size()).clear(); } public static int[] toArray(Collection ints) { diff --git a/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java b/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java index 9455df1bd0d6b..5cbc877f39eda 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java +++ b/server/src/main/java/org/opensearch/extensions/rest/ExtensionRestResponse.java @@ -11,6 +11,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; import java.util.List; @@ -32,7 +33,7 @@ public class ExtensionRestResponse extends BytesRestResponse { * @param status The REST status. * @param builder The builder for the response. */ - public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, XContentBuilder builder) { + public ExtensionRestResponse(RestRequest request, RestStatus status, XContentBuilder builder) { super(status, builder); this.consumedParams = request.consumedParams(); this.contentConsumed = request.isContentConsumed(); @@ -45,7 +46,7 @@ public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, XC * @param status The REST status. * @param content A plain text response string. */ - public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String content) { + public ExtensionRestResponse(RestRequest request, RestStatus status, String content) { super(status, content); this.consumedParams = request.consumedParams(); this.contentConsumed = request.isContentConsumed(); @@ -59,7 +60,7 @@ public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, St * @param contentType The content type of the response string. * @param content A response string. */ - public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String contentType, String content) { + public ExtensionRestResponse(RestRequest request, RestStatus status, String contentType, String content) { super(status, contentType, content); this.consumedParams = request.consumedParams(); this.contentConsumed = request.isContentConsumed(); @@ -73,7 +74,7 @@ public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, St * @param contentType The content type of the response bytes. * @param content Response bytes. */ - public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String contentType, byte[] content) { + public ExtensionRestResponse(RestRequest request, RestStatus status, String contentType, byte[] content) { super(status, contentType, content); this.consumedParams = request.consumedParams(); this.contentConsumed = request.isContentConsumed(); @@ -87,7 +88,7 @@ public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, St * @param contentType The content type of the response bytes. * @param content Response bytes. */ - public ExtensionRestResponse(ExtensionRestRequest request, RestStatus status, String contentType, BytesReference content) { + public ExtensionRestResponse(RestRequest request, RestStatus status, String contentType, BytesReference content) { super(status, contentType, content); this.consumedParams = request.consumedParams(); this.contentConsumed = request.isContentConsumed(); diff --git a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java index 6965f3bf664ed..3977c7c1512fa 100644 --- a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java @@ -32,7 +32,6 @@ package org.opensearch.index.mapper; -import com.carrotsearch.hppc.ObjectArrayList; import org.apache.lucene.document.StoredField; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; @@ -52,6 +51,7 @@ import java.io.IOException; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collections; @@ -240,35 +240,34 @@ protected String contentType() { */ public static class CustomBinaryDocValuesField extends CustomDocValuesField { - private final ObjectArrayList bytesList; - - private int totalSize = 0; + private final ArrayList bytesList; public CustomBinaryDocValuesField(String name, byte[] bytes) { super(name); - bytesList = new ObjectArrayList<>(); + bytesList = new ArrayList<>(); add(bytes); } public void add(byte[] bytes) { bytesList.add(bytes); - totalSize += bytes.length; } @Override public BytesRef binaryValue() { try { - CollectionUtils.sortAndDedup(bytesList); - int size = bytesList.size(); - BytesStreamOutput out = new BytesStreamOutput(totalSize + (size + 1) * 5); - out.writeVInt(size); // write total number of values - for (int i = 0; i < size; i++) { - final byte[] value = bytesList.get(i); - int valueLength = value.length; - out.writeVInt(valueLength); - out.writeBytes(value, 0, valueLength); + // sort and dedup in place + CollectionUtils.sortAndDedup(bytesList, Arrays::compareUnsigned); + int size = bytesList.stream().map(b -> b.length).reduce(0, Integer::sum); + int length = bytesList.size(); + try (BytesStreamOutput out = new BytesStreamOutput(size + (length + 1) * 5)) { + out.writeVInt(length); // write total number of values + for (byte[] value : bytesList) { + int valueLength = value.length; + out.writeVInt(valueLength); + out.writeBytes(value, 0, valueLength); + } + return out.bytes().toBytesRef(); } - return out.bytes().toBytesRef(); } catch (IOException e) { throw new OpenSearchException("Failed to get binary value", e); } diff --git a/server/src/main/java/org/opensearch/rest/RestRequest.java b/server/src/main/java/org/opensearch/rest/RestRequest.java index eccaacc73c068..b4f70e359d618 100644 --- a/server/src/main/java/org/opensearch/rest/RestRequest.java +++ b/server/src/main/java/org/opensearch/rest/RestRequest.java @@ -382,7 +382,7 @@ public Map params() { * * @return the list of currently consumed parameters. */ - List consumedParams() { + public List consumedParams() { return new ArrayList<>(consumedParams); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java index 5c3a2454c4074..c0164f1afd924 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java @@ -42,6 +42,10 @@ public class FailAwareWeightedRoutingTests extends OpenSearchTestCase { private ClusterState setUpCluster() { + return setUpCluster(Settings.EMPTY); + } + + private ClusterState setUpCluster(Settings transientSettings) { ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); // set up nodes @@ -78,7 +82,7 @@ private ClusterState setUpCluster() { Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); - Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).transientSettings(transientSettings); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -143,6 +147,124 @@ public void testFindNextWithoutFailOpen() throws IOException { assertEquals(1, shardSkipped.get()); } + public void testFindNextWithJustOneShardInStandbyZone() throws IOException { + ClusterState clusterState = setUpCluster(); + + AtomicInteger shardSkipped = new AtomicInteger(); + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNull(next); + assertEquals(0, shardSkipped.get()); + } + + public void testFindNextWithIgnoreWeightedRoutingTrue() throws IOException { + ClusterState clusterState = setUpCluster(Settings.builder().put("cluster.routing.ignore_weighted_routing", true).build()); + + AtomicInteger shardSkipped = new AtomicInteger(); + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNull(next); + assertEquals(0, shardSkipped.get()); + } + public void testFindNextWithFailOpenDueTo5xx() throws IOException { ClusterState clusterState = setUpCluster(); diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 866939e6ac3d7..59ea0dfca559a 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -552,7 +552,7 @@ public void testWeightedRoutingWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; @@ -565,7 +565,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(3, shardIterator.size()); weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); @@ -573,7 +573,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(1, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -584,7 +584,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); assertEquals(3, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -646,7 +646,7 @@ public void testWeightedRoutingInMemoryStore() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; shardRouting = shardIterator.nextOrNull(); @@ -660,7 +660,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -675,7 +675,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -690,7 +690,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -755,7 +755,7 @@ public void testWeightedRoutingShardState() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); assertEquals(3, shardIterator.size()); ShardRouting shardRouting; @@ -834,21 +834,21 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting1 = shardIterator.nextOrNull(); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting2 = shardIterator.nextOrNull(); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting3 = shardIterator.nextOrNull(); @@ -864,8 +864,8 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { /** * Test to validate that simple weighted shard routing with seed return same shard routing on each call */ - public void testActiveInitializingShardsSimpleWeightedIt() { - TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsSimpleWeightedIt"); + public void testActiveInitializingShardsWeightedItWithCustomSeed() { + TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsWeightedItWithCustomSeed"); try { Settings.Builder settings = Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -912,16 +912,15 @@ public void testActiveInitializingShardsSimpleWeightedIt() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, 1); ShardRouting shardRouting1 = shardIterator.nextOrNull(); for (int i = 0; i < 50; i++) { - shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, 1); ShardRouting shardRouting2 = shardIterator.nextOrNull(); diff --git a/server/src/test/java/org/opensearch/common/util/CollectionUtilsTests.java b/server/src/test/java/org/opensearch/common/util/CollectionUtilsTests.java index 40b2706d314ce..c237bdeb5c5cf 100644 --- a/server/src/test/java/org/opensearch/common/util/CollectionUtilsTests.java +++ b/server/src/test/java/org/opensearch/common/util/CollectionUtilsTests.java @@ -41,9 +41,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -85,6 +87,32 @@ public void testRotate() { } } + private void assertDeduped(List array, Comparator cmp, int expectedLength) { + // test the dedup w/ ArrayLists and LinkedLists + List> types = List.of(new ArrayList(array), new LinkedList<>(array)); + for (List clone : types) { + // dedup the list + CollectionUtils.sortAndDedup(clone, cmp); + // verify unique elements + for (int i = 0; i < clone.size() - 1; ++i) { + assertNotEquals(cmp.compare(clone.get(i), clone.get(i + 1)), 0); + } + assertEquals(expectedLength, clone.size()); + } + } + + public void testSortAndDedup() { + // test no elements in a string array + assertDeduped(List.of(), Comparator.naturalOrder(), 0); + // test no elements in an integer array + assertDeduped(List.of(), Comparator.naturalOrder(), 0); + // test unsorted array + assertDeduped(List.of(-1, 0, 2, 1, -1, 19, -1), Comparator.naturalOrder(), 5); + // test sorted array + assertDeduped(List.of(-1, 0, 1, 2, 19, 19), Comparator.naturalOrder(), 5); + // test sorted + } + public void testSortAndDedupByteRefArray() { SortedSet set = new TreeSet<>(); final int numValues = scaledRandomIntBetween(0, 10000); diff --git a/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java index 5d5d3769a9427..cdc79e9778e2d 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/ExtensionRestResponseTests.java @@ -11,11 +11,18 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; +import java.util.Map; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.http.HttpRequest; +import org.opensearch.http.HttpResponse; +import org.opensearch.http.HttpRequest.HttpVersion; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; import org.opensearch.rest.RestRequest.Method; import org.opensearch.test.OpenSearchTestCase; @@ -38,15 +45,63 @@ public void setUp() throws Exception { testBytes = new byte[] { 1, 2 }; } - private ExtensionRestRequest generateTestRequest() { - ExtensionRestRequest request = new ExtensionRestRequest( - Method.GET, - "/foo", - Collections.emptyMap(), - null, - new BytesArray("Text Content"), - null - ); + private RestRequest generateTestRequest() { + RestRequest request = RestRequest.request(null, new HttpRequest() { + + @Override + public Method method() { + return Method.GET; + } + + @Override + public String uri() { + return "/foo"; + } + + @Override + public BytesReference content() { + return new BytesArray("Text Content"); + } + + @Override + public Map> getHeaders() { + return Collections.emptyMap(); + } + + @Override + public List strictCookies() { + return Collections.emptyList(); + } + + @Override + public HttpVersion protocolVersion() { + return null; + } + + @Override + public HttpRequest removeHeader(String header) { + // we don't use + return null; + } + + @Override + public HttpResponse createResponse(RestStatus status, BytesReference content) { + return null; + } + + @Override + public Exception getInboundException() { + return null; + } + + @Override + public void release() {} + + @Override + public HttpRequest releaseAndCopy() { + return null; + } + }, null); // consume params "foo" and "bar" request.param("foo"); request.param("bar"); @@ -60,7 +115,7 @@ public void testConstructorWithBuilder() throws IOException { builder.startObject(); builder.field("status", ACCEPTED); builder.endObject(); - ExtensionRestRequest request = generateTestRequest(); + RestRequest request = generateTestRequest(); ExtensionRestResponse response = new ExtensionRestResponse(request, OK, builder); assertEquals(OK, response.status()); @@ -73,7 +128,7 @@ public void testConstructorWithBuilder() throws IOException { } public void testConstructorWithPlainText() { - ExtensionRestRequest request = generateTestRequest(); + RestRequest request = generateTestRequest(); ExtensionRestResponse response = new ExtensionRestResponse(request, OK, testText); assertEquals(OK, response.status()); @@ -86,7 +141,7 @@ public void testConstructorWithPlainText() { } public void testConstructorWithText() { - ExtensionRestRequest request = generateTestRequest(); + RestRequest request = generateTestRequest(); ExtensionRestResponse response = new ExtensionRestResponse(request, OK, TEXT_CONTENT_TYPE, testText); assertEquals(OK, response.status()); @@ -100,7 +155,7 @@ public void testConstructorWithText() { } public void testConstructorWithByteArray() { - ExtensionRestRequest request = generateTestRequest(); + RestRequest request = generateTestRequest(); ExtensionRestResponse response = new ExtensionRestResponse(request, OK, OCTET_CONTENT_TYPE, testBytes); assertEquals(OK, response.status()); @@ -113,7 +168,7 @@ public void testConstructorWithByteArray() { } public void testConstructorWithBytesReference() { - ExtensionRestRequest request = generateTestRequest(); + RestRequest request = generateTestRequest(); ExtensionRestResponse response = new ExtensionRestResponse( request, OK,