Skip to content

Commit

Permalink
[Extensions] Makes JobRunner/Parameter/Request/Response classes exten…
Browse files Browse the repository at this point in the history
…d from ActionRequest/Response (#352)

* Replaces ExtensionActionRequest class name with the JobParameter/RunnerRequest fully qualified class names, modifes JobParameter/Runner/Request/Response classes to extend from ActionRequest/Response

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Removes ExtensionJobActionResponse and fixes affected test classes

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Fixing javadocs

Signed-off-by: Joshua Palis <jpalis@amazon.com>

---------

Signed-off-by: Joshua Palis <jpalis@amazon.com>
  • Loading branch information
joshpalis authored Mar 28, 2023
1 parent cf6c78e commit 77b873e
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ExtensionJobActionRequest(String extensionActionName, T actionParams) thr
}

/**
* Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified class name bytes
* Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified request class name bytes
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
Expand All @@ -53,7 +53,7 @@ private static <T extends Writeable> byte[] convertParamsToBytes(T actionParams)
byte[] requestBytes = BytesReference.toBytes(out.bytes());

// Convert fully qualifed class name to byte array
byte[] requestClassBytes = ExtensionActionRequest.class.getName().getBytes(StandardCharsets.UTF_8);
byte[] requestClassBytes = actionParams.getClass().getName().getBytes(StandardCharsets.UTF_8);

// Generate ExtensionActionRequest responseByte array
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + requestBytes.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
package org.opensearch.jobscheduler.transport.request;

import java.io.IOException;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentParser;
Expand All @@ -21,7 +23,7 @@
/**
* Request to extensions to parse a ScheduledJobParameter
*/
public class JobParameterRequest implements Writeable {
public class JobParameterRequest extends ActionRequest {

/**
* accessToken is the placeholder for the user Identity/access token to be used to perform validation prior to invoking the extension action
Expand Down Expand Up @@ -94,6 +96,11 @@ public void writeTo(StreamOutput out) throws IOException {
this.jobDocVersion.writeTo(out);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public String getAccessToken() {
return this.accessToken;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

import java.io.IOException;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.jobscheduler.spi.JobExecutionContext;

/**
* Request to extensions to invoke their ScheduledJobRunner implementation
*
*/
public class JobRunnerRequest implements Writeable {
public class JobRunnerRequest extends ActionRequest {

/**
* accessToken is the placeholder for the user Identity/access token to be used to perform validation prior to invoking the extension action
Expand Down Expand Up @@ -78,6 +79,11 @@ public void writeTo(StreamOutput out) throws IOException {
this.jobExecutionContext.writeTo(out);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public String getAccessToken() {
return this.accessToken;
}
Expand All @@ -89,4 +95,5 @@ public String getJobParameterDocumentId() {
public JobExecutionContext getJobExecutionContext() {
return this.jobExecutionContext;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

import java.io.IOException;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.jobscheduler.model.ExtensionJobParameter;

/**
* Response from extensions to parse a ScheduledJobParameter
*/
public class JobParameterResponse implements Writeable {
public class JobParameterResponse extends ActionResponse {

/**
* jobParameter is job index entry intended to be used to validate prior to job execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

import java.io.IOException;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;

/**
* Response from extensions indicating the status of the ScheduledJobRunner invocation
*
*/
public class JobRunnerResponse implements Writeable {
public class JobRunnerResponse extends ActionResponse {

/**
* jobRunnerStatus indicates if the extension job runner has been executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
Expand All @@ -40,7 +41,6 @@
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.transport.request.ExtensionJobActionRequest;
import org.opensearch.jobscheduler.transport.response.ExtensionJobActionResponse;
import org.opensearch.jobscheduler.transport.request.JobParameterRequest;
import org.opensearch.jobscheduler.transport.response.JobParameterResponse;
import org.opensearch.jobscheduler.transport.request.JobRunnerRequest;
Expand Down Expand Up @@ -122,6 +122,24 @@ private byte[] trimRequestBytes(byte[] requestBytes) {
return Arrays.copyOfRange(requestBytes, pos + 1, requestBytes.length);
}

/**
* Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array
*
* @param <T> a class that extends writeable
* @param actionParams the action parameters to be serialized
* @throws IOException if serialization fails
* @return the byte array of the parameters
*/
private static <T extends Writeable> byte[] convertParamsToBytes(T actionParams) throws IOException {
// Write all to output stream
BytesStreamOutput out = new BytesStreamOutput();
actionParams.writeTo(out);
out.flush();

// convert bytes stream to byte array
return BytesReference.toBytes(out.bytes());
}

public void testGetJobDetailsSanity() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Boolean> inProgressFuture = new CompletableFuture<>();
JobDetailsService jobDetailsService = new JobDetailsService(
Expand Down Expand Up @@ -388,11 +406,11 @@ public void testJobParameterExtensionJobActionRequest() throws IOException {
}
}

public void testJobRunnerExtensionJobActionResponse() throws IOException {
public void testJobRunnerExtensionActionResponse() throws IOException {

// Create JobRunnerResponse
JobRunnerResponse jobRunnerResponse = new JobRunnerResponse(true);
ExtensionActionResponse actionResponse = new ExtensionJobActionResponse<JobRunnerResponse>(jobRunnerResponse);
ExtensionActionResponse actionResponse = new ExtensionActionResponse(convertParamsToBytes(jobRunnerResponse));

// Test ExtensionActionResponse deserialization
try (BytesStreamOutput out = new BytesStreamOutput()) {
Expand All @@ -410,11 +428,11 @@ public void testJobRunnerExtensionJobActionResponse() throws IOException {

}

public void testJobParameterExtensionJobActionResponse() throws IOException {
public void testJobParameterExtensionActionResponse() throws IOException {

// Create JobParameterResponse
JobParameterResponse jobParameterResponse = new JobParameterResponse(this.extensionJobParameter);
ExtensionActionResponse actionResponse = new ExtensionJobActionResponse<JobParameterResponse>(jobParameterResponse);
ExtensionActionResponse actionResponse = new ExtensionActionResponse(convertParamsToBytes(jobParameterResponse));

// Test ExtensionActionReseponse deserialization
try (BytesStreamOutput out = new BytesStreamOutput()) {
Expand Down

0 comments on commit 77b873e

Please sign in to comment.