Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: better support for conditionals with simulate?verbose #34155

Merged
merged 8 commits into from
Oct 23, 2018

Conversation

jakelandis
Copy link
Contributor

This commit introduces two corrections to the way simulate?verbose
handles conditionals on processors.

  1. Prior to this change when executing simulate?verbose for
    processors with conditionals that evaluate to false, that processor
    would still be displayed in the result set. What was displayed was
    correct, such that no changes to the document occurred. However, if the
    conditional evaluates to false, the processor should not even be
    displayed.

  2. Prior to this change when executing simulate?verbose for
    pipeline processors with conditionals, the individual steps would no
    longer be displayed. Commit e37e5df addressed the issue, but
    failed account for a conditional on the pipeline processor. Since
    a pipeline processor can introduce cycles and is effectively a
    single processor that encapsulates multiple other processors that
    are potentially guarded by a single conditional, special handling is
    needed to for pipeline and conditional pipeline processors.


  1. showing steps when conditional evaluates to false
PUT _ingest/pipeline/mypipeline0
{
  "processors": [
    {
      "set": {
        "if":"true",
        "tag": "set1",
        "field": "set1",
        "value": true
      }
    },
    {
      "set": {
        "if":"false",
        "tag": "set2",
        "field": "set2",
        "value": true
      }
    }
  ]
}
POST _ingest/pipeline/mypipeline0/_simulate?verbose
{
  "docs": [
    {
      "_source": {}
    }
  ]
}

Results before this change:

{
  "docs": [
    {
      "processor_results": [
        {
          "tag": "set1",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:31:19.365249Z"
            }
          }
        },
        {
          "tag": "set2",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:31:19.365249Z"
            }
          }
        }
      ]
    }
  ]
}

^^ set2 is displayed in the verbose results even though the conditional evaluated to false.

Results after this change:

{
  "docs": [
    {
      "processor_results": [
        {
          "tag": "set1",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:30:21.328835Z"
            }
          }
        }
      ]
    }
  ]
}

  1. showing all steps for conditional pipelines

The main change here is that due the conditional around a pipeline, we do not know until runtime if a pipeline can execute. This also means that due to the conditional we can NOT detect cycles until runtime. So the code has been changed to remove the static cycle check and build out the pipeline processors during runtime and for each pipeline run it through a non-tracking simulation looking for cycles. If cycles are found propagate the exception to stop the processing and display the error in the correct step, else let the tracking simulation continue.

PUT _ingest/pipeline/mypipeline1
{
  "processors": [
     {
      "set": {
        "tag": "set1",
        "field": "set1",
        "value": true
      }
    },
    {
      "pipeline": {
        "tag": "call-pipeline2",
        "pipeline": "mypipeline2"
      }
    }
  ]
}

PUT _ingest/pipeline/mypipeline2
{
  "processors": [
    {
      "set": {
        "tag": "set2",
        "field": "set2",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "true",
        "tag": "call-pipeline3",
        "pipeline": "mypipeline3"
      }
    }
  ]
}

PUT _ingest/pipeline/mypipeline3
{
  "processors": [
    {
      "set": {
        "tag": "set3",
        "field": "set3",
        "value": true
      }
    },
    {
      "set": {
        "tag": "set4",
        "field": "set4",
        "value": true
      }
    }
  ]
}

POST _ingest/pipeline/mypipeline1/_simulate?verbose
{
  "docs": [
    {
      "_source": {}
    }
  ]
}

Results before this change:

{
  "docs": [
    {
      "processor_results": [
        {
          "tag": "set1",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:26:41.524915Z"
            }
          }
        },
        {
          "tag": "set2",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true,
              "set2": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:26:41.524915Z"
            }
          }
        },
        {
          "tag": "call-pipeline3",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set3": true,
              "set2": true,
              "set4": true,
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:26:41.524915Z"
            }
          }
        }
      ]
    }
  ]
}

^^ Note that steps for set3 and set4 are not displayed. This is because the prior fix (e37e5df) did not account for the conditional that wrapped the pipeline processor. e37e5df only works if there is not a conditional on the pipeline (as evident by set2).

After this change:

{
  "docs": [
    {
      "processor_results": [
        {
          "tag": "set1",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:28:56.499409Z"
            }
          }
        },
        {
          "tag": "set2",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true,
              "set2": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:28:56.499409Z"
            }
          }
        },
        {
          "tag": "set3",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set1": true,
              "set3": true,
              "set2": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:28:56.499409Z"
            }
          }
        },
        {
          "tag": "set4",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "set3": true,
              "set2": true,
              "set4": true,
              "set1": true
            },
            "_ingest": {
              "timestamp": "2018-09-28T18:28:56.499409Z"
            }
          }
        }
      ]
    }
  ]
}

This commit introduces two corrections to the way simulate?verbose
handles conditionals on processors.

1) Prior to this change when executing simulate?verbose for
processors with conditionals that evaluate to false, that processor
would still be displayed in the result set. What was displayed was
correct, such that no changes to the document occurred. However, if the
conditional evaluates to false, the processor should not even be
displayed.

2) Prior to this change when executing simulate?verbose for
pipeline processors with conditionals, the individual steps would no
longer be displayed. Commit e37e5df addressed the issue, but
failed account for a conditional on the pipeline processor. Since
a pipeline processor can introduce cycles and is effectively a
single processor that encapsulates multiple other processors that
are potentially guarded by a single conditional, special handling is
needed to for pipeline and conditional pipeline processors.
@jakelandis jakelandis added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.0.0 v6.5.0 labels Sep 28, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than further complicating compound pipelines, this seems to me like a reason to make conditional first class members of processors, rather than the current implementation as a wrapper processor.

