Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offline batch ingestion API actions and data ingesters #2844

Merged

Conversation

Zhangxunmt
Copy link
Collaborator

@Zhangxunmt Zhangxunmt commented Aug 22, 2024

Description

Add a new API to offline ingest data in batch mode from different sources (starting with sageMaker). This is to collaborate with the offline batch inference released in 2.16.

Example of batch ingestion request from SageMaker:

  1. Create the knn index
PUT /my-nlp-index
{
  "settings": {
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "text"
      },
      "chapter_embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "chapter": {
        "type": "text"
      },
      "title_embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "title": {
        "type": "text"
      }
    }
  }
}

  1. run the new batch ingestion API
POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index",
  "field_map": {
    "input": "$.content",   // input is a reserved name to indicate llm input 
    "output": "$.SageMakerOutput", // output is a reserved name to indicate llm output
    "input_names": ["chapter", "title"], 
    "output_names": ["chapter_embedding", "title_embedding"],
    "ingest_fields": ["$.id"]
  },
  "credential": {
    "region": "us-east-1",
    "access_key": "xxxx",
    "secret_key": "xxxx",
    "session_token": "xxxx"
  },
  "data_source": {
    "type": "s3",
    "source": ["s3://offlinebatch/output/sagemaker_djl_batch_input.json.out"]
  }
}

Example of batch ingestion request from OpenAI:

  1. Create the knn index
PUT /my-nlp-index-openai
{
  "settings": {
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "custom_id": {
        "type": "text"
      },
      "question_embedding": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "answer_embedding": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "engine": "nmslib",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "question": {
        "type": "text"
      },
      "answer": {
        "type": "text"
      }
    }
  }
}
  1. run the new batch ingestion API using OpenAI file
POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index-openai",
  "field_map": {
    "output": "source[1].$.response.body.data[*].embedding", 
    "output_names": ["question_embedding", "answer_embedding"],
    "input": "source[2].$.body.input",
    "input_names": ["question", "answer"],
    "id_field": ["source[1].$.custom_id", "source[2].$.custom_id"],
    "ingest_fields": ["source[1].$.custom_id"]
  },
  "credential": {
    "openAI_key": "<your key>"
  },
  "data_source": {
    "type": "openAI",
    "source": ["file-wbu2zvKKAaqpSzRNWiN3Ia2y", "file-5gXEtbKjHnYrKrdtv69IeRN2"]. // [output file id, input file id]
  }
}

Related Issues

#2840

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@Zhangxunmt Zhangxunmt force-pushed the feature/offline-batch branch from 1907a48 to 1163387 Compare August 27, 2024 22:43
@Zhangxunmt Zhangxunmt force-pushed the feature/offline-batch branch from 1163387 to a6b7b4a Compare August 27, 2024 22:48
@Zhangxunmt Zhangxunmt force-pushed the feature/offline-batch branch from e6d4047 to b3337d8 Compare August 28, 2024 23:02
jngz-es
jngz-es previously approved these changes Sep 4, 2024
Signed-off-by: Xun Zhang <xunzh@amazon.com>
exception = addValidationError("The input for ML batch ingestion cannot be null.", exception);
}
if (mlBatchIngestionInput != null && mlBatchIngestionInput.getCredential() == null) {
exception = addValidationError("The credential for ML batch ingestion cannot be null", exception);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Credentials for ML batch ingestion are missing. Please provide the necessary credentials to continue with the ingestion process.

I had this comment which is resolved but haven't applied. Same goes for other validation Errors.

@ylwu-amzn
Copy link
Collaborator

ylwu-amzn commented Sep 4, 2024

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index-openai",
  "field_map": {
    "output": "source[1].$.response.body.data[*].embedding",  
    "output_names": ["question_embedding", "answer_embedding"],
    "input": "source[2].$.body.input",
    "input_names": ["question", "answer"],
    "id_field": ["source[1].$.custom_id", "source[2].$.custom_id"],
    "ingest_fields": ["source[1].$.custom_id"]
  },
  "credential": {
    "openAI_key": "<your key>"
  },
  "data_source": {
    "type": "openAI",
    "source": ["file-wbu2zvKKAaqpSzRNWiN3Ia2y", "file-5gXEtbKjHnYrKrdtv69IeRN2"]. // [output file id, input file id]
  }
}

field_map is not intuitive. Suggest change to similar way of https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/ , map field from file to a field name in index. For example

{
 "field_map": {
    "product_name": "source[0,2].$.my_product_name" // map my_product_name in first and third source file to product_name in index
    "product_name_embedding": "source[1,3].$.embeddings[0]"
  },
  "ingest_fields": ["source[0,2].$.product_description"], // This will map the filed name of source file to same field name in index
  "data_source": {
    "type": "openAI",
    "source": ["products1", "products1_embedding", "products2", "products2_embedding"]. // [output file id, input file id]
  }
}

@Zhangxunmt
Copy link
Collaborator Author

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index-openai",
  "field_map": {
    "output": "source[1].$.response.body.data[*].embedding",  
    "output_names": ["question_embedding", "answer_embedding"],
    "input": "source[2].$.body.input",
    "input_names": ["question", "answer"],
    "id_field": ["source[1].$.custom_id", "source[2].$.custom_id"],
    "ingest_fields": ["source[1].$.custom_id"]
  },
  "credential": {
    "openAI_key": "<your key>"
  },
  "data_source": {
    "type": "openAI",
    "source": ["file-wbu2zvKKAaqpSzRNWiN3Ia2y", "file-5gXEtbKjHnYrKrdtv69IeRN2"]. // [output file id, input file id]
  }
}

