Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metadata Immutability] Change different indices lookup objects from array type to lists #14723

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Breaking change: Do not request "search_pipelines" metrics by default in NodesInfoRequest ([#12497](https://github.com/opensearch-project/OpenSearch/pull/12497))
- Refactor `:libs` module `bootstrap` package to eliminate top level split packages for JPMS support [#17117](https://github.com/opensearch-project/OpenSearch/pull/17117))
- Refactor the codebase to eliminate top level split packages for JPMS support. [#17153](https://github.com/opensearch-project/OpenSearch/pull/17153))
- Breaking change: Modify the utility APIs in the Metadata to get different indices ([#14723](https://github.com/opensearch-project/OpenSearch/pull/14723))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
Expand Down Expand Up @@ -215,13 +216,13 @@ public ClusterHealthResponse(StreamInput in) throws IOException {
}

/** needed for plugins BWC */
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
public ClusterHealthResponse(String clusterName, Set<String> concreteIndices, ClusterState clusterState) {
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0));
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
Set<String> concreteIndices,
akolarkunnu marked this conversation as resolved.
Show resolved Hide resolved
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand All @@ -239,7 +240,7 @@ public ClusterHealthResponse(

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
Set<String> concreteIndices,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
int numberOfPendingTasks,
Expand All @@ -262,7 +263,7 @@ public ClusterHealthResponse(
String clusterName,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
Set<String> concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand All @@ -286,7 +287,7 @@ public ClusterHealthResponse(
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
Set<String> concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
Expand All @@ -70,6 +69,8 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -494,7 +495,7 @@
);
}

String[] concreteIndices;
Set<String> concreteIndices;
if (request.level().equals(ClusterHealthRequest.Level.AWARENESS_ATTRIBUTES)) {
String awarenessAttribute = request.getAwarenessAttribute();
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
Expand All @@ -512,12 +513,12 @@
}

try {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
concreteIndices = Set.of(indexNameExpressionResolver.concreteIndexNames(clusterState, request));
} catch (IndexNotFoundException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
Collections.emptySet(),

Check warning on line 521 in server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java#L521

Added line #L521 was not covered by tests
request,
clusterState,
numberOfPendingTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@
}
ClusterStateHealth streamHealth = new ClusterStateHealth(
state,
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new)
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toSet())

Check warning on line 339 in server/src/main/java/org/opensearch/action/admin/indices/datastream/GetDataStreamAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/datastream/GetDataStreamAction.java#L339

Added line #L339 was not covered by tests
);
dataStreamInfos.add(new Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Cluster state health information
Expand Down Expand Up @@ -86,9 +87,9 @@ public ClusterStateHealth(final ClusterState clusterState, final ClusterHealthRe
* Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and the provided index names.
*
* @param clusterState The current cluster state. Must not be null.
* @param concreteIndices An array of index names to consider. Must not be null but may be empty.
* @param concreteIndices A set of index names to consider. Must not be null but may be empty.
*/
public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) {
public ClusterStateHealth(final ClusterState clusterState, final Set<String> concreteIndices) {
numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().getDataNodes().size();
hasDiscoveredClusterManager = clusterState.nodes().getClusterManagerNodeId() != null;
Expand Down Expand Up @@ -152,7 +153,7 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre

public ClusterStateHealth(
final ClusterState clusterState,
final String[] concreteIndices,
final Set<String> concreteIndices,
final ClusterHealthRequest.Level healthLevel
) {
numberOfNodes = clusterState.nodes().getSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@
if (routing != null) {
Set<String> r = Sets.newHashSet(Strings.splitStringByCommaToArray(routing));
Map<String, Set<String>> routings = new HashMap<>();
String[] concreteIndices = metadata.getConcreteAllIndices();
Set<String> concreteIndices = metadata.getConcreteAllIndices();

Check warning on line 709 in server/src/main/java/org/opensearch/cluster/metadata/IndexNameExpressionResolver.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/IndexNameExpressionResolver.java#L709

Added line #L709 was not covered by tests
for (String index : concreteIndices) {
routings.put(index, r);
}
Expand Down Expand Up @@ -746,7 +746,7 @@
*/
boolean isPatternMatchingAllIndices(Metadata metadata, String[] indicesOrAliases, String[] concreteIndices) {
// if we end up matching on all indices, check, if its a wildcard parameter, or a "-something" structure
if (concreteIndices.length == metadata.getConcreteAllIndices().length && indicesOrAliases.length > 0) {
if (concreteIndices.length == metadata.getConcreteAllIndices().size() && indicesOrAliases.length > 0) {

// we might have something like /-test1,+test1 that would identify all indices
// or something like /-test1 with test1 index missing and IndicesOptions.lenient()
Expand Down Expand Up @@ -1182,17 +1182,17 @@

private static List<String> resolveEmptyOrTrivialWildcard(IndicesOptions options, Metadata metadata) {
if (options.expandWildcardsOpen() && options.expandWildcardsClosed() && options.expandWildcardsHidden()) {
return Arrays.asList(metadata.getConcreteAllIndices());
return List.copyOf(metadata.getConcreteAllIndices());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be expensive operation to copy the set to a list every time for index resolution.
Have run micro benchmark on your changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here ConcreteAllIndices(allIndices) are unmodifiableSet, so List.copyOf() will not really create a copy .
There is an implementation Note in javadoc of this API - "If the given Collection is an unmodifiable List, calling copyOf will generally not create a copy". Also if we look in to the internal implementation of Lisy.copyOf() it's clear that, it's not creating a copy if collection is an AbstractImmutableList.

Copy link
Member

@shwetathareja shwetathareja Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are doing
Collections.unmodifiableSet(allIndices); which creates new object of UnmodifiableSet. UnmodifiableSet extends UnmodifiableCollection and it is not extending AbstractImmutableList. It will end up copying the elements. Can you cross check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. So what do you suggest here?
concrete indices treated as Set in the existing implementation -

final Set<String> allIndices = new HashSet<>(indices.size());

That's why I kept only concrete indices as Set and all other indices as List. If we keep concrete indices also as List, then this conversion won't be required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes right, lets check the usage in the code if it has to be Set, else i agree keeping it as list would be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akolarkunnu Can you please take these changes to closure by profiling health API as in #15492

That should bring confidence on whether or not to get these changes in code or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will resume this work soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran performance test by crating indices using a script, which ran for 2 hours and 3 minutes. On comparing with changes and without changes, with changes I am consistently able to create ~30 more indices. So it is clearly a performance improvement.
Also I captured FlameGraph of 5 mins after 2 hours of index creation by using async profiler, and I don't see anything unusual there. Attaching both cpu and alloc with and without changes.
flamegraphs.zip

Copy link
Member

@shwetathareja shwetathareja Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akolarkunnu : Thanks for benchmarking the creation times. Can you please benchmark resolveEmptyOrTrivialWildcard method with and without your change. This resolution would happen in the context of every indexing/ search request which could turn out to be very expensive with this copy operation

Copy link
Contributor

@sandeshkr419 sandeshkr419 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shwetathareja A quick clarification - so if resolveEmptyOrTrivialWildcard is called in every search/indexing request - will the indexing/search benchmarks suffice to establish the impact?

If yes, please checkout the comparison results - #14723 (comment) - I do see a good improvement in reduction of search and indexing latencies, increase in majority of indexing/search throughputs.

One catch I see is that the indices+aliases size is relatively low in the OSB benchmark, but I guess the slight increase in performance we see can be accounted for the reduction in conversions back and forth to arrays/lists. What do you think?

} else if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) {
return Arrays.asList(metadata.getConcreteVisibleIndices());
return metadata.getConcreteVisibleIndices();
} else if (options.expandWildcardsOpen() && options.expandWildcardsHidden()) {
return Arrays.asList(metadata.getConcreteAllOpenIndices());
return metadata.getConcreteAllOpenIndices();
} else if (options.expandWildcardsOpen()) {
return Arrays.asList(metadata.getConcreteVisibleOpenIndices());
return metadata.getConcreteVisibleOpenIndices();
} else if (options.expandWildcardsClosed() && options.expandWildcardsHidden()) {
return Arrays.asList(metadata.getConcreteAllClosedIndices());
return metadata.getConcreteAllClosedIndices();
} else if (options.expandWildcardsClosed()) {
return Arrays.asList(metadata.getConcreteVisibleClosedIndices());
return metadata.getConcreteVisibleClosedIndices();
} else {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int totalOpenIndexShards;

private final String[] allIndices;
private final String[] visibleIndices;
private final String[] allOpenIndices;
private final String[] visibleOpenIndices;
private final String[] allClosedIndices;
private final String[] visibleClosedIndices;
private final Set<String> allIndices;
private final List<String> visibleIndices;
private final List<String> allOpenIndices;
private final List<String> visibleOpenIndices;
private final List<String> allClosedIndices;
private final List<String> visibleClosedIndices;

private final SortedMap<String, IndexAbstraction> indicesLookup;

Expand All @@ -294,12 +294,12 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
final Map<String, IndexMetadata> indices,
final Map<String, IndexTemplateMetadata> templates,
final Map<String, Custom> customs,
String[] allIndices,
String[] visibleIndices,
String[] allOpenIndices,
String[] visibleOpenIndices,
String[] allClosedIndices,
String[] visibleClosedIndices,
Set<String> allIndices,
List<String> visibleIndices,
List<String> allOpenIndices,
List<String> visibleOpenIndices,
List<String> allClosedIndices,
List<String> visibleClosedIndices,
SortedMap<String, IndexAbstraction> indicesLookup,
Map<String, SortedMap<Long, String>> systemTemplatesLookup
) {
Expand All @@ -325,12 +325,12 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
this.totalNumberOfShards = totalNumberOfShards;
this.totalOpenIndexShards = totalOpenIndexShards;

this.allIndices = allIndices;
this.visibleIndices = visibleIndices;
this.allOpenIndices = allOpenIndices;
this.visibleOpenIndices = visibleOpenIndices;
this.allClosedIndices = allClosedIndices;
this.visibleClosedIndices = visibleClosedIndices;
this.allIndices = Collections.unmodifiableSet(allIndices);
this.visibleIndices = Collections.unmodifiableList(visibleIndices);
this.allOpenIndices = Collections.unmodifiableList(allOpenIndices);
this.visibleOpenIndices = Collections.unmodifiableList(visibleOpenIndices);
this.allClosedIndices = Collections.unmodifiableList(allClosedIndices);
this.visibleClosedIndices = Collections.unmodifiableList(visibleClosedIndices);
this.indicesLookup = indicesLookup;
this.systemTemplatesLookup = systemTemplatesLookup;
}
Expand Down Expand Up @@ -608,42 +608,42 @@ private static String mergePaths(String path, String field) {
/**
* Returns all the concrete indices.
*/
public String[] getConcreteAllIndices() {
public Set<String> getConcreteAllIndices() {
return allIndices;
}

/**
* Returns all the concrete indices that are not hidden.
*/
public String[] getConcreteVisibleIndices() {
public List<String> getConcreteVisibleIndices() {
return visibleIndices;
}

/**
* Returns all of the concrete indices that are open.
*/
public String[] getConcreteAllOpenIndices() {
public List<String> getConcreteAllOpenIndices() {
return allOpenIndices;
}

/**
* Returns all of the concrete indices that are open and not hidden.
*/
public String[] getConcreteVisibleOpenIndices() {
public List<String> getConcreteVisibleOpenIndices() {
return visibleOpenIndices;
}

/**
* Returns all of the concrete indices that are closed.
*/
public String[] getConcreteAllClosedIndices() {
public List<String> getConcreteAllClosedIndices() {
return allClosedIndices;
}

/**
* Returns all of the concrete indices that are closed and not hidden.
*/
public String[] getConcreteVisibleClosedIndices() {
public List<String> getConcreteVisibleClosedIndices() {
return visibleClosedIndices;
}

Expand Down Expand Up @@ -1647,12 +1647,12 @@ protected Metadata buildMetadataWithPreviousIndicesLookups() {
indices,
templates,
customs,
Arrays.copyOf(previousMetadata.allIndices, previousMetadata.allIndices.length),
Arrays.copyOf(previousMetadata.visibleIndices, previousMetadata.visibleIndices.length),
Arrays.copyOf(previousMetadata.allOpenIndices, previousMetadata.allOpenIndices.length),
Arrays.copyOf(previousMetadata.visibleOpenIndices, previousMetadata.visibleOpenIndices.length),
Arrays.copyOf(previousMetadata.allClosedIndices, previousMetadata.allClosedIndices.length),
Arrays.copyOf(previousMetadata.visibleClosedIndices, previousMetadata.visibleClosedIndices.length),
previousMetadata.allIndices,
akolarkunnu marked this conversation as resolved.
Show resolved Hide resolved
previousMetadata.visibleIndices,
previousMetadata.allOpenIndices,
previousMetadata.visibleOpenIndices,
previousMetadata.allClosedIndices,
previousMetadata.visibleClosedIndices,
Collections.unmodifiableSortedMap(previousMetadata.indicesLookup),
systemTemplatesLookup
);
Expand Down Expand Up @@ -1749,17 +1749,6 @@ protected Metadata buildMetadataWithRecomputedIndicesLookups() {

validateDataStreams(indicesLookup, (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE));

// build all concrete indices arrays:
// TODO: I think we can remove these arrays. it isn't worth the effort, for operations on all indices.
// When doing an operation across all indices, most of the time is spent on actually going to all shards and
// do the required operations, the bottleneck isn't resolving expressions into concrete indices.
String[] allIndicesArray = allIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleIndicesArray = visibleIndices.toArray(Strings.EMPTY_ARRAY);
String[] allOpenIndicesArray = allOpenIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleOpenIndicesArray = visibleOpenIndices.toArray(Strings.EMPTY_ARRAY);
String[] allClosedIndicesArray = allClosedIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleClosedIndicesArray = visibleClosedIndices.toArray(Strings.EMPTY_ARRAY);

return new Metadata(
clusterUUID,
clusterUUIDCommitted,
Expand All @@ -1771,12 +1760,12 @@ protected Metadata buildMetadataWithRecomputedIndicesLookups() {
indices,
templates,
customs,
allIndicesArray,
visibleIndicesArray,
allOpenIndicesArray,
visibleOpenIndicesArray,
allClosedIndicesArray,
visibleClosedIndicesArray,
allIndices,
visibleIndices,
allOpenIndices,
visibleOpenIndices,
allClosedIndices,
visibleClosedIndices,
indicesLookup,
systemTemplatesLookup
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
}
} else {
List<String> filteredIndices = filterIndices(
List.of(currentState.metadata().getConcreteAllIndices()),
List.copyOf(currentState.metadata().getConcreteAllIndices()),

Check warning on line 169 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L169

Added line #L169 was not covered by tests
akolarkunnu marked this conversation as resolved.
Show resolved Hide resolved
indexNames,
IndicesOptions.fromOptions(true, true, true, true)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -93,7 +94,7 @@ public void testClusterHealth() throws IOException {
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse(
"bla",
new String[] { Metadata.ALL },
Set.of(Metadata.ALL),
clusterState,
pendingTasks,
inFlight,
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testClusterHealthVerifyClusterManagerNodeDiscovery() throws IOExcept
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse(
"bla",
new String[] { Metadata.ALL },
Set.of(Metadata.ALL),
clusterState,
pendingTasks,
inFlight,
Expand Down
Loading
Loading