Skip to content

Commit

Permalink
Clean up segment stats tests
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <sabhumik@amazon.com>
  • Loading branch information
Bhumika Saini committed Aug 11, 2023
1 parent 3664166 commit 50e8908
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ private RemoteSegmentTransferTracker.Stats stats() {
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
return matches.get(0).getStats();
return matches.get(0).getSegmentStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public void testStatsResponseFromAllNodes() {
assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0);
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats stats = matches.get(0).getStats();
validateUploadStats(stats);
RemoteSegmentTransferTracker.Stats stats = matches.get(0).getSegmentStats();
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
}

Expand All @@ -86,15 +86,15 @@ public void testStatsResponseFromAllNodes() {
assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0);
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(2, matches.size());
for (RemoteStoreStats stat : matches) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
RemoteSegmentTransferTracker.Stats stats = stat.getStats();
RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats();
if (routing.primary()) {
validateUploadStats(stats);
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
} else {
validateDownloadStats(stats);
Expand Down Expand Up @@ -124,9 +124,9 @@ public void testStatsResponseAllShards() {
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertEquals(3, response.getSuccessfulShards());
assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 3);
RemoteSegmentTransferTracker.Stats stats = response.getRemoteStoreStats()[0].getStats();
validateUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

// Step 3 - Enable replicas on the existing indices and ensure that download
// stats are being populated as well
Expand All @@ -137,13 +137,13 @@ public void testStatsResponseAllShards() {
for (RemoteStoreStats stat : response.getRemoteStoreStats()) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
stats = stat.getStats();
segmentStats = stat.getSegmentStats();
if (routing.primary()) {
validateUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);
} else {
validateDownloadStats(stats);
assertEquals(0, stats.totalUploadsStarted);
validateDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);
}
}

