Skip to content

Commit

Permalink
[Segment Replication] Add logs & force enable segrep
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Mar 20, 2023
1 parent 3c9eeb4 commit 937ff0b
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public Iterator<Setting<?>> settings() {
public static final String SETTING_REPLICATION_TYPE = "index.replication.type";
public static final Setting<ReplicationType> INDEX_REPLICATION_TYPE_SETTING = new Setting<>(
SETTING_REPLICATION_TYPE,
ReplicationType.DOCUMENT.toString(),
ReplicationType.SEGMENT.toString(),
ReplicationType::parseString,
Property.IndexScope,
Property.Final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static boolean isEnabled(String featureFlagName) {
return settings != null && settings.getAsBoolean(featureFlagName, false);
}

public static final Setting<Boolean> REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, false, Property.NodeScope);
public static final Setting<Boolean> REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, true, Property.NodeScope);

public static final Setting<Boolean> REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3226,7 +3226,7 @@ private void doCheckIndex() throws IOException {
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
}

Engine getEngine() {
public Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private void recover(StartRecoveryRequest request, ActionListener<RecoveryRespon

if (request.isPrimaryRelocation()
&& (routingEntry.relocating() == false || routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) {
logger.debug(
logger.info(
"delaying recovery of {} as source shard is not marked yet as relocating to {}",
request.shardId(),
request.targetNode()
Expand All @@ -177,7 +177,7 @@ private void recover(StartRecoveryRequest request, ActionListener<RecoveryRespon
}

RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace(
logger.info(
"[{}][{}] starting recovery to {}",
request.shardId().getIndex().getName(),
request.shardId().id(),
Expand All @@ -190,7 +190,7 @@ private void reestablish(ReestablishRecoveryRequest request, ActionListener<Reco
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());

logger.trace(
logger.info(
"[{}][{}] reestablishing recovery {}",
request.shardId().getIndex().getName(),
request.shardId().id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourc
}

protected void retryRecovery(final long recoveryId, final Throwable reason, TimeValue retryAfter, TimeValue activityTimeout) {
logger.trace(() -> new ParameterizedMessage("will retry recovery with id [{}] in [{}]", recoveryId, retryAfter), reason);
logger.info(() -> new ParameterizedMessage("will retry recovery with id [{}] in [{}]", recoveryId, retryAfter), reason);
retryRecovery(recoveryId, retryAfter, activityTimeout);
}

protected void retryRecovery(final long recoveryId, final String reason, TimeValue retryAfter, TimeValue activityTimeout) {
logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason);
logger.info("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason);
retryRecovery(recoveryId, retryAfter, activityTimeout);
}

Expand All @@ -215,7 +215,7 @@ private void retryRecovery(final long recoveryId, final TimeValue retryAfter, fi

protected void reestablishRecovery(final StartRecoveryRequest request, final String reason, TimeValue retryAfter) {
final long recoveryId = request.recoveryId();
logger.trace("will try to reestablish recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason);
logger.info("will try to reestablish recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason);
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request));
}

Expand All @@ -232,7 +232,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final ReplicationTimer timer;
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
logger.info("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
}
final RecoveryTarget recoveryTarget = recoveryRef.get();
Expand All @@ -242,7 +242,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final IndexShard indexShard = recoveryTarget.indexShard();
indexShard.preRecovery();
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
logger.info("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand All @@ -261,20 +261,20 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
} catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
logger.info("unexpected error while preparing shard for peer recovery, failing recovery", e);
onGoingRecoveries.fail(
recoveryId,
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e),
true
);
return;
}
logger.trace("{} starting recovery from {}", startRequest.shardId(), startRequest.sourceNode());
logger.info("{} starting recovery from {}", startRequest.shardId(), startRequest.sourceNode());
} else {
startRequest = preExistingRequest;
requestToSend = new ReestablishRecoveryRequest(recoveryId, startRequest.shardId(), startRequest.targetAllocationId());
actionName = PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY;
logger.trace("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
logger.info("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
}
}
transportService.sendRequest(
Expand Down Expand Up @@ -313,7 +313,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(
boolean verifyTranslog
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
logger.info("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
Expand Down Expand Up @@ -341,7 +341,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;
logger.trace("{} shard folder empty, recovering all files", recoveryTarget);
logger.info("{} shard folder empty, recovering all files", recoveryTarget);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} catch (final IOException e) {
if (startingSeqNo != UNASSIGNED_SEQ_NO) {
Expand All @@ -359,7 +359,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(
}
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
}
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
logger.info("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
Expand Down Expand Up @@ -450,7 +450,7 @@ private void performTranslogOps(
final Consumer<Exception> retryOnMappingException = exception -> {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
logger.debug("delaying recovery due to missing mapping changes", exception);
logger.info("delaying recovery due to missing mapping changes", exception);
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
Expand Down Expand Up @@ -588,7 +588,7 @@ public void onFailure(Exception e) {
true // be safe
);
} else {
logger.debug(
logger.info(
() -> new ParameterizedMessage("unexpected error during recovery, but recovery id [{}] is finished", recoveryId),
e
);
Expand Down Expand Up @@ -654,9 +654,9 @@ public void handleResponse(RecoveryResponse recoveryResponse) {
.append(timeValueMillis(recoveryResponse.phase2Time))
.append("]")
.append("\n");
logger.trace("{}", sb);
logger.info("{}", sb);
} else {
logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime);
logger.info("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime);
}
}

Expand All @@ -667,7 +667,7 @@ public void handleException(TransportException e) {

private void onException(Exception e) {
if (logger.isTraceEnabled()) {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"[{}][{}] Got exception on recovery",
request.shardId().getIndex().getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ void phase1(
phase1ExistingFileSizes.add(md.length());
existingTotalSizeInBytes += md.length();
if (logger.isTraceEnabled()) {
logger.trace(
logger.info(
"recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + " size [{}]",
md.name(),
md.checksum(),
Expand All @@ -411,21 +411,21 @@ void phase1(
phase1Files.addAll(diff.missing);
for (StoreFileMetadata md : phase1Files) {
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
logger.trace(
logger.info(
"recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
md.name(),
request.metadataSnapshot().asMap().get(md.name()),
md
);
} else {
logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
logger.info("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
}
phase1FileNames.add(md.name());
phase1FileSizes.add(md.length());
totalSizeInBytes += md.length();
}

logger.trace(
logger.info(
"recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
phase1FileNames.size(),
new ByteSizeValue(totalSizeInBytes),
Expand Down Expand Up @@ -473,7 +473,7 @@ void phase1(
final long existingTotalSize = existingTotalSizeInBytes;
cleanFilesStep.whenComplete(r -> {
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
logger.info("recovery [phase1]: took [{}]", took);
listener.onResponse(
new SendFileResult(
phase1FileNames,
Expand All @@ -487,14 +487,14 @@ void phase1(
);
}, listener::onFailure);
} else {
logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());
logger.info("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());

// but we must still create a retention lease
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
createRetentionLease(startingSeqNo, createRetentionLeaseStep);
createRetentionLeaseStep.whenComplete(retentionLease -> {
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
logger.info("recovery [phase1]: took [{}]", took);
listener.onResponse(
new SendFileResult(
Collections.emptyList(),
Expand Down Expand Up @@ -534,14 +534,14 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
// not enough, and fall back to a file-based recovery.
//
// (approximately) because we do not guarantee to be able to satisfy every lease on every peer.
logger.trace("cloning primary's retention lease");
logger.info("cloning primary's retention lease");
try {
final StepListener<ReplicationResponse> cloneRetentionLeaseStep = new StepListener<>();
final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(
request.targetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)
);
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
logger.info("cloned primary's retention lease as [{}]", clonedLease);
cloneRetentionLeaseStep.whenComplete(rr -> listener.onResponse(clonedLease), listener::onFailure);
} catch (RetentionLeaseNotFoundException e) {
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
Expand All @@ -557,7 +557,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)
);
addRetentionLeaseStep.whenComplete(rr -> listener.onResponse(newLease), listener::onFailure);
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
logger.info("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
}
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger);
}
Expand Down Expand Up @@ -604,12 +604,12 @@ void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> li
final ActionListener<Void> wrappedListener = ActionListener.wrap(nullVal -> {
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
logger.info("recovery [phase1]: remote engine start took [{}]", tookTime);
listener.onResponse(tookTime);
}, e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)));
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
logger.trace("recovery [phase1]: prepare remote engine for translog");
logger.info("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.checkForCancel();
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
}
Expand Down Expand Up @@ -642,7 +642,7 @@ void phase2(
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
logger.info("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
final StopWatch stopWatch = new StopWatch().start();
final StepListener<Void> sendListener = new StepListener<>();
final OperationBatchSender sender = new OperationBatchSender(
Expand All @@ -669,7 +669,7 @@ void phase2(
);
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
logger.info("recovery [phase2]: took [{}]", tookTime);
listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));
}, listener::onFailure);
sender.start();
Expand Down Expand Up @@ -793,7 +793,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
}
cancellableThreads.checkForCancel();
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
logger.info("finalizing recovery");
/*
* Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
* shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
Expand Down Expand Up @@ -821,7 +821,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
logger.info("performing relocation hand-off");
final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled()
? recoveryTarget::forceSegmentFileSync
: () -> {};
Expand Down
Loading

0 comments on commit 937ff0b

Please sign in to comment.