field_map is not intuitive. Suggest change to similar way of https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/ , map field from file to a field name in index. For example

{
 "field_map": {
    "product_name": "source[0,2].$.my_product_name" // map my_product_name in first and third source file to product_name in index
    "product_name_embedding": "source[1,3].$.embeddings[0]"
  },
  "ingest_fields": ["source[0,2].$.product_description"], // This will map the filed name of source file to same field name in index
  "data_source": {
    "type": "openAI",
    "source": ["products1", "products1_embedding", "products2", "products2_embedding"]. // [output file id, input file id]
  }
}

This is a CX interphase suggestion. I will explore the suggested way in field_map in a separate PR since this one is already long.

@Zhangxunmt Zhangxunmt merged commit 33a7c96 into opensearch-project:main Sep 4, 2024
6 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 4, 2024
* batch ingest API rest and transport actions

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* add openAI ingester

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* update batch ingestion field mapping interphase and address comments

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* support multiple data sources as ingestion inputs

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* use dedicated thread pool for ingestion

Signed-off-by: Xun Zhang <xunzh@amazon.com>

---------

Signed-off-by: Xun Zhang <xunzh@amazon.com>
(cherry picked from commit 33a7c96)
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 4, 2024
* batch ingest API rest and transport actions

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* add openAI ingester

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* update batch ingestion field mapping interphase and address comments

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* support multiple data sources as ingestion inputs

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* use dedicated thread pool for ingestion

Signed-off-by: Xun Zhang <xunzh@amazon.com>

---------

Signed-off-by: Xun Zhang <xunzh@amazon.com>
(cherry picked from commit 33a7c96)
Zhangxunmt added a commit that referenced this pull request Sep 4, 2024
* batch ingest API rest and transport actions

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* add openAI ingester

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* update batch ingestion field mapping interphase and address comments

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* support multiple data sources as ingestion inputs

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* use dedicated thread pool for ingestion

Signed-off-by: Xun Zhang <xunzh@amazon.com>

---------

Signed-off-by: Xun Zhang <xunzh@amazon.com>
(cherry picked from commit 33a7c96)

Co-authored-by: Xun Zhang <xunzh@amazon.com>
Zhangxunmt added a commit that referenced this pull request Sep 4, 2024
* batch ingest API rest and transport actions

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* add openAI ingester

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* update batch ingestion field mapping interphase and address comments

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* support multiple data sources as ingestion inputs

Signed-off-by: Xun Zhang <xunzh@amazon.com>

* use dedicated thread pool for ingestion

Signed-off-by: Xun Zhang <xunzh@amazon.com>

---------

Signed-off-by: Xun Zhang <xunzh@amazon.com>
(cherry picked from commit 33a7c96)

Co-authored-by: Xun Zhang <xunzh@amazon.com>
@Zhangxunmt
Copy link
Collaborator Author

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index-openai",
  "field_map": {
    "output": "source[1].$.response.body.data[*].embedding",  
    "output_names": ["question_embedding", "answer_embedding"],
    "input": "source[2].$.body.input",
    "input_names": ["question", "answer"],
    "id_field": ["source[1].$.custom_id", "source[2].$.custom_id"],
    "ingest_fields": ["source[1].$.custom_id"]
  },
  "credential": {
    "openAI_key": "<your key>"
  },
  "data_source": {
    "type": "openAI",
    "source": ["file-wbu2zvKKAaqpSzRNWiN3Ia2y", "file-5gXEtbKjHnYrKrdtv69IeRN2"]. // [output file id, input file id]
  }
}

field_map is not intuitive. Suggest change to similar way of https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/ , map field from file to a field name in index. For example

{
 "field_map": {
    "product_name": "source[0,2].$.my_product_name" // map my_product_name in first and third source file to product_name in index
    "product_name_embedding": "source[1,3].$.embeddings[0]"
  },
  "ingest_fields": ["source[0,2].$.product_description"], // This will map the filed name of source file to same field name in index
  "data_source": {
    "type": "openAI",
    "source": ["products1", "products1_embedding", "products2", "products2_embedding"]. // [output file id, input file id]
  }
}

After some thoughts, I think the index mapping can be updated to reflect the actual field mappings for the target index like what you suggested. However, I think we should not over complicate the problem to consider more than 1 embedding source file because that would cause ingestion confusion. For example, in the case of

"product_name": "source[0,2].$.my_product_name"
"product_name_embedding": "source[1,3].$.embeddings[0]"

In the case that people put 45% product_name data in file 0, and 55% data in file 2, and 55% product_name_embedding data in file 1 and 45% data in file 3, since the files has to be scanned one by one, we wouldn't know how to match them because essentially some of the data in file 1 needs to be bulk indexed but others to be bulk updated.

I think we should keep the concept simple and easy to understand here. In a single request, we accept multiple files, but each of the file contains all the data for a certain fields. Basically we only support vertically sharding but not horizontally sharding. With that said, in the field mapping, each of the field would only come from 1 single file to avoid confusions and disorder.
"product_name": "source[0].$.my_product_name"
"product_name_embedding": "source[1].$.embeddings[0]"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants