From 167d4e5d755296d0b9da9c8b9f6df913b642b3dd Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 28 Mar 2023 01:15:48 +0000 Subject: [PATCH 1/3] Replaces ExtensionActionRequest class name with the JobParameter/RunnerRequest fully qualified class names, modifes JobParameter/Runner/Request/Response classes to extend from ActionRequest/Response Signed-off-by: Joshua Palis --- .../transport/request/ExtensionJobActionRequest.java | 2 +- .../transport/request/JobParameterRequest.java | 11 +++++++++-- .../transport/request/JobRunnerRequest.java | 11 +++++++++-- .../transport/response/JobParameterResponse.java | 4 ++-- .../transport/response/JobRunnerResponse.java | 4 ++-- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java index 3e8b3c27..dbabedb3 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java @@ -53,7 +53,7 @@ private static byte[] convertParamsToBytes(T actionParams) byte[] requestBytes = BytesReference.toBytes(out.bytes()); // Convert fully qualifed class name to byte array - byte[] requestClassBytes = ExtensionActionRequest.class.getName().getBytes(StandardCharsets.UTF_8); + byte[] requestClassBytes = actionParams.getClass().getName().getBytes(StandardCharsets.UTF_8); // Generate ExtensionActionRequest responseByte array byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + requestBytes.length) diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java index 245423a5..326cccc1 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java @@ -9,10 +9,12 @@ package org.opensearch.jobscheduler.transport.request; import java.io.IOException; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.xcontent.XContentParser; @@ -21,7 +23,7 @@ /** * Request to extensions to parse a ScheduledJobParameter */ -public class JobParameterRequest implements Writeable { +public class JobParameterRequest extends ActionRequest { /** * accessToken is the placeholder for the user Identity/access token to be used to perform validation prior to invoking the extension action @@ -94,6 +96,11 @@ public void writeTo(StreamOutput out) throws IOException { this.jobDocVersion.writeTo(out); } + @Override + public ActionRequestValidationException validate() { + return null; + } + public String getAccessToken() { return this.accessToken; } diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java index 29eef927..99bd5be9 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java @@ -10,16 +10,17 @@ import java.io.IOException; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.jobscheduler.spi.JobExecutionContext; /** * Request to extensions to invoke their ScheduledJobRunner implementation * */ -public class JobRunnerRequest implements Writeable { +public class JobRunnerRequest extends ActionRequest { /** * accessToken is the placeholder for the user Identity/access token to be used to perform validation prior to invoking the extension action @@ -78,6 +79,11 @@ public void writeTo(StreamOutput out) throws IOException { this.jobExecutionContext.writeTo(out); } + @Override + public ActionRequestValidationException validate() { + return null; + } + public String getAccessToken() { return this.accessToken; } @@ -89,4 +95,5 @@ public String getJobParameterDocumentId() { public JobExecutionContext getJobExecutionContext() { return this.jobExecutionContext; } + } diff --git a/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java index 18290c13..38a94439 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java @@ -10,15 +10,15 @@ import java.io.IOException; +import org.opensearch.action.ActionResponse; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.jobscheduler.model.ExtensionJobParameter; /** * Response from extensions to parse a ScheduledJobParameter */ -public class JobParameterResponse implements Writeable { +public class JobParameterResponse extends ActionResponse { /** * jobParameter is job index entry intended to be used to validate prior to job execution diff --git a/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java index a5a267f3..7af48973 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java @@ -10,15 +10,15 @@ import java.io.IOException; +import org.opensearch.action.ActionResponse; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; /** * Response from extensions indicating the status of the ScheduledJobRunner invocation * */ -public class JobRunnerResponse implements Writeable { +public class JobRunnerResponse extends ActionResponse { /** * jobRunnerStatus indicates if the extension job runner has been executed From f78f445985035168daa2687e172c4cce62deb2c8 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 28 Mar 2023 01:43:36 +0000 Subject: [PATCH 2/3] Removes ExtensionJobActionResponse and fixes affected test classes Signed-off-by: Joshua Palis --- .../response/ExtensionJobActionResponse.java | 52 ------------------- .../utils/JobDetailsServiceIT.java | 28 ++++++++-- 2 files changed, 23 insertions(+), 57 deletions(-) delete mode 100644 src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java diff --git a/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java deleted file mode 100644 index c7f89046..00000000 --- a/src/main/java/org/opensearch/jobscheduler/transport/response/ExtensionJobActionResponse.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.jobscheduler.transport.response; - -import java.io.IOException; - -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.io.stream.Writeable; -import org.opensearch.extensions.action.ExtensionActionResponse; - -/** - * Response from extension job action, converts response params to a byte array - * - */ -public class ExtensionJobActionResponse extends ExtensionActionResponse { - - /** - * Instantiates a new ExtensionJobActionResponse - * - * @param actionResponse the response object holding the action response parameters - * @throws IOException if serialization fails - */ - public ExtensionJobActionResponse(T actionResponse) throws IOException { - super(convertParamsToBytes(actionResponse)); - } - - /** - * Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array - * - * @param a class that extends writeable - * @param actionParams the action parameters to be serialized - * @throws IOException if serialization fails - * @return the byte array of the parameters - */ - private static byte[] convertParamsToBytes(T actionParams) throws IOException { - // Write all to output stream - BytesStreamOutput out = new BytesStreamOutput(); - actionParams.writeTo(out); - out.flush(); - - // convert bytes stream to byte array - return BytesReference.toBytes(out.bytes()); - } - -} diff --git a/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java index 2b78858b..d59fccb0 100644 --- a/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java +++ b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java @@ -27,6 +27,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; @@ -40,7 +41,6 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.jobscheduler.transport.request.ExtensionJobActionRequest; -import org.opensearch.jobscheduler.transport.response.ExtensionJobActionResponse; import org.opensearch.jobscheduler.transport.request.JobParameterRequest; import org.opensearch.jobscheduler.transport.response.JobParameterResponse; import org.opensearch.jobscheduler.transport.request.JobRunnerRequest; @@ -122,6 +122,24 @@ private byte[] trimRequestBytes(byte[] requestBytes) { return Arrays.copyOfRange(requestBytes, pos + 1, requestBytes.length); } + /** + * Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array + * + * @param a class that extends writeable + * @param actionParams the action parameters to be serialized + * @throws IOException if serialization fails + * @return the byte array of the parameters + */ + private static byte[] convertParamsToBytes(T actionParams) throws IOException { + // Write all to output stream + BytesStreamOutput out = new BytesStreamOutput(); + actionParams.writeTo(out); + out.flush(); + + // convert bytes stream to byte array + return BytesReference.toBytes(out.bytes()); + } + public void testGetJobDetailsSanity() throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture inProgressFuture = new CompletableFuture<>(); JobDetailsService jobDetailsService = new JobDetailsService( @@ -388,11 +406,11 @@ public void testJobParameterExtensionJobActionRequest() throws IOException { } } - public void testJobRunnerExtensionJobActionResponse() throws IOException { + public void testJobRunnerExtensionActionResponse() throws IOException { // Create JobRunnerResponse JobRunnerResponse jobRunnerResponse = new JobRunnerResponse(true); - ExtensionActionResponse actionResponse = new ExtensionJobActionResponse(jobRunnerResponse); + ExtensionActionResponse actionResponse = new ExtensionActionResponse(convertParamsToBytes(jobRunnerResponse)); // Test ExtensionActionResponse deserialization try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -410,11 +428,11 @@ public void testJobRunnerExtensionJobActionResponse() throws IOException { } - public void testJobParameterExtensionJobActionResponse() throws IOException { + public void testJobParameterExtensionActionResponse() throws IOException { // Create JobParameterResponse JobParameterResponse jobParameterResponse = new JobParameterResponse(this.extensionJobParameter); - ExtensionActionResponse actionResponse = new ExtensionJobActionResponse(jobParameterResponse); + ExtensionActionResponse actionResponse = new ExtensionActionResponse(convertParamsToBytes(jobParameterResponse)); // Test ExtensionActionReseponse deserialization try (BytesStreamOutput out = new BytesStreamOutput()) { From 3835e1cd26fd497d1d9d062f1d4076da9f9a8249 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 28 Mar 2023 01:44:45 +0000 Subject: [PATCH 3/3] Fixing javadocs Signed-off-by: Joshua Palis --- .../transport/request/ExtensionJobActionRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java index dbabedb3..c4ca24d7 100644 --- a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java @@ -37,7 +37,7 @@ public ExtensionJobActionRequest(String extensionActionName, T actionParams) thr } /** - * Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified class name bytes + * Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified request class name bytes * * @param a class that extends writeable * @param actionParams the action parameters to be serialized