Skip to content

Commit

Permalink
[Remove] type support from Bulk API (#2215)
Browse files Browse the repository at this point in the history
Removes all support for type parameter from the Bulk API.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize authored Feb 23, 2022
1 parent 44441d8 commit 1b571ec
Show file tree
Hide file tree
Showing 65 changed files with 228 additions and 1,002 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultType,
defaultRouting,
null,
defaultPipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.search.SearchHit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -69,9 +66,7 @@
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.fieldFromSource;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasId;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasIndex;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasProperty;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
Expand All @@ -96,17 +91,6 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
);
}

private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(
request,
expectWarningsOnce(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener
),
listener
);
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
Expand Down Expand Up @@ -210,7 +194,6 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
// we do want to check that we don't get duplicate ids back
Expand Down Expand Up @@ -317,7 +300,6 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
Expand Down Expand Up @@ -346,7 +328,6 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
// tag::bulk-processor-mix-parameters
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
Expand All @@ -373,87 +354,13 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
createIndexWithMultipleShards("test");

createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
final String customType = "testType";
final String ignoredType = "ignoredType";

int numDocs = randomIntBetween(10, 10);
{
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
// Check that untyped document additions inherit the global type
String globalType = customType;
String localType = null;
try (
BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1))
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()
) {

indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, globalType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(globalType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}

}
{
// Check that typed document additions don't inherit the global type
String globalType = ignoredType;
String localType = customType;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (
BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1))
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()
) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
{
// Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (
BulkProcessor processor = initBulkProcessorBuilder(listener)
// let's make sure that the bulk action limit trips, one single execution will index all the documents
Expand All @@ -462,23 +369,22 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()
) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");

indexDocs(processor, numDocs, null, localType, "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);
assertResponseItems(listener.bulkItems, numDocs);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
Expand All @@ -495,7 +401,6 @@ private MultiGetRequest indexDocs(
String localIndex,
String localType,
String globalIndex,
String globalType,
String globalPipeline
) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
Expand All @@ -510,7 +415,7 @@ private MultiGetRequest indexDocs(
);
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);
processor.add(data, globalIndex, globalPipeline, XContentType.JSON);
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
Expand Down Expand Up @@ -538,19 +443,14 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
return indexDocs(processor, numDocs, "test", null, null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertResponseItems(bulkItemResponses, numDocs, MapperService.SINGLE_MAPPING_NAME);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs, String expectedType) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo(expectedType));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat(
"item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.search.SearchHit;

import java.io.IOException;
Expand All @@ -46,7 +45,6 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasId;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasIndex;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasProperty;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
Expand Down Expand Up @@ -117,7 +115,7 @@ public void testMixPipelineOnRequestAndGlobal() throws IOException {
}

public void testGlobalIndex() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2").source(XContentType.JSON, "field", "bulk2"));

Expand All @@ -129,7 +127,7 @@ public void testGlobalIndex() throws IOException {

@SuppressWarnings("unchecked")
public void testIndexGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest("local_index").id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(
new IndexRequest().id("2") // will take global index
Expand All @@ -142,31 +140,6 @@ public void testIndexGlobalAndPerRequest() throws IOException {
assertThat(hits, containsInAnyOrder(both(hasId("1")).and(hasIndex("local_index")), both(hasId("2")).and(hasIndex("global_index"))));
}

public void testGlobalType() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index").id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index").id("2").source(XContentType.JSON, "field", "bulk2"));

bulkWithTypes(request);

Iterable<SearchHit> hits = searchAll("index");
assertThat(hits, everyItem(hasType("global_type")));
}

public void testTypeGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index1", "local_type", "1").source(XContentType.JSON, "field", "bulk1"));
request.add(
new IndexRequest("index2").id("2") // will take global type
.source(XContentType.JSON, "field", "bulk2")
);

bulkWithTypes(request);

Iterable<SearchHit> hits = searchAll("index1", "index2");
assertThat(hits, containsInAnyOrder(both(hasId("1")).and(hasType("local_type")), both(hasId("2")).and(hasType("global_type"))));
}

public void testGlobalRouting() throws IOException {
createIndexWithMultipleShards("index");
BulkRequest request = new BulkRequest((String) null);
Expand Down Expand Up @@ -194,28 +167,6 @@ public void testMixLocalAndGlobalRouting() throws IOException {
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
}

public void testGlobalIndexNoTypes() throws IOException {
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2").source(XContentType.JSON, "field", "bulk2"));

bulk(request);

Iterable<SearchHit> hits = searchAll("global_index");
assertThat(hits, everyItem(hasIndex("global_index")));
}

private BulkResponse bulkWithTypes(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(
request,
highLevelClient()::bulk,
highLevelClient()::bulkAsync,
expectWarningsOnce(RestBulkAction.TYPES_DEPRECATION_MESSAGE)
);
assertFalse(bulkResponse.hasFailures());
return bulkResponse;
}

private BulkResponse bulk(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync, RequestOptions.DEFAULT);
assertFalse(bulkResponse.hasFailures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.opensearch.index.VersionType;
import org.opensearch.index.get.GetResult;
import org.opensearch.rest.RestStatus;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
Expand Down Expand Up @@ -441,10 +440,9 @@ public void testMultiGet() throws IOException {
public void testMultiGetWithIds() throws IOException {
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest("index", "type", "id1").source("{\"field\":\"value1\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "type", "id2").source("{\"field\":\"value2\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "id1").source("{\"field\":\"value1\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "id2").source("{\"field\":\"value2\"}", XContentType.JSON));

highLevelClient().bulk(bulk, expectWarningsOnce(RestBulkAction.TYPES_DEPRECATION_MESSAGE));
MultiGetRequest multiGetRequest = new MultiGetRequest();
multiGetRequest.add("index", "id1");
multiGetRequest.add("index", "id2");
Expand Down Expand Up @@ -1016,7 +1014,6 @@ private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse b

assertEquals(i, bulkItemResponse.getItemId());
assertEquals("index", bulkItemResponse.getIndex());
assertEquals("_doc", bulkItemResponse.getType());
assertEquals(String.valueOf(i), bulkItemResponse.getId());

DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ setup:
bulk:
refresh: true
body:
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
- '{"index": {"_index": "test-0"}}'
- '{"ip": "10.0.0.1", "integer": 38, "float": 12.5713, "name": "Ruth", "bool": true}'
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
- '{"index": {"_index": "test-0"}}'
- '{"ip": "10.0.0.2", "integer": 42, "float": 15.3393, "name": "Jackie", "surname": "Bowling", "bool": false}'
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
- '{"index": {"_index": "test-1"}}'
- '{"ip": "10.0.0.3", "integer": 29, "float": 19.0517, "name": "Stephanie", "bool": true}'
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
- '{"index": {"_index": "test-1"}}'
- '{"ip": "10.0.0.4", "integer": 19, "float": 19.3717, "surname": "Hamilton", "bool": true}'
- '{"index": {"_index": "test-2", "_type": "_doc"}}'
- '{"index": {"_index": "test-2"}}'
- '{"ip": "10.0.0.5", "integer": 0, "float": 17.3349, "name": "Natalie", "bool": false}'

---
Expand Down
Loading

0 comments on commit 1b571ec

Please sign in to comment.