Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-5051 Created ElasticSearch lookup service. #2615

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
37d3aeb
NIFI-5051 Created ElasticSearch lookup service.
MikeThomsen Apr 7, 2018
d071fab
NIFI-5051 Fixed checkstyle issue.
MikeThomsen Apr 10, 2018
8c74b9b
NIFI-5051 Converted ES lookup service to use a SchemaRegistry.
MikeThomsen Apr 25, 2018
cb98bec
NIFI-5051 Cleaned up POM and added a simple unit test that uses a moc…
MikeThomsen Apr 27, 2018
0214461
NIFI-5051 Added change; waiting for feedback.
MikeThomsen Apr 28, 2018
17d3d9e
NIFI-5051 Changed query setup based on code review. Changed tests to …
MikeThomsen May 22, 2018
01aca62
NIFI-5051 fixed a checkstyle issue.
MikeThomsen May 22, 2018
b741d53
NIFI-5051 Rebased to cleanup merge issues
MikeThomsen May 24, 2018
1a3db2f
NIFI-5051 Added changes from a code review.
MikeThomsen May 31, 2018
cbd4886
NIFI-5051 Fixed a checkstyle issue.
MikeThomsen May 31, 2018
b07337b
NIFI-5051 Added coverage generator for tests.
MikeThomsen May 31, 2018
f4f6dc0
Rebased.
MikeThomsen Jun 15, 2018
b86e80e
NIFI-5051 Updated service and switched it over to JsonInferenceSchema…
MikeThomsen Aug 13, 2018
54be2be
NIFI-5051 Removed dead code.
MikeThomsen Aug 13, 2018
19ef6e2
NIFI-5051 Fixed checkstyle errors.
MikeThomsen Aug 14, 2018
fd26466
NIFI-5051 Refactored query builder.
MikeThomsen Aug 14, 2018
d8951cb
NIFI-5051 Added placeholder gitignore to force test compile.
MikeThomsen Aug 14, 2018
5556677
NIFI-5051 Added note explaining why the .gitignore file was needed.
MikeThomsen Aug 15, 2018
72c833c
NIFI-5051 Made constructor public.
MikeThomsen Sep 14, 2018
3027d31
NIFI-5051 Fixed path issue in client service integration tests.
MikeThomsen Sep 16, 2018
e0f2db6
NIFI-5051 Added additional mapping capabilities to let users massage …
MikeThomsen Sep 17, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -133,6 +139,33 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.8.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.8.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -150,6 +183,8 @@
<httpPort>9400</httpPort>
<version>5.6.2</version>
<timeout>90</timeout>
<logLevel>ERROR</logLevel>
<pathInitScript>${project.basedir}/src/test/resources/setup.script</pathInitScript>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("el-rest-client-service")
.displayName("Client Service")
.description("An ElasticSearch client service to use for running queries.")
.identifiesControllerService(ElasticSearchClientService.class)
.required(true)
.build();
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("el-lookup-index")
.displayName("Index")
.description("The name of the index to read from")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the lookup is performed on an incoming flow file, is there any reason the Index, Type, etc. properties couldn't support attributes coming from the flow file? If it is this way because the ES Client Service CS can't use them, perhaps we should write up a separate improvement Jira to do something like NIFI-5121.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could definitely see some value to that, but since this is a LookupService implementation, we should discuss it in that context. NIFI-5121 only describes one particular interface, and LookupService is more expansive in use.

.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("el-lookup-type")
.displayName("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

private ElasticSearchClientService clientService;

private String index;
private String type;
private ObjectMapper mapper;

private final List<PropertyDescriptor> DESCRIPTORS;

public ElasticSearchLookupService() {
List<PropertyDescriptor> _desc = new ArrayList<>();
_desc.addAll(super.getSupportedPropertyDescriptors());
_desc.add(CLIENT_SERVICE);
_desc.add(INDEX);
_desc.add(TYPE);
DESCRIPTORS = Collections.unmodifiableList(_desc);
}

private volatile ConcurrentHashMap<String, RecordPath> mappings;

@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
mapper = new ObjectMapper();

List<PropertyDescriptor> dynamic = context.getProperties().entrySet().stream()
.filter( e -> e.getKey().isDynamic())
.map(e -> e.getKey())
.collect(Collectors.toList());

Map<String, RecordPath> _temp = new HashMap<>();
for (PropertyDescriptor desc : dynamic) {
String value = context.getProperty(desc).getValue();
String name = desc.getName();
_temp.put(name, RecordPath.compile(value));
}

mappings = new ConcurrentHashMap<>(_temp);

super.onEnabled(context);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}

@Override
public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String name) {
return new PropertyDescriptor.Builder()
.name(name)
.addValidator((subject, input, context) -> {
ValidationResult.Builder builder = new ValidationResult.Builder();
try {
JsonPath.parse(input);
builder.valid(true);
} catch (Exception ex) {
builder.explanation(ex.getMessage())
.valid(false)
.subject(subject);
}

return builder.build();
})
.dynamic(true)
.build();
}

