From 0577703183ec3e341b88ff3ff43410683f5bbad9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 21 Oct 2018 13:16:15 -0400 Subject: [PATCH] Revert "ingest: processor stats (#34202)" This reverts commit 65677296000558f0d848b67b19d5d588110af5bb. --- .../ingest/CompoundProcessor.java | 31 +--- .../ingest/ConditionalProcessor.java | 31 +--- .../elasticsearch/ingest/IngestService.java | 112 ++---------- .../org/elasticsearch/ingest/IngestStats.java | 169 +++--------------- .../org/elasticsearch/ingest/Pipeline.java | 16 +- .../ingest/PipelineProcessor.java | 4 - .../cluster/node/stats/NodeStatsTests.java | 58 ++---- .../ingest/CompoundProcessorTests.java | 86 ++------- .../ingest/ConditionalProcessorTests.java | 51 +----- .../ingest/IngestServiceTests.java | 136 +++----------- .../ingest/IngestStatsTests.java | 83 +++------ .../ingest/PipelineProcessorTests.java | 21 ++- 12 files changed, 146 insertions(+), 652 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 3b8281bd471d2..e1a413f6aa9bb 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -20,15 +20,12 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.collect.Tuple; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import java.util.stream.Collectors; /** @@ -43,33 +40,16 @@ public class CompoundProcessor implements Processor { private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; - private final List> processorsWithMetrics; - private final LongSupplier relativeTimeProvider; - - CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) { - this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider); - } public CompoundProcessor(Processor... processor) { this(false, Arrays.asList(processor), Collections.emptyList()); } public CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors) { - this(ignoreFailure, processors, onFailureProcessors, System::nanoTime); - } - CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors, - LongSupplier relativeTimeProvider) { super(); this.ignoreFailure = ignoreFailure; this.processors = processors; this.onFailureProcessors = onFailureProcessors; - this.relativeTimeProvider = relativeTimeProvider; - this.processorsWithMetrics = new ArrayList<>(processors.size()); - processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric()))); - } - - List> getProcessorsWithMetrics() { - return processorsWithMetrics; } public boolean isIgnoreFailure() { @@ -114,17 +94,12 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Tuple processorWithMetric : processorsWithMetrics) { - Processor processor = processorWithMetric.v1(); - IngestMetric metric = processorWithMetric.v2(); - long startTimeInNanos = relativeTimeProvider.getAsLong(); + for (Processor processor : processors) { try { - metric.preIngest(); if (processor.execute(ingestDocument) == null) { return null; } } catch (Exception e) { - metric.ingestFailed(); if (ignoreFailure) { continue; } @@ -137,15 +112,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { executeOnFailure(ingestDocument, compoundProcessorException); break; } - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); } } return ingestDocument; } - void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { try { putFailureMetadata(ingestDocument, exception); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 9078dc86c1b07..b6f6612344a39 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -28,8 +28,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import java.util.stream.Collectors; import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.Script; @@ -44,20 +42,12 @@ public class ConditionalProcessor extends AbstractProcessor { private final ScriptService scriptService; private final Processor processor; - private final IngestMetric metric; - private final LongSupplier relativeTimeProvider; ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { - this(tag, script, scriptService, processor, System::nanoTime); - } - - ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) { super(tag); this.condition = script; this.scriptService = scriptService; this.processor = processor; - this.metric = new IngestMetric(); - this.relativeTimeProvider = relativeTimeProvider; } @Override @@ -65,30 +55,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestConditionalScript script = scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - // Only record metric if the script evaluates to true - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - return processor.execute(ingestDocument); - } catch (Exception e) { - metric.ingestFailed(); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); - } + return processor.execute(ingestDocument); } return ingestDocument; } - Processor getProcessor() { - return processor; - } - - IngestMetric getMetric() { - return metric; - } - @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 705e77028a1ef..6c46a9b2354f6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,6 +19,19 @@ package org.elasticsearch.ingest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -36,7 +49,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -49,19 +61,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - /** * Holder class for several ingest related services. */ @@ -263,59 +262,11 @@ public void applyClusterState(final ClusterChangedEvent event) { Pipeline originalPipeline = originalPipelines.get(id); if (originalPipeline != null) { pipeline.getMetrics().add(originalPipeline.getMetrics()); - List> oldPerProcessMetrics = new ArrayList<>(); - List> newPerProcessMetrics = new ArrayList<>(); - getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics); - getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics); - //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since - //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and - //consistent id's per processor and/or semantic equals for each processor will be needed. - if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { - Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); - for (Tuple compositeMetric : newPerProcessMetrics) { - String type = compositeMetric.v1().getType(); - IngestMetric metric = compositeMetric.v2(); - if (oldMetricsIterator.hasNext()) { - Tuple oldCompositeMetric = oldMetricsIterator.next(); - String oldType = oldCompositeMetric.v1().getType(); - IngestMetric oldMetric = oldCompositeMetric.v2(); - if (type.equals(oldType)) { - metric.add(oldMetric); - } - } - } - } } }); } } - /** - * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as - * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. - * @param compoundProcessor The compound processor to start walking the non-failure processors - * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. - * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor - */ - private static List> getProcessorMetrics(CompoundProcessor compoundProcessor, - List> processorMetrics) { - //only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure - for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { - Processor processor = processorWithMetric.v1(); - IngestMetric metric = processorWithMetric.v2(); - if (processor instanceof CompoundProcessor) { - getProcessorMetrics((CompoundProcessor) processor, processorMetrics); - } else { - //Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true. - if (processor instanceof ConditionalProcessor) { - metric = ((ConditionalProcessor) processor).getMetric(); - } - processorMetrics.add(new Tuple<>(processor, metric)); - } - } - return processorMetrics; - } - private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; @@ -420,42 +371,11 @@ protected void doRun() { } public IngestStats stats() { - IngestStats.Builder statsBuilder = new IngestStats.Builder(); - statsBuilder.addTotalMetrics(totalMetrics); - pipelines.forEach((id, pipeline) -> { - CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); - statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); - List> processorMetrics = new ArrayList<>(); - getProcessorMetrics(rootProcessor, processorMetrics); - processorMetrics.forEach(t -> { - Processor processor = t.v1(); - IngestMetric processorMetric = t.v2(); - statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); - }); - }); - return statsBuilder.build(); - } - //package private for testing - static String getProcessorName(Processor processor){ - // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name - if(processor instanceof ConditionalProcessor){ - processor = ((ConditionalProcessor) processor).getProcessor(); - } - StringBuilder sb = new StringBuilder(5); - sb.append(processor.getType()); + Map statsPerPipeline = + pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); - if(processor instanceof PipelineProcessor){ - String pipelineName = ((PipelineProcessor) processor).getPipelineName(); - sb.append(":"); - sb.append(pipelineName); - } - String tag = processor.getTag(); - if(tag != null && !tag.isEmpty()){ - sb.append(":"); - sb.append(tag); - } - return sb.toString(); + return new IngestStats(totalMetrics.createStats(), statsPerPipeline); } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index e3d671bc8b2a0..c4c1520fd19d4 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -27,28 +27,17 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class IngestStats implements Writeable, ToXContentFragment { private final Stats totalStats; - private final List pipelineStats; - private final Map> processorStats; + private final Map statsPerPipeline; - /** - * @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats, - * and pipeline stats are logically the sum of the processor stats. - * @param pipelineStats - The stats for a given ingest pipeline. - * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. - */ - public IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) { + public IngestStats(Stats totalStats, Map statsPerPipeline) { this.totalStats = totalStats; - this.pipelineStats = pipelineStats; - this.processorStats = processorStats; + this.statsPerPipeline = statsPerPipeline; } /** @@ -57,43 +46,37 @@ public IngestStats(Stats totalStats, List pipelineStats, Map(size); - this.processorStats = new HashMap<>(size); + this.statsPerPipeline = new HashMap<>(size); for (int i = 0; i < size; i++) { - String pipelineId = in.readString(); - Stats pipelineStat = new Stats(in); - this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); - int processorsSize = in.readVInt(); - List processorStatsPerPipeline = new ArrayList<>(processorsSize); - for (int j = 0; j < processorsSize; j++) { - String processorName = in.readString(); - Stats processorStat = new Stats(in); - processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); - } - this.processorStats.put(pipelineId, processorStatsPerPipeline); + statsPerPipeline.put(in.readString(), new Stats(in)); } } @Override public void writeTo(StreamOutput out) throws IOException { totalStats.writeTo(out); - out.writeVInt(pipelineStats.size()); - for (PipelineStat pipelineStat : pipelineStats) { - out.writeString(pipelineStat.getPipelineId()); - pipelineStat.getStats().writeTo(out); - List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); - if(processorStatsForPipeline == null) { - out.writeVInt(0); - }else{ - out.writeVInt(processorStatsForPipeline.size()); - for (ProcessorStat processorStat : processorStatsForPipeline) { - out.writeString(processorStat.getName()); - processorStat.getStats().writeTo(out); - } - } + out.writeVInt(statsPerPipeline.size()); + for (Map.Entry entry : statsPerPipeline.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); } } + + /** + * @return The accumulated stats for all pipelines + */ + public Stats getTotalStats() { + return totalStats; + } + + /** + * @return The stats on a per pipeline basis + */ + public Map getStatsPerPipeline() { + return statsPerPipeline; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("ingest"); @@ -101,21 +84,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws totalStats.toXContent(builder, params); builder.endObject(); builder.startObject("pipelines"); - for (PipelineStat pipelineStat : pipelineStats) { - builder.startObject(pipelineStat.getPipelineId()); - pipelineStat.getStats().toXContent(builder, params); - List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); - builder.startArray("processors"); - if (processorStatsForPipeline != null) { - for (ProcessorStat processorStat : processorStatsForPipeline) { - builder.startObject(); - builder.startObject(processorStat.getName()); - processorStat.getStats().toXContent(builder, params); - builder.endObject(); - builder.endObject(); - } - } - builder.endArray(); + for (Map.Entry entry : statsPerPipeline.entrySet()) { + builder.startObject(entry.getKey()); + entry.getValue().toXContent(builder, params); builder.endObject(); } builder.endObject(); @@ -123,18 +94,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public Stats getTotalStats() { - return totalStats; - } - - public List getPipelineStats() { - return pipelineStats; - } - - public Map> getProcessorStats() { - return processorStats; - } - public static class Stats implements Writeable, ToXContentFragment { private final long ingestCount; @@ -175,6 +134,7 @@ public long getIngestCount() { } /** + * * @return The total time spent of ingest preprocessing in millis. */ public long getIngestTimeInMillis() { @@ -204,77 +164,4 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } - - /** - * Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects - */ - static class Builder { - private Stats totalStats; - private List pipelineStats = new ArrayList<>(); - private Map> processorStats = new HashMap<>(); - - - Builder addTotalMetrics(IngestMetric totalMetric) { - this.totalStats = totalMetric.createStats(); - return this; - } - - Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { - this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats())); - return this; - } - - Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { - this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) - .add(new ProcessorStat(processorName, metric.createStats())); - return this; - } - - IngestStats build() { - return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats), - Collections.unmodifiableMap(processorStats)); - } - } - - /** - * Container for pipeline stats. - */ - public static class PipelineStat { - private final String pipelineId; - private final Stats stats; - - public PipelineStat(String pipelineId, Stats stats) { - this.pipelineId = pipelineId; - this.stats = stats; - } - - public String getPipelineId() { - return pipelineId; - } - - public Stats getStats() { - return stats; - } - } - - /** - * Container for processor stats. - */ - public static class ProcessorStat { - private final String name; - private final Stats stats; - - public ProcessorStat(String name, Stats stats) { - this.name = name; - this.stats = stats; - } - - public String getName() { - return name; - } - - public Stats getStats() { - return stats; - } - } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index fc5311be5cbde..8d5f6d6ff7c54 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,12 +22,11 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; +import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -48,21 +47,20 @@ public final class Pipeline { private final Integer version; private final CompoundProcessor compoundProcessor; private final IngestMetric metrics; - private final LongSupplier relativeTimeProvider; + private final Clock clock; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { - this(id, description, version, compoundProcessor, System::nanoTime); + this(id, description, version, compoundProcessor, Clock.systemUTC()); } //package private for testing - Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, - LongSupplier relativeTimeProvider) { + Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; this.metrics = new IngestMetric(); - this.relativeTimeProvider = relativeTimeProvider; + this.clock = clock; } public static Pipeline create(String id, Map config, @@ -91,7 +89,7 @@ public static Pipeline create(String id, Map config, * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInNanos = relativeTimeProvider.getAsLong(); + long startTimeInMillis = clock.millis(); try { metrics.preIngest(); return compoundProcessor.execute(ingestDocument); @@ -99,7 +97,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { metrics.ingestFailed(); throw e; } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + long ingestTimeInMillis = clock.millis() - startTimeInMillis; metrics.postIngest(ingestTimeInMillis); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 16324e8dee6c7..918ff6b8aefee 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -53,10 +53,6 @@ public String getType() { return TYPE; } - String getPipelineName() { - return pipelineName; - } - public static final class Factory implements Processor.Factory { private final IngestService ingestService; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 8f51fb08dd23f..3384efcf836c6 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -53,6 +53,7 @@ import static java.util.Collections.emptySet; public class NodeStatsTests extends ESTestCase { + public void testSerialization() throws IOException { NodeStats nodeStats = createNodeStats(); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -270,29 +271,14 @@ public void testSerialization() throws IOException { assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); - assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); - for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { - String pipelineId = pipelineStat.getPipelineId(); - IngestStats.Stats deserializedPipelineStats = - getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); - assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); - assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); - assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); - assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount()); - List processorStats = ingestStats.getProcessorStats().get(pipelineId); - //intentionally validating identical order - Iterator it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator(); - for (IngestStats.ProcessorStat processorStat : processorStats) { - IngestStats.ProcessorStat deserializedProcessorStat = it.next(); - assertEquals(processorStat.getStats().getIngestFailedCount(), - deserializedProcessorStat.getStats().getIngestFailedCount()); - assertEquals(processorStat.getStats().getIngestTimeInMillis(), - deserializedProcessorStat.getStats().getIngestTimeInMillis()); - assertEquals(processorStat.getStats().getIngestCurrent(), - deserializedProcessorStat.getStats().getIngestCurrent()); - assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount()); - } - assertFalse(it.hasNext()); + assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size()); + for (Map.Entry entry : ingestStats.getStatsPerPipeline().entrySet()) { + IngestStats.Stats stats = entry.getValue(); + IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey()); + assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount()); + assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis()); + assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent()); + assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount()); } } AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); @@ -443,24 +429,14 @@ private static NodeStats createNodeStats() { if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - int numPipelines = randomIntBetween(0, 10); - int numProcessors = randomIntBetween(0, 10); - List ingestPipelineStats = new ArrayList<>(numPipelines); - Map> ingestProcessorStats = new HashMap<>(numPipelines); - for (int i = 0; i < numPipelines; i++) { - String pipelineId = randomAlphaOfLengthBetween(3, 10); - ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats - (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); - List processorPerPipeline = new ArrayList<>(numProcessors); - for (int j =0; j < numProcessors;j++) { - IngestStats.Stats processorStats = new IngestStats.Stats - (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); - } - ingestProcessorStats.put(pipelineId,processorPerPipeline); + int numStatsPerPipeline = randomIntBetween(0, 10); + Map statsPerPipeline = new HashMap<>(); + for (int i = 0; i < numStatsPerPipeline; i++) { + statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); } - ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); + ingestStats = new IngestStats(totalStats, statsPerPipeline); } AdaptiveSelectionStats adaptiveSelectionStats = null; if (frequently()) { @@ -489,8 +465,4 @@ private static NodeStats createNodeStats() { fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats, adaptiveSelectionStats); } - - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { - return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); - } } diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index dabcae533a0bf..aaede49a36d57 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -27,17 +27,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class CompoundProcessorTests extends ESTestCase { private IngestDocument ingestDocument; @@ -55,29 +49,18 @@ public void testEmpty() throws Exception { } public void testSingleProcessor() throws Exception { - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); - TestProcessor processor = new TestProcessor(ingestDocument ->{ - assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0); - }); - CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); - ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1 + TestProcessor processor = new TestProcessor(ingestDocument -> {}); + CompoundProcessor compoundProcessor = new CompoundProcessor(processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); - assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); compoundProcessor.execute(ingestDocument); - verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 0, 1); - } public void testSingleProcessorWithException() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); + CompoundProcessor compoundProcessor = new CompoundProcessor(processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); @@ -88,22 +71,15 @@ public void testSingleProcessorWithException() throws Exception { assertThat(e.getRootCause().getMessage(), equalTo("error")); } assertThat(processor.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 1, 0); - } public void testIgnoreFailure() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor compoundProcessor = - new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList()); compoundProcessor.execute(ingestDocument); assertThat(processor1.getInvokedCounter(), equalTo(1)); - assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); - assertStats(1, compoundProcessor, 0, 1, 0, 0); assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); } @@ -117,15 +93,11 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); }); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), - Collections.singletonList(processor2), relativeTimeProvider); + Collections.singletonList(processor2)); compoundProcessor.execute(ingestDocument); - verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 1, 1); assertThat(processor2.getInvokedCounter(), equalTo(1)); } @@ -146,17 +118,14 @@ public void testSingleProcessorWithNestedFailures() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); }); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), - Collections.singletonList(lastProcessor), relativeTimeProvider); + Collections.singletonList(lastProcessor)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); + Collections.singletonList(compoundOnFailProcessor)); compoundProcessor.execute(ingestDocument); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { @@ -168,24 +137,21 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); }); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor); + CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor), relativeTimeProvider); + Collections.singletonList(secondProcessor)); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFail() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -194,24 +160,21 @@ public void testCompoundProcessorExceptionFail() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(failProcessor), relativeTimeProvider); + Collections.singletonList(failProcessor)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor), relativeTimeProvider); + Collections.singletonList(secondProcessor)); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -220,44 +183,27 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); + Collections.singletonList(new CompoundProcessor(failProcessor))); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor), relativeTimeProvider); + Collections.singletonList(secondProcessor)); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); - assertStats(compoundProcessor, 1, 1, 0); } public void testBreakOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");}); TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");}); TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {}); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), - Collections.singletonList(onFailureProcessor), relativeTimeProvider); + Collections.singletonList(onFailureProcessor)); pipeline.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); - assertStats(pipeline, 1, 1, 0); - } - - private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { - assertStats(0, compoundProcessor, 0L, count, failed, time); - } - private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) { - IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats(); - assertThat(stats.getIngestCount(), equalTo(count)); - assertThat(stats.getIngestCurrent(), equalTo(current)); - assertThat(stats.getIngestFailedCount(), equalTo(failed)); - assertThat(stats.getIngestTimeInMillis(), equalTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index c5548ae559400..c7d4dfa4e68cd 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -33,18 +33,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.LongSupplier; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class ConditionalProcessorTests extends ESTestCase { @@ -66,8 +60,6 @@ public void testChecksCondition() throws Exception { new HashMap<>(ScriptModule.CORE_CONTEXTS) ); Map document = new HashMap<>(); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2)); ConditionalProcessor processor = new ConditionalProcessor( randomAlphaOfLength(10), new Script( @@ -75,10 +67,7 @@ public void testChecksCondition() throws Exception { scriptName, Collections.emptyMap()), scriptService, new Processor() { @Override - public IngestDocument execute(final IngestDocument ingestDocument){ - if(ingestDocument.hasField("error")){ - throw new RuntimeException("error"); - } + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { ingestDocument.setFieldValue("foo", "bar"); return ingestDocument; } @@ -92,37 +81,20 @@ public String getType() { public String getTag() { return null; } - }, relativeTimeProvider); + }); - //false, never call processor never increments metrics - String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); - assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); - assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); - assertStats(processor, 0, 0, 0); - - ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - ingestDocument.setFieldValue(conditionalField, falseValue); - ingestDocument.setFieldValue("error", true); - processor.execute(ingestDocument); - assertStats(processor, 0, 0, 0); - - //true, always call processor and increments metrics - ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); - assertStats(processor, 1, 0, 1); + String falseValue = "falsy"; ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - ingestDocument.setFieldValue(conditionalField, trueValue); - ingestDocument.setFieldValue("error", true); - IngestDocument finalIngestDocument = ingestDocument; - expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); - assertStats(processor, 2, 1, 2); + ingestDocument.setFieldValue(conditionalField, falseValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); + assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); } @SuppressWarnings("unchecked") @@ -169,14 +141,5 @@ private static void assertMutatingCtxThrows(Consumer> mutati Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); - assertStats(processor, 0, 0, 0); - } - - private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) { - IngestStats.Stats stats = conditionalProcessor.getMetric().createStats(); - assertThat(stats.getIngestCount(), equalTo(count)); - assertThat(stats.getIngestCurrent(), equalTo(0L)); - assertThat(stats.getIngestFailedCount(), equalTo(failed)); - assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3dde7babb0a96..4de39349dc517 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -63,7 +63,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -747,23 +746,16 @@ public void testBulkRequestExecution() { verify(completionHandler, times(1)).accept(null); } - public void testStats() throws Exception { + public void testStats() { final Processor processor = mock(Processor.class); - final Processor processorFailure = mock(Processor.class); - when(processor.getType()).thenReturn("mock"); - when(processor.getTag()).thenReturn("mockTag"); - when(processorFailure.getType()).thenReturn("failure-mock"); - //avoid returning null and dropping the document - when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); - when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); - Map map = new HashMap<>(2); - map.put("mock", (factories, tag, config) -> processor); - map.put("failure-mock", (factories, tag, config) -> processorFailure); - IngestService ingestService = createWithProcessors(map); - + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); final IngestStats initialStats = ingestService.stats(); - assertThat(initialStats.getPipelineStats().size(), equalTo(0)); - assertStats(initialStats.getTotalStats(), 0, 0, 0); + assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); + assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); PutPipelineRequest putRequest = new PutPipelineRequest("_id1", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -777,6 +769,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); @@ -785,33 +778,18 @@ public void testStats() throws Exception { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); - assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); - - afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); - afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); - - //total - assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0); - //pipeline - assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0); - assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0); - //processor - assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); - + assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); + assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); indexRequest.setPipeline("_id2"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); - assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); - //total - assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0); - //pipeline - assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0); - assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0); - //processor - assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); + assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); //update cluster state and ensure that new stats are added to old stats putRequest = new PutPipelineRequest("_id1", @@ -822,66 +800,13 @@ public void testStats() throws Exception { indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); - assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); - //total - assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0); - //pipeline - assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0); - assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0); - //The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is - //due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each - //processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases, - //like this one it may not readily obvious why the metrics were not carried forward. - assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0); - - //test a failure, and that the processor stats are added from the old stats - putRequest = new PutPipelineRequest("_id1", - new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), - XContentType.JSON); - previousClusterState = clusterState; - clusterState = IngestService.innerPut(putRequest, clusterState); - ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - final IngestStats afterForthRequestStats = ingestService.stats(); - assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); - //total - assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0); - //pipeline - assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0); - assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0); - //processor - assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed - assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats - assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0); - } + assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); + assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); - public void testStatName(){ - Processor processor = mock(Processor.class); - String name = randomAlphaOfLength(10); - when(processor.getType()).thenReturn(name); - assertThat(IngestService.getProcessorName(processor), equalTo(name)); - String tag = randomAlphaOfLength(10); - when(processor.getTag()).thenReturn(tag); - assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag)); - - ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class); - when(conditionalProcessor.getProcessor()).thenReturn(processor); - assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag)); - - PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); - String pipelineName = randomAlphaOfLength(10); - when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); - name = PipelineProcessor.TYPE; - when(pipelineProcessor.getType()).thenReturn(name); - assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); - when(pipelineProcessor.getTag()).thenReturn(tag); - assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag)); } - public void testExecuteWithDrop() { Map factories = new HashMap<>(); factories.put("drop", new DropProcessor.Factory()); @@ -1010,23 +935,4 @@ public boolean matches(Object o) { return false; } } - - private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) { - assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time); - } - - private void assertPipelineStats(List pipelineStats, String pipelineId, long count, long failed, long time) { - assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time); - } - - private void assertStats(IngestStats.Stats stats, long count, long failed, long time) { - assertThat(stats.getIngestCount(), equalTo(count)); - assertThat(stats.getIngestCurrent(), equalTo(0L)); - assertThat(stats.getIngestFailedCount(), equalTo(failed)); - assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); - } - - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { - return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); - } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 3d39faf9a7447..9974dd568a8c7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -19,75 +19,44 @@ package org.elasticsearch.ingest; -import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { - //total - IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); - //pipeline - IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); - IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); - IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); - List pipelineStats = - Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); - //processor - IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); - IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); - IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); - //pipeline1 -> processor1,processor2; pipeline2 -> processor3 - Map> processorStats = MapBuilder.>newMapBuilder() - .put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) - .put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat)) - .map(); - - IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats); - - IngestStats serializedStats = serialize(ingestStats); - assertNotSame(ingestStats, serializedStats); - assertNotSame(totalStats, serializedStats.getTotalStats()); - assertNotSame(pipelineStats, serializedStats.getPipelineStats()); - assertNotSame(processorStats, serializedStats.getProcessorStats()); + IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30); + IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300); + IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo)); + IngestStats serialize = serialize(ingestStats); + assertNotSame(serialize, ingestStats); + assertNotSame(serialize.getTotalStats(), total); + assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount()); + assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount()); + assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis()); + assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent()); - assertStats(totalStats, serializedStats.getTotalStats()); - assertEquals(serializedStats.getPipelineStats().size(), 3); + assertEquals(ingestStats.getStatsPerPipeline().size(), 1); + assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo")); - for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { - assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats()); - List serializedProcessorStats = - serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); - List processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); - if(processorStat != null) { - Iterator it = processorStat.iterator(); - //intentionally enforcing the identical ordering - for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { - IngestStats.ProcessorStat ps = it.next(); - assertEquals(ps.getName(), serializedProcessorStat.getName()); - assertStats(ps.getStats(), serializedProcessorStat.getStats()); - } - assertFalse(it.hasNext()); - } - } - } + Map left = ingestStats.getStatsPerPipeline(); + Map right = serialize.getStatsPerPipeline(); - private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) { - assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount()); - assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount()); - assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis()); - assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent()); + assertEquals(right.size(), 1); + assertTrue(right.containsKey("foo")); + assertEquals(left.size(), 1); + assertTrue(left.containsKey("foo")); + IngestStats.Stats leftStats = left.get("foo"); + IngestStats.Stats rightStats = right.get("foo"); + assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount()); + assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount()); + assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis()); + assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent()); } private IngestStats serialize(IngestStats stats) throws IOException { @@ -96,8 +65,4 @@ private IngestStats serialize(IngestStats stats) throws IOException { StreamInput in = out.bytes().streamInput(); return new IngestStats(in); } - - private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { - return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); - } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index eea0f03fa647f..018ded346d4fc 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -21,13 +21,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.test.ESTestCase; +import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; @@ -144,15 +143,15 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { pipeline2ProcessorConfig.put("pipeline", pipeline3Id); PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); - LongSupplier relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L); + Clock clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(0L); Pipeline pipeline1 = new Pipeline( - pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider + pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock ); String key1 = randomAlphaOfLength(10); - relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3)); + clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(3L); Pipeline pipeline2 = new Pipeline( pipeline2Id, null, null, new CompoundProcessor(true, Arrays.asList( @@ -161,15 +160,15 @@ pipeline2Id, null, null, new CompoundProcessor(true, }), pipeline2Processor), Collections.emptyList()), - relativeTimeProvider + clock ); - relativeTimeProvider = mock(LongSupplier.class); - when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); + clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(2L); Pipeline pipeline3 = new Pipeline( pipeline3Id, null, null, new CompoundProcessor( new TestProcessor(ingestDocument -> { throw new RuntimeException("error"); - })), relativeTimeProvider + })), clock ); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);