Skip to content

Commit

Permalink
Adding JavaDocs and fixing divide by zero errors on UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed Jul 5, 2023
1 parent e667270 commit 743c09a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class RemoteRefreshSegmentTracker {
*/
private volatile long localRefreshClockTimeMs;

/**
* Time in milliseconds for the last successful segment download
*/
private volatile long lastDownloadTimestampMs;

/**
Expand Down Expand Up @@ -101,8 +104,19 @@ public class RemoteRefreshSegmentTracker {
*/
private volatile long uploadBytesSucceeded;

/**
* Cumulative sum of size in bytes of segment files for which download has started.
*/
private volatile long downloadBytesStarted;

/**
* Cumulative sum of size in bytes of segment files for which download has failed.
*/
private volatile long downloadBytesFailed;

/**
* Cumulative sum of size in bytes of segment files for which download has succeeded.
*/
private volatile long downloadBytesSucceeded;

/**
Expand All @@ -120,8 +134,19 @@ public class RemoteRefreshSegmentTracker {
*/
private volatile long totalUploadsSucceeded;

/**
* Cumulative sum of count of segment file downloads that have started.
*/
private volatile long totalDownloadsStarted;

/**
* Cumulative sum of count of segment file downloads that have succeeded.
*/
private volatile long totalDownloadsSucceeded;

/**
* Cumulative sum of count of segment file downloads that have failed.
*/
private volatile long totalDownloadsFailed;

/**
Expand Down Expand Up @@ -165,6 +190,10 @@ public class RemoteRefreshSegmentTracker {
*/
private final Object uploadBytesMutex = new Object();

/**
* Provides moving average over the last N total size in bytes of segment files downloaded from the remote store.
* N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size.
*/
private final AtomicReference<MovingAverage> downloadBytesMovingAverageReference;

private final Object downloadBytesMutex = new Object();
Expand All @@ -177,18 +206,26 @@ public class RemoteRefreshSegmentTracker {

private final Object uploadBytesPerSecMutex = new Object();

/**
* Provides moving average over the last N upload speed (in bytes/s) of segment files downloaded from the remote store.
* N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size.
*/
private final AtomicReference<MovingAverage> downloadBytesPerSecMovingAverageReference;

private final Object downloadBytesPerSecMutex = new Object();

/**
* Provides moving average over the last N overall upload time (in nanos) as part of remote refresh.N is window size.
* Provides moving average over the last N overall upload time (in millis) as part of remote refresh.N is window size.
* Wrapped with {@code AtomicReference} for dynamic changes in window size.
*/
private final AtomicReference<MovingAverage> uploadTimeMsMovingAverageReference;

private final Object uploadTimeMsMutex = new Object();

/**
* Provides moving average over the last N overall download time (in millis) of segments downloaded from the remote store.
* Wrapped with {@code AtomicReference} for dynamic changes in window size.
*/
private final AtomicReference<MovingAverage> downloadTimeMovingAverageReference;

private final Object downloadTimeMutex = new Object();
Expand Down Expand Up @@ -851,5 +888,4 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(bytesLag);
}
}

}
12 changes: 5 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -4771,14 +4770,14 @@ private String copySegmentFiles(
RemoteSegmentStoreDirectory.UploadedSegmentMetadata segmentMetadata = uploadedSegments.get(file);
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
long startTimeInMs = System.currentTimeMillis();
long startTimeInNs = System.nanoTime();
long segmentSizeInBytes = segmentMetadata.getLength();
beforeSegmentDownload(downloadStatsTracker, segmentSizeInBytes);
try {
storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT);
storeDirectory.sync(Collections.singleton(file));
downloadedSegments.add(file);
afterSegmentDownloadCompleted(downloadStatsTracker, segmentSizeInBytes, startTimeInMs);
afterSegmentDownloadCompleted(downloadStatsTracker, segmentSizeInBytes, startTimeInNs);
} catch (IOException e) {
afterSegmentDownloadFailed(downloadStatsTracker, segmentSizeInBytes);
throw e;
Expand Down Expand Up @@ -4826,15 +4825,14 @@ private void afterSegmentDownloadCompleted(
long downloadedFileSize,
long startTimeInNs
) {
long currentTime = System.currentTimeMillis();
downloadStatsTracker.updateLastDownloadTimestampMs(currentTime);
long currentTimeInNs = System.nanoTime();
downloadStatsTracker.updateLastDownloadTimestampMs(System.currentTimeMillis());
downloadStatsTracker.incrementTotalDownloadsSucceeded();
downloadStatsTracker.addDownloadBytes(downloadedFileSize);
downloadStatsTracker.addDownloadBytesSucceeded(downloadedFileSize);
long timeTakenInMS = currentTime - startTimeInNs;
long timeTakenInMS = Math.max(1, TimeValue.nsecToMSec(currentTimeInNs - startTimeInNs));
downloadStatsTracker.addDownloadTime(timeTakenInMS);
downloadStatsTracker.addDownloadBytesPerSec((downloadedFileSize * 1_000L) / timeTakenInMS);

}

private void afterSegmentDownloadFailed(RemoteRefreshSegmentTracker downloadStatsTracker, long failedFileSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void tearDown() throws Exception {
clusterService.close();
}

public void testOnlyPrimaryShards() throws Exception {
public void testAllShardCopies() throws Exception {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
RoutingTable routingTable = RoutingTable.builder().addAsNew(remoteStoreIndexMetadata).build();
Metadata metadata = Metadata.builder().put(remoteStoreIndexMetadata, false).build();
Expand All @@ -125,7 +125,7 @@ public void testOnlyPrimaryShards() throws Exception {
new String[] { INDEX.getName() }
);

assertEquals(shardsIterator.size(), 2);
assertEquals(shardsIterator.size(), 4);
}

public void testOnlyLocalShards() throws Exception {
Expand Down Expand Up @@ -153,10 +153,10 @@ public void testOnlyLocalShards() throws Exception {
remoteStoreStatsRequest.local(true);
ShardsIterator shardsIterator = statsAction.shards(clusterService.state(), remoteStoreStatsRequest, concreteIndices);

assertEquals(shardsIterator.size(), 1);
assertEquals(shardsIterator.size(), 2);
}

public void testOnlyRemoteStoreEnabledShards() throws Exception {
public void testOnlyRemoteStoreEnabledShardCopies() throws Exception {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
Index NEW_INDEX = new Index("newIndex", "newUUID");
IndexMetadata indexMetadataWithoutRemoteStore = IndexMetadata.builder(NEW_INDEX.getName())
Expand Down Expand Up @@ -189,6 +189,6 @@ public void testOnlyRemoteStoreEnabledShards() throws Exception {
new String[] { INDEX.getName() }
);

assertEquals(shardsIterator.size(), 2);
assertEquals(shardsIterator.size(), 4);
}
}

0 comments on commit 743c09a

Please sign in to comment.