Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SnapshotV2] Add snapshot size to status response for shallow V2 snapshots #15573

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolea
if (hasIndexFilter) {
assertEquals(0, snapshotStatus.getStats().getTotalSize());
} else {
// TODO: after adding primary store size at the snapshot level, total size here should be > 0
assertTrue(snapshotStatus.getStats().getTotalSize() > 0L);
}
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testStatusApiConsistency() {
assertEquals(snapshotStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
}

public void testStatusAPICallForShallowCopySnapshot() {
public void testStatusAPICallForShallowCopySnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Expand All @@ -135,15 +135,24 @@ public void testStatusAPICallForShallowCopySnapshot() {
final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);

final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot);
assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS));
assertBusy(() -> {
final SnapshotStatus snapshotStatus = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.execute()
.actionGet()
.getSnapshots()
.get(0);
assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS));

final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName);
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L));
final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName);
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L));
}, 1, TimeUnit.MINUTES);
}

public void testStatusAPICallInProgressSnapshot() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.snapshots.status;

import org.opensearch.Version;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -86,6 +87,8 @@

private SnapshotStats stats;

private final long initialTotalSizeInBytes;

@Nullable
private final Boolean includeGlobalState;

Expand All @@ -96,7 +99,12 @@
includeGlobalState = in.readOptionalBoolean();
final long startTime = in.readLong();
final long time = in.readLong();
updateShardStats(startTime, time);
if (in.getVersion().onOrAfter(Version.CURRENT)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While checking this in 'main', please could you help add a TODO comment with the specific version this needs to be replaced with in the follow-up PR? Applicable for all places we have added 'Version.CURRENT'

initialTotalSizeInBytes = in.readOptionalLong();

Check warning on line 103 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L103

Added line #L103 was not covered by tests
} else {
initialTotalSizeInBytes = 0L;

Check warning on line 105 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L105

Added line #L105 was not covered by tests
}
updateShardStats(startTime, time, initialTotalSizeInBytes);

Check warning on line 107 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L107

Added line #L107 was not covered by tests
}

SnapshotStatus(
Expand All @@ -105,15 +113,18 @@
List<SnapshotIndexShardStatus> shards,
Boolean includeGlobalState,
long startTime,
long time
long time,
long initialTotalSizeInBytes
) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
assert time >= 0 : "time must be >= 0 but received [" + time + "]";
updateShardStats(startTime, time);
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
assert initialTotalSizeInBytes >= 0 : "initialTotalSizeInBytes must be >= 0 but received [" + initialTotalSizeInBytes + "]";
updateShardStats(startTime, time, initialTotalSizeInBytes);
}

private SnapshotStatus(
Expand All @@ -123,7 +134,8 @@
Map<String, SnapshotIndexStatus> indicesStatus,
SnapshotShardsStats shardsStats,
SnapshotStats stats,
Boolean includeGlobalState
Boolean includeGlobalState,
long initialTotalSizeInBytes
) {
this.snapshot = snapshot;
this.state = state;
Expand All @@ -132,6 +144,7 @@
this.shardsStats = shardsStats;
this.stats = stats;
this.includeGlobalState = includeGlobalState;
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
}

/**
Expand Down Expand Up @@ -204,6 +217,9 @@
out.writeOptionalBoolean(includeGlobalState);
out.writeLong(stats.getStartTime());
out.writeLong(stats.getTime());
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalLong(initialTotalSizeInBytes);

Check warning on line 221 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L221

Added line #L221 was not covered by tests
}
}

@Override
Expand Down Expand Up @@ -276,7 +292,7 @@
shards.addAll(index.getShards().values());
}
}
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState);
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState, 0L);
}
);
static {
Expand All @@ -299,8 +315,8 @@
return PARSER.parse(parser, null);
}

private void updateShardStats(long startTime, long time) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
private void updateShardStats(long startTime, long time, long initialTotalSizeInBytes) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSizeInBytes, 0);
shardsStats = new SnapshotShardsStats(shards);
for (SnapshotIndexShardStatus shard : shards) {
// BWC: only update timestamps when we did not get a start time from an old node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@
Collections.unmodifiableList(shardStatusBuilder),
entry.includeGlobalState(),
entry.startTime(),
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L),

Check warning on line 300 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java#L300

Added line #L300 was not covered by tests
0L
)
);
}
Expand Down Expand Up @@ -344,7 +345,7 @@
boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0;
long initialSnapshotTotalSize = 0;
if (isShallowV2Snapshot && request.indices().length == 0) {
// TODO: add primary store size in bytes at the snapshot level
initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes();

Check warning on line 348 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java#L348

Added line #L348 was not covered by tests
}

for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
Expand Down Expand Up @@ -377,7 +378,8 @@
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime,
initialSnapshotTotalSize
)
);
}
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
final long primaryStoreSize;

private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L);
}

/**
Expand All @@ -84,6 +86,7 @@
* @param shardSizes a shardkey to size in bytes mapping per shard.
* @param routingToDataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param primaryStoreSize total size in bytes for all the primary shards
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -92,14 +95,16 @@
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
final Map<String, FileCacheStats> nodeFileCacheStats,
final long primaryStoreSize
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
this.primaryStoreSize = primaryStoreSize;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

Expand All @@ -121,6 +126,11 @@
} else {
this.nodeFileCacheStats = Map.of();
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.primaryStoreSize = in.readOptionalLong();
} else {
this.primaryStoreSize = 0L;

Check warning on line 132 in server/src/main/java/org/opensearch/cluster/ClusterInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/ClusterInfo.java#L132

Added line #L132 was not covered by tests
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}
Expand Down Expand Up @@ -166,6 +176,9 @@
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalLong(this.primaryStoreSize);
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -220,6 +233,7 @@
}
}
builder.endArray(); // end "reserved_sizes"
builder.field("primary_store_size", this.primaryStoreSize);

Check warning on line 236 in server/src/main/java/org/opensearch/cluster/ClusterInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/ClusterInfo.java#L236

Added line #L236 was not covered by tests
return builder;
}

Expand All @@ -246,6 +260,13 @@
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the total size in bytes for all the primary shards
*/
public long getPrimaryStoreSize() {
return primaryStoreSize;
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager

private volatile long primaryStoreSize;
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
Expand All @@ -127,6 +129,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.primaryStoreSize = 0L;
this.threadPool = threadPool;
this.client = client;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -213,7 +216,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
nodeFileCacheStats,
primaryStoreSize
);
}

Expand Down Expand Up @@ -305,8 +309,13 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>();
final Map<ShardRouting, String> dataPathByShardRoutingBuilder = new HashMap<>();
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = new HashMap<>();
buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders);

primaryStoreSize = buildShardLevelInfo(
logger,
stats,
shardSizeByIdentifierBuilder,
dataPathByShardRoutingBuilder,
reservedSpaceBuilders
);
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> rsrvdSpace = new HashMap<>();
reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build()));

Expand Down Expand Up @@ -366,13 +375,14 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}

static void buildShardLevelInfo(
static long buildShardLevelInfo(
Logger logger,
ShardStats[] stats,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> newShardRoutingToDataPath,
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard
) {
long currentPrimaryStoreSize = 0L;
for (ShardStats s : stats) {
final ShardRouting shardRouting = s.getShardRouting();
newShardRoutingToDataPath.put(shardRouting, s.getDataPath());
Expand All @@ -382,6 +392,9 @@ static void buildShardLevelInfo(
continue;
}
final long size = storeStats.sizeInBytes();
if (shardRouting.primary()) {
currentPrimaryStoreSize += size;
}
final long reserved = storeStats.getReservedSize().getBytes();

final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
Expand All @@ -396,6 +409,7 @@ static void buildShardLevelInfo(
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
}
}
return currentPrimaryStoreSize;
}

static void fillDiskUsagePerNode(
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,7 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
clusterInfoService,
actionModule.getActionFilters(),
remoteStorePinnedTimestampService
);
Expand Down
Loading
Loading