diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java index b7599265aece3..1d0876d79a549 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java @@ -969,9 +969,13 @@ public void writeOptionalArray(@Nullable T[] array) throws } public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException { + writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable); + } + + public void writeOptionalWriteable(final Writer writer, @Nullable T writeable) throws IOException { if (writeable != null) { writeBoolean(true); - writeable.writeTo(this); + writer.write(this, writeable); } else { writeBoolean(false); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java index b55219e1cb37f..e21889848370e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -8,8 +8,6 @@ package org.opensearch.remotemigration; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; @@ -17,21 +15,14 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.settings.Settings; -import org.opensearch.core.util.FileSystemUtils; -import org.opensearch.index.remote.RemoteIndexPath; -import org.opensearch.index.remote.RemoteIndexPathUploader; -import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import java.nio.file.Path; -import java.util.Arrays; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -471,80 +462,80 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E * exclude docrep nodes, assert that remote index path file exists * when shards start relocating to the remote nodes. */ - public void testRemoteIndexPathFileExistsAfterMigration() throws Exception { - String docrepClusterManager = internalCluster().startClusterManagerOnlyNode(); - - logger.info("---> Starting 2 docrep nodes"); - addRemote = false; - internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build()); - internalCluster().validateClusterFormed(); - - logger.info("---> Creating index with 1 primary and 1 replica"); - String indexName = "migration-index"; - Settings oneReplica = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .build(); - createIndexAndAssertDocrepProperties(indexName, oneReplica); - - String indexUUID = internalCluster().client() - .admin() - .indices() - .prepareGetSettings(indexName) - .get() - .getSetting(indexName, IndexMetadata.SETTING_INDEX_UUID); - - logger.info("---> Starting indexing in parallel"); - AsyncIndexingService indexingService = new AsyncIndexingService(indexName); - indexingService.startIndexing(); - - logger.info("---> Adding 2 remote enabled nodes to the cluster & cluster manager"); - initDocRepToRemoteMigration(); - addRemote = true; - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build()); - internalCluster().validateClusterFormed(); - - assertTrue( - internalCluster().client() - .admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX) - ) - .get() - .isAcknowledged() - ); - - // elect cluster manager with remote-cluster state enabled - internalCluster().client() - .execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(docrepClusterManager)) - .get(); - - internalCluster().validateClusterFormed(); - - logger.info("---> Excluding docrep nodes from allocation"); - excludeNodeSet("type", "docrep"); - - waitForRelocation(); - waitNoPendingTasksOnAll(); - indexingService.stopIndexing(); - - // validate remote index path file exists - logger.info("---> Asserting remote index path file exists"); - String fileNamePrefix = String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "7", RemoteIndexPath.DEFAULT_VERSION); - - assertTrue(FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR))); - Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR)); - assertEquals(1, files.length); - assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix))); - - assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR))); - files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR)); - assertEquals(1, files.length); - assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix))); - } + // public void testRemoteIndexPathFileExistsAfterMigration() throws Exception { + // String docrepClusterManager = internalCluster().startClusterManagerOnlyNode(); + // + // logger.info("---> Starting 2 docrep nodes"); + // addRemote = false; + // internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build()); + // internalCluster().validateClusterFormed(); + // + // logger.info("---> Creating index with 1 primary and 1 replica"); + // String indexName = "migration-index"; + // Settings oneReplica = Settings.builder() + // .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + // .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + // .build(); + // createIndexAndAssertDocrepProperties(indexName, oneReplica); + // + // String indexUUID = internalCluster().client() + // .admin() + // .indices() + // .prepareGetSettings(indexName) + // .get() + // .getSetting(indexName, IndexMetadata.SETTING_INDEX_UUID); + // + // logger.info("---> Starting indexing in parallel"); + // AsyncIndexingService indexingService = new AsyncIndexingService(indexName); + // indexingService.startIndexing(); + // + // logger.info("---> Adding 2 remote enabled nodes to the cluster & cluster manager"); + // initDocRepToRemoteMigration(); + // addRemote = true; + // internalCluster().startClusterManagerOnlyNode(); + // internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build()); + // internalCluster().validateClusterFormed(); + // + // assertTrue( + // internalCluster().client() + // .admin() + // .cluster() + // .prepareUpdateSettings() + // .setPersistentSettings( + // Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX) + // ) + // .get() + // .isAcknowledged() + // ); + // + // // elect cluster manager with remote-cluster state enabled + // internalCluster().client() + // .execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(docrepClusterManager)) + // .get(); + // + // internalCluster().validateClusterFormed(); + // + // logger.info("---> Excluding docrep nodes from allocation"); + // excludeNodeSet("type", "docrep"); + // + // waitForRelocation(); + // waitNoPendingTasksOnAll(); + // indexingService.stopIndexing(); + // + // // validate remote index path file exists + // logger.info("---> Asserting remote index path file exists"); + // String fileNamePrefix = String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "7", RemoteIndexPath.DEFAULT_VERSION); + // + // assertTrue(FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR))); + // Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR)); + // assertEquals(1, files.length); + // assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix))); + // + // assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR))); + // files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR)); + // assertEquals(1, files.length); + // assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix))); + // } /** * Scenario: diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java index 3c8f07613561d..70a223d60069a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { shardRouting.writeTo(out); - out.writeOptionalWriteable(currentNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode); out.writeOptionalWriteable(relocationTargetNode); out.writeOptionalWriteable(clusterInfo); shardAllocationDecision.writeTo(out); diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index a4f6d8afeaf38..767acbc9002be 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -144,7 +144,12 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArrayNullable(nodesIds); - out.writeOptionalArray(concreteNodes); + if (shouldIncludeAllAttribute()) { + out.writeOptionalArray((output, value) -> value.writeToWithAttribute(output), concreteNodes); + } else { + out.writeOptionalArray(concreteNodes); + } + out.writeOptionalTimeValue(timeout); } } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index 9e63f961d241d..fa3979882f5cb 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(stateUUID); metadata.writeTo(out); routingTable.writeTo(out); - nodes.writeTo(out); + nodes.writeToWithAttribute(out); blocks.writeTo(out); // filter out custom states not supported by the other node int numberOfCustoms = 0; @@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(toUuid); out.writeLong(toVersion); routingTable.writeTo(out); - nodes.writeTo(out); + nodesWriteToWithAttributes(nodes, out); metadata.writeTo(out); blocks.writeTo(out); customs.writeTo(out); out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager); } + private void nodesWriteToWithAttributes(Diff nodes, StreamOutput out) throws IOException { + DiscoveryNodes part = nodes.apply(null); + if (part != null) { + out.writeBoolean(true); + part.writeToWithAttribute(out); + } else { + out.writeBoolean(false); + } + } + @Override public ClusterState apply(ClusterState state) { Builder builder = new Builder(clusterName); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 2ec0dabd91786..37abb27043977 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -503,7 +503,7 @@ public FollowerCheckRequest(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(term); - sender.writeTo(out); + sender.writeToWithAttribute(out); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Join.java b/server/src/main/java/org/opensearch/cluster/coordination/Join.java index 58fa85992ebc8..ce1a234998690 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Join.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Join.java @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - sourceNode.writeTo(out); - targetNode.writeTo(out); + sourceNode.writeToWithAttribute(out); + targetNode.writeToWithAttribute(out); out.writeLong(term); out.writeLong(lastAcceptedTerm); out.writeLong(lastAcceptedVersion); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java index 04f87d16ee400..1447838a41502 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java @@ -84,7 +84,7 @@ public JoinRequest(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(minimumTerm); out.writeOptionalWriteable(optionalJoin.orElse(null)); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 4fd2c0eb13073..e7a92d98406e1 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -416,7 +416,7 @@ static class LeaderCheckRequest extends TransportRequest { @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - sender.writeTo(out); + sender.writeToWithAttribute(out); } public DiscoveryNode getSender() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PreVoteRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PreVoteRequest.java index 85632bdc06e75..f78a5369e5b21 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PreVoteRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PreVoteRequest.java @@ -64,7 +64,7 @@ public PreVoteRequest(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(currentTerm); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java index de58eb721b28f..287418aaf378e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(term); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/TermVersionRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/TermVersionRequest.java index 81923f4211b7e..d3ba0380b9d6a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/TermVersionRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/TermVersionRequest.java @@ -68,7 +68,7 @@ abstract class TermVersionRequest extends TransportRequest implements Writeable @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(term); out.writeLong(version); } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 653f81830ed17..adb43d258dc13 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -143,6 +143,7 @@ public static boolean isDedicatedSearchNode(Settings settings) { private final Map attributes; private final Version version; private final SortedSet roles; + private static final String ZONE = "zone"; /** * Creates a new {@link DiscoveryNode} @@ -358,17 +359,37 @@ public DiscoveryNode(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + writeToUtil(out, false); + } + + public void writeToWithAttribute(StreamOutput out) throws IOException { + writeToUtil(out, true); + } + + private void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException { out.writeString(nodeName); out.writeString(nodeId); out.writeString(ephemeralId); out.writeString(hostName); out.writeString(hostAddress); address.writeTo(out); - out.writeVInt(attributes.size()); - for (Map.Entry entry : attributes.entrySet()) { - out.writeString(entry.getKey()); - out.writeString(entry.getValue()); + if (includeAllAttributes) { + serializeAttributes(attributes, out); + } else { + // Serialize only remote store and zone (needed for SearchWeightedRouting) attributes if present. + final Map filteredAttributes = attributes.entrySet() + .stream() + .filter( + entry -> entry.getKey().startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX) + || entry.getKey().equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY) + || entry.getKey().equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) + || entry.getKey().equals(ZONE) + ) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + serializeAttributes(filteredAttributes, out); } + out.writeVInt(roles.size()); for (final DiscoveryNodeRole role : roles) { final DiscoveryNodeRole compatibleRole = role.getCompatibilityRole(out.getVersion()); @@ -379,6 +400,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVersion(version); } + private void serializeAttributes(final Map attributes, final StreamOutput out) throws IOException { + out.writeVInt(attributes.size()); + for (Map.Entry entry : attributes.entrySet()) { + out.writeString(entry.getKey()); + out.writeString(entry.getValue()); + } + } + /** * The address that the node can be communicated with. */ diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java index 2ebcd8096893d..18f92d989e959 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java @@ -688,6 +688,14 @@ public String shortSummary() { @Override public void writeTo(StreamOutput out) throws IOException { + writeToUtil((output, value) -> value.writeTo(output), out); + } + + public void writeToWithAttribute(StreamOutput out) throws IOException { + writeToUtil((output, value) -> value.writeToWithAttribute(output), out); + } + + private void writeToUtil(final Writer writer, StreamOutput out) throws IOException { if (clusterManagerNodeId == null) { out.writeBoolean(false); } else { @@ -696,7 +704,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeVInt(nodes.size()); for (DiscoveryNode node : this) { - node.writeTo(out); + writer.write(out, node); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java index 59a39b358cb70..614e9f49c8726 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java @@ -107,7 +107,7 @@ public List getNodeDecisions() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(targetNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode); if (nodeDecisions != null) { out.writeBoolean(true); out.writeList(nodeDecisions); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java index 4163a5fd4c16f..6b805ca91fa58 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java @@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - node.writeTo(out); + node.writeToWithAttribute(out); out.writeOptionalWriteable(shardStoreInfo); out.writeOptionalWriteable(canAllocateDecision); nodeDecision.writeTo(out); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 101daaa143a66..07c272cd94d77 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -83,7 +83,8 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput(); + return CLUSTER_BLOCKS_FORMAT.serialize((out, blocks) -> blocks.writeTo(out), clusterBlocks, generateBlobFileName(), getCompressor()) + .streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index e5e44525520f4..37e416f93fe97 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -113,7 +113,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return clusterStateCustomsFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput(); + return clusterStateCustomsFormat.serialize( + (out, customClusterState) -> customClusterState.writeTo(out), + custom, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java index 8e850e903954a..1e08d5f5a3e05 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java @@ -107,7 +107,12 @@ public String generateBlobFileName() { @Override public InputStream serialize() throws IOException { - return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput(); + return customBlobStoreFormat.serialize( + (out, customClusterState) -> customClusterState.writeTo(out), + custom, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 446207a767009..829036c6d122b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput(); + return DISCOVERY_NODES_FORMAT.serialize( + (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), + discoveryNodes, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index dee48237e5c4c..b47b1f905e58b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -83,8 +83,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor()) - .streamInput(); + return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + (out, settingsHash) -> settingsHash.writeTo(out), + hashesOfConsistentSettings, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index 030491cf8b7b9..5b43f9936c0cb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -122,7 +122,12 @@ public String generateBlobFileName() { @Override public InputStream serialize() throws IOException { - return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); + return PINNED_TIMESTAMPS_FORMAT.serialize( + (out, timestamps) -> timestamps.writeTo(out), + pinnedTimestamps, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 46c5074c48eb8..8303be4a526d7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -104,7 +104,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); + return INDEX_ROUTING_TABLE_FORMAT.serialize( + (out, routingTable) -> routingTable.writeTo(out), + indexRoutingTable, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java index 2370417dc14df..877d0bf0dbdde 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java @@ -139,8 +139,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { assert routingTableIncrementalDiff != null; - return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(routingTableIncrementalDiff, generateBlobFileName(), getCompressor()) - .streamInput(); + return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize( + (out, routingTableDiff) -> routingTableDiff.writeTo(out), + routingTableIncrementalDiff, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 6279a8ec3646c..d746aaa2a0783 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -294,6 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode()); } } + transportService.sendRequest( startRequest.sourceNode(), actionName, diff --git a/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java index 1df0d3861f686..7309712314acd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java @@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(recoveryId); shardId.writeTo(out); out.writeString(targetAllocationId); - sourceNode.writeTo(out); - targetNode.writeTo(out); + sourceNode.writeToWithAttribute(out); + targetNode.writeToWithAttribute(out); metadataSnapshot.writeTo(out); out.writeBoolean(primaryRelocation); out.writeLong(startingSeqNo); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java index 0add86ab88a16..c364f7300fa5b 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -28,6 +28,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.io.stream.Writeable.Writer; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.gateway.CorruptStateException; @@ -55,7 +56,7 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction writer, T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -76,7 +77,7 @@ public void close() throws IOException { }; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) { // TODO The stream version should be configurable stream.setVersion(Version.CURRENT); - obj.writeTo(stream); + writer.write(stream, obj); } CodecUtil.writeFooter(indexOutput); } diff --git a/server/src/main/java/org/opensearch/transport/TransportRequest.java b/server/src/main/java/org/opensearch/transport/TransportRequest.java index c62cf59d3be2f..cfd60361cc771 100644 --- a/server/src/main/java/org/opensearch/transport/TransportRequest.java +++ b/server/src/main/java/org/opensearch/transport/TransportRequest.java @@ -63,6 +63,8 @@ public Empty(StreamInput in) throws IOException { } } + private boolean includeAllAttributes = false; + /** * Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent". */ @@ -94,4 +96,12 @@ public TaskId getParentTask() { public void writeTo(StreamOutput out) throws IOException { parentTaskId.writeTo(out); } + + public void setIncludeAllAttributes(boolean includeAllAttributes) { + this.includeAllAttributes = includeAllAttributes; + } + + public boolean shouldIncludeAllAttribute() { + return includeAllAttributes; + } } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index d08b28730d417..fff6d82b23c7e 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -752,7 +752,7 @@ public HandshakeResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(discoveryNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode); clusterName.writeTo(out); out.writeVersion(version); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 74254f1a1987f..02f22d713def5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -548,6 +548,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); when(blobContainer.readBlob(indexName)).thenReturn( INDEX_ROUTING_TABLE_FORMAT.serialize( + (out, routingTable) -> routingTable.writeTo(out), clusterState.getRoutingTable().getIndicesRouting().get(indexName), uploadedFileName, compressor @@ -588,7 +589,12 @@ public void testGetAsyncIndexRoutingTableDiffReadAction() throws Exception { String uploadedFileName = String.format(Locale.ROOT, "routing-table-diff/" + indexName); when(blobContainer.readBlob(indexName)).thenReturn( - REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(diff, uploadedFileName, compressor).streamInput() + REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize( + (out, routingTableDiff) -> routingTableDiff.writeTo(out), + diff, + uploadedFileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 4ef459e6657a1..dc45293549e1e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -140,7 +140,12 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput() + DISCOVERY_NODES_FORMAT.serialize( + (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), + discoveryNodes, + fileName, + compressor + ).streamInput() ); RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); @@ -190,7 +195,7 @@ public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, Interru ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, fileName, compressor).streamInput() + CLUSTER_BLOCKS_FORMAT.serialize((out, blocks) -> blocks.writeTo(out), clusterBlocks, fileName, compressor).streamInput() ); RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); @@ -261,7 +266,12 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc namedWriteableRegistry ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput() + remoteClusterStateCustoms.clusterStateCustomsFormat.serialize( + (out, customState) -> customState.writeTo(out), + custom, + fileName, + compressor + ).streamInput() ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1871f4d08ba43..a31a743200b34 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -1112,7 +1112,12 @@ public void testGetClusterStateUsingDiff() throws IOException { "custom", is -> readFrom(is, namedWriteableRegistry, addedCustom.getWriteableName()) ); - BytesReference bytes = customMetadataFormat.serialize(addedCustom, "custom-md2-file__1", compressor); + BytesReference bytes = customMetadataFormat.serialize( + (out, customClusterState) -> customClusterState.writeTo(out), + addedCustom, + "custom-md2-file__1", + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); } @@ -1131,6 +1136,7 @@ public void testGetClusterStateUsingDiff() throws IOException { ); when(blobContainer.readBlob(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)).thenAnswer(i -> { BytesReference bytes = HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + (out, settingsHash) -> settingsHash.writeTo(out), hashesOfConsistentSettings, HASHES_OF_CONSISTENT_SETTINGS_FILENAME, compressor @@ -1172,7 +1178,12 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.discoveryNodesUpdated(true); manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME)); when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(nodesBuilder.build(), DISCOVERY_NODES_FILENAME, compressor); + BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize( + (out, nodes) -> nodes.writeToWithAttribute(out), + nodesBuilder.build(), + DISCOVERY_NODES_FILENAME, + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); } @@ -1183,7 +1194,12 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.clusterBlocksUpdated(true); manifestBuilder.clusterBlocksMetadata(new UploadedMetadataAttribute(CLUSTER_BLOCKS, CLUSTER_BLOCKS_FILENAME)); when(blobContainer.readBlob(CLUSTER_BLOCKS_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = CLUSTER_BLOCKS_FORMAT.serialize(newClusterBlock, CLUSTER_BLOCKS_FILENAME, compressor); + BytesReference bytes = CLUSTER_BLOCKS_FORMAT.serialize( + (out, clusterBlock) -> clusterBlock.writeTo(out), + newClusterBlock, + CLUSTER_BLOCKS_FILENAME, + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); @@ -3254,6 +3270,7 @@ private void mockBlobContainerForGlobalMetadata( String fileName = entry.getValue(); when(blobContainer.readBlob(fileName)).thenAnswer((invocation) -> { BytesReference bytesReference = customMetadataFormat.serialize( + (out, customState) -> customState.writeTo(out), metadata.custom(custom), fileName, blobStoreRepository.getCompressor() diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index a2da1e8b0fdb2..8c337df48d9eb 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -359,7 +359,12 @@ public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Excepti compressor ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, fileName, compressor).streamInput() + HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + (out, hashesOfConsistentSetting) -> hashesOfConsistentSetting.writeTo(out), + hashesOfConsistentSettings, + fileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); @@ -490,7 +495,12 @@ public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { namedWriteableRegistry ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - customMetadataForDownload.customBlobStoreFormat.serialize(customMetadata, fileName, compressor).streamInput() + customMetadataForDownload.customBlobStoreFormat.serialize( + (out, metadata) -> metadata.writeTo(out), + customMetadata, + fileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index f1bced2bdf855..1b988ee1f37ec 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -143,7 +143,7 @@ public void testSerDe() throws IOException { public void testExceptionDuringSerialization() throws IOException { DiscoveryNodes nodes = mock(DiscoveryNodes.class); RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); - doThrow(new IOException("mock-exception")).when(nodes).writeTo(any()); + doThrow(new IOException("mock-exception")).when(nodes).writeToWithAttribute(any()); IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize); } diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java index 536df880b2597..c4e53c1eea138 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java @@ -35,7 +35,12 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase { public void testSerDe() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); - BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none()); + BytesReference bytesReference = clusterBlocksFormat.serialize( + (out, metadata) -> metadata.writeTo(out), + indexMetadata, + TEST_BLOB_FILE_NAME, + CompressorRegistry.none() + ); IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); assertThat(readIndexMetadata, is(indexMetadata)); } @@ -43,6 +48,7 @@ public void testSerDe() throws IOException { public void testSerDeForCompressed() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); BytesReference bytesReference = clusterBlocksFormat.serialize( + (out, metadata) -> metadata.writeTo(out), indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.getCompressor(DeflateCompressor.NAME)