From 2e3e1ed7bb4e2fe6239a14fed8450931e90af45d Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 2 Feb 2023 14:00:38 -0800 Subject: [PATCH] [Segment Replication] Introduce primary weight factor for primary shards distribution (#6017) (#6161) * Add integration test to show shard allocation * Update WeightFunction to consider primary shards for uniform primary distribution * Include primary shard weight for all shard types * Update integration test to show docrep & segrep indices * Add settings updater and update TriConsumer functional interface declaration * Add balance configuration test * Fix failing unit tests and merge conflicts * Rename primary balance factor * Update integration tests to avoid green state timeouts * PR feedback --------- (cherry picked from commit aa21b5f35c1befa34e8ddfceff9c14b078d2e415) Signed-off-by: Suraj Singh Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../opensearch/common/collect/Triplet.java | 63 ++++ .../SegmentReplicationAllocationIT.java | 307 ++++++++++++++++++ .../cluster/routing/RoutingNode.java | 17 + .../allocator/BalancedShardsAllocator.java | 67 +++- .../allocator/LocalShardsBalancer.java | 8 + .../allocation/allocator/ShardsBalancer.java | 11 +- .../org/opensearch/common/TriConsumer.java | 13 +- .../settings/AbstractScopedSettings.java | 23 ++ .../common/settings/ClusterSettings.java | 1 + .../opensearch/common/settings/Setting.java | 56 ++++ .../support/MultiTermsValuesSourceConfig.java | 2 +- .../support/MultiValuesSourceFieldConfig.java | 2 +- .../allocation/BalanceConfigurationTests.java | 177 +++++++++- .../common/settings/SettingTests.java | 127 ++++++++ .../metrics/SumAggregatorTests.java | 2 +- 15 files changed, 852 insertions(+), 24 deletions(-) create mode 100644 libs/common/src/main/java/org/opensearch/common/collect/Triplet.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java diff --git a/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java b/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java new file mode 100644 index 0000000000000..9b7e8f6a419a6 --- /dev/null +++ b/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.common.collect; + +import java.util.Objects; + +/** + * A container for 3 elements, similar to {@link org.opensearch.common.collect.Tuple} + * + * @opensearch.internal + */ +public class Triplet { + + public static Triplet tuple(V1 v1, V2 v2, V3 v3) { + return new Triplet<>(v1, v2, v3); + } + + private final V1 v1; + private final V2 v2; + + private final V3 v3; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Triplet triplet = (Triplet) o; + return Objects.equals(v1, triplet.v1) && Objects.equals(v2, triplet.v2) && Objects.equals(v3, triplet.v3); + } + + @Override + public int hashCode() { + return Objects.hash(v1, v2, v3); + } + + public Triplet(V1 v1, V2 v2, V3 v3) { + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + } + + public V1 v1() { + return v1; + } + + public V2 v2() { + return v2; + } + + public V3 v3() { + return v3; + } + + @Override + public String toString() { + return "Tuple [v1=" + v1 + ", v2=" + v2 + ", v3=" + v3 + "]"; + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java new file mode 100644 index 0000000000000..7864bc4668cd5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -0,0 +1,307 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Formatter; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT { + + private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) { + Settings.Builder builder = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); + if (isSegRep) { + builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } else { + builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); + } + prepareCreate(idxName, builder).get(); + } + + /** + * This test verifies primary shard allocation is balanced. + */ + public void testShardAllocation() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 5; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(5, 10); + + final List nodeNames = new ArrayList<>(); + logger.info("--> Creating {} nodes", nodeCount); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") + ) + ); + + int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; + ShardAllocations shardAllocations = new ShardAllocations(); + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + totalShardCount += shardCount; + replicaCount = randomIntBetween(0, maxReplicaCount); + totalReplicaCount += replicaCount; + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + shardAllocations.printShardDistribution(state); + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + RoutingNodes nodes = state.getRoutingNodes(); + final float avgNumShards = (float) (totalShardCount) / (float) (nodes.size()); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + + for (RoutingNode node : nodes) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + } + + /** + * This test verifies shard allocation with changes to cluster config i.e. node add, removal keeps the primary shard + * allocation balanced. + */ + public void testAllocationWithDisruption() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 5; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(1, 10); + + logger.info("--> Creating {} nodes", nodeCount); + final List nodeNames = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder() + .put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") + .put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0f") + .put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0f") + .build() + ) + ); + + int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; + ShardAllocations shardAllocations = new ShardAllocations(); + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + totalShardCount += shardCount; + replicaCount = randomIntBetween(1, maxReplicaCount); + totalReplicaCount += replicaCount; + logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + if (logger.isTraceEnabled()) { + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + shardAllocations.printShardDistribution(state); + } + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + float avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); + int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + + for (RoutingNode node : state.getRoutingNodes()) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + + final int additionalNodeCount = randomIntBetween(1, 5); + logger.info("--> Adding {} nodes", additionalNodeCount); + + internalCluster().startNodes(additionalNodeCount); + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); + minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + shardAllocations.printShardDistribution(state); + for (RoutingNode node : state.getRoutingNodes()) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + + logger.info("--> Stop one third nodes"); + for (int i = 1; i < nodeCount; i += 3) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i))); + // give replica a chance to promote as primary before terminating node containing the replica + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); + minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + shardAllocations.printShardDistribution(state); + + for (RoutingNode node : state.getRoutingNodes()) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + } + + /** + * This class is created for debugging purpose to show shard allocation across nodes. It keeps cluster state which + * is used to build the node's shard allocation + */ + private class ShardAllocations { + ClusterState state; + + public static final String separator = "==================================================="; + public static final String ONE_LINE_RETURN = "\n"; + public static final String TWO_LINE_RETURN = "\n\n"; + + /** + Store shard primary/replica shard count against a node for segrep indices. + String: NodeId + int[]: tuple storing primary shard count in 0th index and replica's in 1 + */ + TreeMap nodeToSegRepCountMap = new TreeMap<>(); + /** + Store shard primary/replica shard count against a node for docrep indices. + String: NodeId + int[]: tuple storing primary shard count in 0th index and replica's in 1 + */ + TreeMap nodeToDocRepCountMap = new TreeMap<>(); + + /** + * Helper map containing NodeName to NodeId + */ + TreeMap nameToNodeId = new TreeMap<>(); + + /* + Unassigned array containing primary at 0, replica at 1 + */ + int[] unassigned = new int[2]; + + int[] totalShards = new int[2]; + + public final String printShardAllocationWithHeader(int[] docrep, int[] segrep) { + StringBuffer sb = new StringBuffer(); + Formatter formatter = new Formatter(sb, Locale.getDefault()); + formatter.format("%-20s %-20s %-20s %-20s\n", "P", docrep[0] + segrep[0], docrep[0], segrep[0]); + formatter.format("%-20s %-20s %-20s %-20s\n", "R", docrep[1] + segrep[1], docrep[1], segrep[1]); + return sb.toString(); + } + + public void reset() { + nodeToSegRepCountMap.clear(); + nodeToDocRepCountMap.clear(); + nameToNodeId.clear(); + totalShards[0] = totalShards[1] = 0; + unassigned[0] = unassigned[1] = 0; + } + + public void setState(ClusterState state) { + this.reset(); + this.state = state; + buildMap(); + } + + private void buildMap() { + for (RoutingNode node : state.getRoutingNodes()) { + nameToNodeId.putIfAbsent(node.node().getName(), node.nodeId()); + nodeToSegRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); + nodeToDocRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); + } + for (ShardRouting shardRouting : state.routingTable().allShards()) { + // Fetch shard to update. Initialize local array + if (isIndexSegRep(shardRouting.getIndexName())) { + updateMap(nodeToSegRepCountMap, shardRouting); + } else { + updateMap(nodeToDocRepCountMap, shardRouting); + } + } + } + + void updateMap(TreeMap mapToUpdate, ShardRouting shardRouting) { + int[] shard; + shard = shardRouting.assignedToNode() ? mapToUpdate.get(shardRouting.currentNodeId()) : unassigned; + // Update shard type count + if (shardRouting.primary()) { + shard[0]++; + totalShards[0]++; + } else { + shard[1]++; + totalShards[1]++; + } + // For assigned shards, put back counter + if (shardRouting.assignedToNode()) mapToUpdate.put(shardRouting.currentNodeId(), shard); + } + + boolean isIndexSegRep(String indexName) { + return state.metadata() + .index(indexName) + .getSettings() + .get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey()) + .equals(ReplicationType.SEGMENT.toString()); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(TWO_LINE_RETURN + separator + ONE_LINE_RETURN); + Formatter formatter = new Formatter(sb, Locale.getDefault()); + for (Map.Entry entry : nameToNodeId.entrySet()) { + String nodeId = nameToNodeId.get(entry.getKey()); + formatter.format("%-20s %-20s %-20s %-20s\n", entry.getKey().toUpperCase(Locale.getDefault()), "TOTAL", "DOCREP", "SEGREP"); + sb.append(printShardAllocationWithHeader(nodeToDocRepCountMap.get(nodeId), nodeToSegRepCountMap.get(nodeId))); + } + sb.append(ONE_LINE_RETURN); + formatter.format("%-20s %-20s %-20s\n\n", "Unassigned ", unassigned[0], unassigned[1]); + formatter.format("%-20s %-20s %-20s\n\n", "Total Shards", totalShards[0], totalShards[1]); + return sb.toString(); + } + + public void printShardDistribution(ClusterState state) { + this.setState(state); + logger.info("--> Shard distribution {}", this); + } + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index 413ddff72f7a5..d23b4856eced0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -329,6 +329,23 @@ public List shardsWithState(ShardRoutingState... states) { return shards; } + /** + * Determine the primary shards of an index with a specific state + * @param states set of states which should be listed + * @return a list of shards + */ + public List primaryShardsWithState(ShardRoutingState... states) { + List shards = new ArrayList<>(); + for (ShardRouting shardEntry : this) { + for (ShardRoutingState state : states) { + if (shardEntry.state() == state && shardEntry.primary() == true) { + shards.add(shardEntry); + } + } + } + return shards; + } + /** * Determine the shards of an index with a specific state * @param index id of the index diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index d8761e9b1a78e..8893aaa54799a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -107,6 +107,14 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PRIMARY_SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting( + "cluster.routing.allocation.balance.primary", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean movePrimaryFirst; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -117,10 +125,19 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { - setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); + setWeightFunction( + INDEX_BALANCE_FACTOR_SETTING.get(settings), + SHARD_BALANCE_FACTOR_SETTING.get(settings), + PRIMARY_SHARD_BALANCE_FACTOR_SETTING.get(settings) + ); setThreshold(THRESHOLD_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); - clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); + clusterSettings.addSettingsUpdateConsumer( + INDEX_BALANCE_FACTOR_SETTING, + SHARD_BALANCE_FACTOR_SETTING, + PRIMARY_SHARD_BALANCE_FACTOR_SETTING, + this::setWeightFunction + ); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } @@ -128,8 +145,8 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) { this.movePrimaryFirst = movePrimaryFirst; } - private void setWeightFunction(float indexBalance, float shardBalanceFactor) { - weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); + private void setWeightFunction(float indexBalance, float shardBalanceFactor, float primaryShardBalance) { + weightFunction = new WeightFunction(indexBalance, shardBalanceFactor, primaryShardBalance); } private void setThreshold(float threshold) { @@ -252,17 +269,22 @@ static class WeightFunction { private final float shardBalance; private final float theta0; private final float theta1; + private final float theta2; + private final float primaryShardBalance; private AllocationConstraints constraints; - WeightFunction(float indexBalance, float shardBalance) { - float sum = indexBalance + shardBalance; + WeightFunction(float indexBalance, float shardBalance, float primaryShardBalance) { + float sum = indexBalance + shardBalance + primaryShardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } theta0 = shardBalance / sum; theta1 = indexBalance / sum; + theta2 = primaryShardBalance / sum; + this.indexBalance = indexBalance; this.shardBalance = shardBalance; + this.primaryShardBalance = primaryShardBalance; this.constraints = new AllocationConstraints(); } @@ -274,7 +296,9 @@ public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode float weight(ShardsBalancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - return theta0 * weightShard + theta1 * weightIndex; + final float primaryWeightShard = node.numPrimaryShards() - balancer.avgPrimaryShardsPerNode(); + + return theta0 * weightShard + theta1 * weightIndex + theta2 * primaryWeightShard; } } @@ -288,6 +312,8 @@ public static class ModelNode implements Iterable { private int numShards = 0; private final RoutingNode routingNode; + private int primaryNumShards = 0; + ModelNode(RoutingNode routingNode) { this.routingNode = routingNode; } @@ -313,6 +339,10 @@ public int numShards(String idx) { return index == null ? 0 : index.numShards(); } + public int numPrimaryShards() { + return primaryNumShards; + } + public int highestPrimary(String index) { ModelIndex idx = indices.get(index); if (idx != null) { @@ -329,6 +359,9 @@ public void addShard(ShardRouting shard) { } index.addShard(shard); numShards++; + if (shard.primary()) { + primaryNumShards++; + } } public void removeShard(ShardRouting shard) { @@ -339,6 +372,9 @@ public void removeShard(ShardRouting shard) { indices.remove(shard.getIndexName()); } } + if (shard.primary()) { + primaryNumShards--; + } numShards--; } @@ -381,13 +417,14 @@ public Balancer( } /** - * A model index. + * A model index that stores info about specific index * * @opensearch.internal */ static final class ModelIndex implements Iterable { private final String id; private final Set shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node + private final Set primaryShards = new HashSet<>(); private int highestPrimary = -1; ModelIndex(String id) { @@ -415,6 +452,10 @@ public int numShards() { return shards.size(); } + public int numPrimaryShards() { + return primaryShards.size(); + } + @Override public Iterator iterator() { return shards.iterator(); @@ -423,12 +464,20 @@ public Iterator iterator() { public void removeShard(ShardRouting shard) { highestPrimary = -1; assert shards.contains(shard) : "Shard not allocated on current node: " + shard; + if (shard.primary()) { + assert primaryShards.contains(shard) : "Primary shard not allocated on current node: " + shard; + primaryShards.remove(shard); + } shards.remove(shard); } public void addShard(ShardRouting shard) { highestPrimary = -1; - assert !shards.contains(shard) : "Shard already allocated on current node: " + shard; + assert shards.contains(shard) == false : "Shard already allocated on current node: " + shard; + if (shard.primary()) { + assert primaryShards.contains(shard) == false : "Primary shard already allocated on current node: " + shard; + primaryShards.add(shard); + } shards.add(shard); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 3c5e4013748af..738c6588357f5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -63,6 +63,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float threshold; private final Metadata metadata; private final float avgShardsPerNode; + private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; @@ -81,6 +82,8 @@ public LocalShardsBalancer( this.routingNodes = allocation.routingNodes(); this.metadata = allocation.metadata(); avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + final int shardCount = StreamSupport.stream(metadata.spliterator(), false).mapToInt(IndexMetadata::getNumberOfShards).sum(); + avgPrimaryShardsPerNode = (float) shardCount / routingNodes.size(); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); @@ -101,6 +104,11 @@ public float avgShardsPerNode(String index) { return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); } + @Override + public float avgPrimaryShardsPerNode() { + return avgPrimaryShardsPerNode; + } + /** * Returns the global average of shards per node */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index 593e6998141fb..9774f84a4cd91 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -60,16 +60,23 @@ public abstract class ShardsBalancer { abstract MoveDecision decideRebalance(ShardRouting shardRouting); /** - * Returns the average of shards per node for the given index + * Returns the average of shards per node */ public float avgShardsPerNode() { return Float.MAX_VALUE; } /** - * Returns the global average of shards per node + * Returns the global average of shards per node for the given index */ public float avgShardsPerNode(String index) { return Float.MAX_VALUE; } + + /** + * Returns the average of primary shards per node + */ + public float avgPrimaryShardsPerNode() { + return Float.MAX_VALUE; + } } diff --git a/server/src/main/java/org/opensearch/common/TriConsumer.java b/server/src/main/java/org/opensearch/common/TriConsumer.java index f98276b6d007d..a174499d58628 100644 --- a/server/src/main/java/org/opensearch/common/TriConsumer.java +++ b/server/src/main/java/org/opensearch/common/TriConsumer.java @@ -32,6 +32,8 @@ package org.opensearch.common; +import java.util.Objects; + /** * Represents an operation that accepts three arguments and returns no result. * @@ -50,5 +52,14 @@ public interface TriConsumer { * @param t the second function argument * @param u the third function argument */ - void apply(S s, T t, U u); + void accept(S s, T t, U u); + + default TriConsumer andThen(TriConsumer after) { + Objects.requireNonNull(after); + + return (l, r, s) -> { + accept(l, r, s); + after.accept(l, r, s); + }; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java index 8a19d309975df..50f6714ec41b9 100644 --- a/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java @@ -38,6 +38,7 @@ import org.apache.lucene.search.spell.LevenshteinDistance; import org.apache.lucene.util.CollectionUtil; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.TriConsumer; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; @@ -444,6 +445,28 @@ public synchronized void addSettingsUpdateConsumer(Setting a, Setting< addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {}); } + /** + * Adds a settings consumer that accepts the values for three settings. The consumer is only notified if any one of + * the settings changed and if the provided validator succeeded. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ * This method registers a compound updater that is useful if three settings depends on each other. + * The consumer is always provided with both values even if only one of the two changes. + */ + public synchronized void addSettingsUpdateConsumer(Setting
a, Setting b, Setting c, TriConsumer consumer) { + if (a != get(a.getKey())) { + throw new IllegalArgumentException("Setting is not registered for key [" + a.getKey() + "]"); + } + if (b != get(b.getKey())) { + throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]"); + } + if (c != get(c.getKey())) { + throw new IllegalArgumentException("Setting is not registered for key [" + c.getKey() + "]"); + } + addSettingsUpdater(Setting.compoundUpdater(consumer, (i, j, k) -> {}, a, b, c, logger)); + } + /** * Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change * and if the provided validator succeeded. diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ab1f8fb8b783b..426395c48d331 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -233,6 +233,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index 176f24e3cc1ff..aaea633caaf60 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -39,6 +39,8 @@ import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Strings; +import org.opensearch.common.TriConsumer; +import org.opensearch.common.collect.Triplet; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; import org.opensearch.common.unit.ByteSizeValue; @@ -722,6 +724,60 @@ public String toString() { }; } + /** + * Updates settings that depend on each other. + * + * See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, Setting, TriConsumer)} and its usage for details. + */ + static AbstractScopedSettings.SettingUpdater> compoundUpdater( + final TriConsumer consumer, + final TriConsumer validator, + final Setting aSetting, + final Setting bSetting, + final Setting cSetting, + Logger logger + ) { + final AbstractScopedSettings.SettingUpdater aSettingUpdater = aSetting.newUpdater(null, logger); + final AbstractScopedSettings.SettingUpdater bSettingUpdater = bSetting.newUpdater(null, logger); + final AbstractScopedSettings.SettingUpdater cSettingUpdater = cSetting.newUpdater(null, logger); + return new AbstractScopedSettings.SettingUpdater>() { + @Override + public boolean hasChanged(Settings current, Settings previous) { + return aSettingUpdater.hasChanged(current, previous) + || bSettingUpdater.hasChanged(current, previous) + || cSettingUpdater.hasChanged(current, previous); + } + + @Override + public Triplet getValue(Settings current, Settings previous) { + A valueA = aSettingUpdater.getValue(current, previous); + B valueB = bSettingUpdater.getValue(current, previous); + C valueC = cSettingUpdater.getValue(current, previous); + validator.accept(valueA, valueB, valueC); + return new Triplet<>(valueA, valueB, valueC); + } + + @Override + public void apply(Triplet value, Settings current, Settings previous) { + if (aSettingUpdater.hasChanged(current, previous)) { + logSettingUpdate(aSetting, current, previous, logger); + } + if (bSettingUpdater.hasChanged(current, previous)) { + logSettingUpdate(bSetting, current, previous, logger); + } + if (cSettingUpdater.hasChanged(current, previous)) { + logSettingUpdate(cSetting, current, previous, logger); + } + consumer.accept(value.v1(), value.v2(), value.v3()); + } + + @Override + public String toString() { + return "CompoundUpdater for: " + aSettingUpdater + " and " + bSettingUpdater + " and " + cSettingUpdater; + } + }; + } + static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( Consumer consumer, final List> configuredSettings diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java b/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java index b0c828ca6b902..e1c6ac4150a5d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java @@ -57,7 +57,7 @@ ObjectParser apply( MultiTermsValuesSourceConfig.Builder::new ); - BaseMultiValuesSourceFieldConfig.PARSER.apply(parser, scriptable, timezoneAware); + BaseMultiValuesSourceFieldConfig.PARSER.accept(parser, scriptable, timezoneAware); if (valueTypeHinted) { parser.declareField( diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java b/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java index 63fce83369c18..3e2976a25f749 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java @@ -69,7 +69,7 @@ public class MultiValuesSourceFieldConfig extends BaseMultiValuesSourceFieldConf MultiValuesSourceFieldConfig.Builder::new ); - BaseMultiValuesSourceFieldConfig.PARSER.apply(parser, scriptable, timezoneAware); + BaseMultiValuesSourceFieldConfig.PARSER.accept(parser, scriptable, timezoneAware); if (filtered) { parser.declareField( diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 1ba69694eaec1..3762e137ac8da 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -75,7 +75,7 @@ public class BalanceConfigurationTests extends OpenSearchAllocationTestCase { public void testIndexBalance() { /* Tests balance over indices only */ final float indexBalance = 1.0f; - final float replicaBalance = 0.0f; + final float shardBalance = 0.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -84,7 +84,7 @@ public void testIndexBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); @@ -123,10 +123,148 @@ public void testIndexBalance() { ); } - public void testReplicaBalance() { + /** + * This test verifies that with only primary shard balance, the primary shard distribution is balanced within thresholds. + */ + public void testPrimaryBalance() { + /* Tests balance over primary shards only */ + final float indexBalance = 0.0f; + final float shardBalance = 0.0f; + final float primaryBalance = 1.0f; + final float balanceThreshold = 1.0f; + + Settings.Builder settings = Settings.builder(); + settings.put( + ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() + ); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); + settings.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), primaryBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); + + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); + + ClusterState clusterState = initCluster(strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = addNode(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes + 1, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = removeNodes(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + (numberOfNodes + 1) - (numberOfNodes + 1) / 2, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + } + + /** + * This test verifies + */ + public void testBalanceDefaults() { + final float indexBalance = 0.55f; + final float shardBalance = 0.45f; + final float primaryBalance = 0.40f; + final float balanceThreshold = 1.0f; + + Settings.Builder settings = Settings.builder(); + settings.put( + ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() + ); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); + settings.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), primaryBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); + + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); + + ClusterState clusterState = initCluster(strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + assertIndexBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = addNode(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes + 1, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + assertIndexBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes + 1, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = removeNodes(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + (numberOfNodes + 1) - (numberOfNodes + 1) / 2, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + assertIndexBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + (numberOfNodes + 1) - (numberOfNodes + 1) / 2, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + } + + public void testShardBalance() { /* Tests balance over replicas only */ final float indexBalance = 0.0f; - final float replicaBalance = 1.0f; + final float shardBalance = 1.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -135,13 +273,13 @@ public void testReplicaBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, @@ -151,7 +289,7 @@ public void testReplicaBalance() { ); clusterState = addNode(clusterState, strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes + 1, numberOfIndices, @@ -161,7 +299,7 @@ public void testReplicaBalance() { ); clusterState = removeNodes(clusterState, strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes + 1 - (numberOfNodes + 1) / 2, numberOfIndices, @@ -254,7 +392,7 @@ private ClusterState removeNodes(ClusterState clusterState, AllocationService st return applyStartedShardsUntilNoChange(clusterState, strategy); } - private void assertReplicaBalance( + private void assertShardBalance( RoutingNodes nodes, int numberOfNodes, int numberOfIndices, @@ -309,6 +447,27 @@ private void assertIndexBalance( } } + private void assertPrimaryBalance( + RoutingTable routingTable, + RoutingNodes nodes, + int numberOfNodes, + int numberOfIndices, + int numberOfReplicas, + int numberOfShards, + float threshold + ) { + + final int numShards = numberOfShards * numberOfIndices; + final float avgNumShards = (float) (numShards) / (float) (numberOfNodes); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - threshold))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + threshold))); + + for (RoutingNode node : nodes) { + assertThat(node.primaryShardsWithState(STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards)); + assertThat(node.primaryShardsWithState(STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards)); + } + } + public void testPersistedSettings() { Settings.Builder settings = Settings.builder(); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), 0.2); diff --git a/server/src/test/java/org/opensearch/common/settings/SettingTests.java b/server/src/test/java/org/opensearch/common/settings/SettingTests.java index 7703cb394397e..45810b421c027 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingTests.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LogEvent; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.collect.Triplet; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.AbstractScopedSettings.SettingUpdater; import org.opensearch.common.settings.Setting.Property; @@ -625,6 +626,28 @@ public void validate(Integer a, Integer b) { } } + // This test class is used to verify behavior of BalancedShardAllocator.WeightFunction and ensure set function is called + // whenever there is a change in any of the settings. + public static class TriSettingConsumer { + + private Integer b; + private Integer a; + + private Integer c; + + public void set(Integer a, Integer b, Integer c) { + this.a = a; + this.b = b; + this.c = c; + } + + public void validate(Integer a, Integer b, Integer c) { + if (Integer.signum(a) != Integer.signum(b) || Integer.signum(a) != Integer.signum(c)) { + throw new IllegalArgumentException("boom"); + } + } + } + public void testComposite() { Composite c = new Composite(); Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); @@ -689,6 +712,110 @@ public void testCompositeValidator() { } + public void testTriSettingConsumer() { + TriSettingConsumer consumer = new TriSettingConsumer(); + Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); + Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); + Setting c = Setting.intSetting("foo.int.bar.c", 1, Property.Dynamic, Property.NodeScope); + ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater( + consumer::set, + consumer::validate, + a, + b, + c, + logger + ); + assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + assertNull(consumer.a); + assertNull(consumer.b); + assertNull(consumer.c); + + Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); + assertTrue(settingUpdater.apply(build, Settings.EMPTY)); + assertEquals(2, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + + Integer aValue = consumer.a; + assertFalse(settingUpdater.apply(build, build)); + assertSame(aValue, consumer.a); + Settings previous = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + + Integer bValue = consumer.b; + assertFalse(settingUpdater.apply(build, build)); + assertSame(bValue, consumer.b); + previous = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).put("foo.int.bar.c", 10).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + assertEquals(10, consumer.c.intValue()); + + // reset to default + assertTrue(settingUpdater.apply(Settings.EMPTY, build)); + assertEquals(1, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + } + + public void testTriSettingConsumerValidator() { + TriSettingConsumer consumer = new TriSettingConsumer(); + Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); + Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); + Setting c = Setting.intSetting("foo.int.bar.c", 1, Property.Dynamic, Property.NodeScope); + ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater( + consumer::set, + consumer::validate, + a, + b, + c, + logger + ); + assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + assertNull(consumer.a); + assertNull(consumer.b); + assertNull(consumer.c); + + Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); + assertTrue(settingUpdater.apply(build, Settings.EMPTY)); + assertEquals(2, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + + Integer aValue = consumer.a; + assertFalse(settingUpdater.apply(build, build)); + assertSame(aValue, consumer.a); + final Settings previous = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + + Integer bValue = consumer.b; + assertFalse(settingUpdater.apply(build, build)); + assertSame(bValue, consumer.b); + final Settings previous2 = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).put("foo.int.bar.c", 10).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + assertEquals(10, consumer.c.intValue()); + + Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build(); + IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous2)); + assertThat(exc.getMessage(), equalTo("boom")); + + // reset to default + assertTrue(settingUpdater.apply(Settings.EMPTY, build)); + assertEquals(1, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + } + public void testListSettingsDeprecated() { final Setting> deprecatedListSetting = Setting.listSetting( "foo.deprecated", diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java index 8c0087ca0b87d..b38d4eee850ef 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java @@ -385,7 +385,7 @@ private void sumRandomDocsTestCase( builder, new MatchAllDocsQuery(), writer -> writer.addDocuments(docs), - internalSum -> verify.apply(finalSum, docs, internalSum), + internalSum -> verify.accept(finalSum, docs, internalSum), fieldType ); }