Skip to content

Commit

Permalink
Use Map in snapshot/restore tracking (#87666)
Browse files Browse the repository at this point in the history
Snapshot/restore keeps track of progress in ImmutableOpenMaps. This
commit changes these tracking structures to use HashMap.

relates #86239
  • Loading branch information
rjernst authored Jun 17, 2022
1 parent 6e522f8 commit ba4f745
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;

import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
Expand Down Expand Up @@ -59,7 +59,7 @@ public void clusterChanged(ClusterChangedEvent changedEvent) {
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -33,16 +34,16 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements

public static final String TYPE = "restore";

public static final RestoreInProgress EMPTY = new RestoreInProgress(ImmutableOpenMap.of());
public static final RestoreInProgress EMPTY = new RestoreInProgress(Map.of());

private final ImmutableOpenMap<String, Entry> entries;
private final Map<String, Entry> entries;

/**
* Constructs new restore metadata
*
* @param entries map of currently running restore processes keyed by their restore uuid
*/
private RestoreInProgress(ImmutableOpenMap<String, Entry> entries) {
private RestoreInProgress(Map<String, Entry> entries) {
this.entries = entries;
}

Expand Down Expand Up @@ -82,12 +83,12 @@ public Iterator<Entry> iterator() {

public static final class Builder {

private final ImmutableOpenMap.Builder<String, Entry> entries = ImmutableOpenMap.builder();
private final Map<String, Entry> entries = new HashMap<>();

public Builder() {}

public Builder(RestoreInProgress restoreInProgress) {
entries.putAllFromMap(restoreInProgress.entries);
entries.putAll(restoreInProgress.entries);
}

public Builder add(Entry entry) {
Expand All @@ -96,20 +97,14 @@ public Builder add(Entry entry) {
}

public RestoreInProgress build() {
return entries.isEmpty() ? EMPTY : new RestoreInProgress(entries.build());
return entries.isEmpty() ? EMPTY : new RestoreInProgress(Collections.unmodifiableMap(entries));
}
}

/**
* Restore metadata
*/
public record Entry(
String uuid,
Snapshot snapshot,
State state,
List<String> indices,
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards
) {
public record Entry(String uuid, Snapshot snapshot, State state, List<String> indices, Map<ShardId, ShardRestoreStatus> shards) {
/**
* Creates new restore metadata
*
Expand All @@ -119,18 +114,12 @@ public record Entry(
* @param indices list of indices being restored
* @param shards map of shards being restored to their current restore status
*/
public Entry(
String uuid,
Snapshot snapshot,
State state,
List<String> indices,
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards
) {
public Entry(String uuid, Snapshot snapshot, State state, List<String> indices, Map<ShardId, ShardRestoreStatus> shards) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.indices = Objects.requireNonNull(indices);
if (shards == null) {
this.shards = ImmutableOpenMap.of();
this.shards = Map.of();
} else {
this.shards = shards;
}
Expand Down Expand Up @@ -347,25 +336,19 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException

public RestoreInProgress(StreamInput in) throws IOException {
int count = in.readVInt();
final ImmutableOpenMap.Builder<String, Entry> entriesBuilder = ImmutableOpenMap.builder(count);
final Map<String, Entry> entriesBuilder = Maps.newHashMapWithExpectedSize(count);
for (int i = 0; i < count; i++) {
final String uuid;
uuid = in.readString();
Snapshot snapshot = new Snapshot(in);
State state = State.fromValue(in.readByte());
List<String> indexBuilder = in.readStringList();
List<String> indices = in.readImmutableList(StreamInput::readString);
entriesBuilder.put(
uuid,
new Entry(
uuid,
snapshot,
state,
Collections.unmodifiableList(indexBuilder),
in.readImmutableOpenMap(ShardId::new, ShardRestoreStatus::readShardRestoreStatus)
)
new Entry(uuid, snapshot, state, indices, in.readImmutableMap(ShardId::new, ShardRestoreStatus::readShardRestoreStatus))
);
}
this.entries = entriesBuilder.build();
this.entries = Collections.unmodifiableMap(entriesBuilder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -251,7 +250,7 @@ public static Entry startedEntry(
List<String> dataStreams,
long startTime,
long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version,
List<SnapshotFeatureInfo> featureStates
Expand Down Expand Up @@ -302,12 +301,12 @@ public static Entry startClone(
Collections.emptyList(),
startTime,
repositoryStateId,
ImmutableOpenMap.of(),
Map.of(),
null,
Collections.emptyMap(),
version,
source,
ImmutableOpenMap.of()
Map.of()
);
}

Expand All @@ -326,7 +325,7 @@ public static boolean completed(Collection<ShardSnapshotStatus> shards) {
return true;
}

private static boolean hasFailures(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
private static boolean hasFailures(Map<RepositoryShardId, ShardSnapshotStatus> clones) {
for (ShardSnapshotStatus value : clones.values()) {
if (value.state().failed()) {
return true;
Expand Down Expand Up @@ -685,7 +684,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
/**
* Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard operation in this entry.
*/
private final ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> shardStatusByRepoShardId;
private final Map<RepositoryShardId, ShardSnapshotStatus> shardStatusByRepoShardId;

@Nullable
private final Map<String, Object> userMetadata;
Expand Down Expand Up @@ -723,7 +722,7 @@ public Entry(
userMetadata,
version,
null,
ImmutableOpenMap.of()
Map.of()
);
}

Expand All @@ -742,7 +741,7 @@ private Entry(
Map<String, Object> userMetadata,
Version version,
@Nullable SnapshotId source,
ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> shardStatusByRepoShardId
Map<RepositoryShardId, ShardSnapshotStatus> shardStatusByRepoShardId
) {
this.state = state;
this.snapshot = snapshot;
Expand All @@ -762,9 +761,7 @@ private Entry(
assert shardStatusByRepoShardId == null || shardStatusByRepoShardId.isEmpty()
: "Provided explict repo shard id statuses [" + shardStatusByRepoShardId + "] but no source";
final Map<String, Index> res = Maps.newMapWithExpectedSize(indices.size());
final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> byRepoShardIdBuilder = ImmutableOpenMap.builder(
shards.size()
);
final Map<RepositoryShardId, ShardSnapshotStatus> byRepoShardIdBuilder = Maps.newHashMapWithExpectedSize(shards.size());
for (Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
final ShardId shardId = entry.getKey();
final IndexId indexId = indices.get(shardId.getIndexName());
Expand All @@ -773,12 +770,12 @@ private Entry(
assert existing == null || existing.equals(index) : "Conflicting indices [" + existing + "] and [" + index + "]";
byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), entry.getValue());
}
this.shardStatusByRepoShardId = byRepoShardIdBuilder.build();
snapshotIndices = Map.copyOf(res);
this.shardStatusByRepoShardId = Map.copyOf(byRepoShardIdBuilder);
this.snapshotIndices = Map.copyOf(res);
} else {
assert shards.isEmpty();
this.shardStatusByRepoShardId = shardStatusByRepoShardId;
snapshotIndices = Map.of();
this.snapshotIndices = Map.of();
}
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.shardStatusByRepoShardId);
}
Expand All @@ -801,17 +798,14 @@ private static Entry readFrom(StreamInput in) throws IOException {
indices = Collections.unmodifiableMap(idx);
}
final long startTime = in.readLong();
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = in.readImmutableOpenMap(
ShardId::new,
ShardSnapshotStatus::readFrom
);
final Map<ShardId, ShardSnapshotStatus> shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom);
final long repositoryStateId = in.readLong();
final String failure = in.readOptionalString();
final Map<String, Object> userMetadata = in.readMap();
final Version version = Version.readVersion(in);
final List<String> dataStreams = in.readStringList();
final SnapshotId source = in.readOptionalWriteable(SnapshotId::new);
final ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones = in.readImmutableOpenMap(
final Map<RepositoryShardId, ShardSnapshotStatus> clones = in.readImmutableMap(
RepositoryShardId::new,
ShardSnapshotStatus::readFrom
);
Expand Down Expand Up @@ -840,7 +834,7 @@ private static boolean assertShardsConsistent(
State state,
Map<String, IndexId> indices,
Map<ShardId, ShardSnapshotStatus> shards,
ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> statusByRepoShardId
Map<RepositoryShardId, ShardSnapshotStatus> statusByRepoShardId
) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
Expand Down Expand Up @@ -895,7 +889,7 @@ public Entry withRepoGen(long newRepoGen) {
userMetadata,
version,
source,
source == null ? ImmutableOpenMap.of() : shardStatusByRepoShardId
source == null ? Map.of() : shardStatusByRepoShardId
);
}

Expand Down Expand Up @@ -943,13 +937,13 @@ public Entry withUpdatedIndexIds(Map<IndexId, IndexId> updates) {
userMetadata,
version,
source,
ImmutableOpenMap.of()
Map.of()
);
}
return this;
}

public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
public Entry withClones(Map<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
if (updatedClones.equals(shardStatusByRepoShardId)) {
return this;
}
Expand All @@ -964,7 +958,7 @@ public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus>
featureStates,
startTime,
repositoryStateId,
ImmutableOpenMap.of(),
Map.of(),
failure,
userMetadata,
version,
Expand All @@ -985,7 +979,7 @@ public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus>
*/
@Nullable
public Entry abort() {
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
final Map<ShardId, ShardSnapshotStatus> shardsBuilder = new HashMap<>();
boolean completed = true;
boolean allQueued = true;
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : shards.entrySet()) {
Expand Down Expand Up @@ -1016,12 +1010,12 @@ public Entry abort() {
featureStates,
startTime,
repositoryStateId,
shardsBuilder.build(),
Map.copyOf(shardsBuilder),
ABORTED_FAILURE_TEXT,
userMetadata,
version,
source,
ImmutableOpenMap.of()
Map.of()
);
}

Expand All @@ -1033,7 +1027,7 @@ public Entry abort() {
* @param shards new shard snapshot states
* @return new snapshot entry
*/
public Entry withShardStates(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
public Entry withShardStates(Map<ShardId, ShardSnapshotStatus> shards) {
if (completed(shards.values())) {
return new Entry(
snapshot,
Expand All @@ -1058,7 +1052,7 @@ public Entry withShardStates(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shar
* Same as {@link #withShardStates} but does not check if the snapshot completed and thus is only to be used when starting new
* shard snapshots on data nodes for a running snapshot.
*/
public Entry withStartedShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
public Entry withStartedShards(Map<ShardId, ShardSnapshotStatus> shards) {
final SnapshotsInProgress.Entry updated = new Entry(
snapshot,
includeGlobalState,
Expand Down Expand Up @@ -1305,7 +1299,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(dataStreams);
out.writeOptionalWriteable(source);
if (source == null) {
out.writeMap(ImmutableOpenMap.of());
out.writeMap(Map.of());
} else {
out.writeMap(shardStatusByRepoShardId);
}
Expand Down
Loading

0 comments on commit ba4f745

Please sign in to comment.