diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java
new file mode 100644
index 0000000000000..4e809aa5b444e
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest;
+
+import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.metrics.MeanMetric;
+
+/**
+ *
Metrics to measure ingest actions.
+ *
This counts measure documents and timings for a given scope.
+ * The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline,
+ * or you can use this class to count documents for a given pipeline or a specific processor.
+ * This class does not make assumptions about it's given scope.
+ */
+class IngestMetric {
+
+ /**
+ * The time it takes to complete the measured item.
+ */
+ private final MeanMetric ingestTime = new MeanMetric();
+ /**
+ * The current count of things being measure. Should most likely ever be 0 or 1.
+ * Useful when aggregating multiple metrics to see how many things are in flight.
+ */
+ private final CounterMetric ingestCurrent = new CounterMetric();
+ /**
+ * The ever increasing count of things being measured
+ */
+ private final CounterMetric ingestCount = new CounterMetric();
+ /**
+ * The only increasing count of failures
+ */
+ private final CounterMetric ingestFailed = new CounterMetric();
+
+ /**
+ * Call this prior to the ingest action.
+ */
+ void preIngest() {
+ ingestCurrent.inc();
+ }
+
+ /**
+ * Call this after the performing the ingest action, even if the action failed.
+ * @param ingestTimeInMillis The time it took to perform the action.
+ */
+ void postIngest(long ingestTimeInMillis) {
+ ingestCurrent.dec();
+ ingestTime.inc(ingestTimeInMillis);
+ ingestCount.inc();
+ }
+
+ /**
+ * Call this if the ingest action failed.
+ */
+ void ingestFailed() {
+ ingestFailed.inc();
+ }
+
+ /**
+ *
Add two sets of metrics together.
+ *
Note - this method does not add the current count values.
+ * The current count value is ephemeral and requires a increase/decrease operation pairs to keep the value correct.
+ *
+ * @param metrics The metric to add.
+ */
+ void add(IngestMetric metrics) {
+ ingestCount.inc(metrics.ingestCount.count());
+ ingestTime.inc(metrics.ingestTime.sum());
+ ingestFailed.inc(metrics.ingestFailed.count());
+ }
+
+ /**
+ * Creates a serializable representation for these metrics.
+ */
+ IngestStats.Stats createStats() {
+ return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.count(), ingestFailed.count());
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
index 3cba98a45016a..5bc24a367da33 100644
--- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
@@ -23,16 +23,16 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@@ -50,8 +50,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.metrics.CounterMetric;
-import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -80,8 +78,7 @@ public class IngestService implements ClusterStateApplier {
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map pipelines = new HashMap<>();
private final ThreadPool threadPool;
- private final StatsHolder totalStats = new StatsHolder();
- private volatile Map statsHolderPerPipeline = Collections.emptyMap();
+ private final IngestMetric totalMetrics = new IngestMetric();
public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
@@ -258,10 +255,16 @@ Map pipelines() {
@Override
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
+ Map originalPipelines = pipelines;
innerUpdatePipelines(event.previousState(), state);
- IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
- if (ingestMetadata != null) {
- updatePipelineStats(ingestMetadata);
+ //pipelines changed, so add the old metrics to the new metrics
+ if (originalPipelines != pipelines) {
+ pipelines.forEach((id, pipeline) -> {
+ Pipeline originalPipeline = originalPipelines.get(id);
+ if (originalPipeline != null) {
+ pipeline.getMetrics().add(originalPipeline.getMetrics());
+ }
+ });
}
}
@@ -326,6 +329,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq
public void executeBulkRequest(Iterable> actionRequests,
BiConsumer itemFailureHandler, Consumer completionHandler,
Consumer itemDroppedHandler) {
+
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
@@ -368,37 +372,11 @@ protected void doRun() {
}
public IngestStats stats() {
- Map statsHolderPerPipeline = this.statsHolderPerPipeline;
- Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
- for (Map.Entry entry : statsHolderPerPipeline.entrySet()) {
- statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
- }
+ Map statsPerPipeline =
+ pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));
- return new IngestStats(totalStats.createStats(), statsPerPipeline);
- }
-
- void updatePipelineStats(IngestMetadata ingestMetadata) {
- boolean changed = false;
- Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
- Iterator iterator = newStatsPerPipeline.keySet().iterator();
- while (iterator.hasNext()) {
- String pipeline = iterator.next();
- if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
- iterator.remove();
- changed = true;
- }
- }
- for (String pipeline : ingestMetadata.getPipelines().keySet()) {
- if (newStatsPerPipeline.containsKey(pipeline) == false) {
- newStatsPerPipeline.put(pipeline, new StatsHolder());
- changed = true;
- }
- }
-
- if (changed) {
- statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
- }
+ return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
}
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception {
@@ -409,10 +387,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
long startTimeInNanos = System.nanoTime();
// the pipeline specific stat holder may not exist and that is fine:
// (e.g. the pipeline may have been removed while we're ingesting a document
- Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
try {
- totalStats.preIngest();
- pipelineStats.ifPresent(StatsHolder::preIngest);
+ totalMetrics.preIngest();
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
@@ -438,13 +414,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
indexRequest.source(ingestDocument.getSourceAndMetadata());
}
} catch (Exception e) {
- totalStats.ingestFailed();
- pipelineStats.ifPresent(StatsHolder::ingestFailed);
+ totalMetrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
- totalStats.postIngest(ingestTimeInMillis);
- pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
+ totalMetrics.postIngest(ingestTimeInMillis);
}
}
@@ -481,27 +455,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
- private static class StatsHolder {
-
- private final MeanMetric ingestMetric = new MeanMetric();
- private final CounterMetric ingestCurrent = new CounterMetric();
- private final CounterMetric ingestFailed = new CounterMetric();
-
- void preIngest() {
- ingestCurrent.inc();
- }
-
- void postIngest(long ingestTimeInMillis) {
- ingestCurrent.dec();
- ingestMetric.inc(ingestTimeInMillis);
- }
-
- void ingestFailed() {
- ingestFailed.inc();
- }
-
- IngestStats.Stats createStats() {
- return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
- }
- }
}
diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java
index 1d345ea5f7884..8d5f6d6ff7c54 100644
--- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java
+++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java
@@ -22,10 +22,12 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
+import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+
import org.elasticsearch.script.ScriptService;
/**
@@ -44,12 +46,21 @@ public final class Pipeline {
@Nullable
private final Integer version;
private final CompoundProcessor compoundProcessor;
+ private final IngestMetric metrics;
+ private final Clock clock;
public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
+ this(id, description, version, compoundProcessor, Clock.systemUTC());
+ }
+
+ //package private for testing
+ Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) {
this.id = id;
this.description = description;
this.compoundProcessor = compoundProcessor;
this.version = version;
+ this.metrics = new IngestMetric();
+ this.clock = clock;
}
public static Pipeline create(String id, Map config,
@@ -78,7 +89,17 @@ public static Pipeline create(String id, Map config,
* Modifies the data of a document to be indexed based on the processor this pipeline holds
*/
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
- return compoundProcessor.execute(ingestDocument);
+ long startTimeInMillis = clock.millis();
+ try {
+ metrics.preIngest();
+ return compoundProcessor.execute(ingestDocument);
+ } catch (Exception e) {
+ metrics.ingestFailed();
+ throw e;
+ } finally {
+ long ingestTimeInMillis = clock.millis() - startTimeInMillis;
+ metrics.postIngest(ingestTimeInMillis);
+ }
}
/**
@@ -135,4 +156,11 @@ public List getOnFailureProcessors() {
public List flattenAllProcessors() {
return compoundProcessor.flattenProcessors();
}
+
+ /**
+ * The metrics associated with this pipeline.
+ */
+ public IngestMetric getMetrics() {
+ return metrics;
+ }
}
diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
index 8d1302a2ada0e..afae36427ad17 100644
--- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
@@ -19,16 +19,6 @@
package org.elasticsearch.ingest;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
@@ -59,13 +49,22 @@
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@@ -769,16 +768,14 @@ public void testStats() {
previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
- final Map configurationMap = new HashMap<>();
- configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
- configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
- ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
+
@SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class);
final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1");
+ indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
@@ -793,23 +790,21 @@ public void testStats() {
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
- }
- // issue: https://github.com/elastic/elasticsearch/issues/18126
- public void testUpdatingStatsWhenRemovingPipelineWorks() {
- IngestService ingestService = createWithProcessors();
- Map configurationMap = new HashMap<>();
- configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
- configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
- ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
- assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1"));
- assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2"));
-
- configurationMap = new HashMap<>();
- configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON));
- ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
- assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1")));
- assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
+ //update cluster state and ensure that new stats are added to old stats
+ putRequest = new PutPipelineRequest("_id1",
+ new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON);
+ previousClusterState = clusterState;
+ clusterState = IngestService.innerPut(putRequest, clusterState);
+ ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+ indexRequest.setPipeline("_id1");
+ ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
+ final IngestStats afterThirdRequestStats = ingestService.stats();
+ assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2));
+ assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L));
+ assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
+ assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L));
+
}
private IngestDocument eqIndexTypeId(final Map source) {
diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java
index 99fa7633d085a..018ded346d4fc 100644
--- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java
+++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java
@@ -18,20 +18,17 @@
*/
package org.elasticsearch.ingest;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.test.ESTestCase;
+
+import java.time.Clock;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ingest.CompoundProcessor;
-import org.elasticsearch.ingest.IngestDocument;
-import org.elasticsearch.ingest.IngestService;
-import org.elasticsearch.ingest.Pipeline;
-import org.elasticsearch.ingest.PipelineProcessor;
-import org.elasticsearch.ingest.Processor;
-import org.elasticsearch.ingest.RandomDocumentPicks;
-import org.elasticsearch.test.ESTestCase;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -130,4 +127,81 @@ innerPipelineId, null, null, new CompoundProcessor()
outerProc.execute(testIngestDocument);
outerProc.execute(testIngestDocument);
}
+
+ public void testPipelineProcessorWithPipelineChain() throws Exception {
+ String pipeline1Id = "pipeline1";
+ String pipeline2Id = "pipeline2";
+ String pipeline3Id = "pipeline3";
+ IngestService ingestService = mock(IngestService.class);
+ PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
+
+ Map pipeline1ProcessorConfig = new HashMap<>();
+ pipeline1ProcessorConfig.put("pipeline", pipeline2Id);
+ PipelineProcessor pipeline1Processor = factory.create(Collections.emptyMap(), null, pipeline1ProcessorConfig);
+
+ Map pipeline2ProcessorConfig = new HashMap<>();
+ pipeline2ProcessorConfig.put("pipeline", pipeline3Id);
+ PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig);
+
+ Clock clock = mock(Clock.class);
+ when(clock.millis()).thenReturn(0L).thenReturn(0L);
+ Pipeline pipeline1 = new Pipeline(
+ pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock
+ );
+
+ String key1 = randomAlphaOfLength(10);
+ clock = mock(Clock.class);
+ when(clock.millis()).thenReturn(0L).thenReturn(3L);
+ Pipeline pipeline2 = new Pipeline(
+ pipeline2Id, null, null, new CompoundProcessor(true,
+ Arrays.asList(
+ new TestProcessor(ingestDocument -> {
+ ingestDocument.setFieldValue(key1, randomInt());
+ }),
+ pipeline2Processor),
+ Collections.emptyList()),
+ clock
+ );
+ clock = mock(Clock.class);
+ when(clock.millis()).thenReturn(0L).thenReturn(2L);
+ Pipeline pipeline3 = new Pipeline(
+ pipeline3Id, null, null, new CompoundProcessor(
+ new TestProcessor(ingestDocument -> {
+ throw new RuntimeException("error");
+ })), clock
+ );
+ when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
+ when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);
+ when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3);
+
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
+ //start the chain
+ ingestDocument.executePipeline(pipeline1);
+ assertNotNull(ingestDocument.getSourceAndMetadata().get(key1));
+
+ //check the stats
+ IngestStats.Stats pipeline1Stats = pipeline1.getMetrics().createStats();
+ IngestStats.Stats pipeline2Stats = pipeline2.getMetrics().createStats();
+ IngestStats.Stats pipeline3Stats = pipeline3.getMetrics().createStats();
+
+ //current
+ assertThat(pipeline1Stats.getIngestCurrent(), equalTo(0L));
+ assertThat(pipeline2Stats.getIngestCurrent(), equalTo(0L));
+ assertThat(pipeline3Stats.getIngestCurrent(), equalTo(0L));
+
+ //count
+ assertThat(pipeline1Stats.getIngestCount(), equalTo(1L));
+ assertThat(pipeline2Stats.getIngestCount(), equalTo(1L));
+ assertThat(pipeline3Stats.getIngestCount(), equalTo(1L));
+
+ //time
+ assertThat(pipeline1Stats.getIngestTimeInMillis(), equalTo(0L));
+ assertThat(pipeline2Stats.getIngestTimeInMillis(), equalTo(3L));
+ assertThat(pipeline3Stats.getIngestTimeInMillis(), equalTo(2L));
+
+ //failure
+ assertThat(pipeline1Stats.getIngestFailedCount(), equalTo(0L));
+ assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
+ assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
+ }
}