diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java index 818100539849a..c7c0f21eb0876 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java @@ -91,7 +91,6 @@ void executeDocument( } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { - threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); final List responses = new CopyOnWriteArrayList<>( diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java index 22075076f0cf3..eff038b109aea 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java @@ -38,7 +38,6 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -89,7 +88,7 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe return; } - IngestPipelineValidator.validateIngestPipeline(simulateRequest.getPipeline(), ingestService.getClusterService()); + ingestService.validateProcessorCountForIngestPipeline(simulateRequest.getPipeline()); executionService.execute(simulateRequest, listener); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 4d605dcf5d819..251e9124218b1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -132,7 +132,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.store.IndicesStore; -import org.opensearch.ingest.IngestPipelineValidator; +import org.opensearch.ingest.IngestService; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsService; import org.opensearch.monitor.jvm.JvmGcMonitorService; @@ -406,7 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterService.USER_DEFINED_METADATA, ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS, + IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java b/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java deleted file mode 100644 index ff3abee9f5a11..0000000000000 --- a/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ingest; - -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Setting; - -import java.util.List; - -/** - * This class contains methods to validate the ingest pipeline. - */ -public class IngestPipelineValidator { - - /** - * Defines the limit for the number of processors which can run on a given document during ingestion. - */ - public static final Setting MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( - "cluster.ingest.max_number_processors", - Integer.MAX_VALUE, - 1, - Integer.MAX_VALUE, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - /** - * Validates that the number of compound processors in the pipeline does not exceed the configured limit. - * - * @param pipeline - * @param clusterService - */ - public static void validateIngestPipeline(Pipeline pipeline, ClusterService clusterService) { - - List processors = pipeline.getCompoundProcessor().getProcessors(); - int maxNumberOfIngestProcessorsAllowed = clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS); - - if (processors.size() > maxNumberOfIngestProcessorsAllowed) { - throw new IllegalStateException( - "Cannot use more than the maximum processors allowed. Number of processors configured is [" - + processors.size() - + "] which exceeds the maximum allowed configuration of [" - + maxNumberOfIngestProcessorsAllowed - + "] processors." - ); - } - } -} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index d614badcaea2d..99b0774df7009 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -62,6 +62,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -107,6 +108,18 @@ public class IngestService implements ClusterStateApplier, ReportingService MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( + "cluster.ingest.max_number_processors", + Integer.MAX_VALUE, + 1, + Integer.MAX_VALUE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private static final Logger logger = LogManager.getLogger(IngestService.class); private final ClusterService clusterService; @@ -123,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -495,7 +515,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2(); Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); - IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService); + validateProcessorCountForIngestPipeline(pipeline); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { @@ -510,6 +530,20 @@ void validatePipeline(Map ingestInfos, PutPipelineReq ExceptionsHelper.rethrowAndSuppress(exceptions); } + public void validateProcessorCountForIngestPipeline(Pipeline pipeline) { + List processors = pipeline.getCompoundProcessor().getProcessors(); + + if (processors.size() > maxIngestProcessorCount) { + throw new IllegalStateException( + "Cannot use more than the maximum processors allowed. Number of processors being configured is [" + + processors.size() + + "] which exceeds the maximum allowed configuration of [" + + maxIngestProcessorCount + + "] processors." + ); + } + } + public void executeBulkRequest( int numberOfActionRequests, Iterable> actionRequests, @@ -1102,7 +1136,6 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { processorFactories, scriptService ); - IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); if (previous == null) { diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java index f7e6d065dca67..4d88683826af7 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -10,7 +10,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.junit.After; @@ -38,16 +37,4 @@ public void testDeprecatedGetMasterServiceBWC() { assertThat(masterService, equalTo(clusterManagerService)); } } - - public void testUpdateMaxIngestProcessorCountSetting() { - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - - // verify defaults - assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); - - // verify update max processor - Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build(); - clusterSettings.applySettings(newSettings); - assertEquals(3, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); - } } diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 1f4b1d635d438..e4e21b7654e79 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.metrics.OperationStats; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.XContentType; @@ -2058,6 +2059,18 @@ public void testPrepareBatches_different_index_pipeline() { assertEquals(4, batches.size()); } + public void testUpdateMaxIngestProcessorCountSetting() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + // verify defaults + assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + + // verify update max processor + Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build(); + clusterSettings.applySettings(newSettings); + assertEquals(3, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + } + private IngestService.IndexRequestWrapper createIndexRequestWrapper(String index, List pipelines) { IndexRequest indexRequest = new IndexRequest(index); return new IngestService.IndexRequestWrapper(0, indexRequest, pipelines, true);