Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
#23: Implemented GetInfluxDatabase_2
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Jul 23, 2019
1 parent a2595cf commit 585a9da
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase;

import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
Expand All @@ -41,7 +42,7 @@
@WritesAttributes({
@WritesAttribute(attribute = GetInfluxDatabase_2.INFLUXDB_ORG_NAME, description = "The organization where the results came from."),
})

@EventDriven
public class GetInfluxDatabase_2 extends AbstractGetInfluxDatabase {

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class GetInfluxDatabase_2 extends AbstractGetInfluxDatabase {
propertyDescriptors.add(DIALECT_COMMENT_PREFIX);
propertyDescriptors.add(DIALECT_DATE_TIME_FORMAT);

propertyDescriptors.add(RECORDS_PER_FLOWFILE);
propertyDescriptors.add(ENABLE_GZIP);
propertyDescriptors.add(LOG_LEVEL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,33 @@
*/
package org.influxdata.nifi.processors.internal;

import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;

import org.influxdata.Cancellable;
import org.influxdata.client.domain.Dialect;
import org.influxdata.client.domain.Query;
import org.influxdata.exceptions.InfluxException;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_FAIL_TO_QUERY;

/**
* @author Jakub Bednar (bednar@github) (19/07/2019 10:23)
Expand Down Expand Up @@ -118,17 +132,206 @@ public abstract class AbstractGetInfluxDatabase extends AbstractInfluxDatabasePr
.allowableValues("RFC3339", "RFC3339Nano")
.build();

protected static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
public static final PropertyDescriptor RECORDS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("influxdb-records-per-flowfile")
.displayName("Results Per FlowFile")
.description("How many records to put into a FlowFile at once. The whole body will be treated as a CSV file.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successful Flux queries are routed to this relationship").build();

protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed Flux queries are routed to this relationship").build();

protected static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("Failed queries that are retryable exception are routed to this relationship").build();

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {


FlowFile flowFile;
// If there are incoming connections, prepare query params from flow file
if (context.hasIncomingConnection()) {
FlowFile incomingFlowFile = session.get();

if (incomingFlowFile == null && context.hasNonLoopConnection()) {
return;
}

flowFile = incomingFlowFile;

} else {
flowFile = session.create();
}


String org = context.getProperty(ORG).evaluateAttributeExpressions(flowFile).getValue();
String flux = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();

boolean dialectHeader = context.getProperty(DIALECT_HEADER).asBoolean();
String dialectDelimiter = context.getProperty(DIALECT_DELIMITER).getValue();
String dialectCommentPrefix = context.getProperty(DIALECT_COMMENT_PREFIX).getValue();
Dialect.DateTimeFormatEnum dialectTimeFormat = Dialect.DateTimeFormatEnum
.fromValue(context.getProperty(DIALECT_DATE_TIME_FORMAT).getValue());

List<Dialect.AnnotationsEnum> dialectAnnotations = new ArrayList<>();
String dialectAnnotationsValue = context.getProperty(DIALECT_ANNOTATIONS).getValue();
if (dialectAnnotationsValue != null && !dialectAnnotationsValue.isEmpty()) {
Arrays.stream(dialectAnnotationsValue.split(","))
.filter(annotation -> annotation != null && !annotation.trim().isEmpty())
.map(String::trim)
.forEach(annotation -> dialectAnnotations.add(Dialect.AnnotationsEnum.fromValue(annotation.trim())));
}


long sizePerBatch = -1;
if (context.getProperty(RECORDS_PER_FLOWFILE).isSet()) {
sizePerBatch = context.getProperty(RECORDS_PER_FLOWFILE).evaluateAttributeExpressions().asLong();
}


try {

Dialect dialect = new Dialect()
.header(dialectHeader)
.delimiter(dialectDelimiter)
.commentPrefix(dialectCommentPrefix)
.dateTimeFormat(dialectTimeFormat)
.annotations(dialectAnnotations);

Query query = new Query()
.query(flux)
.dialect(dialect);

new QueryProcessor(org, query, sizePerBatch, flowFile, context, session).doQueryRaw();

} catch (Exception e) {
catchException(e, Collections.singletonList(flowFile), context, session);
}
}

private void catchException(final Throwable e,
final List<FlowFile> flowFiles,
final ProcessContext context,
final ProcessSession session) {

String message = INFLUX_DB_FAIL_TO_QUERY;
Object status = e.getClass().getSimpleName();
Relationship relationship = REL_FAILURE;

if (e instanceof InfluxException) {

InfluxException ie = (InfluxException) e;
status = ie.status();

// Retryable
if (Arrays.asList(429, 503).contains(ie.status()) || ie.getCause() instanceof SocketTimeoutException) {
message += " ... retry";
relationship = REL_RETRY;
}
}

for (FlowFile flowFile : flowFiles) {

if (REL_RETRY.equals(relationship)) {
session.penalize(flowFile);
}

session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, e.getMessage());
session.transfer(flowFile, relationship);
}

getLogger().error(message, new Object[]{status, e.getLocalizedMessage()}, e);
context.yield();
}

private class QueryProcessor {
private FlowFile flowFile;
private final List<FlowFile> flowFiles = new ArrayList<>();
private final String org;
private final long sizePerBatch;
private final Query query;
private final ProcessContext context;
private final ProcessSession session;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final StopWatch stopWatch = new StopWatch();

private long recordIndex = 0;

private QueryProcessor(final String org,
final Query query,
final long sizePerBatch,
final FlowFile flowFile,
final ProcessContext context,
final ProcessSession session) {
this.flowFiles.add(flowFile);
if (sizePerBatch == -1) {
this.flowFile = flowFile;
} else {
this.flowFile = session.create();
this.flowFiles.add(this.flowFile);
}
this.org = org;
this.sizePerBatch = sizePerBatch;
this.query = query;
this.context = context;
this.session = session;

this.stopWatch.start();
}

void doQueryRaw() {
try {

getInfluxDBClient(context)
.getQueryApi()
.queryRaw(query, org, this::onResponse, this::onError, this::onComplete);

} catch (Exception e) {
catchException(e, flowFiles, context, session);
}
}

private void onResponse(Cancellable cancellable, String record) {

recordIndex++;
if (sizePerBatch != -1 && recordIndex > sizePerBatch) {
flowFile = session.create();
flowFiles.add(flowFile);
recordIndex = 1;
}

session.append(flowFile, out -> {
if (recordIndex > 1) {
out.write('\n');
}
out.write(record.getBytes());
});
}

private void onError(Throwable throwable) {
countDownLatch.countDown();
stopWatch.stop();

catchException(throwable, flowFiles, context, session);
}

private void onComplete() {
countDownLatch.countDown();
stopWatch.stop();

session.transfer(flowFiles, REL_SUCCESS);
for (FlowFile flowFile : flowFiles) {
session.getProvenanceReporter()
.send(flowFile, influxDatabaseService.getDatabaseURL(), stopWatch.getElapsed(MILLISECONDS));
}

getLogger().debug("Query {} fetched in {}", new Object[]{query, stopWatch.getDuration()});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public abstract class AbstractInfluxDatabaseProcessor extends AbstractProcessor
public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message";
public static final String INFLUX_DB_ERROR_MESSAGE_LOG = "Failed procession flow file {} due to {}";
public static final String INFLUX_DB_FAIL_TO_INSERT = "Failed to insert into influxDB due to {}";
public static final String INFLUX_DB_FAIL_TO_QUERY = "Failed to execute Flux query due {} to {}";

protected AtomicReference<InfluxDB> influxDB = new AtomicReference<>();
protected long maxRecordsSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ public void onScheduled(@NonNull final ProcessContext context) {
Objects.requireNonNull(context, "ProcessContext is required");

influxDatabaseService = context.getProperty(INFLUX_DB_SERVICE).asControllerService(InfluxDatabaseService_2.class);
maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();

if (getSupportedPropertyDescriptors().contains(MAX_RECORDS_SIZE)) {
maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
}
}

@OnStopped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.influxdata.Cancellable;
import org.influxdata.client.InfluxDBClient;
import org.influxdata.client.QueryApi;
import org.influxdata.client.domain.Query;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;

import com.google.common.collect.Lists;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import static org.influxdata.nifi.services.InfluxDatabaseService_2.INFLUX_DB_ACCESS_TOKEN;

Expand All @@ -37,13 +48,47 @@
abstract class AbstractTestGetInfluxDatabaseSettings_2 {

TestRunner runner;
Answer queryAnswer = invocation -> Void.class;
Exception queryOnErrorValue = null;
List<String> queryOnResponseRecords = Lists.newArrayList();
MockComponentLog logger;
QueryApi mockQueryApi;
GetInfluxDatabase_2 processor;

@Before
public void before() throws IOException, GeneralSecurityException, InitializationException {

InfluxDBClient mockInfluxDBClient = Mockito.mock(InfluxDBClient.class);
mockQueryApi = Mockito.mock(QueryApi.class);
Mockito.doAnswer(invocation -> mockQueryApi).when(mockInfluxDBClient).getQueryApi();
Mockito.doAnswer(invocation -> {
if (queryOnErrorValue != null) {
//noinspection unchecked
Consumer<Exception> onError = invocation.getArgumentAt(3, Consumer.class);
onError.accept(queryOnErrorValue);
}

GetInfluxDatabase_2 processor = Mockito.spy(new GetInfluxDatabase_2());
queryOnResponseRecords.forEach(record -> {
//noinspection unchecked
BiConsumer<Cancellable, String> onRecord = invocation.getArgumentAt(2, BiConsumer.class);
onRecord.accept(Mockito.mock(Cancellable.class), record);
});

boolean wasException = queryOnErrorValue != null;
try {
return queryAnswer.answer(invocation);
} catch (Exception e){
wasException = true;
throw e;
} finally {
if (!wasException) {
Runnable onComplete = invocation.getArgumentAt(4, Runnable.class);
onComplete.run();
}
}
}).when(mockQueryApi).queryRaw(Mockito.any(Query.class), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.any());

processor = Mockito.spy(new GetInfluxDatabase_2());

runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetInfluxDatabase_2.ORG, "my-org");
Expand All @@ -56,5 +101,11 @@ public void before() throws IOException, GeneralSecurityException, Initializatio
runner.addControllerService("influxdb-service", influxDatabaseService);
runner.setProperty(influxDatabaseService, INFLUX_DB_ACCESS_TOKEN, "my-token");
runner.enableControllerService(influxDatabaseService);

MockProcessContext context = new MockProcessContext(processor);
MockProcessorInitializationContext initContext = new MockProcessorInitializationContext(processor, context);
logger = initContext.getLogger();
processor.initialize(initContext);
processor.onScheduled(runner.getProcessContext());
}
}
Loading

0 comments on commit 585a9da

Please sign in to comment.