Skip to content

Commit

Permalink
NIFI-5051 Updated service and switched it over to JsonInferenceSchema…
Browse files Browse the repository at this point in the history
…RegistryService.
  • Loading branch information
MikeThomsen committed Aug 13, 2018
1 parent f4f6dc0 commit b86e80e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.7.0-SNAPSHOT</version>
<version>1.8.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -159,51 +159,11 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<!-- Only run for tests -->
<execution>
<id>groovy-tests</id>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<compilerId>groovy-eclipse-compiler</compilerId>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-eclipse-compiler</artifactId>
<version>2.9.2-01</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-eclipse-batch</artifactId>
<version>2.4.3-01</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/groovy</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
Expand All @@ -218,31 +178,6 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.1</version>
<configuration>
<destFile>${basedir}/target/coverage-reports/jacoco-unit.exec</destFile>
<dataFile>${basedir}/target/coverage-reports/jacoco-unit.exec</dataFile>
</configuration>
<executions>
<execution>
<id>jacoco-initialize</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>jacoco-site</id>
<phase>package</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
Expand All @@ -46,7 +46,7 @@
import java.util.Set;
import java.util.stream.Collectors;

public class ElasticSearchLookupService extends SchemaRegistryService implements LookupService<Record> {
public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("el-rest-client-service")
.displayName("Client Service")
Expand Down Expand Up @@ -78,28 +78,41 @@ public class ElasticSearchLookupService extends SchemaRegistryService implements
private String type;
private ObjectMapper mapper;

private final List<PropertyDescriptor> DESCRIPTORS;

ElasticSearchLookupService() {
List<PropertyDescriptor> _desc = new ArrayList<>();
_desc.addAll(super.getSupportedPropertyDescriptors());
_desc.add(CLIENT_SERVICE);
_desc.add(INDEX);
_desc.add(TYPE);
DESCRIPTORS = Collections.unmodifiableList(_desc);
}

@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
public void onEnabled(final ConfigurationContext context) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
mapper = new ObjectMapper();

super.onEnabled(context);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> _desc = new ArrayList<>();
_desc.addAll(super.getSupportedPropertyDescriptors());
_desc.add(CLIENT_SERVICE);
_desc.add(INDEX);
_desc.add(TYPE);

return Collections.unmodifiableList(_desc);
return DESCRIPTORS;
}

@Override
public Optional<Record> lookup(Map coordinates) throws LookupFailureException {
return Optional.empty();
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
Map<String, String> context = coordinates.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().toString()
));
return lookup(coordinates, context);
}

@Override
Expand All @@ -108,11 +121,10 @@ public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, Stri

try {
Record record;
RecordSchema schema = getSchema(context, null);
if (coordinates.containsKey("_id")) {
record = getById((String)coordinates.get("_id"), schema);
record = getById((String)coordinates.get("_id"), context);
} else {
record = getByQuery(coordinates, schema);
record = getByQuery(coordinates, context);
}

return record == null ? Optional.empty() : Optional.of(record);
Expand Down Expand Up @@ -155,7 +167,7 @@ private void validateCoordinates(Map coordinates) throws LookupFailureException
}
}

private Record getById(final String _id, RecordSchema recordSchema) throws IOException, LookupFailureException {
private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
Map<String, Object> query = new HashMap<String, Object>(){{
put("query", new HashMap<String, Object>() {{
put("match", new HashMap<String, String>(){{
Expand All @@ -177,7 +189,7 @@ private Record getById(final String _id, RecordSchema recordSchema) throws IOExc

final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");

RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source);
RecordSchema toUse = getSchema(context, source, null);

return new MapRecord(toUse, source);
}
Expand All @@ -203,7 +215,7 @@ private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
return outter;
}

private Record getByQuery(final Map<String, Object> query, RecordSchema recordSchema) throws LookupFailureException {
private Record getByQuery(final Map<String, Object> query, Map<String, String> context) throws LookupFailureException {
try {
final String json = mapper.writeValueAsString(buildQuery(query));

Expand All @@ -213,11 +225,11 @@ private Record getByQuery(final Map<String, Object> query, RecordSchema recordSc
return null;
} else {
final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
RecordSchema toUse = recordSchema != null ? recordSchema : convertSchema(source);
RecordSchema toUse = getSchema(context, source, null);
return new MapRecord(toUse, source);
}

} catch (IOException e) {
} catch (Exception e) {
throw new LookupFailureException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.junit.Assert
import org.junit.Before
import org.junit.Test

import static groovy.json.JsonOutput.*
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson

class ElasticSearch5ClientService_IT {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.nifi.elasticsearch.integration

import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticSearchLookupService
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
Expand All @@ -35,14 +36,19 @@ class ElasticSearchLookupServiceTest {
void setup() throws Exception {
mockClientService = new TestElasticSearchClientService()
lookupService = new ElasticSearchLookupService()
def registry = new TestSchemaRegistry()
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
runner.addControllerService("clientService", mockClientService)
runner.addControllerService("lookupService", lookupService)
runner.addControllerService("registry", registry)
runner.enableControllerService(mockClientService)
runner.enableControllerService(registry)
runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "clientService")
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "users")
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService")
runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService")
runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INFER_SCHEMA)
runner.enableControllerService(lookupService)
}

Expand Down

0 comments on commit b86e80e

Please sign in to comment.