/**
* Exception thrown if there are cycles found in the execution of a pipeline
*/
class IngestCycleException extends RuntimeException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new exception? We generally have been trying to reduce the number of custom exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to make a decision based on the exception type while running a simulation : https://github.com/elastic/elasticsearch/pull/34155/files#diff-5777058d5b7e6c57d4f9e519ecdb0d24R64

Basically while running a simulation, I need to explicitly check for cycles and stop the simulation if a cycle is found. If an exception thrown, and it is not a IngestCycleException I let the simulation continue and fail normally (not in the cycle cycle) to allow the recording of the steps that led to the actual exception to be recorded and displayed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In offline discussions, I will remove this custom exception in favor of the original.

*/
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
try {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId());
throw new IngestCycleException("Cycle detected for pipeline: " + pipeline.getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be an IllegalArgumentException to produce a 400?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will get wrapped in a IllegalArgumentException.

"reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.IngestCycleException: Cycle detected for pipeline: pipeline2"

Since we can only detect cycles at runtime, it is the index request that will throw this exception e.g.

POST test/_doc?pipeline=pipeline1
{}

and results in the 500 HTTP response (not the pipeline put), and thus I think 500 is appropriate since there is nothing wrong that index request. Ideally we would check for cycles for on pipeline put and throw a 400, but we can't reliable check for cycles until runtime.

@jakelandis
Copy link
Contributor Author

Rather than further complicating compound pipelines, this seems to me like a reason to make conditional first class members of processors, rather than the current implementation as a wrapper processor.

I agree the conditionals as wrappers approach adds a bit of complication when working with the guts/execution of the pipeline and simulate. However, it is nice transparent functionality when simply working on a processor. I don't think re-implementing it is possible in the 6.5 timeframe, but will add as point of discussion to revisit this design choice now that we have some hindsight.

In the context of this PR, the main complexity is the need to check for cycles between pipelines at runtime. This is especially tricky for the simulate workflow since it needs to both record each step via static decoration and check for cycles at runtime. I am not sure that this specific complexity would be reduced much if we moved away from the wrapper design.

@jakelandis
Copy link
Contributor Author

@rjernst - Mind taking another look. No code changes, but replied to the original set of concerns.

@jakelandis
Copy link
Contributor Author

jakelandis commented Oct 22, 2018

@rjernst - mind taking another look ? Specifically the removal of the custom exception: 6150ab0

The branch conflict is due to #34202 getting reverted. I will re-introduce that PR (but without the bug) prior to merging this one.

EDIT: Re-introduction PR that will resolve the conflict (assuming it gets merged first) #34724

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM

@jakelandis jakelandis merged commit 89dc07b into elastic:master Oct 23, 2018
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Oct 23, 2018
* master: (24 commits)
  ingest: better support for conditionals with simulate?verbose (elastic#34155)
  [Rollup] Job deletion should be invoked on the allocated task (elastic#34574)
  [DOCS] .Security index is never auto created (elastic#34589)
  CCR: Requires soft-deletes on the follower (elastic#34725)
  re-enable bwc tests (elastic#34743)
  Empty GetAliases authorization fix (elastic#34444)
  INGEST: Document Processor Conditional (elastic#33388)
  [CCR] Add total fetch time leader stat (elastic#34577)
  SQL: Support pattern against compatible indices (elastic#34718)
  [CCR] Auto follow pattern APIs adjustments (elastic#34518)
  [Test] Remove dead code from ExceptionSerializationTests (elastic#34713)
  A small typo in migration-assistance doc (elastic#34704)
  ingest: processor stats (elastic#34724)
  SQL: Implement IN(value1, value2, ...) expression. (elastic#34581)
  Tests: Add checks to GeoDistanceQueryBuilderTests (elastic#34273)
  INGEST: Rename Pipeline Processor Param. (elastic#34733)
  Core: Move IndexNameExpressionResolver to java time (elastic#34507)
  [DOCS] Force Merge: clarify execution and storage requirements (elastic#33882)
  TESTING.asciidoc fix examples using forbidden annotation (elastic#34515)
  SQL: Implement `CONVERT`, an alternative to `CAST` (elastic#34660)
  ...
jakelandis added a commit that referenced this pull request Oct 23, 2018
This commit introduces two corrections to the way simulate?verbose
handles conditionals on processors.

1) Prior to this change when executing simulate?verbose for
processors with conditionals that evaluate to false, that processor
would still be displayed in the result set. What was displayed was
correct, such that no changes to the document occurred. However, if the
conditional evaluates to false, the processor should not even be
displayed.

2) Prior to this change when executing simulate?verbose for
pipeline processors with conditionals, the individual steps would no
longer be displayed. Commit e37e5df addressed the issue, but
failed account for a conditional on the pipeline processor. Since
a pipeline processor can introduce cycles and is effectively a
single processor that encapsulates multiple other processors that
are potentially guarded by a single conditional, special handling is
needed to for pipeline and conditional pipeline processors.
@jakelandis
Copy link
Contributor Author

6.x backport: c4f03ae

kcm pushed a commit that referenced this pull request Oct 30, 2018
This commit introduces two corrections to the way simulate?verbose
handles conditionals on processors.

1) Prior to this change when executing simulate?verbose for
processors with conditionals that evaluate to false, that processor
would still be displayed in the result set. What was displayed was
correct, such that no changes to the document occurred. However, if the
conditional evaluates to false, the processor should not even be
displayed.

2) Prior to this change when executing simulate?verbose for
pipeline processors with conditionals, the individual steps would no
longer be displayed. Commit e37e5df addressed the issue, but
failed account for a conditional on the pipeline processor. Since
a pipeline processor can introduce cycles and is effectively a
single processor that encapsulates multiple other processors that
are potentially guarded by a single conditional, special handling is
needed to for pipeline and conditional pipeline processors.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants