Skip to content

Commit

Permalink
Add RemotePersistedState unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Aug 15, 2024
1 parent 2a07657 commit 7fdd155
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,7 @@ public PublicationContext newPublicationContext(
return new RemotePublicationContext(clusterChangedEvent, persistedStateRegistry);
}
}
final PublicationContext publicationContext = new PublicationContext(
clusterChangedEvent,
persistedStateRegistry
);
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, persistedStateRegistry);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand Down Expand Up @@ -408,10 +405,7 @@ public class PublicationContext {
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
protected final PersistedStateRegistry persistedStateRegistry;

PublicationContext(
ClusterChangedEvent clusterChangedEvent,
PersistedStateRegistry persistedStateRegistry
) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
Expand Down Expand Up @@ -607,17 +601,12 @@ public String executor() {

public class RemotePublicationContext extends PublicationContext {

RemotePublicationContext(
ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry
) {
RemotePublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
super(clusterChangedEvent, persistedStateRegistry);
}

@Override
public void sendClusterState(
final DiscoveryNode destination,
final ActionListener<PublishWithJoinResponse> listener
) {
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
try {
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.gateway.remote;

import java.util.Map.Entry;
import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
Expand All @@ -29,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -47,7 +47,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
// required for state publication
public static final int CODEC_V3 = 3; // In Codec V3, we have introduced new diff field in diff-manifest's routing_table_diff

private static final Map<Version, Integer> CODEC_TO_VERSION_MAPPING = Map.of(Version.V_2_16_0,
private static final Map<Version, Integer> CODEC_TO_VERSION_MAPPING = Map.of(
Version.V_2_16_0,
ClusterMetadataManifest.CODEC_V3,
Version.V_2_15_0,
ClusterMetadataManifest.CODEC_V2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public RemoteClusterStateService(
* @return A manifest object which contains the details of uploaded entity metadata.
*/
@Nullable
public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID, int codecVersion) throws IOException {
public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID, int codecVersion)
throws IOException {
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep

final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings);
coordinationState.handlePrePublish(clusterState);
Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, Mockito.times(1))
.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState));

Mockito.when(remoteClusterStateService.markLastStateAsCommitted(any(), any()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.opensearch.cluster.coordination.PublicationTransportHandler.PublicationContext;
import org.opensearch.cluster.coordination.PublicationTransportHandler.RemotePublicationContext;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -54,14 +57,18 @@

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import org.mockito.Mockito;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -288,6 +295,43 @@ public void testHandleIncomingRemotePublishRequestWhenNoLastSeenState() throws I
Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any());
}

public void testNewPublicationContext() {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);

// Remote publication disabled
ClusterChangedEvent event1 = new ClusterChangedEvent(
"source1",
buildClusterState(TERM, VERSION + 1),
buildClusterState(TERM, VERSION)
);
PublicationContext publicationContext = handler.newPublicationContext(event1, false, new PersistedStateRegistry());
assertNotNull(publicationContext);
assertThat(publicationContext, not(instanceOf(RemotePublicationContext.class)));

// Remote publication enabled but some nodes are remote enabled and some remote disabled
ClusterChangedEvent event2 = new ClusterChangedEvent(
"source2",
buildClusterStateWithMixedNodes(TERM, VERSION + 1),
buildClusterState(TERM, VERSION)
);
PublicationContext publicationContext2 = handler.newPublicationContext(event2, true, new PersistedStateRegistry());
assertNotNull(publicationContext2);
assertThat(publicationContext2, not(instanceOf(RemotePublicationContext.class)));

// Remote publication enabled and all nodes are remote enabled
ClusterChangedEvent event3 = new ClusterChangedEvent(
"source3",
buildClusterStateWithRemoteNodes(TERM, VERSION + 1),
buildClusterState(TERM, VERSION)
);
PublicationContext publicationContext3 = handler.newPublicationContext(event3, true, new PersistedStateRegistry());
assertNotNull(publicationContext3);
assertThat(publicationContext3, instanceOf(RemotePublicationContext.class));
}

private PublicationTransportHandler getPublicationTransportHandler(
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
RemoteClusterStateService remoteClusterStateService
Expand All @@ -310,4 +354,54 @@ private ClusterState buildClusterState(long term, long version) {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(localNode).add(secondNode).localNodeId(LOCAL_NODE_ID).build();
return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build();
}

private ClusterState buildClusterStateWithMixedNodes(long term, long version) {
CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term);
Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build();
DiscoveryNode remoteNode = new DiscoveryNode(
"remoteNode",
buildNewFakeTransportAddress(),
Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"remote_state_repo",
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"remote_routing_repo"
),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNodes nodes = DiscoveryNodes.builder().add(localNode).add(remoteNode).localNodeId(LOCAL_NODE_ID).build();
return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build();
}

private ClusterState buildClusterStateWithRemoteNodes(long term, long version) {
CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term);
Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build();
DiscoveryNode remoteNode1 = new DiscoveryNode(
"remoteNode1",
buildNewFakeTransportAddress(),
Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"remote_state_repo",
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"remote_routing_repo"
),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode remoteNode2 = new DiscoveryNode(
"remoteNode2",
buildNewFakeTransportAddress(),
Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"remote_state_repo",
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"remote_routing_repo"
),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNodes nodes = DiscoveryNodes.builder().add(remoteNode1).add(remoteNode2).localNodeId(remoteNode1.getId()).build();
return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.mockito.ArgumentMatchers.eq;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
Expand All @@ -111,6 +111,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -215,6 +216,18 @@ private ClusterState createClusterState(long version, Metadata metadata) {
.build();
}

private ClusterState createClusterStateWithNodes(long version, Metadata metadata) {
DiscoveryNode oldNode = new DiscoveryNode(
"node2",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.V_2_13_0
);
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).add(oldNode).build();
return ClusterState.builder(clusterName).nodes(discoveryNodes).version(version).metadata(metadata).build();
}

private CoordinationMetadata createCoordinationMetadata(long term) {
CoordinationMetadata.Builder builder = CoordinationMetadata.builder();
builder.term(term);
Expand Down Expand Up @@ -755,7 +768,8 @@ public void testRemotePersistedState() throws IOException {
);

remotePersistedState.setLastAcceptedState(secondClusterState);
Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, times(1))
.writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);

assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState));
assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm));
Expand All @@ -778,6 +792,46 @@ public void testRemotePersistedState() throws IOException {
assertThat(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted(), equalTo(true));
}