@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
Map<String, String> context = coordinates.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().toString()
));
return lookup(coordinates, context);
}

@Override
public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
validateCoordinates(coordinates);

try {
Record record;
if (coordinates.containsKey("_id")) {
record = getById((String)coordinates.get("_id"), context);
} else {
record = getByQuery(coordinates, context);
}

return record == null ? Optional.empty() : Optional.of(record);
} catch (Exception ex) {
getLogger().error("Error during lookup.", ex);
throw new LookupFailureException(ex);
}
}

private void validateCoordinates(Map coordinates) throws LookupFailureException {
List<String> reasons = new ArrayList<>();

if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
reasons.add("_id was supplied, but it was not a String.");
}

if (coordinates.containsKey("_id") && coordinates.size() > 1) {
reasons.add("When _id is used, it can be the only key used in the lookup.");
}

if (reasons.size() > 0) {
String error = String.join("\n", reasons);
throw new LookupFailureException(error);
}
}

private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
Map<String, Object> query = new HashMap<String, Object>(){{
put("query", new HashMap<String, Object>() {{
put("match", new HashMap<String, String>(){{
put("_id", _id);
}});
}});
}};

String json = mapper.writeValueAsString(query);

SearchResponse response = clientService.search(json, index, type);

if (response.getNumberOfHits() > 1) {
throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
response.getNumberOfHits(), json));
} else if (response.getNumberOfHits() == 0) {
return null;
}

final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");

RecordSchema toUse = getSchema(context, source, null);

Record record = new MapRecord(toUse, source);

if (mappings.size() > 0) {
record = applyMappings(record, source);
}

return record;
}

Map<String, Object> getNested(String key, Object value) {
String path = key.substring(0, key.lastIndexOf("."));

return new HashMap<String, Object>(){{
put("path", path);
put("query", new HashMap<String, Object>(){{
put("match", new HashMap<String, Object>(){{
put(key, value);
}});
}});
}};
}

private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
Map<String, Object> query = new HashMap<String, Object>(){{
put("bool", new HashMap<String, Object>(){{
put("must", coordinates.entrySet().stream()
.map(e -> new HashMap<String, Object>(){{
if (e.getKey().contains(".")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run the unit and integration tests and the code LGTM, but I'd feel better if I could get an example going where I do the lookup on a field that's not at the top level. I have a document containing a "user" field, which contains other fields such as "name", and "name" contains other fields like "first" and "last". I tried using this with a simple CSV input containing an id and a first name, and tried to use the lookup service to match "user.name.first" and return the value of "user.name.last", but got an error saying I was trying to do a nested query on a field that wasn't nested. I didn't add an explicit mapping for the index, just put the complex JSON docs into ES. Am I configuring it wrong, or is this not supported, or could there be a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into that. Should be able to get something resolved this weekend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but got an error saying I was trying to do a nested query on a field that wasn't nested.

I think you are. ES can be weird about detecting nested documents. I've only had consistent good results when explicitly defining them. I'll try to set up a test example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I got it working and will share some artifacts tomorrow if I get a chance so you can watch them in action. I'm thinking some of the behavior still needs a second opinion on the flexibility/user-friendliness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still getting the same error (nested object under path [user.name] is not of nested type) on my flow. I tried yours but I don't have any documents/mappings in ES (such as a doc with "subfield.longfield), can you share an example doc I can put in there? I have my own ES so I didn't start up the Docker Compose stuff you attached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below. It has a sample flow, the commands for Kibana and a docker compose file.

put("nested", getNested(e.getKey(), e.getValue()));
} else {
put("match", new HashMap<String, Object>() {{
put(e.getKey(), e.getValue());
}});
}
}}).collect(Collectors.toList())
);
}});
}};

Map<String, Object> outter = new HashMap<String, Object>(){{
put("size", 1);
put("query", query);
}};

return outter;
}

private Record getByQuery(final Map<String, Object> query, Map<String, String> context) throws LookupFailureException {
try {
final String json = mapper.writeValueAsString(buildQuery(query));

SearchResponse response = clientService.search(json, index, type);

if (response.getNumberOfHits() == 0) {
return null;
} else {
final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);

if (mappings.size() > 0) {
record = applyMappings(record, source);
}

return record;
}

} catch (Exception e) {
throw new LookupFailureException(e);
}
}

private Record applyMappings(Record record, Map<String, Object> source) {
Record _rec = new MapRecord(record.getSchema(), new HashMap<>());

mappings.entrySet().forEach(entry -> {
try {
Object o = JsonPath.read(source, entry.getKey());
RecordPath path = entry.getValue();
Optional<FieldValue> first = path.evaluate(_rec).getSelectedFields().findFirst();
if (first.isPresent()) {
first.get().updateValue(o);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});

return _rec;
}

@Override
public Class<?> getValueType() {
return Record.class;
}

@Override
public Set<String> getRequiredKeys() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.elasticsearch.ElasticSearchLookupService
org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
Loading