Skip to content

Commit

Permalink
[Remote Store] Sync segments in refresh listener on refresh after commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Oct 22, 2023
1 parent 14d4a63 commit b916e0f
Showing 1 changed file with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,10 @@ public void beforeRefresh() throws IOException {}

@Override
protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
if (shouldSync(didRefresh)) {
if (shouldSync(didRefresh) && isReadyForUpload()) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
}
initializeRemoteDirectoryOnTermUpdate();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Expand Down Expand Up @@ -164,16 +160,12 @@ private boolean shouldSync(boolean didRefresh) {
// is important to upload the zero state segments so that the restore does not break.
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
|| didRefresh
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
|| isRefreshAfterCommitSafe();
}

private boolean syncSegments() {
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
logger.debug(
"Skipped syncing segments with primaryMode={} indexShardState={}",
indexShard.getReplicationTracker().isPrimaryMode(),
indexShard.state()
);
if (isReadyForUpload() == false) {
// Following check is required to enable retry and make sure that we do not lose this refresh event
// When primary shard is restored from remote store, the recovery happens first followed by changing
// primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through
Expand Down Expand Up @@ -323,6 +315,15 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
} catch (Exception e) {
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);
}
return false;
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
Expand Down Expand Up @@ -439,6 +440,39 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB
}
}

private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init();
if (uploadedMetadata != null) {
segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet());
}
}
}

private boolean isReadyForUpload() {
boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED;
if (isReady == false) {
logOnSyncSkip();
}
return isReady;
}

private void logOnSyncSkip() {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
if (indexShard.getReplicationTracker() != null) {
sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode());
}
if (indexShard.state() != null) {
sb.append(" indexShardState=").append(indexShard.state());
}
if (indexShard.getEngineOrNull() != null) {
sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName());
}
logger.trace(sb.toString());
}

/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*/
Expand Down

0 comments on commit b916e0f

Please sign in to comment.