Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add BWC for batch ingestion #824

Merged
merged 1 commit into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Bug Fixes
- Fix for missing HybridQuery results when concurrent segment search is enabled ([#800](https://github.com/opensearch-project/neural-search/pull/800))
### Infrastructure
- Add BWC for batch ingestion ([#769](https://github.com/opensearch-project/neural-search/pull/769))
### Documentation
### Maintenance
### Refactoring
6 changes: 4 additions & 2 deletions qa/restart-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -146,10 +147,11 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.bwc;

import org.opensearch.neuralsearch.util.TestUtils;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion;
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR;

public class BatchIngestionIT extends AbstractRestartUpgradeRestTestCase {
private static final String PIPELINE_NAME = "pipeline-BatchIngestionIT";
private static final String TEXT_FIELD_NAME = "passage_text";
private static final String EMBEDDING_FIELD_NAME = "passage_embedding";
private static final int batchSize = 3;

public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
String indexName = getIndexNameForTest();
if (isRunningAgainstOldCluster()) {
String modelId = uploadSparseEncodingModel();
loadModel(modelId);
createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME);
createIndexWithConfiguration(
indexName,
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())),
PIPELINE_NAME
);
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize);
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
} else {
String modelId = null;
modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR);
loadModel(modelId);
try {
List<Map<String, String>> docs = prepareDataForBulkIngestion(5, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize);
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,21 @@ private void createChunkingIndex(String indexName) throws Exception {
createIndexWithConfiguration(indexName, indexSetting, PIPELINE_NAME);
}

private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) {
int docCount = getDocCount(indexName);
assertEquals(documentCount, docCount);
private Map<String, Object> getFirstDocumentInQuery(String indexName, int resultSize) {
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
Map<String, Object> searchResults = search(indexName, query, 10);
Map<String, Object> searchResults = search(indexName, query, resultSize);
assertNotNull(searchResults);
Map<String, Object> document = getFirstInnerHit(searchResults);
assertNotNull(document);
Object documentSource = document.get("_source");
assert (documentSource instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> documentSourceMap = (Map<String, Object>) documentSource;
assert (documentSourceMap).containsKey(fieldName);
Object ingestOutputs = documentSourceMap.get(fieldName);
assertEquals(expected, ingestOutputs);
return getFirstInnerHit(searchResults);
}

private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) {
Object outputs = validateDocCountAndInfo(
indexName,
documentCount,
() -> getFirstDocumentInQuery(indexName, 10),
fieldName,
List.class
);
assertEquals(expected, outputs);
}
}
12 changes: 8 additions & 4 deletions qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -147,10 +148,11 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -203,10 +205,11 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down Expand Up @@ -259,10 +262,11 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) {
}
}

// Excluding the k-NN radial search tests because we introduce this feature in 2.14
// Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14
if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){
filter {
excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*"
excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.bwc;

import org.opensearch.neuralsearch.util.TestUtils;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion;
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR;

public class BatchIngestionIT extends AbstractRollingUpgradeTestCase {
private static final String SPARSE_PIPELINE = "BatchIngestionIT_sparse_pipeline_rolling";
private static final String TEXT_FIELD_NAME = "passage_text";
private static final String EMBEDDING_FIELD_NAME = "passage_embedding";

public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
String indexName = getIndexNameForTest();
String sparseModelId = null;
switch (getClusterType()) {
case OLD:
sparseModelId = uploadSparseEncodingModel();
loadModel(sparseModelId);
createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE);
createIndexWithConfiguration(
indexName,
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())),
SPARSE_PIPELINE
);
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2);
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class);
break;
case MIXED:
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR);
loadModel(sparseModelId);
List<Map<String, String>> docsForMixed = prepareDataForBulkIngestion(5, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3);
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class);
break;
case UPGRADED:
try {
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR);
loadModel(sparseModelId);
List<Map<String, String>> docsForUpgraded = prepareDataForBulkIngestion(10, 5);
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2);
validateDocCountAndInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class);
} finally {
wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null);
}
break;
default:
throw new IllegalStateException("Unexpected value: " + getClusterType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.ArrayList;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -722,6 +723,36 @@ protected void addSparseEncodingDoc(
assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
}

protected void bulkAddDocuments(
final String index,
final String textField,
final String pipeline,
final List<Map<String, String>> docs,
final int batchSize
) throws IOException, ParseException {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < docs.size(); ++i) {
String doc = String.format(
Locale.ROOT,
"{ \"index\": { \"_index\": \"%s\", \"_id\": \"%s\" } },\n" + "{ \"%s\": \"%s\"}",
index,
docs.get(i).get("id"),
textField,
docs.get(i).get("text")
);
builder.append(doc);
builder.append("\n");
}
Request request = new Request(
"POST",
String.format(Locale.ROOT, "/_bulk?refresh=true&pipeline=%s&batch_size=%d", pipeline, batchSize)
);
request.setJsonEntity(builder.toString());

Response response = client().performRequest(request);
assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode()));
}

/**
* Parse the first returned hit from a search response as a map
*
Expand Down Expand Up @@ -1286,6 +1317,27 @@ protected void wipeOfTestResources(
}
}

protected Object validateDocCountAndInfo(
String indexName,
int expectedDocCount,
Supplier<Map<String, Object>> documentSupplier,
final String field,
final Class<?> valueType
) {
int count = getDocCount(indexName);
assertEquals(expectedDocCount, count);
Map<String, Object> document = documentSupplier.get();
assertNotNull(document);
Object documentSource = document.get("_source");
assertTrue(documentSource instanceof Map);
@SuppressWarnings("unchecked")
Map<String, Object> documentSourceMap = (Map<String, Object>) documentSource;
assertTrue(documentSourceMap.containsKey(field));
Object outputs = documentSourceMap.get(field);
assertTrue(valueType.isAssignableFrom(outputs.getClass()));
return outputs;
}

/**
* Enumeration for types of pipeline processors, used to lookup resources like create
* processor request as those are type specific
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A helper class to build docs for bulk request which is used by batch ingestion tests.
*/
public class BatchIngestionUtils {
private static final List<String> TEXTS = Arrays.asList(
"hello",
"world",
"an apple",
"find me",
"birdy",
"flying piggy",
"newspaper",
"dynamic programming",
"random text",
"finally"
);

public static List<Map<String, String>> prepareDataForBulkIngestion(int startId, int count) {
List<Map<String, String>> docs = new ArrayList<>();
for (int i = startId; i < startId + count; ++i) {
Map<String, String> params = new HashMap<>();
params.put("id", Integer.toString(i));
params.put("text", TEXTS.get(i % TEXTS.size()));
docs.add(params);
}
return docs;
}
}
Loading