diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 8eb976d1216f..b0c784284c28 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -57,6 +57,12 @@
provided
+
+ org.apache.nifi
+ nifi-lookup-service-api
+ provided
+
+
com.fasterxml.jackson.corejackson-databind
@@ -123,6 +129,33 @@
+
+ org.apache.nifi
+ nifi-avro-record-utils
+ 1.8.0-SNAPSHOT
+ compile
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+ compile
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+ org.apache.nifi
+ nifi-record-path
+ 1.8.0-SNAPSHOT
+ compile
+
@@ -140,6 +173,8 @@
94005.6.290
+ ERROR
+ ${project.basedir}/src/test/resources/setup.script
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
new file mode 100644
index 000000000000..c86477109011
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.jayway.jsonpath.JsonPath;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService {
+ public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("el-rest-client-service")
+ .displayName("Client Service")
+ .description("An ElasticSearch client service to use for running queries.")
+ .identifiesControllerService(ElasticSearchClientService.class)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+ .name("el-lookup-index")
+ .displayName("Index")
+ .description("The name of the index to read from")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
+ .name("el-lookup-type")
+ .displayName("Type")
+ .description("The type of this document (used by Elasticsearch for indexing and searching)")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private ElasticSearchClientService clientService;
+
+ private String index;
+ private String type;
+ private ObjectMapper mapper;
+
+ private final List DESCRIPTORS;
+
+ public ElasticSearchLookupService() {
+ List _desc = new ArrayList<>();
+ _desc.addAll(super.getSupportedPropertyDescriptors());
+ _desc.add(CLIENT_SERVICE);
+ _desc.add(INDEX);
+ _desc.add(TYPE);
+ DESCRIPTORS = Collections.unmodifiableList(_desc);
+ }
+
+ private volatile ConcurrentHashMap mappings;
+
+ @Override
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+ index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
+ type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
+ mapper = new ObjectMapper();
+
+ List dynamic = context.getProperties().entrySet().stream()
+ .filter( e -> e.getKey().isDynamic())
+ .map(e -> e.getKey())
+ .collect(Collectors.toList());
+
+ Map _temp = new HashMap<>();
+ for (PropertyDescriptor desc : dynamic) {
+ String value = context.getProperty(desc).getValue();
+ String name = desc.getName();
+ _temp.put(name, RecordPath.compile(value));
+ }
+
+ mappings = new ConcurrentHashMap<>(_temp);
+
+ super.onEnabled(context);
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @Override
+ public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String name) {
+ return new PropertyDescriptor.Builder()
+ .name(name)
+ .addValidator((subject, input, context) -> {
+ ValidationResult.Builder builder = new ValidationResult.Builder();
+ try {
+ JsonPath.parse(input);
+ builder.valid(true);
+ } catch (Exception ex) {
+ builder.explanation(ex.getMessage())
+ .valid(false)
+ .subject(subject);
+ }
+
+ return builder.build();
+ })
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ Map context = coordinates.entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> e.getValue().toString()
+ ));
+ return lookup(coordinates, context);
+ }
+
+ @Override
+ public Optional lookup(Map coordinates, Map context) throws LookupFailureException {
+ validateCoordinates(coordinates);
+
+ try {
+ Record record;
+ if (coordinates.containsKey("_id")) {
+ record = getById((String)coordinates.get("_id"), context);
+ } else {
+ record = getByQuery(coordinates, context);
+ }
+
+ return record == null ? Optional.empty() : Optional.of(record);
+ } catch (Exception ex) {
+ getLogger().error("Error during lookup.", ex);
+ throw new LookupFailureException(ex);
+ }
+ }
+
+ private void validateCoordinates(Map coordinates) throws LookupFailureException {
+ List reasons = new ArrayList<>();
+
+ if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
+ reasons.add("_id was supplied, but it was not a String.");
+ }
+
+ if (coordinates.containsKey("_id") && coordinates.size() > 1) {
+ reasons.add("When _id is used, it can be the only key used in the lookup.");
+ }
+
+ if (reasons.size() > 0) {
+ String error = String.join("\n", reasons);
+ throw new LookupFailureException(error);
+ }
+ }
+
+ private Record getById(final String _id, Map context) throws IOException, LookupFailureException, SchemaNotFoundException {
+ Map query = new HashMap(){{
+ put("query", new HashMap() {{
+ put("match", new HashMap(){{
+ put("_id", _id);
+ }});
+ }});
+ }};
+
+ String json = mapper.writeValueAsString(query);
+
+ SearchResponse response = clientService.search(json, index, type);
+
+ if (response.getNumberOfHits() > 1) {
+ throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
+ response.getNumberOfHits(), json));
+ } else if (response.getNumberOfHits() == 0) {
+ return null;
+ }
+
+ final Map source = (Map)response.getHits().get(0).get("_source");
+
+ RecordSchema toUse = getSchema(context, source, null);
+
+ Record record = new MapRecord(toUse, source);
+
+ if (mappings.size() > 0) {
+ record = applyMappings(record, source);
+ }
+
+ return record;
+ }
+
+ Map getNested(String key, Object value) {
+ String path = key.substring(0, key.lastIndexOf("."));
+
+ return new HashMap(){{
+ put("path", path);
+ put("query", new HashMap(){{
+ put("match", new HashMap(){{
+ put(key, value);
+ }});
+ }});
+ }};
+ }
+
+ private Map buildQuery(Map coordinates) {
+ Map query = new HashMap(){{
+ put("bool", new HashMap(){{
+ put("must", coordinates.entrySet().stream()
+ .map(e -> new HashMap(){{
+ if (e.getKey().contains(".")) {
+ put("nested", getNested(e.getKey(), e.getValue()));
+ } else {
+ put("match", new HashMap() {{
+ put(e.getKey(), e.getValue());
+ }});
+ }
+ }}).collect(Collectors.toList())
+ );
+ }});
+ }};
+
+ Map outter = new HashMap(){{
+ put("size", 1);
+ put("query", query);
+ }};
+
+ return outter;
+ }
+
+ private Record getByQuery(final Map query, Map context) throws LookupFailureException {
+ try {
+ final String json = mapper.writeValueAsString(buildQuery(query));
+
+ SearchResponse response = clientService.search(json, index, type);
+
+ if (response.getNumberOfHits() == 0) {
+ return null;
+ } else {
+ final Map source = (Map)response.getHits().get(0).get("_source");
+ RecordSchema toUse = getSchema(context, source, null);
+ Record record = new MapRecord(toUse, source);
+
+ if (mappings.size() > 0) {
+ record = applyMappings(record, source);
+ }
+
+ return record;
+ }
+
+ } catch (Exception e) {
+ throw new LookupFailureException(e);
+ }
+ }
+
+ private Record applyMappings(Record record, Map source) {
+ Record _rec = new MapRecord(record.getSchema(), new HashMap<>());
+
+ mappings.entrySet().forEach(entry -> {
+ try {
+ Object o = JsonPath.read(source, entry.getKey());
+ RecordPath path = entry.getValue();
+ Optional first = path.evaluate(_rec).getSelectedFields().findFirst();
+ if (first.isPresent()) {
+ first.get().updateValue(o);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+
+ return _rec;
+ }
+
+ @Override
+ public Class> getValueType() {
+ return Record.class;
+ }
+
+ @Override
+ public Set getRequiredKeys() {
+ return Collections.emptySet();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 161f6526901f..65745fb94638 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.nifi.elasticsearch.ElasticSearchLookupService
org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchLookupService/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchLookupService/additionalDetails.html
new file mode 100644
index 000000000000..3b95430e339c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchLookupService/additionalDetails.html
@@ -0,0 +1,53 @@
+
+
+
+
+
+ ElasticSearchLookupService
+
+
+
+
+
+
Description:
+
+ This lookup service uses ElasticSearch as its data source. Mappings in LookupRecord map record paths to paths within
+ an ElasticSearch document. Example:
+
+
+ /user/name => user.contact.name
+
+
+ That would map the record path /user/name to an embedded document named contact with a field named
+ name.
+
+
+ The query that is assembled from these is a boolean query where all of the criteria are under the must list.
+ In addition, wildcards are not supported right now and all criteria are translated into literal match queries.
+
+
Post-Processing
+
+ Because an ElasticSearch result might be structured differently than the record which will be enriched by this service,
+ users can specify an additional set of mappings on this lookup service that map JsonPath operations to record paths. Example:
+
+
+ $.user.contact.email => /user/email_address
+
+
+ Would copy the field email from the embedded document contact into the record at that path.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
new file mode 100644
index 000000000000..b5c446895d89
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.groovy
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.SearchResponse
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+import static groovy.json.JsonOutput.prettyPrint
+import static groovy.json.JsonOutput.toJson
+
+class ElasticSearch5ClientService_IT {
+
+ private TestRunner runner
+ private ElasticSearchClientServiceImpl service
+
+ static final String INDEX = "messages"
+ static final String TYPE = "message"
+
+ @Before
+ void before() throws Exception {
+ runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
+ service = new ElasticSearchClientServiceImpl()
+ runner.addControllerService("Client Service", service)
+ runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400")
+ runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000")
+ runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
+ runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000")
+ try {
+ runner.enableControllerService(service)
+ } catch (Exception ex) {
+ ex.printStackTrace()
+ throw ex
+ }
+ }
+
+ @After
+ void after() throws Exception {
+ service.onDisabled()
+ }
+
+ @Test
+ void testBasicSearch() throws Exception {
+ String query = prettyPrint(toJson([
+ size: 10,
+ query: [
+ match_all: [:]
+ ],
+ aggs: [
+ term_counts: [
+ terms: [
+ field: "msg",
+ size: 5
+ ]
+ ]
+ ]
+ ]))
+
+
+ SearchResponse response = service.search(query, "messages", "message")
+ Assert.assertNotNull("Response was null", response)
+
+ Assert.assertEquals("Wrong count", 15, response.numberOfHits)
+ Assert.assertFalse("Timed out", response.isTimedOut())
+ Assert.assertNotNull("Hits was null", response.getHits())
+ Assert.assertEquals("Wrong number of hits", 10, response.hits.size())
+ Assert.assertNotNull("Aggregations are missing", response.aggregations)
+ Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size())
+
+ Map termCounts = response.aggregations.get("term_counts")
+ Assert.assertNotNull("Term counts was missing", termCounts)
+ def buckets = termCounts.get("buckets")
+ Assert.assertNotNull("Buckets branch was empty", buckets)
+ def expected = [
+ "one": 1,
+ "two": 2,
+ "three": 3,
+ "four": 4,
+ "five": 5
+ ]
+
+ buckets.each { aggRes ->
+ def key = aggRes["key"]
+ def docCount = aggRes["doc_count"]
+ Assert.assertEquals("${key} did not match.", expected[key], docCount)
+ }
+ }
+
+ @Test
+ void testDeleteByQuery() throws Exception {
+ String query = prettyPrint(toJson([
+ query: [
+ match: [
+ msg: "five"
+ ]
+ ]
+ ]))
+ DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE)
+ Assert.assertNotNull(response)
+ Assert.assertTrue(response.getTook() > 0)
+ }
+
+ @Test
+ void testDeleteById() throws Exception {
+ final String ID = "1"
+ DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID)
+ Assert.assertNotNull(response)
+ Assert.assertTrue(response.getTook() > 0)
+ def doc = service.get(INDEX, TYPE, ID)
+ Assert.assertNull(doc)
+ doc = service.get(INDEX, TYPE, "2")
+ Assert.assertNotNull(doc)
+ }
+
+ @Test
+ void testGet() throws IOException {
+ Map old
+ 1.upto(15) { index ->
+ String id = String.valueOf(index)
+ def doc = service.get(INDEX, TYPE, id)
+ Assert.assertNotNull("Doc was null", doc)
+ Assert.assertNotNull("${doc.toString()}\t${doc.keySet().toString()}", doc.get("msg"))
+ old = doc
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy
new file mode 100644
index 000000000000..27452a34ac30
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticSearchLookupService
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.record.MapRecord
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+class ElasticSearchLookupServiceTest {
+ ElasticSearchClientService mockClientService
+ ElasticSearchLookupService lookupService
+ TestRunner runner
+
+ @Before
+ void setup() throws Exception {
+ mockClientService = new TestElasticSearchClientService()
+ lookupService = new ElasticSearchLookupService()
+ def registry = new TestSchemaRegistry()
+ runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
+ runner.addControllerService("clientService", mockClientService)
+ runner.addControllerService("lookupService", lookupService)
+ runner.addControllerService("registry", registry)
+ runner.enableControllerService(mockClientService)
+ runner.enableControllerService(registry)
+ runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "clientService")
+ runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "users")
+ runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService")
+ runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService")
+ runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INFER_SCHEMA)
+ runner.enableControllerService(lookupService)
+ }
+
+ @Test
+ void simpleLookupTest() throws Exception {
+ def coordinates = ["_id": "12345" ]
+
+ Optional result = lookupService.lookup(coordinates)
+
+ Assert.assertNotNull(result)
+ Assert.assertTrue(result.isPresent())
+ MapRecord record = result.get()
+ Assert.assertEquals("john.smith", record.getAsString("username"))
+ Assert.assertEquals("testing1234", record.getAsString("password"))
+ Assert.assertEquals("john.smith@test.com", record.getAsString("email"))
+ Assert.assertEquals("Software Engineer", record.getAsString("position"))
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
new file mode 100644
index 000000000000..b2558364b4e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.ElasticSearchLookupService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.record.path.RecordPath
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.schemaregistry.services.SchemaRegistry
+import org.apache.nifi.serialization.record.MapRecord
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.serialization.record.RecordSchema
+import org.apache.nifi.serialization.record.type.RecordDataType
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+
+class ElasticSearchLookupService_IT {
+ private TestRunner runner
+ private ElasticSearchClientService service
+ private ElasticSearchLookupService lookupService
+
+ @Before
+ void before() throws Exception {
+ runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
+ service = new ElasticSearchClientServiceImpl()
+ lookupService = new ElasticSearchLookupService()
+ runner.addControllerService("Client Service", service)
+ runner.addControllerService("Lookup Service", lookupService)
+ runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400")
+ runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000")
+ runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
+ runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000")
+ runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service")
+ runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
+ runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")
+ runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "details")
+
+ try {
+ runner.enableControllerService(service)
+ runner.enableControllerService(lookupService)
+ } catch (Exception ex) {
+ ex.printStackTrace()
+ throw ex
+ }
+ }
+
+ @Test
+ void testValidity() throws Exception {
+ setDefaultSchema()
+ runner.assertValid()
+ }
+
+ private void setDefaultSchema() throws Exception {
+ runner.disableControllerService(lookupService)
+ SchemaRegistry registry = new TestSchemaRegistry()
+ runner.addControllerService("registry", registry)
+ runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
+ runner.enableControllerService(registry)
+ runner.enableControllerService(lookupService)
+ }
+
+ @Test
+ void lookupById() {
+ def coordinates = [ _id: "2" ]
+ Optional result = lookupService.lookup(coordinates)
+
+ Assert.assertNotNull(result)
+ Assert.assertTrue(result.isPresent())
+ def record = result.get()
+ Assert.assertEquals("jane.doe@company.com", record.getAsString("email"))
+ Assert.assertEquals("098-765-4321", record.getAsString("phone"))
+ Assert.assertEquals("GHIJK", record.getAsString("accessKey"))
+ }
+
+ @Test
+ void testInvalidIdScenarios() {
+ def coordinates = [
+ [
+ _id: 1
+ ],
+ [
+ _id: "1", "email": "john.smith@company.com"
+ ]
+ ]
+
+ coordinates.each { coordinate ->
+ def exception
+
+ try {
+ lookupService.lookup(coordinate)
+ } catch (Exception ex) {
+ exception = ex
+ }
+
+ Assert.assertNotNull(exception)
+ Assert.assertTrue(exception instanceof LookupFailureException)
+ }
+ }
+
+ @Test
+ void lookupByQuery() {
+ def coordinates = [ "phone": "098-765-4321", "email": "jane.doe@company.com" ]
+ Optional result = lookupService.lookup(coordinates)
+
+ Assert.assertNotNull(result)
+ Assert.assertTrue(result.isPresent())
+ def record = result.get()
+ Assert.assertEquals("jane.doe@company.com", record.getAsString("email"))
+ Assert.assertEquals("098-765-4321", record.getAsString("phone"))
+ Assert.assertEquals("GHIJK", record.getAsString("accessKey"))
+ }
+
+ @Test
+ void testNestedSchema() {
+ def coordinates = [
+ "subField.deeper.deepest.super_secret": "The sky is blue",
+ "subField.deeper.secretz": "Buongiorno, mondo!!",
+ "msg": "Hello, world"
+ ]
+
+ runner.disableControllerService(lookupService)
+ runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex")
+ runner.enableControllerService(lookupService)
+
+ Optional response = lookupService.lookup(coordinates)
+ Assert.assertNotNull(response)
+ Assert.assertTrue(response.isPresent())
+ def rec = response.get()
+ Assert.assertEquals("Hello, world", rec.getValue("msg"))
+ def subRec = getSubRecord(rec, "subField")
+ Assert.assertNotNull(subRec)
+ def deeper = getSubRecord(subRec, "deeper")
+ Assert.assertNotNull(deeper)
+ def deepest = getSubRecord(deeper, "deepest")
+ Assert.assertNotNull(deepest)
+ Assert.assertEquals("The sky is blue", deepest.getAsString("super_secret"))
+ }
+
+ @Test
+ void testDetectedSchema() throws LookupFailureException {
+ runner.disableControllerService(lookupService)
+ runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "complex")
+ runner.enableControllerService(lookupService)
+ def coordinates = ["_id": "1" ]
+
+ Optional response = lookupService.lookup(coordinates)
+ Assert.assertNotNull(response)
+ Record rec = response.get()
+ Record subRec = getSubRecord(rec, "subField")
+
+ def r2 = new MapRecord(rec.schema, [:])
+ def path = RecordPath.compile("/subField/longField")
+ def result = path.evaluate(r2)
+ result.selectedFields.findFirst().get().updateValue(1234567890L)
+
+ Assert.assertNotNull(rec)
+ Assert.assertNotNull(subRec)
+ Assert.assertEquals("Hello, world", rec.getValue("msg"))
+ Assert.assertNotNull(rec.getValue("subField"))
+ Assert.assertEquals(new Long(100000), subRec.getValue("longField"))
+ Assert.assertEquals("2018-04-10T12:18:05Z", subRec.getValue("dateField"))
+ }
+
+ Record getSubRecord(Record rec, String fieldName) {
+ RecordSchema schema = rec.schema
+ RecordSchema subSchema = ((RecordDataType)schema.getField(fieldName).get().dataType).childSchema
+ rec.getAsRecord(fieldName, subSchema)
+ }
+
+ @Test
+ void testMappings() {
+ runner.disableControllerService(lookupService)
+ runner.setProperty(lookupService, "\$.subField.longField", "/longField2")
+ runner.setProperty(lookupService, '$.subField.dateField', '/dateField2')
+ runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
+ runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex")
+ runner.enableControllerService(lookupService)
+
+ def coordinates = ["msg": "Hello, world"]
+ def result = lookupService.lookup(coordinates)
+ Assert.assertTrue(result.isPresent())
+ def rec = result.get()
+ ["dateField": "2018-08-14T10:08:00Z", "longField": 150000L].each { field ->
+ def value = rec.getValue(field.key)
+ Assert.assertEquals(field.value, value)
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.groovy
similarity index 51%
rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java
rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.groovy
index 674cc147b446..75e9dbd1e06c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.groovy
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -15,35 +15,37 @@
* limitations under the License.
*/
-package org.apache.nifi.elasticsearch.integration;
+package org.apache.nifi.elasticsearch.integration
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
+import org.apache.nifi.elasticsearch.ElasticSearchLookupService
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.exception.ProcessException
-import java.util.ArrayList;
-import java.util.List;
+class TestControllerServiceProcessor extends AbstractProcessor {
-public class TestControllerServiceProcessor extends AbstractProcessor {
-
- static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("Client Service")
.description("ElasticSearchClientServiceImpl")
.identifiesControllerService(ElasticSearchClientServiceImpl.class)
.required(true)
- .build();
+ .build()
+ public static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
+ .name("Lookup Service")
+ .description("ElasticSearchClientServiceImpl")
+ .identifiesControllerService(ElasticSearchLookupService.class)
+ .required(false)
+ .build()
@Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List getSupportedPropertyDescriptors() {
- List propDescs = new ArrayList<>();
- propDescs.add(CLIENT_SERVICE);
- return propDescs;
+ [ CLIENT_SERVICE, LOOKUP_SERVICE ]
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
new file mode 100644
index 000000000000..3b5fc0a15a8a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.elasticsearch.DeleteOperationResponse
+import org.apache.nifi.elasticsearch.ElasticSearchClientService
+import org.apache.nifi.elasticsearch.IndexOperationRequest
+import org.apache.nifi.elasticsearch.IndexOperationResponse
+import org.apache.nifi.elasticsearch.SearchResponse
+
+class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
+ @Override
+ IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
+ return null
+ }
+
+ @Override
+ IndexOperationResponse add(List operations) throws IOException {
+ return null
+ }
+
+ @Override
+ DeleteOperationResponse deleteById(String index, String type, String id) throws IOException {
+ return null
+ }
+
+ @Override
+ DeleteOperationResponse deleteById(String index, String type, List ids) throws IOException {
+ return null
+ }
+
+ @Override
+ DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException {
+ return null
+ }
+
+ @Override
+ Map get(String index, String type, String id) throws IOException {
+ return null
+ }
+
+ @Override
+ SearchResponse search(String query, String index, String type) throws IOException {
+ List hits = [[
+ "_source": [
+ "username": "john.smith",
+ "password": "testing1234",
+ "email": "john.smith@test.com",
+ "position": "Software Engineer"
+ ]
+ ]]
+ return new SearchResponse(hits, null, 1, 100, false)
+ }
+
+ @Override
+ String getTransitUrl(String index, String type) {
+ return ""
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestSchemaRegistry.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestSchemaRegistry.groovy
new file mode 100644
index 000000000000..2a0bd8eca5b3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestSchemaRegistry.groovy
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch.integration
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.schema.access.SchemaField
+import org.apache.nifi.schemaregistry.services.SchemaRegistry
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.RecordField
+import org.apache.nifi.serialization.record.RecordFieldType
+import org.apache.nifi.serialization.record.RecordSchema
+import org.apache.nifi.serialization.record.SchemaIdentifier
+
+class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
+ @Override
+ RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
+ new SimpleRecordSchema([
+ new RecordField("msg", RecordFieldType.STRING.dataType)
+ ])
+ }
+
+ @Override
+ Set getSuppliedSchemaFields() {
+ [ SchemaField.SCHEMA_NAME ]
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/.gitignore b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/.gitignore
new file mode 100644
index 000000000000..00aca0c29684
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/.gitignore
@@ -0,0 +1 @@
+# This is a placeholder to force Maven to compile the groovy code.
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
deleted file mode 100644
index 687faf03b67d..000000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.elasticsearch.integration;
-
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
-import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
-import org.apache.nifi.elasticsearch.IndexOperationRequest;
-import org.apache.nifi.elasticsearch.SearchResponse;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticSearchClientService_IT {
-
- private TestRunner runner;
- private ElasticSearchClientServiceImpl service;
-
- static final String INDEX = "messages";
- static final String TYPE = "message";
-
- @Before
- public void before() throws Exception {
- runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
- service = new ElasticSearchClientServiceImpl();
- runner.addControllerService("Client Service", service);
- runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400");
- runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
- runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
- runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000");
- try {
- runner.enableControllerService(service);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- }
-
- Map expected = new HashMap<>();
- expected.put("one", 1);
- expected.put("two", 2);
- expected.put("three", 3);
- expected.put("four", 4);
- expected.put("five", 5);
-
-
- int index = 1;
- List docs = new ArrayList<>();
- for (Map.Entry entry : expected.entrySet()) {
- for (int idx = 0; idx < entry.getValue(); idx++) {
- Map fields = new HashMap<>();
- fields.put("msg", entry.getKey());
- IndexOperationRequest ior = new IndexOperationRequest(INDEX, TYPE, String.valueOf(index++), fields);
- docs.add(ior);
- }
- }
- service.add(docs);
- }
-
- @After
- public void after() throws Exception {
- service.onDisabled();
- }
-
- @Test
- public void testBasicSearch() throws Exception {
- String query = "{\n" +
- "\t\"size\": 10,\n" +
- "\t\"query\": {\n" +
- "\t\t\"match_all\": {}\n" +
- "\t},\n" +
- "\t\"aggs\": {\n" +
- "\t\t\"term_counts\": {\n" +
- "\t\t\t\"terms\": {\n" +
- "\t\t\t\t\"field\": \"msg.keyword\",\n" +
- "\t\t\t\t\"size\": 5\n" +
- "\t\t\t}\n" +
- "\t\t}\n" +
- "\t}\n" +
- "}";
- SearchResponse response = service.search(query, INDEX, TYPE);
- Assert.assertNotNull("Response was null", response);
-
- Assert.assertEquals("Wrong count", 15, response.getNumberOfHits());
- Assert.assertFalse("Timed out", response.isTimedOut());
- Assert.assertNotNull("Hits was null", response.getHits());
- Assert.assertEquals("Wrong number of hits", 10, response.getHits().size());
- Assert.assertNotNull("Aggregations are missing", response.getAggregations());
- Assert.assertEquals("Aggregation count is wrong", 1, response.getAggregations().size());
-
- Map termCounts = (Map) response.getAggregations().get("term_counts");
- Assert.assertNotNull("Term counts was missing", termCounts);
- List