public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
.clusterTerm(1L)
.stateVersion(5L)
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.build();
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)))
.thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest"));
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(CODEC_V1)))
.thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest2"));

Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest3"));
CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);
ClusterState clusterState = createClusterState(
randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build()
);
remotePersistedState.setLastAcceptedState(clusterState);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);

ClusterState clusterState2 = createClusterState(
randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build()
);
remotePersistedState.setLastAcceptedState(clusterState2);
Mockito.verify(remoteClusterStateService).writeIncrementalMetadata(clusterState, clusterState2, manifest);

ClusterState clusterState3 = createClusterStateWithNodes(
randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build()
);
remotePersistedState.setLastAcceptedState(clusterState3);

Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState3, previousClusterUUID, CODEC_V1);

}

public void testRemotePersistedStateNotCommitted() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Expand Down Expand Up @@ -813,14 +867,17 @@ public void testRemotePersistedStateNotCommitted() throws IOException {
remotePersistedState.setLastAcceptedState(clusterState);
ArgumentCaptor<String> previousClusterUUIDCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<ClusterState> clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture(), eq(MANIFEST_CURRENT_CODEC_VERSION));
Mockito.verify(remoteClusterStateService)
.writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture(), eq(MANIFEST_CURRENT_CODEC_VERSION));
assertEquals(previousClusterUUID, previousClusterUUIDCaptor.getValue());
}

public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));
Mockito.doThrow(IOException.class)
.when(remoteClusterStateService)
.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));

CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);

Expand All @@ -837,7 +894,9 @@ public void testRemotePersistedStateFailureStats() throws IOException {
RemotePersistenceStats remoteStateStats = new RemotePersistenceStats();
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));
Mockito.doThrow(IOException.class)
.when(remoteClusterStateService)
.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));
when(remoteClusterStateService.getStats()).thenReturn(remoteStateStats);
doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed();
CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);
Expand Down
Loading

0 comments on commit 7fdd155

Please sign in to comment.