From dab927149367816a18d7af09f88dcab9558e0874 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Fri, 28 Sep 2018 12:33:27 -0500 Subject: [PATCH 1/5] ingest: better support for conditionals with simulate?verbose This commit introduces two corrections to the way simulate?verbose handles conditionals on processors. 1) Prior to this change when executing simulate?verbose for processors with conditionals that evaluate to false, that processor would still be displayed in the result set. What was displayed was correct, such that no changes to the document occurred. However, if the conditional evaluates to false, the processor should not even be displayed. 2) Prior to this change when executing simulate?verbose for pipeline processors with conditionals, the individual steps would no longer be displayed. Commit e37e5df addressed the issue, but failed account for a conditional on the pipeline processor. Since a pipeline processor can introduce cycles and is effectively a single processor that encapsulates multiple other processors that are potentially guarded by a single conditional, special handling is needed to for pipeline and conditional pipeline processors. --- .../test/ingest/210_pipeline_processor.yml | 4 +- .../rest-api-spec/test/ingest/90_simulate.yml | 7 +- .../ingest/SimulateExecutionService.java | 10 +- .../ingest/ConditionalProcessor.java | 16 +- .../ingest/IngestCycleException.java | 7 + .../elasticsearch/ingest/IngestDocument.java | 3 +- .../ingest/TrackingResultProcessor.java | 68 +++-- .../ingest/TrackingResultProcessorTests.java | 250 +++++++++++++++--- 8 files changed, 292 insertions(+), 73 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index c7c5df1e06f99..f55d698e632b3 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -102,7 +102,7 @@ teardown: - match: { acknowledged: true } - do: - catch: /illegal_state_exception/ + catch: /ingest_cycle_exception/ index: index: test type: test @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: org.elasticsearch.ingest.IngestCycleException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 46c4fb0a69e58..5fbebf6a21f23 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -641,7 +641,6 @@ teardown: - match: { acknowledged: true } - do: - catch: /illegal_state_exception/ ingest.simulate: verbose: true body: > @@ -667,8 +666,10 @@ teardown: } ] } -- match: { error.root_cause.0.type: "illegal_state_exception" } -- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } +- length: { docs: 1 } +- length: { docs.0.processor_results: 1 } +- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: org.elasticsearch.ingest.IngestCycleException: Cycle detected for pipeline: outer" } +- match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" } --- "Test verbose simulate with Pipeline Processor with Multiple Pipelines": diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index c081707f4dbda..e2b44ae2a7a60 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -21,17 +21,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; -import java.util.Collections; -import java.util.IdentityHashMap; import java.util.List; -import java.util.Set; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; @@ -46,11 +42,9 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { - // Prevent cycles in pipeline decoration - final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); try { Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index b6f6612344a39..061a5b3f6b44d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -50,16 +50,26 @@ public class ConditionalProcessor extends AbstractProcessor { this.processor = processor; } + @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - IngestConditionalScript script = - scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); - if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { + if (evaluate(ingestDocument)) { return processor.execute(ingestDocument); } return ingestDocument; } + boolean evaluate(IngestDocument ingestDocument) { + IngestConditionalScript script = + scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); + return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata())); + } + + //tODO remove + public Processor getProcessor() { + return processor; + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java b/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java new file mode 100644 index 0000000000000..b603ae5499b5f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java @@ -0,0 +1,7 @@ +package org.elasticsearch.ingest; + +class IngestCycleException extends RuntimeException { + IngestCycleException(String message) { + super(message); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 719558edbf748..054d8cf465f30 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -643,11 +643,12 @@ private static Object deepCopy(Object value) { * for this document. * @param pipeline Pipeline to execute * @throws Exception On exception in pipeline execution + * @throws IngestCycleException If an cycle (infinite loops) are found between pipelines */ public IngestDocument executePipeline(Pipeline pipeline) throws Exception { try { if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); + throw new IngestCycleException("Cycle detected for pipeline: " + pipeline.getId()); } return pipeline.execute(this); } finally { diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 41a984be5adad..4f97d110448c6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -19,11 +19,11 @@ package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; -import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -42,14 +42,46 @@ public final class TrackingResultProcessor implements Processor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + Processor processor = actualProcessor; try { - actualProcessor.execute(ingestDocument); - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); + if (processor instanceof ConditionalProcessor) { + ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor; + if (conditionalProcessor.evaluate(ingestDocument) == false) { + return ingestDocument; + } + if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getProcessor(); + } + } + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + Pipeline pipeline = pipelineProcessor.getPipeline(); + //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines + try { + IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); + } catch (ElasticsearchException elasticsearchException) { + if (elasticsearchException.getCause().getCause() instanceof IngestCycleException) { + throw elasticsearchException; + } + //else do nothing, let the tracking processors throw the exception while recording the path up to the failure + } catch (Exception e) { + // do nothing, let the tracking processors throw the exception while recording the path up to the failure + } + //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); + } else { + processor.execute(ingestDocument); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + } } catch (Exception e) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); } throw e; } @@ -66,35 +98,19 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, - Set pipelinesSeen) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); - } - processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); - pipelinesSeen.remove(pipelineProcessor); - } else if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); + if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); - } - onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, - pipelinesSeen)); - pipelinesSeen.remove(pipelineProcessor); - } else if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); + if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 7a7f9b773727f..80a3d3fb32f4d 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -21,17 +21,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; @@ -39,10 +44,11 @@ import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -50,13 +56,11 @@ public class TrackingResultProcessorTests extends ESTestCase { private IngestDocument ingestDocument; private List resultList; - private Set pipelinesSeen; @Before public void init() { ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); resultList = new ArrayList<>(); - pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); } public void testActualProcessor() throws Exception { @@ -76,9 +80,9 @@ public void testActualProcessor() throws Exception { public void testActualCompoundProcessorWithoutOnFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); try { trackingProcessor.execute(ingestDocument); @@ -97,14 +101,14 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { public void testActualCompoundProcessorWithOnFailure() throws Exception { RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); CompoundProcessor actualProcessor = new CompoundProcessor(false, Arrays.asList(new CompoundProcessor(false, Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); @@ -139,10 +143,10 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); @@ -154,6 +158,45 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); } + public void testActualCompoundProcessorWithFalseConditional() throws Exception { + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + CompoundProcessor compoundProcessor = new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); + + CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); + trackingProcessor.execute(ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); + + //the step for key 2 is never executed due to conditional and thus not part of the result set + assertThat(resultList.size(), equalTo(2)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + } + public void testActualPipelineProcessor() throws Exception { String pipelineId = "pipeline1"; IngestService ingestService = mock(IngestService.class); @@ -176,13 +219,13 @@ pipelineId, null, null, new CompoundProcessor( PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(3)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -198,6 +241,142 @@ pipelineId, null, null, new CompoundProcessor( assertThat(resultList.get(2).getProcessorTag(), nullValue()); } + public void testActualPipelineProcessorWithTrueConditional() throws Exception { + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("pipeline", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("pipeline", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("pipeline", pipelineId2); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + factory.create(Collections.emptyMap(), null, pipelineConfig2)), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) + ) + ); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))); + + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + trackingProcessor.execute(ingestDocument); + + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithFalseConditional() throws Exception { + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("pipeline", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("pipeline", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("pipeline", pipelineId2); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + factory.create(Collections.emptyMap(), null, pipelineConfig2)), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) + ) + ); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))); + + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + trackingProcessor.execute(ingestDocument); + + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); + verify(ingestService, Mockito.never()).getPipeline(pipelineId2); + assertThat(resultList.size(), equalTo(2)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + } + public void testActualPipelineProcessorWithHandledFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); @@ -226,13 +405,13 @@ pipelineId, null, null, new CompoundProcessor( PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(4)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -253,25 +432,36 @@ pipelineId, null, null, new CompoundProcessor( } public void testActualPipelineProcessorWithCycle() throws Exception { - String pipelineId = "pipeline1"; + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; IngestService ingestService = mock(IngestService.class); - Map pipelineConfig = new HashMap<>(); - pipelineConfig.put("pipeline", pipelineId); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("pipeline", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("pipeline", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("pipeline", pipelineId2); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); - PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); - Pipeline pipeline = new Pipeline( - pipelineId, null, null, new CompoundProcessor(pipelineProcessor) - ); - when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig2))); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig1))); + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> decorate(actualProcessor, resultList, pipelinesSeen)); - assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); - } + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); + assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(exception.getCause().getCause(), instanceOf(IngestCycleException.class)); + assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); + } public void testActualPipelineProcessorRepeatedInvocation() throws Exception { String pipelineId = "pipeline1"; @@ -284,19 +474,19 @@ public void testActualPipelineProcessorRepeatedInvocation() throws Exception { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); Pipeline pipeline = new Pipeline( pipelineId, null, null, new CompoundProcessor( - new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) ); when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService, times(2)).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(2)); assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); From c39857bb75cce8efb4b70b266e706591cc95a188 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Fri, 28 Sep 2018 13:34:18 -0500 Subject: [PATCH 2/5] add missing license text --- .../ingest/IngestCycleException.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java b/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java index b603ae5499b5f..187b68a7a1a03 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java @@ -1,5 +1,27 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.ingest; +/** + * Exception thrown if there are cycles found in the execution of a pipeline + */ class IngestCycleException extends RuntimeException { IngestCycleException(String message) { super(message); From 5e14aaa0453252e615fe186bdfcd8b99d18b5436 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Fri, 28 Sep 2018 13:52:12 -0500 Subject: [PATCH 3/5] fix todo comment --- .../java/org/elasticsearch/ingest/ConditionalProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 061a5b3f6b44d..1ec3977f2466a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -65,8 +65,7 @@ boolean evaluate(IngestDocument ingestDocument) { return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata())); } - //tODO remove - public Processor getProcessor() { + Processor getProcessor() { return processor; } From 6150ab0bedff03928ace7358bc1c0e86bdac0992 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sat, 20 Oct 2018 17:35:12 -0500 Subject: [PATCH 4/5] removed custom exception --- .../test/ingest/210_pipeline_processor.yml | 4 +-- .../rest-api-spec/test/ingest/90_simulate.yml | 2 +- .../ingest/IngestCycleException.java | 29 ------------------- .../elasticsearch/ingest/IngestDocument.java | 3 +- .../ingest/TrackingResultProcessor.java | 2 +- .../ingest/TrackingResultProcessorTests.java | 2 +- 6 files changed, 6 insertions(+), 36 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index f55d698e632b3..c7c5df1e06f99 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -102,7 +102,7 @@ teardown: - match: { acknowledged: true } - do: - catch: /ingest_cycle_exception/ + catch: /illegal_state_exception/ index: index: test type: test @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: org.elasticsearch.ingest.IngestCycleException: Cycle detected for pipeline: inner" } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 5fbebf6a21f23..4e0e46f0f2b24 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -668,7 +668,7 @@ teardown: } - length: { docs: 1 } - length: { docs.0.processor_results: 1 } -- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: org.elasticsearch.ingest.IngestCycleException: Cycle detected for pipeline: outer" } +- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: outer" } - match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" } --- diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java b/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java deleted file mode 100644 index 187b68a7a1a03..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/IngestCycleException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -/** - * Exception thrown if there are cycles found in the execution of a pipeline - */ -class IngestCycleException extends RuntimeException { - IngestCycleException(String message) { - super(message); - } -} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 054d8cf465f30..719558edbf748 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -643,12 +643,11 @@ private static Object deepCopy(Object value) { * for this document. * @param pipeline Pipeline to execute * @throws Exception On exception in pipeline execution - * @throws IngestCycleException If an cycle (infinite loops) are found between pipelines */ public IngestDocument executePipeline(Pipeline pipeline) throws Exception { try { if (this.executedPipelines.add(pipeline) == false) { - throw new IngestCycleException("Cycle detected for pipeline: " + pipeline.getId()); + throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); } return pipeline.execute(this); } finally { diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4f97d110448c6..4b78715144649 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -61,7 +61,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); } catch (ElasticsearchException elasticsearchException) { - if (elasticsearchException.getCause().getCause() instanceof IngestCycleException) { + if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) { throw elasticsearchException; } //else do nothing, let the tracking processors throw the exception while recording the path up to the failure diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 80a3d3fb32f4d..e3edcd4465a6a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -459,7 +459,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception { ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); - assertThat(exception.getCause().getCause(), instanceOf(IngestCycleException.class)); + assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); } From a4407bb32142a61adab7c1587342b81302d4112f Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Sat, 20 Oct 2018 17:52:42 -0500 Subject: [PATCH 5/5] fix merge --- .../ingest/ConditionalProcessor.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 0e370ba12377b..2493f291bcddf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -60,20 +60,9 @@ public class ConditionalProcessor extends AbstractProcessor { this.relativeTimeProvider = relativeTimeProvider; } - @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (evaluate(ingestDocument)) { - return processor.execute(ingestDocument); - } - return ingestDocument; - } - - boolean evaluate(IngestDocument ingestDocument) { - IngestConditionalScript script = - scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); - if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - // Only record metric if the script evaluates to true long startTimeInNanos = relativeTimeProvider.getAsLong(); try { metric.preIngest(); @@ -89,6 +78,12 @@ boolean evaluate(IngestDocument ingestDocument) { return ingestDocument; } + boolean evaluate(IngestDocument ingestDocument) { + IngestConditionalScript script = + scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); + return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata())); + } + Processor getProcessor() { return processor; }