-
Notifications
You must be signed in to change notification settings - Fork 140
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
offline batch ingestion API actions and data ingesters (#2844)
* batch ingest API rest and transport actions Signed-off-by: Xun Zhang <xunzh@amazon.com> * add openAI ingester Signed-off-by: Xun Zhang <xunzh@amazon.com> * update batch ingestion field mapping interphase and address comments Signed-off-by: Xun Zhang <xunzh@amazon.com> * support multiple data sources as ingestion inputs Signed-off-by: Xun Zhang <xunzh@amazon.com> * use dedicated thread pool for ingestion Signed-off-by: Xun Zhang <xunzh@amazon.com> --------- Signed-off-by: Xun Zhang <xunzh@amazon.com> (cherry picked from commit 33a7c96)
- Loading branch information
1 parent
c6a3cea
commit ce403ec
Showing
26 changed files
with
2,314 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,5 +15,6 @@ public enum MLTaskType { | |
@Deprecated | ||
LOAD_MODEL, | ||
REGISTER_MODEL, | ||
DEPLOY_MODEL | ||
DEPLOY_MODEL, | ||
BATCH_INGEST | ||
} |
18 changes: 18 additions & 0 deletions
18
common/src/main/java/org/opensearch/ml/common/transport/batch/MLBatchIngestionAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.batch; | ||
|
||
import org.opensearch.action.ActionType; | ||
|
||
public class MLBatchIngestionAction extends ActionType<MLBatchIngestionResponse> { | ||
public static MLBatchIngestionAction INSTANCE = new MLBatchIngestionAction(); | ||
public static final String NAME = "cluster:admin/opensearch/ml/batch_ingestion"; | ||
|
||
private MLBatchIngestionAction() { | ||
super(NAME, MLBatchIngestionResponse::new); | ||
} | ||
|
||
} |
152 changes: 152 additions & 0 deletions
152
common/src/main/java/org/opensearch/ml/common/transport/batch/MLBatchIngestionInput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.batch; | ||
|
||
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.common.io.stream.Writeable; | ||
import org.opensearch.core.xcontent.ToXContentObject; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
|
||
import lombok.Builder; | ||
import lombok.Getter; | ||
|
||
/** | ||
* ML batch ingestion data: index, field mapping and input and out files. | ||
*/ | ||
public class MLBatchIngestionInput implements ToXContentObject, Writeable { | ||
|
||
public static final String INDEX_NAME_FIELD = "index_name"; | ||
public static final String FIELD_MAP_FIELD = "field_map"; | ||
public static final String DATA_SOURCE_FIELD = "data_source"; | ||
public static final String CONNECTOR_CREDENTIAL_FIELD = "credential"; | ||
@Getter | ||
private String indexName; | ||
@Getter | ||
private Map<String, Object> fieldMapping; | ||
@Getter | ||
private Map<String, Object> dataSources; | ||
@Getter | ||
private Map<String, String> credential; | ||
|
||
@Builder(toBuilder = true) | ||
public MLBatchIngestionInput( | ||
String indexName, | ||
Map<String, Object> fieldMapping, | ||
Map<String, Object> dataSources, | ||
Map<String, String> credential | ||
) { | ||
if (indexName == null) { | ||
throw new IllegalArgumentException( | ||
"The index name for data ingestion is missing. Please provide a valid index name to proceed." | ||
); | ||
} | ||
if (dataSources == null) { | ||
throw new IllegalArgumentException( | ||
"No data sources were provided for ingestion. Please specify at least one valid data source to proceed." | ||
); | ||
} | ||
this.indexName = indexName; | ||
this.fieldMapping = fieldMapping; | ||
this.dataSources = dataSources; | ||
this.credential = credential; | ||
} | ||
|
||
public static MLBatchIngestionInput parse(XContentParser parser) throws IOException { | ||
String indexName = null; | ||
Map<String, Object> fieldMapping = null; | ||
Map<String, Object> dataSources = null; | ||
Map<String, String> credential = new HashMap<>(); | ||
|
||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); | ||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
|
||
switch (fieldName) { | ||
case INDEX_NAME_FIELD: | ||
indexName = parser.text(); | ||
break; | ||
case FIELD_MAP_FIELD: | ||
fieldMapping = parser.map(); | ||
break; | ||
case CONNECTOR_CREDENTIAL_FIELD: | ||
credential = parser.mapStrings(); | ||
break; | ||
case DATA_SOURCE_FIELD: | ||
dataSources = parser.map(); | ||
break; | ||
default: | ||
parser.skipChildren(); | ||
break; | ||
} | ||
} | ||
return new MLBatchIngestionInput(indexName, fieldMapping, dataSources, credential); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.startObject(); | ||
if (indexName != null) { | ||
builder.field(INDEX_NAME_FIELD, indexName); | ||
} | ||
if (fieldMapping != null) { | ||
builder.field(FIELD_MAP_FIELD, fieldMapping); | ||
} | ||
if (credential != null) { | ||
builder.field(CONNECTOR_CREDENTIAL_FIELD, credential); | ||
} | ||
if (dataSources != null) { | ||
builder.field(DATA_SOURCE_FIELD, dataSources); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput output) throws IOException { | ||
output.writeOptionalString(indexName); | ||
if (fieldMapping != null) { | ||
output.writeBoolean(true); | ||
output.writeMap(fieldMapping, StreamOutput::writeString, StreamOutput::writeGenericValue); | ||
} else { | ||
output.writeBoolean(false); | ||
} | ||
if (credential != null) { | ||
output.writeBoolean(true); | ||
output.writeMap(credential, StreamOutput::writeString, StreamOutput::writeString); | ||
} else { | ||
output.writeBoolean(false); | ||
} | ||
if (dataSources != null) { | ||
output.writeBoolean(true); | ||
output.writeMap(dataSources, StreamOutput::writeString, StreamOutput::writeGenericValue); | ||
} else { | ||
output.writeBoolean(false); | ||
} | ||
} | ||
|
||
public MLBatchIngestionInput(StreamInput input) throws IOException { | ||
indexName = input.readOptionalString(); | ||
if (input.readBoolean()) { | ||
fieldMapping = input.readMap(s -> s.readString(), s -> s.readGenericValue()); | ||
} | ||
if (input.readBoolean()) { | ||
credential = input.readMap(s -> s.readString(), s -> s.readString()); | ||
} | ||
if (input.readBoolean()) { | ||
dataSources = input.readMap(s -> s.readString(), s -> s.readGenericValue()); | ||
} | ||
} | ||
|
||
} |
82 changes: 82 additions & 0 deletions
82
common/src/main/java/org/opensearch/ml/common/transport/batch/MLBatchIngestionRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.batch; | ||
|
||
import static org.opensearch.action.ValidateActions.addValidationError; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
|
||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.action.ActionRequestValidationException; | ||
import org.opensearch.core.common.io.stream.InputStreamStreamInput; | ||
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
|
||
import lombok.AccessLevel; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.ToString; | ||
import lombok.experimental.FieldDefaults; | ||
|
||
@Getter | ||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) | ||
@ToString | ||
public class MLBatchIngestionRequest extends ActionRequest { | ||
|
||
private MLBatchIngestionInput mlBatchIngestionInput; | ||
|
||
@Builder | ||
public MLBatchIngestionRequest(MLBatchIngestionInput mlBatchIngestionInput) { | ||
this.mlBatchIngestionInput = mlBatchIngestionInput; | ||
} | ||
|
||
public MLBatchIngestionRequest(StreamInput in) throws IOException { | ||
super(in); | ||
this.mlBatchIngestionInput = new MLBatchIngestionInput(in); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
this.mlBatchIngestionInput.writeTo(out); | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException exception = null; | ||
if (mlBatchIngestionInput == null) { | ||
exception = addValidationError("The input for ML batch ingestion cannot be null.", exception); | ||
} | ||
if (mlBatchIngestionInput != null && mlBatchIngestionInput.getCredential() == null) { | ||
exception = addValidationError("The credential for ML batch ingestion cannot be null", exception); | ||
} | ||
if (mlBatchIngestionInput != null && mlBatchIngestionInput.getDataSources() == null) { | ||
exception = addValidationError("The data sources for ML batch ingestion cannot be null", exception); | ||
} | ||
|
||
return exception; | ||
} | ||
|
||
public static MLBatchIngestionRequest fromActionRequest(ActionRequest actionRequest) { | ||
if (actionRequest instanceof MLBatchIngestionRequest) { | ||
return (MLBatchIngestionRequest) actionRequest; | ||
} | ||
|
||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) { | ||
actionRequest.writeTo(osso); | ||
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) { | ||
return new MLBatchIngestionRequest(input); | ||
} | ||
} catch (IOException e) { | ||
throw new UncheckedIOException("failed to parse ActionRequest into MLBatchIngestionRequest", e); | ||
} | ||
|
||
} | ||
} |
81 changes: 81 additions & 0 deletions
81
common/src/main/java/org/opensearch/ml/common/transport/batch/MLBatchIngestionResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.ml.common.transport.batch; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
|
||
import org.opensearch.core.action.ActionResponse; | ||
import org.opensearch.core.common.io.stream.InputStreamStreamInput; | ||
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.ToXContentObject; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.ml.common.MLTaskType; | ||
|
||
import lombok.Getter; | ||
|
||
@Getter | ||
public class MLBatchIngestionResponse extends ActionResponse implements ToXContentObject { | ||
public static final String TASK_ID_FIELD = "task_id"; | ||
public static final String TASK_TYPE_FIELD = "task_type"; | ||
public static final String STATUS_FIELD = "status"; | ||
|
||
private String taskId; | ||
private MLTaskType taskType; | ||
private String status; | ||
|
||
public MLBatchIngestionResponse(StreamInput in) throws IOException { | ||
super(in); | ||
this.taskId = in.readString(); | ||
this.taskType = in.readEnum(MLTaskType.class); | ||
this.status = in.readString(); | ||
} | ||
|
||
public MLBatchIngestionResponse(String taskId, MLTaskType mlTaskType, String status) { | ||
this.taskId = taskId; | ||
this.taskType = mlTaskType; | ||
this.status = status; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeString(taskId); | ||
out.writeEnum(taskType); | ||
out.writeString(status); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { | ||
builder.startObject(); | ||
builder.field(TASK_ID_FIELD, taskId); | ||
if (taskType != null) { | ||
builder.field(TASK_TYPE_FIELD, taskType); | ||
} | ||
builder.field(STATUS_FIELD, status); | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
public static MLBatchIngestionResponse fromActionResponse(ActionResponse actionResponse) { | ||
if (actionResponse instanceof MLBatchIngestionResponse) { | ||
return (MLBatchIngestionResponse) actionResponse; | ||
} | ||
|
||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) { | ||
actionResponse.writeTo(osso); | ||
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) { | ||
return new MLBatchIngestionResponse(input); | ||
} | ||
} catch (IOException e) { | ||
throw new UncheckedIOException("failed to parse ActionResponse into MLBatchIngestionResponse", e); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.