Skip to content

Commit

Permalink
Adjust format by spotlessApply task
Browse files Browse the repository at this point in the history
Signed-off-by: Tianli Feng <ftianli@amazon.com>
  • Loading branch information
Tianli Feng committed Apr 26, 2022
1 parent be0acc3 commit 0c110ca
Show file tree
Hide file tree
Showing 20 changed files with 95 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ private void buildResponse(
// Unlikely edge case:
// Data node has finished snapshotting the shard but the cluster state has not yet been updated
// to reflect this. We adjust the status to show up as snapshot metadata being written because
// technically if the data node failed before successfully reporting DONE state to cluster-manager, then
// this shards state would jump to a failed state.
// technically if the data node failed before successfully reporting DONE state to cluster-manager,
// then this shards state would jump to a failed state.
shardStatus = new SnapshotIndexShardStatus(
shardEntry.key,
SnapshotIndexShardStage.FINALIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ public void onFailure(Exception replicaException) {
),
replicaException
);
// Only report "critical" exceptions - TODO: Reach out to the cluster-manager node to get the latest shard state then report.
// Only report "critical" exceptions
// TODO: Reach out to the cluster-manager node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ public long getVersion() {
}

public long getVersionOrMetadataVersion() {
// When following a Zen1 cluster-manager, the cluster state version is not guaranteed to increase, so instead it is preferable to use the
// metadata version to determine the freshest node. However when following a Zen2 cluster-manager the cluster state version should be used.
// When following a Zen1 cluster-manager, the cluster state version is not guaranteed to increase,
// so instead it is preferable to use the metadata version to determine the freshest node.
// However when following a Zen2 cluster-manager the cluster state version should be used.
return term() == ZEN1_BWC_TERM ? metadata().version() : version();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ private static class StoredState {
* returns true if stored state is older then given state or they are from a different cluster-manager, meaning they can't be compared
* */
public boolean isOlderOrDifferentClusterManager(ClusterState clusterState) {
return version < clusterState.version() || Objects.equals(clusterManagerNodeId, clusterState.nodes().getMasterNodeId()) == false;
return version < clusterState.version()
|| Objects.equals(clusterManagerNodeId, clusterState.nodes().getMasterNodeId()) == false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ private Entry(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
version = Version.readVersion(in);
} else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
// If an older cluster-manager informs us that shard generations are supported we use the minimum shard generation compatible
// version. If shard generations are not supported yet we use a placeholder for a version that does not use shard
// generations.
// If an older cluster-manager informs us that shard generations are supported
// we use the minimum shard generation compatible version.
// If shard generations are not supported yet we use a placeholder for a version that does not use shard generations.
version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT;
} else {
version = SnapshotsService.OLD_SNAPSHOT_FORMAT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void handleException(TransportException exp) {
ConnectTransportException.class,
FailedToCommitClusterStateException.class };

private static boolean isClusterManagerChannelException(TransportException exp) {
private static boolean isClusterManagerChannelException(TransportException exp) {
return ExceptionsHelper.unwrap(exp, CLUSTER_MANAGER_CHANNEL_EXCEPTIONS) != null;
}

Expand Down Expand Up @@ -714,7 +714,8 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
// requests might still be in flight even after the shard has already been started or failed on the cluster-manager. We just
// requests might still be in flight even after the shard has already been started or failed on the cluster-manager. We
// just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
builder.success(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ default void markLastAcceptedStateAsCommitted() {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet, the cluster uuid might not been known yet.
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
|| lastAcceptedState.term() == ZEN1_BWC_TERM : "received cluster state with empty cluster uuid but not Zen1 BWC term: "
+ lastAcceptedState;
Expand All @@ -622,7 +623,8 @@ default void markLastAcceptedStateAsCommitted() {
metadataBuilder.clusterUUIDCommitted(true);

if (lastAcceptedState.term() != ZEN1_BWC_TERM) {
// Zen1 cluster-managers never publish a committed cluster UUID so if we logged this it'd happen on on every update. Let's just
// Zen1 cluster-managers never publish a committed cluster UUID so if we logged this it'd happen on on every update.
// Let's just
// not log it at all in a 6.8/7.x rolling upgrade.
logger.info("cluster UUID set to [{}]", lastAcceptedState.metadata().clusterUUID());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,8 @@ assert getLocalNode().equals(applierState.nodes().getMasterNode())

final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingClusterManager && activePublication == false) {
// cluster state update task to become cluster-manager is submitted to MasterService, but publication has not started yet
// cluster state update task to become cluster-manager is submitted to MasterService,
// but publication has not started yet
assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers();
} else {
final ClusterState lastPublishedState;
Expand Down Expand Up @@ -1151,7 +1152,8 @@ private void handleJoin(Join join) {
// If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
final boolean isNewJoinFromClusterManagerEligibleNode = handleJoinIgnoringExceptions(join);

// If we haven't completely finished becoming cluster-manager then there's already a publication scheduled which will, in turn,
// If we haven't completely finished becoming cluster-manager then there's already a publication scheduled which will, in
// turn,
// schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
// race against the election-winning publication and log a big error message, which we can prevent by checking this here:
final boolean establishedAsClusterManager = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
Expand Down Expand Up @@ -1242,7 +1244,9 @@ public void publish(
);
publishListener.onFailure(
new FailedToCommitClusterStateException(
"node is no longer cluster-manager for term " + clusterChangedEvent.state().term() + " while handling publication"
"node is no longer cluster-manager for term "
+ clusterChangedEvent.state().term()
+ " while handling publication"
)
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,18 @@ public class JoinHelper {
@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
throws Exception {
// The current state that MasterService uses might have been updated by a (different) cluster-manager in a higher term already
// The current state that MasterService uses might have been updated by a (different) cluster-manager in a higher term
// already
// Stop processing the current cluster state update, as there's no point in continuing to compute it as
// it will later be rejected by Coordinator.publish(...) anyhow
if (currentState.term() > term) {
logger.trace("encountered higher term {} than current {}, there is a newer cluster-manager", currentState.term(), term);
throw new NotMasterException(
"Higher term encountered (current: " + currentState.term() + " > used: " + term + "), there is a newer cluster-manager"
"Higher term encountered (current: "
+ currentState.term()
+ " > used: "
+ term
+ "), there is a newer cluster-manager"
);
} else if (currentState.nodes().getMasterNodeId() == null && joiningTasks.stream().anyMatch(Task::isBecomeMasterTask)) {
assert currentState.term() < term : "there should be at most one become cluster-manager task per election (= by term)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
newState = becomeClusterManagerAndTrimConflictingNodes(currentState, joiningNodes);
nodesChanged = true;
} else if (currentNodes.isLocalNodeElectedMaster() == false) {
logger.trace("processing node joins, but we are not the cluster-manager. current cluster-manager: {}", currentNodes.getMasterNode());
logger.trace(
"processing node joins, but we are not the cluster-manager. current cluster-manager: {}",
currentNodes.getMasterNode()
);
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not cluster-manager for join request");
} else {
newState = ClusterState.builder(currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ private static ClusterBlock parseNoClusterManagerBlock(String value) {
case "metadata_write":
return NO_MASTER_BLOCK_METADATA_WRITES;
default:
throw new IllegalArgumentException("invalid no-cluster-manager block [" + value + "], must be one of [all, write, metadata_write]");
throw new IllegalArgumentException(
"invalid no-cluster-manager block [" + value + "], must be one of [all, write, metadata_write]"
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ public void sendPublishRequest(
if (destination.equals(discoveryNodes.getLocalNode())) {
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(publishRequest);
// we might override an in-flight publication to self in case where we failed as cluster-manager and became cluster-manager again,
// and the new publication started before the previous one completed (which fails anyhow because of higher current term)
// we might override an in-flight publication to self in case where we failed as cluster-manager and
// became cluster-manager again, and the new publication started before the previous one completed
// (which fails anyhow because of higher current term)
assert previousRequest == null || previousRequest.getAcceptedState().term() < publishRequest.getAcceptedState().term();
responseActionListener = new ActionListener<PublishWithJoinResponse>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public VotingConfiguration reconfigure(
currentConfig,
liveNodes,
retiredNodeIds,
currentClusterManager
currentClusterManager
);

final Set<String> liveNodeIds = liveNodes.stream()
Expand All @@ -134,7 +134,12 @@ public VotingConfiguration reconfigure(
.filter(n -> retiredNodeIds.contains(n.getId()) == false)
.forEach(
n -> orderedCandidateNodes.add(
new VotingConfigNode(n.getId(), true, n.getId().equals(currentClusterManager.getId()), currentConfigNodeIds.contains(n.getId()))
new VotingConfigNode(
n.getId(),
true,
n.getId().equals(currentClusterManager.getId()),
currentConfigNodeIds.contains(n.getId())
)
)
);
currentConfigNodeIds.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,10 @@ public void onNoLongerMaster(String source) {
listener.onNoLongerMaster(source);
} catch (Exception e) {
logger.error(
() -> new ParameterizedMessage("exception thrown by listener while notifying no longer cluster-manager from [{}]", source),
() -> new ParameterizedMessage(
"exception thrown by listener while notifying no longer cluster-manager from [{}]",
source
),
e
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ static class LucenePersistedState implements PersistedState {
// (2) the index is currently empty since it was opened with IndexWriterConfig.OpenMode.CREATE

// In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance,
// this is true if there's only one data path on this cluster-manager node, and the commit we just loaded was already written out
// by this version of OpenSearch. TODO TBD should we avoid indexing when possible?
// this is true if there's only one data path on this cluster-manager node, and the commit we just loaded was already written
// out by this version of OpenSearch. TODO TBD should we avoid indexing when possible?
final PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter();
try {
writer.writeFullStateAndCommit(currentTerm, lastAcceptedState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void clusterChanged(final ClusterChangedEvent event) {
logger.debug(
"not recovering from gateway, nodes_size (master) [{}] < recover_after_master_nodes [{}]",
nodes.getMasterNodes().size(),
recoverAfterClusterManagerNodes
recoverAfterClusterManagerNodes
);
} else {
boolean enforceRecoverAfterTime;
Expand All @@ -255,7 +255,11 @@ public void clusterChanged(final ClusterChangedEvent event) {
} else if (expectedClusterManagerNodes != -1 && (nodes.getMasterNodes().size() < expectedClusterManagerNodes)) {
// does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedClusterManagerNodes + "] cluster-manager nodes, but only have [" + nodes.getMasterNodes().size() + "]";
reason = "expecting ["
+ expectedClusterManagerNodes
+ "] cluster-manager nodes, but only have ["
+ nodes.getMasterNodes().size()
+ "]";
}
}
performStateRecovery(enforceRecoverAfterTime, reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ private void updateFailedShardsCache(final ClusterState state) {

DiscoveryNode clusterManagerNode = state.nodes().getMasterNode();

// remove items from cache which are not in our routing table anymore and resend failures that have not executed on cluster-manager yet
// remove items from cache which are not in our routing table anymore and
// resend failures that have not executed on cluster-manager yet
for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShardsCache.entrySet().iterator(); iterator.hasNext();) {
ShardRouting failedShardRouting = iterator.next().getValue();
ShardRouting matchedRouting = localRoutingNode.getByShardId(failedShardRouting.shardId());
Expand All @@ -278,7 +279,9 @@ private void updateFailedShardsCache(final ClusterState state) {
} else {
// TODO: can we remove this? Is resending shard failures the responsibility of shardStateAction?
if (clusterManagerNode != null) {
String message = "cluster-manager " + clusterManagerNode + " has not removed previously failed shard. resending shard failure";
String message = "cluster-manager "
+ clusterManagerNode
+ " has not removed previously failed shard. resending shard failure";
logger.trace("[{}] re-sending failed shard [{}], reason [{}]", matchedRouting.shardId(), matchedRouting, message);
shardStateAction.localShardFailed(matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER, state);
}
Expand Down
Loading

0 comments on commit 0c110ca

Please sign in to comment.