Skip to content

Commit

Permalink
Merge branch 'master' into allow-set-section-in-setup
Browse files Browse the repository at this point in the history
* master:
  Revert "ingest: processor stats (elastic#34202)"
  • Loading branch information
jasontedor committed Oct 21, 2018
2 parents 2c15af8 + 0577703 commit f4b68da
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 652 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
Expand All @@ -43,33 +40,16 @@ public class CompoundProcessor implements Processor {
private final boolean ignoreFailure;
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
private final LongSupplier relativeTimeProvider;

CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
}

public CompoundProcessor(Processor... processor) {
this(false, Arrays.asList(processor), Collections.emptyList());
}

public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
}
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
LongSupplier relativeTimeProvider) {
super();
this.ignoreFailure = ignoreFailure;
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.processorsWithMetrics = new ArrayList<>(processors.size());
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
}

List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
return processorsWithMetrics;
}

public boolean isIgnoreFailure() {
Expand Down Expand Up @@ -114,17 +94,12 @@ public String getTag() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
long startTimeInNanos = relativeTimeProvider.getAsLong();
for (Processor processor : processors) {
try {
metric.preIngest();
if (processor.execute(ingestDocument) == null) {
return null;
}
} catch (Exception e) {
metric.ingestFailed();
if (ignoreFailure) {
continue;
}
Expand All @@ -137,15 +112,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
executeOnFailure(ingestDocument, compoundProcessorException);
break;
}
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
}
return ingestDocument;
}


void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.elasticsearch.script.IngestConditionalScript;
import org.elasticsearch.script.Script;
Expand All @@ -44,51 +42,24 @@ public class ConditionalProcessor extends AbstractProcessor {
private final ScriptService scriptService;

private final Processor processor;
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;

ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
this(tag, script, scriptService, processor, System::nanoTime);
}

ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
super(tag);
this.condition = script;
this.scriptService = scriptService;
this.processor = processor;
this.metric = new IngestMetric();
this.relativeTimeProvider = relativeTimeProvider;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
// Only record metric if the script evaluates to true
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
return processor.execute(ingestDocument);
} catch (Exception e) {
metric.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
}
return processor.execute(ingestDocument);
}
return ingestDocument;
}

Processor getProcessor() {
return processor;
}

IngestMetric getMetric() {
return metric;
}

@Override
public String getType() {
return TYPE;
Expand Down
112 changes: 16 additions & 96 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@

package org.elasticsearch.ingest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
Expand All @@ -36,7 +49,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -49,19 +61,6 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Holder class for several ingest related services.
*/
Expand Down Expand Up @@ -263,59 +262,11 @@ public void applyClusterState(final ClusterChangedEvent event) {
Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
//consistent id's per processor and/or semantic equals for each processor will be needed.
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
String type = compositeMetric.v1().getType();
IngestMetric metric = compositeMetric.v2();
if (oldMetricsIterator.hasNext()) {
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
String oldType = oldCompositeMetric.v1().getType();
IngestMetric oldMetric = oldCompositeMetric.v2();
if (type.equals(oldType)) {
metric.add(oldMetric);
}
}
}
}
}
});
}
}

/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
*/
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorMetrics) {
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
if (processor instanceof CompoundProcessor) {
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
} else {
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor) {
metric = ((ConditionalProcessor) processor).getMetric();
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
return processorMetrics;
}

private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
Expand Down Expand Up @@ -420,42 +371,11 @@ protected void doRun() {
}

public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> {
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
});
});
return statsBuilder.build();
}

//package private for testing
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
Map<String, IngestStats.Stats> statsPerPipeline =
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));

if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
sb.append(":");
sb.append(pipelineName);
}
String tag = processor.getTag();
if(tag != null && !tag.isEmpty()){
sb.append(":");
sb.append(tag);
}
return sb.toString();
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
}

private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
Expand Down
Loading

0 comments on commit f4b68da

Please sign in to comment.