Expand Down Expand Up @@ -171,9 +171,9 @@ public void testStatsResponseFromLocalNode() {
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertEquals(1, response.getSuccessfulShards());
assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 1);
RemoteSegmentTransferTracker.Stats stats = response.getRemoteStoreStats()[0].getStats();
validateUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);
}
changeReplicaCountAndEnsureGreen(1);
for (String node : nodes) {
Expand All @@ -187,9 +187,9 @@ public void testStatsResponseFromLocalNode() {
for (RemoteStoreStats stat : response.getRemoteStoreStats()) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
RemoteSegmentTransferTracker.Stats stats = stat.getStats();
RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats();
if (routing.primary()) {
validateUploadStats(stats);
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
} else {
validateDownloadStats(stats);
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
.filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList())
.get(0)
.getStats();
.getSegmentStats();
assertTrue(
zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded
&& zeroStatePrimaryStats.totalUploadsSucceeded == 1
Expand All @@ -241,7 +241,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
.filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList())
.get(0)
.getStats();
.getSegmentStats();
assertTrue(
zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
Expand All @@ -266,8 +266,8 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
.filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList());
assertEquals(1, replicaStatsList.size());
RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getStats();
RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getStats();
RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getSegmentStats();
RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getSegmentStats();
// Assert Upload syncs - zero state uploads == download syncs
assertTrue(primaryStats.totalUploadsStarted > 0);
assertTrue(primaryStats.totalUploadsSucceeded > 0);
Expand Down Expand Up @@ -318,7 +318,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
.filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary())
.collect(Collectors.toList())
.get(0)
.getStats();
.getSegmentStats();
assertTrue(
zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded
&& zeroStatePrimaryStats.totalUploadsSucceeded == 1
Expand All @@ -335,8 +335,8 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
.collect(Collectors.toList());
zeroStateReplicaStats.forEach(stats -> {
assertTrue(
stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
});

Expand All @@ -356,7 +356,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr

// Assert that stats for primary shard and replica shard set are equal
for (RemoteStoreStats eachStatsObject : response.getRemoteStoreStats()) {
RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getStats();
RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getSegmentStats();
if (eachStatsObject.getShardRouting().primary()) {
uploadBytesStarted = stats.uploadBytesStarted;
uploadBytesSucceeded = stats.uploadBytesSucceeded;
Expand Down Expand Up @@ -491,7 +491,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {

RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
Arrays.stream(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getStats();
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats();
// Assert that we have both upload and download stats for the index
assertTrue(
segmentTracker.totalUploadsStarted > 0 && segmentTracker.totalUploadsSucceeded > 0 && segmentTracker.totalUploadsFailed == 0
Expand Down Expand Up @@ -520,7 +520,7 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
.get()
.getRemoteStoreStats();
Arrays.stream(remoteStoreStats).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getStats();
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats();
if (statObject.getShardRouting().primary()) {
assertTrue(
segmentTracker.totalUploadsSucceeded == 1
Expand Down Expand Up @@ -567,7 +567,7 @@ private void relocateShard(int shardId, String sourceNode, String destNode) {
ensureGreen(INDEX_NAME);
}

private void validateUploadStats(RemoteSegmentTransferTracker.Stats stats) {
private void validateSegmentUploadStats(RemoteSegmentTransferTracker.Stats stats) {
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public RemoteStoreStats(StreamInput in) throws IOException {
this.shardRouting = new ShardRouting(in);
}

public RemoteSegmentTransferTracker.Stats getStats() {
public RemoteSegmentTransferTracker.Stats getSegmentStats() {
return remoteSegmentShardStats;
}

Expand All @@ -55,7 +55,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(SubFields.DOWNLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted != 0) {
buildDownloadStats(builder);
buildSegmentDownloadStats(builder);
}
builder.endObject();
builder.startObject(SubFields.UPLOAD);
Expand Down Expand Up @@ -104,7 +104,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.endObject();
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
private void buildSegmentDownloadStats(XContentBuilder builder) throws IOException {
builder.field(
DownloadStatsFields.LAST_SYNC_TIMESTAMP,
remoteSegmentShardStats.directoryFileTransferTrackerStats.lastTransferTimestampMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -675,5 +676,59 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(bytesLag);
out.writeOptionalWriteable(directoryFileTransferTrackerStats);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Stats other = (Stats) obj;

return this.shardId.toString().equals(other.shardId.toString())
&& this.localRefreshClockTimeMs == other.localRefreshClockTimeMs
&& this.remoteRefreshClockTimeMs == other.remoteRefreshClockTimeMs
&& this.refreshTimeLagMs == other.refreshTimeLagMs
&& this.localRefreshNumber == other.localRefreshNumber
&& this.remoteRefreshNumber == other.remoteRefreshNumber
&& this.uploadBytesStarted == other.uploadBytesStarted
&& this.uploadBytesFailed == other.uploadBytesFailed
&& this.uploadBytesSucceeded == other.uploadBytesSucceeded
&& this.totalUploadsStarted == other.totalUploadsStarted
&& this.totalUploadsFailed == other.totalUploadsFailed
&& this.totalUploadsSucceeded == other.totalUploadsSucceeded
&& this.rejectionCount == other.rejectionCount
&& this.consecutiveFailuresCount == other.consecutiveFailuresCount
&& this.lastSuccessfulRemoteRefreshBytes == other.lastSuccessfulRemoteRefreshBytes
&& Double.compare(this.uploadBytesMovingAverage, other.uploadBytesMovingAverage) == 0
&& Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0
&& Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0
&& this.bytesLag == other.bytesLag
&& this.directoryFileTransferTrackerStats.equals(other.directoryFileTransferTrackerStats);
}

@Override
public int hashCode() {
return Objects.hash(

Check warning on line 710 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L710

Added line #L710 was not covered by tests
shardId,
localRefreshClockTimeMs,
remoteRefreshClockTimeMs,
refreshTimeLagMs,
localRefreshNumber,
remoteRefreshNumber,
uploadBytesStarted,
uploadBytesFailed,
uploadBytesSucceeded,
totalUploadsStarted,
totalUploadsFailed,
totalUploadsSucceeded,
rejectionCount,
consecutiveFailuresCount,
lastSuccessfulRemoteRefreshBytes,
uploadBytesMovingAverage,
uploadBytesPerSecMovingAverage,
uploadTimeMovingAverage,
bytesLag,

Check warning on line 729 in server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java#L712-L729

Added lines #L712 - L729 were not covered by tests
directoryFileTransferTrackerStats
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Objects;

/**
* Tracks the amount of bytes transferred between two {@link org.apache.lucene.store.Directory} instances
Expand Down Expand Up @@ -191,5 +192,33 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(lastSuccessfulTransferInBytes);
out.writeDouble(transferredBytesPerSecMovingAverage);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Stats stats = (Stats) obj;

return transferredBytesStarted == stats.transferredBytesStarted
&& transferredBytesFailed == stats.transferredBytesFailed
&& transferredBytesSucceeded == stats.transferredBytesSucceeded
&& lastTransferTimestampMs == stats.lastTransferTimestampMs
&& Double.compare(stats.transferredBytesMovingAverage, transferredBytesMovingAverage) == 0
&& lastSuccessfulTransferInBytes == stats.lastSuccessfulTransferInBytes
&& Double.compare(stats.transferredBytesPerSecMovingAverage, transferredBytesPerSecMovingAverage) == 0;
}

@Override
public int hashCode() {
return Objects.hash(
transferredBytesStarted,
transferredBytesFailed,
transferredBytesSucceeded,
lastTransferTimestampMs,
transferredBytesMovingAverage,
lastSuccessfulTransferInBytes,
transferredBytesPerSecMovingAverage

Check warning on line 220 in server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java#L213-L220

Added lines #L213 - L220 were not covered by tests
);
}
}
}
Loading

0 comments on commit 50e8908

Please sign in to comment.