From 743c09a36b319832d40aeb88e64a586f7d29afde Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Wed, 5 Jul 2023 18:58:53 +0530 Subject: [PATCH] Adding JavaDocs and fixing divide by zero errors on UTs Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../remote/RemoteRefreshSegmentTracker.java | 40 ++++++++++++++++++- .../opensearch/index/shard/IndexShard.java | 12 +++--- .../TransportRemoteStoreStatsActionTests.java | 10 ++--- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index c87cf3961fc02..e6a7bd3d70afb 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -52,6 +52,9 @@ public class RemoteRefreshSegmentTracker { */ private volatile long localRefreshClockTimeMs; + /** + * Time in milliseconds for the last successful segment download + */ private volatile long lastDownloadTimestampMs; /** @@ -101,8 +104,19 @@ public class RemoteRefreshSegmentTracker { */ private volatile long uploadBytesSucceeded; + /** + * Cumulative sum of size in bytes of segment files for which download has started. + */ private volatile long downloadBytesStarted; + + /** + * Cumulative sum of size in bytes of segment files for which download has failed. + */ private volatile long downloadBytesFailed; + + /** + * Cumulative sum of size in bytes of segment files for which download has succeeded. + */ private volatile long downloadBytesSucceeded; /** @@ -120,8 +134,19 @@ public class RemoteRefreshSegmentTracker { */ private volatile long totalUploadsSucceeded; + /** + * Cumulative sum of count of segment file downloads that have started. + */ private volatile long totalDownloadsStarted; + + /** + * Cumulative sum of count of segment file downloads that have succeeded. + */ private volatile long totalDownloadsSucceeded; + + /** + * Cumulative sum of count of segment file downloads that have failed. + */ private volatile long totalDownloadsFailed; /** @@ -165,6 +190,10 @@ public class RemoteRefreshSegmentTracker { */ private final Object uploadBytesMutex = new Object(); + /** + * Provides moving average over the last N total size in bytes of segment files downloaded from the remote store. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ private final AtomicReference downloadBytesMovingAverageReference; private final Object downloadBytesMutex = new Object(); @@ -177,18 +206,26 @@ public class RemoteRefreshSegmentTracker { private final Object uploadBytesPerSecMutex = new Object(); + /** + * Provides moving average over the last N upload speed (in bytes/s) of segment files downloaded from the remote store. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ private final AtomicReference downloadBytesPerSecMovingAverageReference; private final Object downloadBytesPerSecMutex = new Object(); /** - * Provides moving average over the last N overall upload time (in nanos) as part of remote refresh.N is window size. + * Provides moving average over the last N overall upload time (in millis) as part of remote refresh.N is window size. * Wrapped with {@code AtomicReference} for dynamic changes in window size. */ private final AtomicReference uploadTimeMsMovingAverageReference; private final Object uploadTimeMsMutex = new Object(); + /** + * Provides moving average over the last N overall download time (in millis) of segments downloaded from the remote store. + * Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ private final AtomicReference downloadTimeMovingAverageReference; private final Object downloadTimeMutex = new Object(); @@ -851,5 +888,4 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(bytesLag); } } - } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 74d1ca19a1bf2..23fdfe45a4e44 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -44,7 +44,6 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; @@ -4771,14 +4770,14 @@ private String copySegmentFiles( RemoteSegmentStoreDirectory.UploadedSegmentMetadata segmentMetadata = uploadedSegments.get(file); long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { - long startTimeInMs = System.currentTimeMillis(); + long startTimeInNs = System.nanoTime(); long segmentSizeInBytes = segmentMetadata.getLength(); beforeSegmentDownload(downloadStatsTracker, segmentSizeInBytes); try { storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT); storeDirectory.sync(Collections.singleton(file)); downloadedSegments.add(file); - afterSegmentDownloadCompleted(downloadStatsTracker, segmentSizeInBytes, startTimeInMs); + afterSegmentDownloadCompleted(downloadStatsTracker, segmentSizeInBytes, startTimeInNs); } catch (IOException e) { afterSegmentDownloadFailed(downloadStatsTracker, segmentSizeInBytes); throw e; @@ -4826,15 +4825,14 @@ private void afterSegmentDownloadCompleted( long downloadedFileSize, long startTimeInNs ) { - long currentTime = System.currentTimeMillis(); - downloadStatsTracker.updateLastDownloadTimestampMs(currentTime); + long currentTimeInNs = System.nanoTime(); + downloadStatsTracker.updateLastDownloadTimestampMs(System.currentTimeMillis()); downloadStatsTracker.incrementTotalDownloadsSucceeded(); downloadStatsTracker.addDownloadBytes(downloadedFileSize); downloadStatsTracker.addDownloadBytesSucceeded(downloadedFileSize); - long timeTakenInMS = currentTime - startTimeInNs; + long timeTakenInMS = Math.max(1, TimeValue.nsecToMSec(currentTimeInNs - startTimeInNs)); downloadStatsTracker.addDownloadTime(timeTakenInMS); downloadStatsTracker.addDownloadBytesPerSec((downloadedFileSize * 1_000L) / timeTakenInMS); - } private void afterSegmentDownloadFailed(RemoteRefreshSegmentTracker downloadStatsTracker, long failedFileSize) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java index 086c5331fbc80..939a9efa6790e 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java @@ -108,7 +108,7 @@ public void tearDown() throws Exception { clusterService.close(); } - public void testOnlyPrimaryShards() throws Exception { + public void testAllShardCopies() throws Exception { FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); RoutingTable routingTable = RoutingTable.builder().addAsNew(remoteStoreIndexMetadata).build(); Metadata metadata = Metadata.builder().put(remoteStoreIndexMetadata, false).build(); @@ -125,7 +125,7 @@ public void testOnlyPrimaryShards() throws Exception { new String[] { INDEX.getName() } ); - assertEquals(shardsIterator.size(), 2); + assertEquals(shardsIterator.size(), 4); } public void testOnlyLocalShards() throws Exception { @@ -153,10 +153,10 @@ public void testOnlyLocalShards() throws Exception { remoteStoreStatsRequest.local(true); ShardsIterator shardsIterator = statsAction.shards(clusterService.state(), remoteStoreStatsRequest, concreteIndices); - assertEquals(shardsIterator.size(), 1); + assertEquals(shardsIterator.size(), 2); } - public void testOnlyRemoteStoreEnabledShards() throws Exception { + public void testOnlyRemoteStoreEnabledShardCopies() throws Exception { FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); Index NEW_INDEX = new Index("newIndex", "newUUID"); IndexMetadata indexMetadataWithoutRemoteStore = IndexMetadata.builder(NEW_INDEX.getName()) @@ -189,6 +189,6 @@ public void testOnlyRemoteStoreEnabledShards() throws Exception { new String[] { INDEX.getName() } ); - assertEquals(shardsIterator.size(), 2); + assertEquals(shardsIterator.size(), 4); } }