diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/StoredScriptsIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/StoredScriptsIT.java index b15467d24ba2b..242c2d9237dff 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/StoredScriptsIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/StoredScriptsIT.java @@ -47,7 +47,7 @@ public void testGetStoredScript() throws Exception { Collections.singletonMap(Script.CONTENT_TYPE_OPTION, XContentType.JSON.mediaType())); PutStoredScriptRequest request = - new PutStoredScriptRequest(id, "search", new BytesArray("{}"), XContentType.JSON, scriptSource); + new PutStoredScriptRequest(id, "score", new BytesArray("{}"), XContentType.JSON, scriptSource); assertAcked(execute(request, highLevelClient()::putScript, highLevelClient()::putScriptAsync)); GetStoredScriptRequest getRequest = new GetStoredScriptRequest("calculate-score"); @@ -66,7 +66,7 @@ public void testDeleteStoredScript() throws Exception { Collections.singletonMap(Script.CONTENT_TYPE_OPTION, XContentType.JSON.mediaType())); PutStoredScriptRequest request = - new PutStoredScriptRequest(id, "search", new BytesArray("{}"), XContentType.JSON, scriptSource); + new PutStoredScriptRequest(id, "score", new BytesArray("{}"), XContentType.JSON, scriptSource); assertAcked(execute(request, highLevelClient()::putScript, highLevelClient()::putScriptAsync)); DeleteStoredScriptRequest deleteRequest = new DeleteStoredScriptRequest(id); @@ -89,7 +89,7 @@ public void testPutScript() throws Exception { Collections.singletonMap(Script.CONTENT_TYPE_OPTION, XContentType.JSON.mediaType())); PutStoredScriptRequest request = - new PutStoredScriptRequest(id, "search", new BytesArray("{}"), XContentType.JSON, scriptSource); + new PutStoredScriptRequest(id, "score", new BytesArray("{}"), XContentType.JSON, scriptSource); assertAcked(execute(request, highLevelClient()::putScript, highLevelClient()::putScriptAsync)); Map script = getAsMap("/_scripts/" + id); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/StoredScriptsDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/StoredScriptsDocumentationIT.java index 9165c5cf10d0e..ac4f2269f9c79 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/StoredScriptsDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/StoredScriptsDocumentationIT.java @@ -307,7 +307,7 @@ public void onFailure(Exception e) { private void putStoredScript(String id, StoredScriptSource scriptSource) throws IOException { PutStoredScriptRequest request = - new PutStoredScriptRequest(id, "search", new BytesArray("{}"), XContentType.JSON, scriptSource); + new PutStoredScriptRequest(id, "score", new BytesArray("{}"), XContentType.JSON, scriptSource); assertAcked(execute(request, highLevelClient()::putScript, highLevelClient()::putScriptAsync)); } } diff --git a/docs/build.gradle b/docs/build.gradle index 045744de9f54c..dd1846dc0459c 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -36,8 +36,11 @@ buildRestTests.expectedUnconvertedCandidates = [ ] integTestCluster { - /* Enable regexes in painless so our tests don't complain about example - * snippets that use them. */ + if ("zip".equals(integTestCluster.distribution)) { + setting 'xpack.license.self_generated.type', 'trial' + } + + // enable regexes in painless so our tests don't complain about example snippets that use them setting 'script.painless.regex.enabled', 'true' Closure configFile = { extraConfigFile it, "src/test/cluster/config/$it" diff --git a/docs/painless/painless-contexts/painless-field-context.asciidoc b/docs/painless/painless-contexts/painless-field-context.asciidoc index 4c767ca389115..80307b25ea545 100644 --- a/docs/painless/painless-contexts/painless-field-context.asciidoc +++ b/docs/painless/painless-contexts/painless-field-context.asciidoc @@ -14,13 +14,10 @@ a customized value for each document in the results of a query. Contains the fields of the specified document where each field is a `List` of values. -{ref}/mapping-source-field.html[`ctx['_source']`] (`Map`):: +{ref}/mapping-source-field.html[`params['_source']`] (`Map`, read-only):: Contains extracted JSON in a `Map` and `List` structure for the fields existing in a stored document. -`_score` (`double` read-only):: - The original score of the specified document. - *Return* `Object`:: @@ -28,4 +25,4 @@ a customized value for each document in the results of a query. *API* -The standard <> is available. \ No newline at end of file +The standard <> is available. diff --git a/docs/reference/licensing/get-license.asciidoc b/docs/reference/licensing/get-license.asciidoc index bf094d99f2f5a..2a7b2da117a5b 100644 --- a/docs/reference/licensing/get-license.asciidoc +++ b/docs/reference/licensing/get-license.asciidoc @@ -39,7 +39,7 @@ For more information, see [float] ==== Examples -The following example provides information about a basic license: +The following example provides information about a trial license: [source,js] -------------------------------------------------- @@ -53,9 +53,11 @@ GET _xpack/license "license" : { "status" : "active", "uid" : "cbff45e7-c553-41f7-ae4f-9205eabd80xx", - "type" : "basic", - "issue_date" : "2018-02-22T23:12:05.550Z", - "issue_date_in_millis" : 1519341125550, + "type" : "trial", + "issue_date" : "2018-10-20T22:05:12.332Z", + "issue_date_in_millis" : 1540073112332, + "expiry_date" : "2018-11-19T22:05:12.332Z", + "expiry_date_in_millis" : 1542665112332, "max_nodes" : 1000, "issued_to" : "test", "issuer" : "elasticsearch", @@ -65,8 +67,10 @@ GET _xpack/license -------------------------------------------------- // TESTRESPONSE[s/"cbff45e7-c553-41f7-ae4f-9205eabd80xx"/$body.license.uid/] // TESTRESPONSE[s/"basic"/$body.license.type/] -// TESTRESPONSE[s/"2018-02-22T23:12:05.550Z"/$body.license.issue_date/] -// TESTRESPONSE[s/1519341125550/$body.license.issue_date_in_millis/] +// TESTRESPONSE[s/"2018-10-20T22:05:12.332Z"/$body.license.issue_date/] +// TESTRESPONSE[s/1540073112332/$body.license.issue_date_in_millis/] +// TESTRESPONSE[s/"2018-11-19T22:05:12.332Z"/$body.license.expiry_date/] +// TESTRESPONSE[s/1542665112332/$body.license.expiry_date_in_millis/] // TESTRESPONSE[s/1000/$body.license.max_nodes/] // TESTRESPONSE[s/"test"/$body.license.issued_to/] // TESTRESPONSE[s/"elasticsearch"/$body.license.issuer/] diff --git a/docs/reference/rest-api/info.asciidoc b/docs/reference/rest-api/info.asciidoc index 1cf4ab563b185..125a7c3d1272e 100644 --- a/docs/reference/rest-api/info.asciidoc +++ b/docs/reference/rest-api/info.asciidoc @@ -57,24 +57,25 @@ Example response: }, "license" : { "uid" : "893361dc-9749-4997-93cb-xxx", - "type" : "basic", - "mode" : "basic", - "status" : "active" + "type" : "trial", + "mode" : "trial", + "status" : "active", + "expiry_date_in_millis" : 1542665112332 }, "features" : { "graph" : { "description" : "Graph Data Exploration for the Elastic Stack", - "available" : false, + "available" : true, "enabled" : true }, "logstash" : { "description" : "Logstash management component for X-Pack", - "available" : false, + "available" : true, "enabled" : true }, "ml" : { "description" : "Machine Learning for the Elastic Stack", - "available" : false, + "available" : true, "enabled" : true, "native_code_info" : { "version" : "7.0.0-alpha1-SNAPSHOT", @@ -93,12 +94,12 @@ Example response: }, "security" : { "description" : "Security for the Elastic Stack", - "available" : false, - "enabled" : true + "available" : true, + "enabled" : false }, "watcher" : { "description" : "Alerting, Notification and Automation for the Elastic Stack", - "available" : false, + "available" : true, "enabled" : true } }, @@ -108,7 +109,7 @@ Example response: // TESTRESPONSE[s/"hash" : "2798b1a3ce779b3611bb53a0082d4d741e4d3168",/"hash" : "$body.build.hash",/] // TESTRESPONSE[s/"date" : "2015-04-07T13:34:42Z"/"date" : "$body.build.date"/] // TESTRESPONSE[s/"uid" : "893361dc-9749-4997-93cb-xxx",/"uid": "$body.license.uid",/] -// TESTRESPONSE[s/"expiry_date_in_millis" : 1914278399999/"expiry_date_in_millis" : "$body.license.expiry_date_in_millis"/] +// TESTRESPONSE[s/"expiry_date_in_millis" : 1542665112332/"expiry_date_in_millis" : "$body.license.expiry_date_in_millis"/] // TESTRESPONSE[s/"version" : "7.0.0-alpha1-SNAPSHOT",/"version": "$body.features.ml.native_code_info.version",/] // TESTRESPONSE[s/"build_hash" : "99a07c016d5a73"/"build_hash": "$body.features.ml.native_code_info.build_hash"/] // So much s/// but at least we test that the layout is close to matching.... diff --git a/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionFieldScript.java b/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionFieldScript.java new file mode 100644 index 0000000000000..14fd1dd6c1242 --- /dev/null +++ b/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionFieldScript.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.script.expression; + +import org.apache.lucene.expressions.Expression; +import org.apache.lucene.expressions.SimpleBindings; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DoubleValues; +import org.apache.lucene.search.DoubleValuesSource; +import org.elasticsearch.script.FieldScript; +import org.elasticsearch.script.GeneralScriptException; + +import java.io.IOException; + +public class ExpressionFieldScript implements FieldScript.LeafFactory { + private final Expression exprScript; + private final DoubleValuesSource source; + + ExpressionFieldScript(Expression e, SimpleBindings b) { + this.exprScript = e; + this.source = exprScript.getDoubleValuesSource(b); + } + + @Override + public FieldScript newInstance(final LeafReaderContext leaf) throws IOException { + return new FieldScript() { + + // Fake the scorer until setScorer is called. + DoubleValues values = source.getValues(leaf, null); + + @Override + public Object execute() { + try { + return values.doubleValue(); + } catch (Exception exception) { + throw new GeneralScriptException("Error evaluating " + exprScript, exception); + } + } + + @Override + public void setDocument(int d) { + try { + values.advanceExact(d); + } catch (IOException e) { + throw new IllegalStateException("Can't advance to doc using " + exprScript, e); + } + } + }; + } +} diff --git a/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java b/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java index 0ece3834653de..d719f7a2cbcd8 100644 --- a/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java +++ b/modules/lang-expression/src/main/java/org/elasticsearch/script/expression/ExpressionScriptEngine.java @@ -41,6 +41,7 @@ import org.elasticsearch.script.BucketAggregationScript; import org.elasticsearch.script.BucketAggregationSelectorScript; import org.elasticsearch.script.ClassPermission; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.FilterScript; import org.elasticsearch.script.NumberSortScript; import org.elasticsearch.script.ScoreScript; @@ -139,6 +140,9 @@ public boolean execute() { } else if (context.instanceClazz.equals(NumberSortScript.class)) { NumberSortScript.Factory factory = (p, lookup) -> newSortScript(expr, lookup, p); return context.factoryClazz.cast(factory); + } else if (context.instanceClazz.equals(FieldScript.class)) { + FieldScript.Factory factory = (p, lookup) -> newFieldScript(expr, lookup, p); + return context.factoryClazz.cast(factory); } throw new IllegalArgumentException("expression engine does not know how to handle script context [" + context.name + "]"); } @@ -289,6 +293,23 @@ private AggregationScript.LeafFactory newAggregationScript(Expression expr, Sear return new ExpressionAggregationScript(expr, bindings, specialValue); } + private FieldScript.LeafFactory newFieldScript(Expression expr, SearchLookup lookup, @Nullable Map vars) { + SimpleBindings bindings = new SimpleBindings(); + for (String variable : expr.variables) { + try { + if (vars != null && vars.containsKey(variable)) { + bindFromParams(vars, bindings, variable); + } else { + final ValueSource valueSource = getDocValueSource(variable, lookup); + bindings.add(variable, valueSource.asDoubleValuesSource()); + } + } catch (Exception e) { + throw convertToScriptException("link error", expr.sourceText, variable, e); + } + } + return new ExpressionFieldScript(expr, bindings); + } + /** * This is a hack for filter scripts, which must return booleans instead of doubles as expression do. * See https://github.com/elastic/elasticsearch/issues/26429. diff --git a/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/ExpressionTests.java b/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/ExpressionFieldScriptTests.java similarity index 75% rename from modules/lang-expression/src/test/java/org/elasticsearch/script/expression/ExpressionTests.java rename to modules/lang-expression/src/test/java/org/elasticsearch/script/expression/ExpressionFieldScriptTests.java index 33e6239002eb1..b1872b30f1f17 100644 --- a/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/ExpressionTests.java +++ b/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/ExpressionFieldScriptTests.java @@ -24,10 +24,9 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType; -import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.ScriptException; -import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.lookup.SearchLookup; import org.elasticsearch.test.ESTestCase; @@ -35,12 +34,13 @@ import java.text.ParseException; import java.util.Collections; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ExpressionTests extends ESTestCase { +public class ExpressionFieldScriptTests extends ESTestCase { private ExpressionScriptEngine service; private SearchLookup lookup; @@ -48,7 +48,7 @@ public class ExpressionTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); - NumberFieldType fieldType = new NumberFieldType(NumberType.DOUBLE); + NumberFieldMapper.NumberFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); MapperService mapperService = mock(MapperService.class); when(mapperService.fullName("field")).thenReturn(fieldType); when(mapperService.fullName("alias")).thenReturn(fieldType); @@ -68,18 +68,11 @@ public void setUp() throws Exception { lookup = new SearchLookup(mapperService, ignored -> fieldData, null); } - private SearchScript.LeafFactory compile(String expression) { - SearchScript.Factory factory = service.compile(null, expression, SearchScript.CONTEXT, Collections.emptyMap()); + private FieldScript.LeafFactory compile(String expression) { + FieldScript.Factory factory = service.compile(null, expression, FieldScript.CONTEXT, Collections.emptyMap()); return factory.newFactory(Collections.emptyMap(), lookup); } - public void testNeedsScores() { - assertFalse(compile("1.2").needs_score()); - assertFalse(compile("doc['field'].value").needs_score()); - assertTrue(compile("1/_score").needs_score()); - assertTrue(compile("doc['field'].value * _score").needs_score()); - } - public void testCompileError() { ScriptException e = expectThrows(ScriptException.class, () -> { compile("doc['field'].value * *@#)(@$*@#$ + 4"); @@ -95,18 +88,18 @@ public void testLinkError() { } public void testFieldAccess() throws IOException { - SearchScript script = compile("doc['field'].value").newInstance(null); + FieldScript script = compile("doc['field'].value").newInstance(null); script.setDocument(1); - double result = script.runAsDouble(); - assertEquals(2.718, result, 0.0); + Object result = script.execute(); + assertThat(result, equalTo(2.718)); } public void testFieldAccessWithFieldAlias() throws IOException { - SearchScript script = compile("doc['alias'].value").newInstance(null); + FieldScript script = compile("doc['alias'].value").newInstance(null); script.setDocument(1); - double result = script.runAsDouble(); - assertEquals(2.718, result, 0.0); + Object result = script.execute(); + assertThat(result, equalTo(2.718)); } } diff --git a/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/StoredExpressionTests.java b/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/StoredExpressionTests.java index 4bf8af9a12eda..7f7f30f271acf 100644 --- a/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/StoredExpressionTests.java +++ b/modules/lang-expression/src/test/java/org/elasticsearch/script/expression/StoredExpressionTests.java @@ -70,7 +70,7 @@ public void testAllOpsDisabledIndexedScripts() throws IOException { .setIndices("test").setTypes("scriptTest").get(); fail("search script should have been rejected"); } catch(Exception e) { - assertThat(e.toString(), containsString("cannot execute scripts using [search] context")); + assertThat(e.toString(), containsString("cannot execute scripts using [field] context")); } try { client().prepareSearch("test") diff --git a/modules/lang-painless/src/test/java/org/elasticsearch/painless/ScriptTestCase.java b/modules/lang-painless/src/test/java/org/elasticsearch/painless/ScriptTestCase.java index e69a1ad5dcfd0..5a4c5de015bc1 100644 --- a/modules/lang-painless/src/test/java/org/elasticsearch/painless/ScriptTestCase.java +++ b/modules/lang-painless/src/test/java/org/elasticsearch/painless/ScriptTestCase.java @@ -27,7 +27,6 @@ import org.elasticsearch.painless.spi.Whitelist; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptException; -import org.elasticsearch.script.SearchScript; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -67,7 +66,6 @@ protected Settings scriptEngineSettings() { */ protected Map, List> scriptContexts() { Map, List> contexts = new HashMap<>(); - contexts.put(SearchScript.CONTEXT, Whitelist.BASE_WHITELISTS); contexts.put(PainlessTestScript.CONTEXT, Whitelist.BASE_WHITELISTS); return contexts; } diff --git a/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/16_update2.yml b/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/16_update2.yml index 253676bda8e38..999733ff14be7 100644 --- a/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/16_update2.yml +++ b/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/16_update2.yml @@ -38,12 +38,12 @@ catch: bad_request put_script: id: "1" - context: "search" + context: "score" body: { "script": {"lang": "painless", "source": "_score * foo bar + doc['myParent.weight'].value"} } - do: catch: /compile error/ put_script: id: "1" - context: "search" + context: "score" body: { "script": {"lang": "painless", "source": "_score * foo bar + doc['myParent.weight'].value"} } diff --git a/plugins/examples/painless-whitelist/src/main/java/org/elasticsearch/example/painlesswhitelist/ExampleWhitelistExtension.java b/plugins/examples/painless-whitelist/src/main/java/org/elasticsearch/example/painlesswhitelist/ExampleWhitelistExtension.java index 9e3bc66e7d58d..ca35db5a81b3d 100644 --- a/plugins/examples/painless-whitelist/src/main/java/org/elasticsearch/example/painlesswhitelist/ExampleWhitelistExtension.java +++ b/plugins/examples/painless-whitelist/src/main/java/org/elasticsearch/example/painlesswhitelist/ExampleWhitelistExtension.java @@ -19,15 +19,15 @@ package org.elasticsearch.example.painlesswhitelist; -import java.util.Collections; -import java.util.List; -import java.util.Map; - import org.elasticsearch.painless.spi.PainlessExtension; import org.elasticsearch.painless.spi.Whitelist; import org.elasticsearch.painless.spi.WhitelistLoader; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.script.SearchScript; + +import java.util.Collections; +import java.util.List; +import java.util.Map; /** An extension of painless which adds a whitelist. */ public class ExampleWhitelistExtension implements PainlessExtension { @@ -37,6 +37,6 @@ public class ExampleWhitelistExtension implements PainlessExtension { @Override public Map, List> getContextWhitelists() { - return Collections.singletonMap(SearchScript.CONTEXT, Collections.singletonList(WHITELIST)); + return Collections.singletonMap(FieldScript.CONTEXT, Collections.singletonList(WHITELIST)); } } diff --git a/server/src/main/java/org/elasticsearch/index/query/InnerHitContextBuilder.java b/server/src/main/java/org/elasticsearch/index/query/InnerHitContextBuilder.java index 58d271bb8206c..1fe781f38ce27 100644 --- a/server/src/main/java/org/elasticsearch/index/query/InnerHitContextBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/InnerHitContextBuilder.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.query; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.script.SearchScript; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; import org.elasticsearch.search.fetch.subphase.InnerHitsContext; @@ -88,10 +88,10 @@ protected void setupInnerHitsContext(QueryShardContext queryShardContext, if (innerHitBuilder.getScriptFields() != null) { for (SearchSourceBuilder.ScriptField field : innerHitBuilder.getScriptFields()) { QueryShardContext innerContext = innerHitsContext.getQueryShardContext(); - SearchScript.Factory factory = innerContext.getScriptService().compile(field.script(), SearchScript.CONTEXT); - SearchScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), innerHitsContext.lookup()); + FieldScript.Factory factory = innerContext.getScriptService().compile(field.script(), FieldScript.CONTEXT); + FieldScript.LeafFactory fieldScript = factory.newFactory(field.script().getParams(), innerHitsContext.lookup()); innerHitsContext.scriptFields().add(new org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField( - field.fieldName(), searchScript, field.ignoreFailure())); + field.fieldName(), fieldScript, field.ignoreFailure())); } } if (innerHitBuilder.getFetchSourceContext() != null) { diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index e1a413f6aa9bb..3b8281bd471d2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -20,12 +20,15 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.collect.Tuple; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Collectors; /** @@ -40,16 +43,33 @@ public class CompoundProcessor implements Processor { private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; + private final List> processorsWithMetrics; + private final LongSupplier relativeTimeProvider; + + CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) { + this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider); + } public CompoundProcessor(Processor... processor) { this(false, Arrays.asList(processor), Collections.emptyList()); } public CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors) { + this(ignoreFailure, processors, onFailureProcessors, System::nanoTime); + } + CompoundProcessor(boolean ignoreFailure, List processors, List onFailureProcessors, + LongSupplier relativeTimeProvider) { super(); this.ignoreFailure = ignoreFailure; this.processors = processors; this.onFailureProcessors = onFailureProcessors; + this.relativeTimeProvider = relativeTimeProvider; + this.processorsWithMetrics = new ArrayList<>(processors.size()); + processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric()))); + } + + List> getProcessorsWithMetrics() { + return processorsWithMetrics; } public boolean isIgnoreFailure() { @@ -94,12 +114,17 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Processor processor : processors) { + for (Tuple processorWithMetric : processorsWithMetrics) { + Processor processor = processorWithMetric.v1(); + IngestMetric metric = processorWithMetric.v2(); + long startTimeInNanos = relativeTimeProvider.getAsLong(); try { + metric.preIngest(); if (processor.execute(ingestDocument) == null) { return null; } } catch (Exception e) { + metric.ingestFailed(); if (ignoreFailure) { continue; } @@ -112,11 +137,15 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { executeOnFailure(ingestDocument, compoundProcessorException); break; } + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); } } return ingestDocument; } + void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { try { putFailureMetadata(ingestDocument, exception); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index b6f6612344a39..9078dc86c1b07 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -28,6 +28,8 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.Script; @@ -42,12 +44,20 @@ public class ConditionalProcessor extends AbstractProcessor { private final ScriptService scriptService; private final Processor processor; + private final IngestMetric metric; + private final LongSupplier relativeTimeProvider; ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { + this(tag, script, scriptService, processor, System::nanoTime); + } + + ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) { super(tag); this.condition = script; this.scriptService = scriptService; this.processor = processor; + this.metric = new IngestMetric(); + this.relativeTimeProvider = relativeTimeProvider; } @Override @@ -55,11 +65,30 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestConditionalScript script = scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - return processor.execute(ingestDocument); + // Only record metric if the script evaluates to true + long startTimeInNanos = relativeTimeProvider.getAsLong(); + try { + metric.preIngest(); + return processor.execute(ingestDocument); + } catch (Exception e) { + metric.ingestFailed(); + throw e; + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); + } } return ingestDocument; } + Processor getProcessor() { + return processor; + } + + IngestMetric getMetric() { + return metric; + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 6c46a9b2354f6..705e77028a1ef 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,19 +19,6 @@ package org.elasticsearch.ingest; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -49,6 +36,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -61,6 +49,19 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * Holder class for several ingest related services. */ @@ -262,11 +263,59 @@ public void applyClusterState(final ClusterChangedEvent event) { Pipeline originalPipeline = originalPipelines.get(id); if (originalPipeline != null) { pipeline.getMetrics().add(originalPipeline.getMetrics()); + List> oldPerProcessMetrics = new ArrayList<>(); + List> newPerProcessMetrics = new ArrayList<>(); + getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics); + getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics); + //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since + //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and + //consistent id's per processor and/or semantic equals for each processor will be needed. + if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) { + Iterator> oldMetricsIterator = oldPerProcessMetrics.iterator(); + for (Tuple compositeMetric : newPerProcessMetrics) { + String type = compositeMetric.v1().getType(); + IngestMetric metric = compositeMetric.v2(); + if (oldMetricsIterator.hasNext()) { + Tuple oldCompositeMetric = oldMetricsIterator.next(); + String oldType = oldCompositeMetric.v1().getType(); + IngestMetric oldMetric = oldCompositeMetric.v2(); + if (type.equals(oldType)) { + metric.add(oldMetric); + } + } + } + } } }); } } + /** + * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as + * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. + * @param compoundProcessor The compound processor to start walking the non-failure processors + * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. + * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor + */ + private static List> getProcessorMetrics(CompoundProcessor compoundProcessor, + List> processorMetrics) { + //only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure + for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { + Processor processor = processorWithMetric.v1(); + IngestMetric metric = processorWithMetric.v2(); + if (processor instanceof CompoundProcessor) { + getProcessorMetrics((CompoundProcessor) processor, processorMetrics); + } else { + //Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true. + if (processor instanceof ConditionalProcessor) { + metric = ((ConditionalProcessor) processor).getMetric(); + } + processorMetrics.add(new Tuple<>(processor, metric)); + } + } + return processorMetrics; + } + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; @@ -371,11 +420,42 @@ protected void doRun() { } public IngestStats stats() { + IngestStats.Builder statsBuilder = new IngestStats.Builder(); + statsBuilder.addTotalMetrics(totalMetrics); + pipelines.forEach((id, pipeline) -> { + CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); + statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); + List> processorMetrics = new ArrayList<>(); + getProcessorMetrics(rootProcessor, processorMetrics); + processorMetrics.forEach(t -> { + Processor processor = t.v1(); + IngestMetric processorMetric = t.v2(); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + }); + }); + return statsBuilder.build(); + } - Map statsPerPipeline = - pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); + //package private for testing + static String getProcessorName(Processor processor){ + // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name + if(processor instanceof ConditionalProcessor){ + processor = ((ConditionalProcessor) processor).getProcessor(); + } + StringBuilder sb = new StringBuilder(5); + sb.append(processor.getType()); - return new IngestStats(totalMetrics.createStats(), statsPerPipeline); + if(processor instanceof PipelineProcessor){ + String pipelineName = ((PipelineProcessor) processor).getPipelineName(); + sb.append(":"); + sb.append(pipelineName); + } + String tag = processor.getTag(); + if(tag != null && !tag.isEmpty()){ + sb.append(":"); + sb.append(tag); + } + return sb.toString(); } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index c4c1520fd19d4..e3d671bc8b2a0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -27,17 +27,28 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class IngestStats implements Writeable, ToXContentFragment { private final Stats totalStats; - private final Map statsPerPipeline; + private final List pipelineStats; + private final Map> processorStats; - public IngestStats(Stats totalStats, Map statsPerPipeline) { + /** + * @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats, + * and pipeline stats are logically the sum of the processor stats. + * @param pipelineStats - The stats for a given ingest pipeline. + * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. + */ + public IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) { this.totalStats = totalStats; - this.statsPerPipeline = statsPerPipeline; + this.pipelineStats = pipelineStats; + this.processorStats = processorStats; } /** @@ -46,37 +57,43 @@ public IngestStats(Stats totalStats, Map statsPerPipeline) { public IngestStats(StreamInput in) throws IOException { this.totalStats = new Stats(in); int size = in.readVInt(); - this.statsPerPipeline = new HashMap<>(size); + this.pipelineStats = new ArrayList<>(size); + this.processorStats = new HashMap<>(size); for (int i = 0; i < size; i++) { - statsPerPipeline.put(in.readString(), new Stats(in)); + String pipelineId = in.readString(); + Stats pipelineStat = new Stats(in); + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); + int processorsSize = in.readVInt(); + List processorStatsPerPipeline = new ArrayList<>(processorsSize); + for (int j = 0; j < processorsSize; j++) { + String processorName = in.readString(); + Stats processorStat = new Stats(in); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + } + this.processorStats.put(pipelineId, processorStatsPerPipeline); } } @Override public void writeTo(StreamOutput out) throws IOException { totalStats.writeTo(out); - out.writeVInt(statsPerPipeline.size()); - for (Map.Entry entry : statsPerPipeline.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + out.writeVInt(pipelineStats.size()); + for (PipelineStat pipelineStat : pipelineStats) { + out.writeString(pipelineStat.getPipelineId()); + pipelineStat.getStats().writeTo(out); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + if(processorStatsForPipeline == null) { + out.writeVInt(0); + }else{ + out.writeVInt(processorStatsForPipeline.size()); + for (ProcessorStat processorStat : processorStatsForPipeline) { + out.writeString(processorStat.getName()); + processorStat.getStats().writeTo(out); + } + } } } - - /** - * @return The accumulated stats for all pipelines - */ - public Stats getTotalStats() { - return totalStats; - } - - /** - * @return The stats on a per pipeline basis - */ - public Map getStatsPerPipeline() { - return statsPerPipeline; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("ingest"); @@ -84,9 +101,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws totalStats.toXContent(builder, params); builder.endObject(); builder.startObject("pipelines"); - for (Map.Entry entry : statsPerPipeline.entrySet()) { - builder.startObject(entry.getKey()); - entry.getValue().toXContent(builder, params); + for (PipelineStat pipelineStat : pipelineStats) { + builder.startObject(pipelineStat.getPipelineId()); + pipelineStat.getStats().toXContent(builder, params); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + builder.startArray("processors"); + if (processorStatsForPipeline != null) { + for (ProcessorStat processorStat : processorStatsForPipeline) { + builder.startObject(); + builder.startObject(processorStat.getName()); + processorStat.getStats().toXContent(builder, params); + builder.endObject(); + builder.endObject(); + } + } + builder.endArray(); builder.endObject(); } builder.endObject(); @@ -94,6 +123,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public Stats getTotalStats() { + return totalStats; + } + + public List getPipelineStats() { + return pipelineStats; + } + + public Map> getProcessorStats() { + return processorStats; + } + public static class Stats implements Writeable, ToXContentFragment { private final long ingestCount; @@ -134,7 +175,6 @@ public long getIngestCount() { } /** - * * @return The total time spent of ingest preprocessing in millis. */ public long getIngestTimeInMillis() { @@ -164,4 +204,77 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } + + /** + * Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects + */ + static class Builder { + private Stats totalStats; + private List pipelineStats = new ArrayList<>(); + private Map> processorStats = new HashMap<>(); + + + Builder addTotalMetrics(IngestMetric totalMetric) { + this.totalStats = totalMetric.createStats(); + return this; + } + + Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats())); + return this; + } + + Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStat(processorName, metric.createStats())); + return this; + } + + IngestStats build() { + return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats), + Collections.unmodifiableMap(processorStats)); + } + } + + /** + * Container for pipeline stats. + */ + public static class PipelineStat { + private final String pipelineId; + private final Stats stats; + + public PipelineStat(String pipelineId, Stats stats) { + this.pipelineId = pipelineId; + this.stats = stats; + } + + public String getPipelineId() { + return pipelineId; + } + + public Stats getStats() { + return stats; + } + } + + /** + * Container for processor stats. + */ + public static class ProcessorStat { + private final String name; + private final Stats stats; + + public ProcessorStat(String name, Stats stats) { + this.name = name; + this.stats = stats; + } + + public String getName() { + return name; + } + + public Stats getStats() { + return stats; + } + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 8d5f6d6ff7c54..fc5311be5cbde 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,11 +22,12 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -47,20 +48,21 @@ public final class Pipeline { private final Integer version; private final CompoundProcessor compoundProcessor; private final IngestMetric metrics; - private final Clock clock; + private final LongSupplier relativeTimeProvider; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { - this(id, description, version, compoundProcessor, Clock.systemUTC()); + this(id, description, version, compoundProcessor, System::nanoTime); } //package private for testing - Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { + Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, + LongSupplier relativeTimeProvider) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; this.metrics = new IngestMetric(); - this.clock = clock; + this.relativeTimeProvider = relativeTimeProvider; } public static Pipeline create(String id, Map config, @@ -89,7 +91,7 @@ public static Pipeline create(String id, Map config, * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInMillis = clock.millis(); + long startTimeInNanos = relativeTimeProvider.getAsLong(); try { metrics.preIngest(); return compoundProcessor.execute(ingestDocument); @@ -97,7 +99,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { metrics.ingestFailed(); throw e; } finally { - long ingestTimeInMillis = clock.millis() - startTimeInMillis; + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metrics.postIngest(ingestTimeInMillis); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 918ff6b8aefee..16324e8dee6c7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -53,6 +53,10 @@ public String getType() { return TYPE; } + String getPipelineName() { + return pipelineName; + } + public static final class Factory implements Processor.Factory { private final IngestService ingestService; diff --git a/server/src/main/java/org/elasticsearch/script/FieldScript.java b/server/src/main/java/org/elasticsearch/script/FieldScript.java new file mode 100644 index 0000000000000..98649dbb33043 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/script/FieldScript.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.script; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.index.fielddata.ScriptDocValues; +import org.elasticsearch.search.lookup.LeafSearchLookup; +import org.elasticsearch.search.lookup.SearchLookup; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A script to produce dynamic values for return fields. + */ +public abstract class FieldScript { + + public static final String[] PARAMETERS = {}; + + private static final Map DEPRECATIONS; + + static { + Map deprecations = new HashMap<>(); + deprecations.put( + "doc", + "Accessing variable [doc] via [params.doc] from within a field script " + + "is deprecated in favor of directly accessing [doc]." + ); + deprecations.put( + "_doc", + "Accessing variable [doc] via [params._doc] from within a field script " + + "is deprecated in favor of directly accessing [doc]." + ); + DEPRECATIONS = Collections.unmodifiableMap(deprecations); + } + + /** The generic runtime parameters for the script. */ + private final Map params; + + /** A leaf lookup for the bound segment this script will operate on. */ + private final LeafSearchLookup leafLookup; + + public FieldScript(Map params, SearchLookup lookup, LeafReaderContext leafContext) { + this.leafLookup = lookup.getLeafSearchLookup(leafContext); + params = new HashMap<>(params); + params.putAll(leafLookup.asMap()); + this.params = new ParameterMap(params, DEPRECATIONS); + } + + // for expression engine + protected FieldScript() { + params = null; + leafLookup = null; + } + + public abstract Object execute(); + + /** The leaf lookup for the Lucene segment this script was created for. */ + protected final LeafSearchLookup getLeafLookup() { + return leafLookup; + } + + /** Return the parameters for this script. */ + public Map getParams() { + return params; + } + + /** The doc lookup for the Lucene segment this script was created for. */ + public final Map> getDoc() { + return leafLookup.doc(); + } + + /** Set the current document to run the script on next. */ + public void setDocument(int docid) { + leafLookup.setDocument(docid); + } + + /** A factory to construct {@link SearchScript} instances. */ + public interface LeafFactory { + FieldScript newInstance(LeafReaderContext ctx) throws IOException; + } + + public interface Factory { + LeafFactory newFactory(Map params, SearchLookup lookup); + } + + /** The context used to compile {@link FieldScript} factories. */ + public static final ScriptContext CONTEXT = new ScriptContext<>("field", Factory.class); +} diff --git a/server/src/main/java/org/elasticsearch/script/ScriptModule.java b/server/src/main/java/org/elasticsearch/script/ScriptModule.java index 24dd491e3a183..1c53ef133de86 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptModule.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptModule.java @@ -40,7 +40,7 @@ public class ScriptModule { public static final Map> CORE_CONTEXTS; static { CORE_CONTEXTS = Stream.of( - SearchScript.CONTEXT, + FieldScript.CONTEXT, AggregationScript.CONTEXT, ScoreScript.CONTEXT, NumberSortScript.CONTEXT, diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index d8829bd11d386..6512e25fc0b78 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -58,8 +58,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.node.ResponseCollectorService; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.aggregations.AggregationInitializationException; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -863,8 +863,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc + IndexSettings.MAX_SCRIPT_FIELDS_SETTING.getKey() + "] index level setting."); } for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) { - SearchScript.Factory factory = scriptService.compile(field.script(), SearchScript.CONTEXT); - SearchScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), context.lookup()); + FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT); + FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), context.lookup()); context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure())); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregationBuilder.java index c2add245058b2..debbacdc6196c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregationBuilder.java @@ -28,8 +28,8 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.Script; -import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationInitializationException; @@ -566,8 +566,8 @@ protected TopHitsAggregatorFactory doBuild(SearchContext context, AggregatorFact if (scriptFields != null) { for (ScriptField field : scriptFields) { QueryShardContext shardContext = context.getQueryShardContext(); - SearchScript.Factory factory = shardContext.getScriptService().compile(field.script(), SearchScript.CONTEXT); - SearchScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), shardContext.lookup()); + FieldScript.Factory factory = shardContext.getScriptService().compile(field.script(), FieldScript.CONTEXT); + FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), shardContext.lookup()); fields.add(new org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField( field.fieldName(), searchScript, field.ignoreFailure())); } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsContext.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsContext.java index 9e43b0bdd32b4..4c1bd4aacb5cc 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsContext.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsContext.java @@ -19,7 +19,7 @@ package org.elasticsearch.search.fetch.subphase; -import org.elasticsearch.script.SearchScript; +import org.elasticsearch.script.FieldScript; import java.util.ArrayList; import java.util.List; @@ -28,10 +28,10 @@ public class ScriptFieldsContext { public static class ScriptField { private final String name; - private final SearchScript.LeafFactory script; + private final FieldScript.LeafFactory script; private final boolean ignoreException; - public ScriptField(String name, SearchScript.LeafFactory script, boolean ignoreException) { + public ScriptField(String name, FieldScript.LeafFactory script, boolean ignoreException) { this.name = name; this.script = script; this.ignoreException = ignoreException; @@ -41,7 +41,7 @@ public String name() { return name; } - public SearchScript.LeafFactory script() { + public FieldScript.LeafFactory script() { return this.script; } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsFetchSubPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsFetchSubPhase.java index 022a97b3f8dfc..532441c0cf934 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsFetchSubPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/ScriptFieldsFetchSubPhase.java @@ -23,7 +23,7 @@ import org.apache.lucene.index.ReaderUtil; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.script.SearchScript; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.internal.SearchContext; @@ -49,7 +49,7 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOExcept Arrays.sort(hits, Comparator.comparingInt(SearchHit::docId)); int lastReaderId = -1; - SearchScript[] leafScripts = null; + FieldScript[] leafScripts = null; List scriptFields = context.scriptFields().fields(); final IndexReader reader = context.searcher().getIndexReader(); for (SearchHit hit : hits) { @@ -64,7 +64,7 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOExcept leafScripts[i].setDocument(docId); final Object value; try { - value = leafScripts[i].run(); + value = leafScripts[i].execute(); CollectionUtils.ensureNoSelfReferences(value, "ScriptFieldsFetchSubPhase leaf script " + i); } catch (RuntimeException e) { if (scriptFields.get(i).ignoreException()) { @@ -91,9 +91,9 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOExcept } } - private SearchScript[] createLeafScripts(LeafReaderContext context, - List scriptFields) { - SearchScript[] scripts = new SearchScript[scriptFields.size()]; + private FieldScript[] createLeafScripts(LeafReaderContext context, + List scriptFields) { + FieldScript[] scripts = new FieldScript[scriptFields.size()]; for (int i = 0; i < scripts.length; i++) { try { scripts[i] = scriptFields.get(i).script().newInstance(context); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 3384efcf836c6..8f51fb08dd23f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -53,7 +53,6 @@ import static java.util.Collections.emptySet; public class NodeStatsTests extends ESTestCase { - public void testSerialization() throws IOException { NodeStats nodeStats = createNodeStats(); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -271,14 +270,29 @@ public void testSerialization() throws IOException { assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); - assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size()); - for (Map.Entry entry : ingestStats.getStatsPerPipeline().entrySet()) { - IngestStats.Stats stats = entry.getValue(); - IngestStats.Stats deserializedStats = deserializedIngestStats.getStatsPerPipeline().get(entry.getKey()); - assertEquals(stats.getIngestFailedCount(), deserializedStats.getIngestFailedCount()); - assertEquals(stats.getIngestTimeInMillis(), deserializedStats.getIngestTimeInMillis()); - assertEquals(stats.getIngestCurrent(), deserializedStats.getIngestCurrent()); - assertEquals(stats.getIngestCount(), deserializedStats.getIngestCount()); + assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); + for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { + String pipelineId = pipelineStat.getPipelineId(); + IngestStats.Stats deserializedPipelineStats = + getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); + assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); + assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); + assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); + assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount()); + List processorStats = ingestStats.getProcessorStats().get(pipelineId); + //intentionally validating identical order + Iterator it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator(); + for (IngestStats.ProcessorStat processorStat : processorStats) { + IngestStats.ProcessorStat deserializedProcessorStat = it.next(); + assertEquals(processorStat.getStats().getIngestFailedCount(), + deserializedProcessorStat.getStats().getIngestFailedCount()); + assertEquals(processorStat.getStats().getIngestTimeInMillis(), + deserializedProcessorStat.getStats().getIngestTimeInMillis()); + assertEquals(processorStat.getStats().getIngestCurrent(), + deserializedProcessorStat.getStats().getIngestCurrent()); + assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount()); + } + assertFalse(it.hasNext()); } } AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); @@ -429,14 +443,24 @@ private static NodeStats createNodeStats() { if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + int numPipelines = randomIntBetween(0, 10); + int numProcessors = randomIntBetween(0, 10); + List ingestPipelineStats = new ArrayList<>(numPipelines); + Map> ingestProcessorStats = new HashMap<>(numPipelines); + for (int i = 0; i < numPipelines; i++) { + String pipelineId = randomAlphaOfLengthBetween(3, 10); + ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); - int numStatsPerPipeline = randomIntBetween(0, 10); - Map statsPerPipeline = new HashMap<>(); - for (int i = 0; i < numStatsPerPipeline; i++) { - statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new IngestStats.Stats(randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); + List processorPerPipeline = new ArrayList<>(numProcessors); + for (int j =0; j < numProcessors;j++) { + IngestStats.Stats processorStats = new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); + } + ingestProcessorStats.put(pipelineId,processorPerPipeline); } - ingestStats = new IngestStats(totalStats, statsPerPipeline); + ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); } AdaptiveSelectionStats adaptiveSelectionStats = null; if (frequently()) { @@ -465,4 +489,8 @@ private static NodeStats createNodeStats() { fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats, adaptiveSelectionStats); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 412b91aaef200..d3f0db6204d4e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -150,6 +150,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34667") public void testDedupByPrimaryTerm() throws Exception { Map latestOperations = new HashMap<>(); List terms = Arrays.asList(between(1, 1000), between(1000, 2000)); diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index aaede49a36d57..dabcae533a0bf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -27,11 +27,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class CompoundProcessorTests extends ESTestCase { private IngestDocument ingestDocument; @@ -49,18 +55,29 @@ public void testEmpty() throws Exception { } public void testSingleProcessor() throws Exception { - TestProcessor processor = new TestProcessor(ingestDocument -> {}); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); + TestProcessor processor = new TestProcessor(ingestDocument ->{ + assertStats(0, ingestDocument.getFieldValue("compoundProcessor", CompoundProcessor.class), 1, 0, 0, 0); + }); + CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); + ingestDocument.setFieldValue("compoundProcessor", compoundProcessor); //ugly hack to assert current count = 1 assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); + assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); compoundProcessor.execute(ingestDocument); + verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 0, 1); + } public void testSingleProcessorWithException() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor); assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); @@ -71,15 +88,22 @@ public void testSingleProcessorWithException() throws Exception { assertThat(e.getRootCause().getMessage(), equalTo("error")); } assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); + } public void testIgnoreFailure() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");}); - CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList()); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = + new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); + assertStats(1, compoundProcessor, 0, 1, 0, 0); assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value")); } @@ -93,11 +117,15 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), - Collections.singletonList(processor2)); + Collections.singletonList(processor2), relativeTimeProvider); compoundProcessor.execute(ingestDocument); + verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 1); assertThat(processor2.getInvokedCounter(), equalTo(1)); } @@ -118,14 +146,17 @@ public void testSingleProcessorWithNestedFailures() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail), - Collections.singletonList(lastProcessor)); + Collections.singletonList(lastProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(compoundOnFailProcessor)); + Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception { @@ -137,21 +168,24 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); - CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); + CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFail() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -160,21 +194,24 @@ public void testCompoundProcessorExceptionFail() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(failProcessor)); + Collections.singletonList(failProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor failProcessor = - new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); + new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); assertThat(ingestMetadata.entrySet(), hasSize(3)); @@ -183,27 +220,44 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); }); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor), - Collections.singletonList(new CompoundProcessor(failProcessor))); + Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor))); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), - Collections.singletonList(secondProcessor)); + Collections.singletonList(secondProcessor), relativeTimeProvider); compoundProcessor.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); } public void testBreakOnFailure() throws Exception { TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");}); TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");}); TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), - Collections.singletonList(onFailureProcessor)); + Collections.singletonList(onFailureProcessor), relativeTimeProvider); pipeline.execute(ingestDocument); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + assertStats(pipeline, 1, 1, 0); + } + + private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { + assertStats(0, compoundProcessor, 0L, count, failed, time); + } + private void assertStats(int processor, CompoundProcessor compoundProcessor, long current, long count, long failed, long time) { + IngestStats.Stats stats = compoundProcessor.getProcessorsWithMetrics().get(processor).v2().createStats(); + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(current)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), equalTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index c7d4dfa4e68cd..c5548ae559400 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -33,12 +33,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongSupplier; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ConditionalProcessorTests extends ESTestCase { @@ -60,6 +66,8 @@ public void testChecksCondition() throws Exception { new HashMap<>(ScriptModule.CORE_CONTEXTS) ); Map document = new HashMap<>(); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1), 0L, TimeUnit.MILLISECONDS.toNanos(2)); ConditionalProcessor processor = new ConditionalProcessor( randomAlphaOfLength(10), new Script( @@ -67,7 +75,10 @@ public void testChecksCondition() throws Exception { scriptName, Collections.emptyMap()), scriptService, new Processor() { @Override - public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument){ + if(ingestDocument.hasField("error")){ + throw new RuntimeException("error"); + } ingestDocument.setFieldValue("foo", "bar"); return ingestDocument; } @@ -81,20 +92,37 @@ public String getType() { public String getTag() { return null; } - }); + }, relativeTimeProvider); + //false, never call processor never increments metrics + String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); + assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + assertStats(processor, 0, 0, 0); + + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + ingestDocument.setFieldValue("error", true); + processor.execute(ingestDocument); + assertStats(processor, 0, 0, 0); + + //true, always call processor and increments metrics + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); processor.execute(ingestDocument); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); + assertStats(processor, 1, 0, 1); - String falseValue = "falsy"; ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); - ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); - assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); - assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + ingestDocument.setFieldValue(conditionalField, trueValue); + ingestDocument.setFieldValue("error", true); + IngestDocument finalIngestDocument = ingestDocument; + expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); + assertStats(processor, 2, 1, 2); } @SuppressWarnings("unchecked") @@ -141,5 +169,14 @@ private static void assertMutatingCtxThrows(Consumer> mutati Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); + assertStats(processor, 0, 0, 0); + } + + private static void assertStats(ConditionalProcessor conditionalProcessor, long count, long failed, long time) { + IngestStats.Stats stats = conditionalProcessor.getMetric().createStats(); + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(0L)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 4de39349dc517..3dde7babb0a96 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -63,6 +63,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -746,16 +747,23 @@ public void testBulkRequestExecution() { verify(completionHandler, times(1)).accept(null); } - public void testStats() { + public void testStats() throws Exception { final Processor processor = mock(Processor.class); - IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> processor)); + final Processor processorFailure = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + when(processorFailure.getType()).thenReturn("failure-mock"); + //avoid returning null and dropping the document + when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); + when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); + Map map = new HashMap<>(2); + map.put("mock", (factories, tag, config) -> processor); + map.put("failure-mock", (factories, tag, config) -> processorFailure); + IngestService ingestService = createWithProcessors(map); + final IngestStats initialStats = ingestService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + assertThat(initialStats.getPipelineStats().size(), equalTo(0)); + assertStats(initialStats.getTotalStats(), 0, 0, 0); PutPipelineRequest putRequest = new PutPipelineRequest("_id1", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -769,7 +777,6 @@ public void testStats() { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); @@ -778,18 +785,33 @@ public void testStats() { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); + + afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + + //total + assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0); + //pipeline + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0); + //processor + assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); + indexRequest.setPipeline("_id2"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); - assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); + assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0); + //pipeline + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //processor + assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); //update cluster state and ensure that new stats are added to old stats putRequest = new PutPipelineRequest("_id1", @@ -800,13 +822,66 @@ public void testStats() { indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); - assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); - assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); + assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0); + //pipeline + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0); + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is + //due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each + //processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases, + //like this one it may not readily obvious why the metrics were not carried forward. + assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0); + + //test a failure, and that the processor stats are added from the old stats + putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), + XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final IngestStats afterForthRequestStats = ingestService.stats(); + assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); + //total + assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0); + //pipeline + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0); + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0); + //processor + assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed + assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats + assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0); + } + public void testStatName(){ + Processor processor = mock(Processor.class); + String name = randomAlphaOfLength(10); + when(processor.getType()).thenReturn(name); + assertThat(IngestService.getProcessorName(processor), equalTo(name)); + String tag = randomAlphaOfLength(10); + when(processor.getTag()).thenReturn(tag); + assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag)); + + ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class); + when(conditionalProcessor.getProcessor()).thenReturn(processor); + assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag)); + + PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); + String pipelineName = randomAlphaOfLength(10); + when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); + name = PipelineProcessor.TYPE; + when(pipelineProcessor.getType()).thenReturn(name); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); + when(pipelineProcessor.getTag()).thenReturn(tag); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag)); } + public void testExecuteWithDrop() { Map factories = new HashMap<>(); factories.put("drop", new DropProcessor.Factory()); @@ -935,4 +1010,23 @@ public boolean matches(Object o) { return false; } } + + private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) { + assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time); + } + + private void assertPipelineStats(List pipelineStats, String pipelineId, long count, long failed, long time) { + assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time); + } + + private void assertStats(IngestStats.Stats stats, long count, long failed, long time) { + assertThat(stats.getIngestCount(), equalTo(count)); + assertThat(stats.getIngestCurrent(), equalTo(0L)); + assertThat(stats.getIngestFailedCount(), equalTo(failed)); + assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); + } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 9974dd568a8c7..3d39faf9a7447 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -19,44 +19,75 @@ package org.elasticsearch.ingest; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { - IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30); - IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300); - IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo)); - IngestStats serialize = serialize(ingestStats); - assertNotSame(serialize, ingestStats); - assertNotSame(serialize.getTotalStats(), total); - assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount()); - assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount()); - assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis()); - assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent()); + //total + IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + //pipeline + IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); + IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); + List pipelineStats = + Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); + //processor + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + //pipeline1 -> processor1,processor2; pipeline2 -> processor3 + Map> processorStats = MapBuilder.>newMapBuilder() + .put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) + .put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat)) + .map(); + + IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats); + + IngestStats serializedStats = serialize(ingestStats); + assertNotSame(ingestStats, serializedStats); + assertNotSame(totalStats, serializedStats.getTotalStats()); + assertNotSame(pipelineStats, serializedStats.getPipelineStats()); + assertNotSame(processorStats, serializedStats.getProcessorStats()); - assertEquals(ingestStats.getStatsPerPipeline().size(), 1); - assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo")); + assertStats(totalStats, serializedStats.getTotalStats()); + assertEquals(serializedStats.getPipelineStats().size(), 3); - Map left = ingestStats.getStatsPerPipeline(); - Map right = serialize.getStatsPerPipeline(); + for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { + assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats()); + List serializedProcessorStats = + serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + List processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + if(processorStat != null) { + Iterator it = processorStat.iterator(); + //intentionally enforcing the identical ordering + for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { + IngestStats.ProcessorStat ps = it.next(); + assertEquals(ps.getName(), serializedProcessorStat.getName()); + assertStats(ps.getStats(), serializedProcessorStat.getStats()); + } + assertFalse(it.hasNext()); + } + } + } - assertEquals(right.size(), 1); - assertTrue(right.containsKey("foo")); - assertEquals(left.size(), 1); - assertTrue(left.containsKey("foo")); - IngestStats.Stats leftStats = left.get("foo"); - IngestStats.Stats rightStats = right.get("foo"); - assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount()); - assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount()); - assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis()); - assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent()); + private void assertStats(IngestStats.Stats fromObject, IngestStats.Stats fromStream) { + assertEquals(fromObject.getIngestCount(), fromStream.getIngestCount()); + assertEquals(fromObject.getIngestFailedCount(), fromStream.getIngestFailedCount()); + assertEquals(fromObject.getIngestTimeInMillis(), fromStream.getIngestTimeInMillis()); + assertEquals(fromObject.getIngestCurrent(), fromStream.getIngestCurrent()); } private IngestStats serialize(IngestStats stats) throws IOException { @@ -65,4 +96,8 @@ private IngestStats serialize(IngestStats stats) throws IOException { StreamInput in = out.bytes().streamInput(); return new IngestStats(in); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 018ded346d4fc..eea0f03fa647f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -21,12 +21,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.test.ESTestCase; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; @@ -143,15 +144,15 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { pipeline2ProcessorConfig.put("pipeline", pipeline3Id); PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); - Clock clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(0L); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); Pipeline pipeline1 = new Pipeline( - pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock + pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider ); String key1 = randomAlphaOfLength(10); - clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(3L); + relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(3)); Pipeline pipeline2 = new Pipeline( pipeline2Id, null, null, new CompoundProcessor(true, Arrays.asList( @@ -160,15 +161,15 @@ pipeline2Id, null, null, new CompoundProcessor(true, }), pipeline2Processor), Collections.emptyList()), - clock + relativeTimeProvider ); - clock = mock(Clock.class); - when(clock.millis()).thenReturn(0L).thenReturn(2L); + relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); Pipeline pipeline3 = new Pipeline( pipeline3Id, null, null, new CompoundProcessor( new TestProcessor(ingestDocument -> { throw new RuntimeException("error"); - })), clock + })), relativeTimeProvider ); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); diff --git a/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java b/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java index 5d1a2013a92b0..22a250710ba3c 100644 --- a/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java +++ b/server/src/test/java/org/elasticsearch/script/ScriptServiceTests.java @@ -150,22 +150,22 @@ public void testNotSupportedDisableDynamicSetting() throws IOException { public void testInlineScriptCompiledOnceCache() throws IOException { buildScriptService(Settings.EMPTY); Script script = new Script(ScriptType.INLINE, "test", "1+1", Collections.emptyMap()); - SearchScript.Factory factoryScript1 = scriptService.compile(script, SearchScript.CONTEXT); - SearchScript.Factory factoryScript2 = scriptService.compile(script, SearchScript.CONTEXT); + FieldScript.Factory factoryScript1 = scriptService.compile(script, FieldScript.CONTEXT); + FieldScript.Factory factoryScript2 = scriptService.compile(script, FieldScript.CONTEXT); assertThat(factoryScript1, sameInstance(factoryScript2)); } public void testAllowAllScriptTypeSettings() throws IOException { buildScriptService(Settings.EMPTY); - assertCompileAccepted("painless", "script", ScriptType.INLINE, SearchScript.CONTEXT); - assertCompileAccepted(null, "script", ScriptType.STORED, SearchScript.CONTEXT); + assertCompileAccepted("painless", "script", ScriptType.INLINE, FieldScript.CONTEXT); + assertCompileAccepted(null, "script", ScriptType.STORED, FieldScript.CONTEXT); } public void testAllowAllScriptContextSettings() throws IOException { buildScriptService(Settings.EMPTY); - assertCompileAccepted("painless", "script", ScriptType.INLINE, SearchScript.CONTEXT); + assertCompileAccepted("painless", "script", ScriptType.INLINE, FieldScript.CONTEXT); assertCompileAccepted("painless", "script", ScriptType.INLINE, AggregationScript.CONTEXT); assertCompileAccepted("painless", "script", ScriptType.INLINE, UpdateScript.CONTEXT); assertCompileAccepted("painless", "script", ScriptType.INLINE, IngestScript.CONTEXT); @@ -176,16 +176,16 @@ public void testAllowSomeScriptTypeSettings() throws IOException { builder.put("script.allowed_types", "inline"); buildScriptService(builder.build()); - assertCompileAccepted("painless", "script", ScriptType.INLINE, SearchScript.CONTEXT); - assertCompileRejected(null, "script", ScriptType.STORED, SearchScript.CONTEXT); + assertCompileAccepted("painless", "script", ScriptType.INLINE, FieldScript.CONTEXT); + assertCompileRejected(null, "script", ScriptType.STORED, FieldScript.CONTEXT); } public void testAllowSomeScriptContextSettings() throws IOException { Settings.Builder builder = Settings.builder(); - builder.put("script.allowed_contexts", "search, aggs"); + builder.put("script.allowed_contexts", "field, aggs"); buildScriptService(builder.build()); - assertCompileAccepted("painless", "script", ScriptType.INLINE, SearchScript.CONTEXT); + assertCompileAccepted("painless", "script", ScriptType.INLINE, FieldScript.CONTEXT); assertCompileAccepted("painless", "script", ScriptType.INLINE, AggregationScript.CONTEXT); assertCompileRejected("painless", "script", ScriptType.INLINE, UpdateScript.CONTEXT); } @@ -195,8 +195,8 @@ public void testAllowNoScriptTypeSettings() throws IOException { builder.put("script.allowed_types", "none"); buildScriptService(builder.build()); - assertCompileRejected("painless", "script", ScriptType.INLINE, SearchScript.CONTEXT); - assertCompileRejected(null, "script", ScriptType.STORED, SearchScript.CONTEXT); + assertCompileRejected("painless", "script", ScriptType.INLINE, FieldScript.CONTEXT); + assertCompileRejected(null, "script", ScriptType.STORED, FieldScript.CONTEXT); } public void testAllowNoScriptContextSettings() throws IOException { @@ -204,7 +204,7 @@ public void testAllowNoScriptContextSettings() throws IOException { builder.put("script.allowed_contexts", "none"); buildScriptService(builder.build()); - assertCompileRejected("painless", "script", ScriptType.INLINE, SearchScript.CONTEXT); + assertCompileRejected("painless", "script", ScriptType.INLINE, FieldScript.CONTEXT); assertCompileRejected("painless", "script", ScriptType.INLINE, AggregationScript.CONTEXT); } diff --git a/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java b/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java index 4bd12c9773491..1a86b3b1da283 100644 --- a/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java +++ b/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java @@ -106,7 +106,8 @@ protected Map, Object>> pluginScripts() { scripts.put("doc['num1'].value * factor", vars -> { Map doc = (Map) vars.get("doc"); ScriptDocValues.Doubles num1 = (ScriptDocValues.Doubles) doc.get("num1"); - Double factor = (Double) vars.get("factor"); + Map params = (Map) vars.get("params"); + Double factor = (Double) params.get("factor"); return num1.getValue() * factor; }); diff --git a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java index 64a9d97e3f429..3c4c0da6322fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java @@ -93,6 +93,17 @@ public T compile(String name, String source, ScriptContext context, Map + ctx -> new FieldScript(parameters, lookup, ctx) { + @Override + public Object execute() { + Map vars = createVars(parameters); + vars.putAll(getLeafLookup().asMap()); + return script.apply(vars); + } + }; + return context.factoryClazz.cast(factory); } else if(context.instanceClazz.equals(TermsSetQueryScript.class)) { TermsSetQueryScript.Factory factory = (parameters, lookup) -> (TermsSetQueryScript.LeafFactory) ctx -> new TermsSetQueryScript(parameters, lookup, ctx) { @@ -276,6 +287,12 @@ public String execute() { throw new IllegalArgumentException("mock script engine does not know how to handle context [" + context.name + "]"); } + private Map createVars(Map params) { + Map vars = new HashMap<>(); + vars.put("params", params); + return vars; + } + public class MockCompiledScript { private final String name; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPainlessExtension.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPainlessExtension.java index f0ca0cb87308d..feacbabe5e29f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPainlessExtension.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPainlessExtension.java @@ -10,15 +10,15 @@ import org.elasticsearch.painless.spi.WhitelistLoader; import org.elasticsearch.script.AggregationScript; import org.elasticsearch.script.BucketAggregationSelectorScript; +import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.FilterScript; +import org.elasticsearch.script.NumberSortScript; import org.elasticsearch.script.ScriptContext; -import org.elasticsearch.script.SearchScript; +import org.elasticsearch.script.StringSortScript; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.elasticsearch.script.NumberSortScript; -import org.elasticsearch.script.StringSortScript; import static java.util.Collections.singletonList; @@ -32,7 +32,7 @@ public Map, List> getContextWhitelists() { List list = singletonList(WHITELIST); whitelist.put(FilterScript.CONTEXT, list); whitelist.put(AggregationScript.CONTEXT, list); - whitelist.put(SearchScript.CONTEXT, list); + whitelist.put(FieldScript.CONTEXT, list); whitelist.put(NumberSortScript.CONTEXT, list); whitelist.put(StringSortScript.CONTEXT, list); whitelist.put(BucketAggregationSelectorScript.CONTEXT, list);