From 3fec56756851f23c169f545282c3a44da4f43423 Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Tue, 28 Mar 2023 00:52:05 +0800 Subject: [PATCH] Add wait_for_completion parameter to resize, open, and forcemerge APIs (#6434) * Add wait_for_completion parameter to resize&open&forcemerge APIs (#6228) Signed-off-by: Gao Binlong * Modify changelog Signed-off-by: Gao Binlong * fix test failure Signed-off-by: Gao Binlong * Fix test failure Signed-off-by: Gao Binlong * change header of new file Signed-off-by: Gao Binlong * modify changelog Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../AbstractBaseReindexRestHandler.java | 15 ---- .../rest-api-spec/api/indices.clone.json | 8 ++ .../rest-api-spec/api/indices.forcemerge.json | 4 + .../rest-api-spec/api/indices.open.json | 18 +++- .../rest-api-spec/api/indices.shrink.json | 8 ++ .../rest-api-spec/api/indices.split.json | 8 ++ .../indices.clone/40_wait_for_completion.yml | 88 +++++++++++++++++++ .../20_wait_for_completion.yml | 39 ++++++++ .../indices.open/30_wait_for_completion.yml | 50 +++++++++++ .../indices.shrink/50_wait_for_completion.yml | 88 +++++++++++++++++++ .../indices.split/40_wait_for_completion.yml | 88 +++++++++++++++++++ .../admin/indices/create/ShrinkIndexIT.java | 2 +- .../indices/forcemerge/ForceMergeRequest.java | 14 +++ .../forcemerge/ForceMergeResponse.java | 13 +++ .../admin/indices/open/OpenIndexRequest.java | 24 +++++ .../admin/indices/open/OpenIndexResponse.java | 9 ++ .../admin/indices/shrink/ResizeRequest.java | 60 ++++++++++++- .../admin/indices/shrink/ResizeResponse.java | 10 +++ .../indices/shrink/TransportResizeAction.java | 14 +-- .../support/master/AcknowledgedRequest.java | 2 + .../org/opensearch/rest/BaseRestHandler.java | 16 ++++ .../admin/indices/RestForceMergeAction.java | 13 ++- .../admin/indices/RestOpenIndexAction.java | 29 +++++- .../admin/indices/RestResizeHandler.java | 30 ++++++- .../forcemerge/ForceMergeResponseTests.java | 19 ++++ .../indices/open/OpenIndexRequestTests.java | 69 +++++++++++++++ .../indices/open/OpenIndexResponseTests.java | 6 ++ .../indices/shrink/ResizeRequestTests.java | 64 +++++++++++++- .../indices/shrink/ResizeResponseTests.java | 6 ++ .../shrink/TransportResizeActionTests.java | 43 +-------- 31 files changed, 780 insertions(+), 78 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.clone/40_wait_for_completion.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/30_wait_for_completion.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/50_wait_for_completion.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.split/40_wait_for_completion.yml create mode 100644 server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexRequestTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 343a791a33f54..6cccdc7e6b5f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558)) - Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544)) - Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517)) +- Add wait_for_completion parameter to resize, open, and forcemerge APIs ([#6434](https://github.com/opensearch-project/OpenSearch/pull/6434)) - [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563)) - [Remote Store] Integrate remote segment store in peer recovery flow ([#6664](https://github.com/opensearch-project/OpenSearch/pull/6664)) - [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791)) diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/AbstractBaseReindexRestHandler.java index 301160ccd581e..d029ae0be9836 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -37,13 +37,9 @@ import org.opensearch.action.support.ActiveShardCount; import org.opensearch.client.node.NodeClient; import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.rest.BaseRestHandler; -import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestStatus; import org.opensearch.tasks.LoggingTaskListener; -import org.opensearch.tasks.Task; import java.io.IOException; import java.util.HashMap; @@ -124,17 +120,6 @@ protected Request setCommonOptions(RestRequest restRequest, Request request) { return request; } - private RestChannelConsumer sendTask(String localNodeId, Task task) { - return channel -> { - try (XContentBuilder builder = channel.newBuilder()) { - builder.startObject(); - builder.field("task", localNodeId + ":" + task.getId()); - builder.endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); - } - }; - } - private static Integer parseSlices(RestRequest request) { String slicesString = request.param("slices"); if (slicesString == null) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clone.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clone.json index b55d43371005f..2d874f4933768 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clone.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.clone.json @@ -46,6 +46,14 @@ "wait_for_active_shards": { "type" : "string", "description" : "Set the number of active shards to wait for on the cloned index before the operation returns." + }, + "wait_for_completion": { + "type" : "boolean", + "description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true." + }, + "task_execution_timeout": { + "type" : "time", + "description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h." } }, "body": { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json index 6036b75bb83e4..02fbcc36dfe64 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json @@ -59,6 +59,10 @@ "only_expunge_deletes":{ "type":"boolean", "description":"Specify whether the operation should only expunge deleted documents" + }, + "wait_for_completion": { + "type" : "boolean", + "description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true." } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.open.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.open.json index 1dab468ce4ff4..f44fb04102a7f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.open.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.open.json @@ -28,7 +28,15 @@ }, "master_timeout":{ "type":"time", - "description":"Specify timeout for connection to master" + "description":"Specify timeout for connection to master", + "deprecated":{ + "version":"2.0.0", + "description":"To promote inclusive language, use 'cluster_manager_timeout' instead." + } + }, + "cluster_manager_timeout":{ + "type":"time", + "description":"Specify timeout for connection to cluster-manager node" }, "ignore_unavailable":{ "type":"boolean", @@ -53,6 +61,14 @@ "wait_for_active_shards":{ "type":"string", "description":"Sets the number of active shards to wait for before the operation returns." + }, + "wait_for_completion": { + "type" : "boolean", + "description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true." + }, + "task_execution_timeout": { + "type" : "time", + "description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h." } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json index 6bb09ee0019e1..a20014a1444ec 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json @@ -50,6 +50,14 @@ "wait_for_active_shards": { "type" : "string", "description" : "Set the number of active shards to wait for on the shrunken index before the operation returns." + }, + "wait_for_completion": { + "type" : "boolean", + "description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true." + }, + "task_execution_timeout": { + "type" : "time", + "description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h." } }, "body":{ diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.split.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.split.json index d1b5a28c9ff0f..d399bf9dbdb8a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.split.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.split.json @@ -50,6 +50,14 @@ "wait_for_active_shards": { "type" : "string", "description" : "Set the number of active shards to wait for on the shrunken index before the operation returns." + }, + "wait_for_completion": { + "type" : "boolean", + "description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true." + }, + "task_execution_timeout": { + "type" : "time", + "description" : "Explicit task execution timeout, only useful when wait_for_completion is false, defaults to 1h." } }, "body":{ diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.clone/40_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.clone/40_wait_for_completion.yml new file mode 100644 index 0000000000000..88bdb9722c541 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.clone/40_wait_for_completion.yml @@ -0,0 +1,88 @@ +--- +"Clone index with wait_for_completion": + # clone index with wait_for_completion parameter, when the parameter is set to false, the API + # will return a task immediately and the clone operation will run in background. + + - skip: + version: " - 2.9.99" + reason: "only available in 3.0+" + features: allowed_warnings + + - do: + nodes.info: + node_id: data:true + - set: + nodes._arbitrary_key_: node_id + + - do: + indices.create: + index: source + wait_for_active_shards: 1 + body: + settings: + # ensure everything is allocated on the same data node + index.routing.allocation.include._id: $node_id + index.number_of_shards: 1 + index.number_of_replicas: 0 + - do: + index: + index: source + id: "1" + body: { "foo": "hello world" } + + - do: + get: + index: source + id: "1" + + - match: { _index: source } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } + + # make it read-only + - do: + indices.put_settings: + index: source + body: + index.blocks.write: true + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + index: source + + # clone with wait_for_completion + - do: + indices.clone: + index: "source" + target: "new_cloned_index" + wait_for_active_shards: 1 + cluster_manager_timeout: 10s + wait_for_completion: false + task_execution_timeout: 30s + body: + settings: + index.number_of_shards: 1 + "index.number_of_replicas": 0 + + - match: { task: /^.+$/ } + - set: { task: taskId } + + - do: + tasks.get: + wait_for_completion: true + task_id: $taskId + - match: { task.action: "indices:admin/resize" } + - match: { task.description: "clone from [source] to [new_cloned_index]" } + + # .tasks index is created when the clone index operation completes, so we should delete .tasks index finally, + # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. + # Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one + # specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future + - do: + allowed_warnings: + - "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default" + indices.delete: + index: .tasks + ignore_unavailable: true diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml new file mode 100644 index 0000000000000..27a2b759dda09 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml @@ -0,0 +1,39 @@ +--- +"Force merge index with wait_for_completion": + # force merge index with wait_for_completion parameter, when the parameter is set to false, the API + # will return a task immediately and the merge process will run in background. + + - skip: + version: " - 2.9.99" + reason: "only available in 3.0+" + features: allowed_warnings + + - do: + indices.create: + index: test_index + + - do: + indices.forcemerge: + index: test_index + wait_for_completion: false + max_num_segments: 1 + - match: { task: /^.+$/ } + - set: { task: taskId } + + - do: + tasks.get: + wait_for_completion: true + task_id: $taskId + - match: { task.action: "indices:admin/forcemerge" } + - match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" } + + # .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally, + # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. + # Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one + # specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future + - do: + allowed_warnings: + - "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default" + indices.delete: + index: .tasks + ignore_unavailable: true diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/30_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/30_wait_for_completion.yml new file mode 100644 index 0000000000000..93d557ee0aa3f --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/30_wait_for_completion.yml @@ -0,0 +1,50 @@ +--- +"Open index with wait_for_completion": + # open index with wait_for_completion parameter, when the parameter is set to false, the API + # will return a task immediately and the open operation will run in background. + + - skip: + version: " - 2.9.99" + reason: "only available in 3.0+" + features: allowed_warnings + + - do: + indices.create: + index: test_index + body: + settings: + number_of_replicas: 0 + number_of_shards: 1 + + - do: + indices.close: + index: test_index + - is_true: acknowledged + + - do: + indices.open: + index: test_index + wait_for_active_shards: all + cluster_manager_timeout: 10s + wait_for_completion: false + task_execution_timeout: 30s + - match: { task: /^.+$/ } + - set: { task: taskId } + + - do: + tasks.get: + wait_for_completion: true + task_id: $taskId + - match: { task.action: "indices:admin/open" } + - match: { task.description: "open indices [test_index]" } + + # .tasks index is created when the open index operation completes, so we should delete .tasks index finally, + # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. + # Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one + # specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future + - do: + allowed_warnings: + - "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default" + indices.delete: + index: .tasks + ignore_unavailable: true diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/50_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/50_wait_for_completion.yml new file mode 100644 index 0000000000000..7ed90cd242223 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/50_wait_for_completion.yml @@ -0,0 +1,88 @@ +--- +"Shrink index with wait_for_completion": + # shrink index with wait_for_completion parameter, when the parameter is set to false, the API + # will return a task immediately and the shrink operation will run in background. + + - skip: + version: " - 2.9.99" + reason: "only available in 3.0+" + features: allowed_warnings + + - do: + nodes.info: + node_id: data:true + - set: + nodes._arbitrary_key_: node_id + + - do: + indices.create: + index: source + wait_for_active_shards: 1 + body: + settings: + # ensure everything is allocated on the same data node + index.routing.allocation.include._id: $node_id + index.number_of_shards: 3 + index.number_of_replicas: 0 + - do: + index: + index: source + id: "1" + body: { "foo": "hello world" } + + - do: + get: + index: source + id: "1" + + - match: { _index: source } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } + + # make it read-only + - do: + indices.put_settings: + index: source + body: + index.blocks.write: true + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + index: source + + # shrink with wait_for_completion + - do: + indices.shrink: + index: "source" + target: "new_shrunken_index" + wait_for_active_shards: 1 + cluster_manager_timeout: 10s + wait_for_completion: false + task_execution_timeout: 30s + body: + settings: + index.number_of_shards: 1 + "index.number_of_replicas": 0 + + - match: { task: /^.+$/ } + - set: { task: taskId } + + - do: + tasks.get: + wait_for_completion: true + task_id: $taskId + - match: { task.action: "indices:admin/resize" } + - match: { task.description: "shrink from [source] to [new_shrunken_index]" } + + # .tasks index is created when the shrink index operation completes, so we should delete .tasks index finally, + # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. + # Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one + # specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future + - do: + allowed_warnings: + - "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default" + indices.delete: + index: .tasks + ignore_unavailable: true diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.split/40_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.split/40_wait_for_completion.yml new file mode 100644 index 0000000000000..9fe1b453501eb --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.split/40_wait_for_completion.yml @@ -0,0 +1,88 @@ +--- +"Split index with wait_for_completion": + # split index with wait_for_completion parameter, when the parameter is set to false, the API + # will return a task immediately and the split operation will run in background. + + - skip: + version: " - 2.9.99" + reason: "only available in 3.0+" + features: allowed_warnings + + - do: + nodes.info: + node_id: data:true + - set: + nodes._arbitrary_key_: node_id + + - do: + indices.create: + index: source + wait_for_active_shards: 1 + body: + settings: + # ensure everything is allocated on the same data node + index.routing.allocation.include._id: $node_id + index.number_of_shards: 1 + index.number_of_replicas: 0 + - do: + index: + index: source + id: "1" + body: { "foo": "hello world" } + + - do: + get: + index: source + id: "1" + + - match: { _index: source } + - match: { _id: "1" } + - match: { _source: { foo: "hello world" } } + + # make it read-only + - do: + indices.put_settings: + index: source + body: + index.blocks.write: true + index.number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: green + index: source + + # split with wait_for_completion + - do: + indices.split: + index: "source" + target: "new_split_index" + wait_for_active_shards: 1 + cluster_manager_timeout: 10s + wait_for_completion: false + task_execution_timeout: 30s + body: + settings: + index.number_of_shards: 2 + "index.number_of_replicas": 0 + + - match: { task: /^.+$/ } + - set: { task: taskId } + + - do: + tasks.get: + wait_for_completion: true + task_id: $taskId + - match: { task.action: "indices:admin/resize" } + - match: { task.description: "split from [source] to [new_split_index]" } + + # .tasks index is created when the split index operation completes, so we should delete .tasks index finally, + # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. + # Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one + # specified task or clear all completed tasks, so we have to do so. Expect we can introduce more tasks related APIs in future + - do: + allowed_warnings: + - "this request accesses system indices: [.tasks], but in a future major version, direct access to system indices will be prevented by default" + indices.delete: + index: .tasks + ignore_unavailable: true diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java index 3420074a0f60b..ef74e7a28fec3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java @@ -806,7 +806,7 @@ public void testCreateShrinkIndexWithMaxShardSize() { .setResizeType(ResizeType.SHRINK) .get() ); - assertEquals(exc.getMessage(), "Cannot set max_shard_size and index.number_of_shards at the same time!"); + assertTrue(exc.getMessage().contains("Cannot set max_shard_size and index.number_of_shards at the same time!")); // use max_shard_size to calculate the target index's shards number // set max_shard_size to 1 then the target index's shards number will be same with the source index's diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index 77007fba539ec..20b636dfd1e21 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -81,6 +81,8 @@ public static final class Defaults { */ private final String forceMergeUUID; + private boolean shouldStoreResult; + /** * Constructs a merge request over one or more indices. * @@ -162,6 +164,18 @@ public ForceMergeRequest flush(boolean flush) { return this; } + /** + * Should this task store its result after it has finished? + */ + public void setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + @Override public String getDescription() { return "Force-merge indices " diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponse.java index dece7db3caec4..c57c7cf58f95f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponse.java @@ -42,6 +42,8 @@ import java.util.Arrays; import java.util.List; +import static java.lang.Math.min; + /** * A response for force merge action. * @@ -78,4 +80,15 @@ public class ForceMergeResponse extends BroadcastResponse { public static ForceMergeResponse fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(getClass().getSimpleName()).append("["); + builder.append("total_shards=").append(getTotalShards()).append(','); + builder.append("successful_shards=").append(getSuccessfulShards()).append(','); + builder.append("failed_shards=").append(getFailedShards()).append(','); + builder.append("failures=").append(Arrays.asList(getShardFailures()).subList(0, min(3, getShardFailures().length))); + return builder.append(']').toString(); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexRequest.java index c6c1c2dc8f0cb..9bc6395c0ea94 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexRequest.java @@ -42,6 +42,7 @@ import org.opensearch.common.util.CollectionUtils; import java.io.IOException; +import java.util.Arrays; import static org.opensearch.action.ValidateActions.addValidationError; @@ -55,6 +56,7 @@ public class OpenIndexRequest extends AcknowledgedRequest impl private String[] indices; private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, false, true); private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + private boolean shouldStoreResult; public OpenIndexRequest(StreamInput in) throws IOException { super(in); @@ -161,6 +163,18 @@ public OpenIndexRequest waitForActiveShards(final int waitForActiveShards) { return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); } + /** + * Should this task store its result after it has finished? + */ + public void setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -168,4 +182,14 @@ public void writeTo(StreamOutput out) throws IOException { indicesOptions.writeIndicesOptions(out); waitForActiveShards.writeTo(out); } + + @Override + public String toString() { + return "open indices " + Arrays.toString(indices()); + } + + @Override + public String getDescription() { + return this.toString(); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexResponse.java index c3849809d9ccf..7d5ac66edaafe 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/open/OpenIndexResponse.java @@ -74,4 +74,13 @@ public void writeTo(StreamOutput out) throws IOException { public static OpenIndexResponse fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(getClass().getSimpleName()).append("["); + builder.append("acknowledged=").append(isAcknowledged()).append(','); + builder.append("shards_acknowledged=").append(isShardsAcknowledged()); + return builder.append(']').toString(); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java index 70746787e0ef2..1e524e720da9b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeRequest.java @@ -40,14 +40,14 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.master.AcknowledgedRequest; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.core.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.ObjectParser; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.common.unit.ByteSizeValue; import java.io.IOException; import java.util.Objects; @@ -88,6 +88,7 @@ public class ResizeRequest extends AcknowledgedRequest implements private ResizeType type = ResizeType.SHRINK; private Boolean copySettings = true; private ByteSizeValue maxShardSize; + private boolean shouldStoreResult; public ResizeRequest(StreamInput in) throws IOException { super(in); @@ -119,9 +120,29 @@ public ActionRequestValidationException validate() { if (targetIndexRequest.settings().getByPrefix("index.sort.").isEmpty() == false) { validationException = addValidationError("can't override index sort when resizing an index", validationException); } + if (IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexRequest.settings())) { + validationException = addValidationError( + "cannot provide a routing partition size value when resizing an index", + validationException + ); + } if (type == ResizeType.SPLIT && IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) == false) { validationException = addValidationError("index.number_of_shards is required for split operations", validationException); } + + // max_shard_size is only supported for shrink + if (type != ResizeType.SHRINK && maxShardSize != null) { + validationException = addValidationError("Unsupported parameter [max_shard_size]", validationException); + } + // max_shard_size conflicts with the index.number_of_shards setting + if (type == ResizeType.SHRINK + && IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) + && maxShardSize != null) { + validationException = addValidationError( + "Cannot set max_shard_size and index.number_of_shards at the same time!", + validationException + ); + } if (maxShardSize != null && maxShardSize.getBytes() <= 0) { validationException = addValidationError("max_shard_size must be greater than 0", validationException); } @@ -243,6 +264,18 @@ public ByteSizeValue getMaxShardSize() { return maxShardSize; } + /** + * Should this task store its result after it has finished? + */ + public void setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -270,4 +303,27 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void fromXContent(XContentParser parser) throws IOException { PARSER.parse(parser, this, null); } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + switch (getResizeType()) { + case SPLIT: + b.append("split from"); + break; + case CLONE: + b.append("clone from"); + break; + default: + b.append("shrink from"); + } + b.append(" [").append(sourceIndex).append("]"); + b.append(" to [").append(getTargetIndexRequest().index()).append(']'); + return b.toString(); + } + + @Override + public String getDescription() { + return this.toString(); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeResponse.java index e24b549925528..69e973d39dc09 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/ResizeResponse.java @@ -67,4 +67,14 @@ public ResizeResponse(boolean acknowledged, boolean shardsAcknowledged, String i public static ResizeResponse fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(getClass().getSimpleName()).append("["); + builder.append("acknowledged=").append(isAcknowledged()).append(','); + builder.append("shards_acknowledged=").append(isShardsAcknowledged()).append(','); + builder.append("index=").append(index()); + return builder.append(']').toString(); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java index 7f55e5efe801b..5bcf3631bf398 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java @@ -181,21 +181,12 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( final Settings targetIndexSettings = targetIndexSettingsBuilder.build(); final int numShards; - // max_shard_size is only supported for shrink - ByteSizeValue maxShardSize = resizeRequest.getMaxShardSize(); - if (resizeRequest.getResizeType() != ResizeType.SHRINK && maxShardSize != null) { - throw new IllegalArgumentException("Unsupported parameter [max_shard_size]"); - } - if (IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) { numShards = IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings); - if (resizeRequest.getResizeType() == ResizeType.SHRINK && maxShardSize != null) { - throw new IllegalArgumentException("Cannot set max_shard_size and index.number_of_shards at the same time!"); - } } else { assert resizeRequest.getResizeType() != ResizeType.SPLIT : "split must specify the number of shards explicitly"; if (resizeRequest.getResizeType() == ResizeType.SHRINK) { - numShards = calculateTargetIndexShardsNum(maxShardSize, primaryShardsStoreStats, metadata); + numShards = calculateTargetIndexShardsNum(resizeRequest.getMaxShardSize(), primaryShardsStoreStats, metadata); } else { assert resizeRequest.getResizeType() == ResizeType.CLONE; numShards = metadata.getNumberOfShards(); @@ -229,9 +220,6 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( } } - if (IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) { - throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index"); - } if (IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) { // if we have a source index with 1 shards it's legal to set this final boolean splitFromSingleShards = resizeRequest.getResizeType() == ResizeType.SPLIT && metadata.getNumberOfShards() == 1; diff --git a/server/src/main/java/org/opensearch/action/support/master/AcknowledgedRequest.java b/server/src/main/java/org/opensearch/action/support/master/AcknowledgedRequest.java index 7f665b4e658a1..11f3eec5dfe37 100644 --- a/server/src/main/java/org/opensearch/action/support/master/AcknowledgedRequest.java +++ b/server/src/main/java/org/opensearch/action/support/master/AcknowledgedRequest.java @@ -38,6 +38,7 @@ import java.io.IOException; +import static org.opensearch.common.unit.TimeValue.timeValueHours; import static org.opensearch.common.unit.TimeValue.timeValueSeconds; /** @@ -51,6 +52,7 @@ public abstract class AcknowledgedRequest { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", nodeId + ":" + task.getId()); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + }; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java index e684cd27570b4..eefe284d572ba 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java @@ -32,6 +32,7 @@ package org.opensearch.rest.action.admin.indices; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeAction; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.node.NodeClient; @@ -40,6 +41,7 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.tasks.LoggingTaskListener; import java.io.IOException; import java.util.List; @@ -80,7 +82,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC "setting only_expunge_deletes and max_num_segments at the same time is deprecated and will be rejected in a future version" ); } - return channel -> client.admin().indices().forceMerge(mergeRequest, new RestToXContentListener<>(channel)); + if (request.paramAsBoolean("wait_for_completion", true)) { + return channel -> client.admin().indices().forceMerge(mergeRequest, new RestToXContentListener<>(channel)); + } else { + // running force merge asynchronously, return a task immediately and store the task's result when it completes + mergeRequest.setShouldStoreResult(true); + return sendTask( + client.getLocalNodeId(), + client.executeLocally(ForceMergeAction.INSTANCE, mergeRequest, LoggingTaskListener.instance()) + ); + } } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestOpenIndexAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestOpenIndexAction.java index e95d6764c5b98..05c51ab02d552 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestOpenIndexAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestOpenIndexAction.java @@ -32,6 +32,8 @@ package org.opensearch.rest.action.admin.indices; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.indices.open.OpenIndexAction; import org.opensearch.action.admin.indices.open.OpenIndexRequest; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.IndicesOptions; @@ -41,12 +43,14 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.tasks.LoggingTaskListener; import java.io.IOException; import java.util.List; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; +import static org.opensearch.action.support.master.AcknowledgedRequest.DEFAULT_TASK_EXECUTION_TIMEOUT; import static org.opensearch.rest.RestRequest.Method.POST; /** @@ -81,6 +85,29 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC if (waitForActiveShards != null) { openIndexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } - return channel -> client.admin().indices().open(openIndexRequest, new RestToXContentListener<>(channel)); + if (request.paramAsBoolean("wait_for_completion", true)) { + return channel -> client.admin().indices().open(openIndexRequest, new RestToXContentListener<>(channel)); + } else { + // Running opening index asynchronously, return a task immediately and store the task's result when it completes + openIndexRequest.setShouldStoreResult(true); + /* + * Replace the ack timeout by task_execution_timeout so that the task can take a longer time to execute but not finish in 30s + * by default, task_execution_timeout defaults to 1h. + */ + openIndexRequest.timeout(request.paramAsTime("task_execution_timeout", DEFAULT_TASK_EXECUTION_TIMEOUT)); + /* + * Add some validation before so the user can get some error immediately. The + * task can't totally validate until it starts but this is better than nothing. + */ + ActionRequestValidationException validationException = openIndexRequest.validate(); + if (validationException != null) { + throw validationException; + } + return sendTask( + client.getLocalNodeId(), + client.executeLocally(OpenIndexAction.INSTANCE, openIndexRequest, LoggingTaskListener.instance()) + ); + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestResizeHandler.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestResizeHandler.java index 1024891d9d44a..c84e0ba08ea32 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestResizeHandler.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestResizeHandler.java @@ -35,6 +35,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.indices.shrink.ResizeAction; import org.opensearch.action.admin.indices.shrink.ResizeRequest; import org.opensearch.action.admin.indices.shrink.ResizeType; import org.opensearch.action.support.ActiveShardCount; @@ -44,12 +46,14 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.tasks.LoggingTaskListener; import java.io.IOException; import java.util.List; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; +import static org.opensearch.action.support.master.AcknowledgedRequest.DEFAULT_TASK_EXECUTION_TIMEOUT; import static org.opensearch.rest.RestRequest.Method.POST; import static org.opensearch.rest.RestRequest.Method.PUT; @@ -96,10 +100,34 @@ public final RestChannelConsumer prepareRequest(final RestRequest request, final resizeRequest.setCopySettings(copySettings); request.applyContentParser(resizeRequest::fromXContent); resizeRequest.timeout(request.paramAsTime("timeout", resizeRequest.timeout())); + resizeRequest.getTargetIndexRequest().timeout(resizeRequest.timeout()); resizeRequest.clusterManagerNodeTimeout(request.paramAsTime("cluster_manager_timeout", resizeRequest.clusterManagerNodeTimeout())); + resizeRequest.getTargetIndexRequest().clusterManagerNodeTimeout(resizeRequest.clusterManagerNodeTimeout()); parseDeprecatedMasterTimeoutParameter(resizeRequest, request, deprecationLogger, getName()); resizeRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); - return channel -> client.admin().indices().resizeIndex(resizeRequest, new RestToXContentListener<>(channel)); + if (request.paramAsBoolean("wait_for_completion", true)) { + return channel -> client.admin().indices().resizeIndex(resizeRequest, new RestToXContentListener<>(channel)); + } else { + // running resizing index asynchronously, return a task immediately and store the task's result when it completes + resizeRequest.setShouldStoreResult(true); + /* + * Replace the ack timeout by task_execution_timeout so that the task can take a longer time to execute but not finish in 30s + * by default, task_execution_timeout defaults to 1h. + */ + resizeRequest.getTargetIndexRequest().timeout(request.paramAsTime("task_execution_timeout", DEFAULT_TASK_EXECUTION_TIMEOUT)); + /* + * Add some validation before so the user can get some error immediately. The + * task can't totally validate until it starts but this is better than nothing. + */ + ActionRequestValidationException validationException = resizeRequest.validate(); + if (validationException != null) { + throw validationException; + } + return sendTask( + client.getLocalNodeId(), + client.executeLocally(ResizeAction.INSTANCE, resizeRequest, LoggingTaskListener.instance()) + ); + } } /** diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponseTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponseTests.java index 15eaa471538fa..3064847b54332 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeResponseTests.java @@ -32,13 +32,32 @@ package org.opensearch.action.admin.indices.forcemerge; +import org.opensearch.OpenSearchException; import org.opensearch.action.support.DefaultShardOperationFailedException; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.AbstractBroadcastResponseTestCase; +import java.util.ArrayList; import java.util.List; public class ForceMergeResponseTests extends AbstractBroadcastResponseTestCase { + + public void testToString() { + String indexName = randomAlphaOfLengthBetween(3, 10); + int shardId = randomIntBetween(0, 10); + List exceptions = new ArrayList<>(); + exceptions.add(new DefaultShardOperationFailedException(indexName, shardId, new OpenSearchException("boom"))); + ForceMergeResponse response = createTestInstance(10, 3, 1, exceptions); + assertEquals( + "ForceMergeResponse[total_shards=10,successful_shards=3,failed_shards=1,failures=[[" + + indexName + + "][" + + shardId + + "] failed, reason [OpenSearchException[boom]]]]", + response.toString() + ); + } + @Override protected ForceMergeResponse createTestInstance( int totalShards, diff --git a/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexRequestTests.java new file mode 100644 index 0000000000000..8686101f9d845 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexRequestTests.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.open; + +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class OpenIndexRequestTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + OpenIndexRequest request = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + + final OpenIndexRequest deserializedRequest; + try (StreamInput in = out.bytes().streamInput()) { + deserializedRequest = new OpenIndexRequest(in); + } + assertEquals(request.timeout(), deserializedRequest.timeout()); + assertEquals(request.clusterManagerNodeTimeout(), deserializedRequest.clusterManagerNodeTimeout()); + assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions()); + assertEquals(request.getParentTask(), deserializedRequest.getParentTask()); + assertEquals(request.waitForActiveShards(), deserializedRequest.waitForActiveShards()); + assertArrayEquals(request.indices(), deserializedRequest.indices()); + } + } + + public void testGetDescription() { + String indexName = randomAlphaOfLengthBetween(3, 10); + OpenIndexRequest request = new OpenIndexRequest(indexName); + assertEquals("open indices [" + indexName + "]", request.getDescription()); + } + + private OpenIndexRequest randomRequest() { + OpenIndexRequest request = new OpenIndexRequest(); + request.indices(generateRandomStringArray(10, 5, false, false)); + if (randomBoolean()) { + request.indicesOptions( + IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()) + ); + } + if (randomBoolean()) { + request.timeout(randomPositiveTimeValue()); + } + if (randomBoolean()) { + request.clusterManagerNodeTimeout(randomPositiveTimeValue()); + } + if (randomBoolean()) { + request.setParentTask(randomAlphaOfLength(5), randomNonNegativeLong()); + } + if (randomBoolean()) { + request.waitForActiveShards( + randomFrom(ActiveShardCount.DEFAULT, ActiveShardCount.NONE, ActiveShardCount.ONE, ActiveShardCount.ALL) + ); + } + return request; + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexResponseTests.java b/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexResponseTests.java index 5186f82248029..4bd7adc7b48b9 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/open/OpenIndexResponseTests.java @@ -38,6 +38,12 @@ public class OpenIndexResponseTests extends AbstractSerializingTestCase { + public void testToString() { + OpenIndexResponse response = new OpenIndexResponse(true, false); + String output = response.toString(); + assertEquals("OpenIndexResponse[acknowledged=true,shards_acknowledged=false]", output); + } + @Override protected OpenIndexResponse doParseInstance(XContentParser parser) { return OpenIndexResponse.fromXContent(parser); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeRequestTests.java index d2d8f717e64f4..d591c395d5402 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeRequestTests.java @@ -32,14 +32,16 @@ package org.opensearch.action.admin.indices.shrink; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexRequestTests; import org.opensearch.common.Strings; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.Settings; -import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.RandomCreateIndexGenerator; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; @@ -125,6 +127,66 @@ public void testToAndFromXContent() throws IOException { OpenSearchAssertions.assertToXContentEquivalent(originalBytes, finalBytes, xContentType); } + public void testTargetIndexSettingsValidation() { + ResizeRequest resizeRequest = new ResizeRequest(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10)); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(randomAlphaOfLengthBetween(3, 10)); + createIndexRequest.settings( + Settings.builder() + .put("index.sort.field", randomAlphaOfLengthBetween(3, 10)) + .put("index.routing_partition_size", randomIntBetween(1, 10)) + ); + resizeRequest.setTargetIndex(createIndexRequest); + ActionRequestValidationException e = resizeRequest.validate(); + assertEquals( + "Validation Failed: 1: can't override index sort when resizing an index;" + + "2: cannot provide a routing partition size value when resizing an index;", + e.getMessage() + ); + + createIndexRequest.settings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10))); + resizeRequest.setMaxShardSize(new ByteSizeValue(randomIntBetween(1, 100))); + resizeRequest.setTargetIndex(createIndexRequest); + e = resizeRequest.validate(); + assertEquals("Validation Failed: 1: Cannot set max_shard_size and index.number_of_shards at the same time!;", e.getMessage()); + + resizeRequest.setResizeType(ResizeType.SPLIT); + createIndexRequest.settings(Settings.builder().build()); + resizeRequest.setMaxShardSize(new ByteSizeValue(randomIntBetween(1, 100))); + resizeRequest.setTargetIndex(createIndexRequest); + e = resizeRequest.validate(); + assertEquals( + "Validation Failed: 1: index.number_of_shards is required for split operations;" + "2: Unsupported parameter [max_shard_size];", + e.getMessage() + ); + + resizeRequest.setResizeType(ResizeType.CLONE); + createIndexRequest.settings(Settings.builder().build()); + resizeRequest.setMaxShardSize(new ByteSizeValue(randomIntBetween(1, 100))); + resizeRequest.setTargetIndex(createIndexRequest); + e = resizeRequest.validate(); + assertEquals("Validation Failed: 1: Unsupported parameter [max_shard_size];", e.getMessage()); + } + + public void testGetDescription() { + String sourceIndexName = randomAlphaOfLengthBetween(3, 10); + String targetIndexName = randomAlphaOfLengthBetween(3, 10); + ResizeRequest request = new ResizeRequest(targetIndexName, sourceIndexName); + String resizeType; + switch (randomIntBetween(0, 2)) { + case 1: + request.setResizeType(ResizeType.SPLIT); + resizeType = "split"; + break; + case 2: + request.setResizeType(ResizeType.CLONE); + resizeType = "clone"; + break; + default: + resizeType = "shrink"; + } + assertEquals(resizeType + " from [" + sourceIndexName + "] to [" + targetIndexName + "]", request.getDescription()); + } + private static ResizeRequest createTestItem() { ResizeRequest resizeRequest = new ResizeRequest(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10)); if (randomBoolean()) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeResponseTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeResponseTests.java index c98d4536968e3..d8a87d9bffe80 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/ResizeResponseTests.java @@ -46,6 +46,12 @@ public void testToXContent() { assertEquals("{\"acknowledged\":true,\"shards_acknowledged\":false,\"index\":\"index_name\"}", output); } + public void testToString() { + ResizeResponse response = new ResizeResponse(true, false, "index_name"); + String output = response.toString(); + assertEquals("ResizeResponse[acknowledged=true,shards_acknowledged=false,index=index_name]", output); + } + @Override protected ResizeResponse doParseInstance(XContentParser parser) { return ResizeResponse.fromXContent(parser); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java index 5705362cc73f4..b830c6b01792c 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java @@ -318,48 +318,6 @@ public void testShrinkWithMaxShardSize() { createClusterState(indexName, 10, 0, Settings.builder().put("index.blocks.write", true).build()) ).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); - // Cannot set max_shard_size when split or clone - ResizeRequest resizeRequestForFailure = new ResizeRequest("target", indexName); - ResizeType resizeType = ResizeType.SPLIT; - if (randomBoolean()) { - resizeType = ResizeType.CLONE; - } - resizeRequestForFailure.setResizeType(resizeType); - resizeRequestForFailure.setMaxShardSize(new ByteSizeValue(100)); - resizeRequestForFailure.getTargetIndexRequest() - .settings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 100)).build()); - ClusterState finalState = clusterState; - IllegalArgumentException iae = expectThrows( - IllegalArgumentException.class, - () -> TransportResizeAction.prepareCreateIndexRequest( - resizeRequestForFailure, - finalState, - null, - new StoreStats(between(1, 10000), between(1, 10000)), - indexName, - "target" - ) - ); - assertEquals("Unsupported parameter [max_shard_size]", iae.getMessage()); - - // Cannot set max_shard_size and index.number_of_shards at the same time - ResizeRequest resizeRequest = new ResizeRequest("target", indexName); - resizeRequest.setResizeType(ResizeType.SHRINK); - resizeRequest.setMaxShardSize(new ByteSizeValue(100)); - resizeRequest.getTargetIndexRequest().settings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 100)).build()); - iae = expectThrows( - IllegalArgumentException.class, - () -> TransportResizeAction.prepareCreateIndexRequest( - resizeRequest, - finalState, - null, - new StoreStats(between(1, 10000), between(1, 10000)), - indexName, - "target" - ) - ); - assertEquals("Cannot set max_shard_size and index.number_of_shards at the same time!", iae.getMessage()); - AllocationService service = new AllocationService( new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), new TestGatewayAllocator(), @@ -377,6 +335,7 @@ public void testShrinkWithMaxShardSize() { // target index's shards number must be the lowest factor of the source index's shards number int expectedShardsNum = 5; + ResizeRequest resizeRequest = new ResizeRequest("target", indexName); resizeRequest.setMaxShardSize(new ByteSizeValue(25)); // clear index settings resizeRequest.getTargetIndexRequest().settings(Settings.builder().build());