Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: support simulate with verbose for pipeline processor #33839

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,150 @@ teardown:
- length: { docs.0.processor_results.1: 2 }
- match: { docs.0.processor_results.1.tag: "rename-1" }
- match: { docs.0.processor_results.1.doc._source.new_status: 200 }

---
"Test verbose simulate with Pipeline Processor with Circular Pipelines":
- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "inner"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "inner"
body: >
{
"description" : "inner pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "outer"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"processors" : [
{
"pipeline" : {
"pipeline": "outer"
}
}
]
}
,
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"field1": "123.42 400 <foo>"
}
}
]
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Recursive invocation of pipeline [inner] detected." }

---
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors": [
{
"set": {
"field": "pipeline1",
"value": true
}
},
{
"pipeline": {
"pipeline": "pipeline2"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "pipeline2"
body: >
{
"processors": [
{
"set": {
"field": "pipeline2",
"value": true
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"processors": [
{
"set": {
"field": "pipeline0",
"value": true
}
},
{
"pipeline": {
"pipeline": "pipeline1"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"field1": "123.42 400 <foo>"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 3 }
- match: { docs.0.processor_results.0.doc._source.pipeline0: true }
- is_false: docs.0.processor_results.0.doc._source.pipeline1
- is_false: docs.0.processor_results.0.doc._source.pipeline2
- match: { docs.0.processor_results.1.doc._source.pipeline0: true }
- match: { docs.0.processor_results.1.doc._source.pipeline1: true }
- is_false: docs.0.processor_results.1.doc._source.pipeline2
- match: { docs.0.processor_results.2.doc._source.pipeline0: true }
- match: { docs.0.processor_results.2.doc._source.pipeline1: true }
- match: { docs.0.processor_results.2.doc._source.pipeline2: true }

Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;

class SimulateExecutionService {

Expand All @@ -42,11 +46,15 @@ class SimulateExecutionService {
}

SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
// Prevent cycles in pipeline decoration
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen);
try {
verbosePipelineProcessor.execute(ingestDocument);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
return new SimulateDocumentVerboseResult(processorResultList);
} catch (Exception e) {
return new SimulateDocumentVerboseResult(processorResultList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@
* under the License.
*/

package org.elasticsearch.ingest.common;
package org.elasticsearch.ingest;

import java.util.Map;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;

public class PipelineProcessor extends AbstractProcessor {

Expand All @@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument.executePipeline(pipeline);
}

Pipeline getPipeline(){
return ingestService.getPipeline(pipelineName);
}

@Override
public String getType() {
return TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
* under the License.
*/

package org.elasticsearch.action.ingest;
package org.elasticsearch.ingest;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.action.ingest.SimulateProcessorResult;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
Expand All @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor {
private final List<SimulateProcessorResult> processorResultList;
private final boolean ignoreFailure;

public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
this.ignoreFailure = ignoreFailure;
this.processorResultList = processorResultList;
this.actualProcessor = actualProcessor;
Expand Down Expand Up @@ -67,19 +66,35 @@ public String getTag() {
return actualProcessor.getTag();
}

public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList,
Set<PipelineProcessor> pipelinesSeen) {
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getProcessors()) {
if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList));
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Recursive invocation of pipeline ["
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not correct because it would also throw for repeated invocation of the same pipeline right? (Since PipelineProcessor doesn't implement equals you will probably only see this in some crazier scenarios (A calls B calls C and then another step of just B which would call C again making it throw on seeing C twice even though it's not a recursive invocation)) See #33419 for more here. I think the easiest fix to get out of that problem is to do the same thing that the pipeline processor does and simply track the current stack of pipelines in pipelinesSeen instead of all the pipelines ever seen (in this case this means just removing a pipeline proc from the pipelinesSeen after it has been unwrapped)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. fixed and updated in latest commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks looks good :)

+ pipelineProcessor.getPipeline().getId() + "] detected.");
}
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
} else if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
} else {
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
}
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Recursive invocation of pipeline ["
+ pipelineProcessor.getPipeline().getId() + "] detected.");
}
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
pipelinesSeen));
} else if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
} else {
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.common;
package org.elasticsearch.ingest;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -27,6 +27,7 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
Expand Down
Loading