Skip to content

Commit

Permalink
Move async task maintenance service to core plugin (#57700)
Browse files Browse the repository at this point in the history
The async task task maintenance service is used by both async search plugin
as well as EQL plugin. So it needs to reside in the core.

Relates to #49638
  • Loading branch information
imotov authored Jun 11, 2020
1 parent 8072646 commit 509c674
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 95 deletions.
20 changes: 20 additions & 0 deletions x-pack/plugin/async-search/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,25 @@ archivesBaseName = 'x-pack-async-search'
compileJava.options.compilerArgs << "-Xlint:-rawtypes"
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// add all sub-projects of the qa sub-project
gradle.projectsEvaluated {
project.subprojects
Expand All @@ -28,6 +47,7 @@ dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('ilm'))
testCompile project(path: xpackModule('async'))
}

dependencyLicenses {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,28 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;

public final class AsyncSearch extends Plugin implements ActionPlugin {
private final Settings settings;

public AsyncSearch(Settings settings) {
this.settings = settings;
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand All @@ -71,31 +50,6 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
);
}

@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService =
new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
return Collections.singletonList(maintenanceService);
} else {
return Collections.emptyList();
}
}

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -30,10 +31,11 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
Expand All @@ -50,7 +52,7 @@
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -79,7 +81,7 @@ public List<AggregationSpec> getAggregations() {

@Before
public void startMaintenanceService() {
for (AsyncSearchMaintenanceService service : internalCluster().getDataNodeInstances(AsyncSearchMaintenanceService.class)) {
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
if (service.lifecycleState() == Lifecycle.State.STOPPED) {
// force the service to start again
service.start();
Expand All @@ -91,7 +93,7 @@ public void startMaintenanceService() {

@After
public void stopMaintenanceService() {
for (AsyncSearchMaintenanceService service : internalCluster().getDataNodeInstances(AsyncSearchMaintenanceService.class)) {
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
service.stop();
}
}
Expand All @@ -103,7 +105,7 @@ public void releaseQueryLatch() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, IndexLifecycle.class,
return Arrays.asList(LocalStateCompositeXPackPlugin.class, AsyncSearch.class, AsyncResultsIndexPlugin.class, IndexLifecycle.class,
SearchTestPlugin.class, ReindexPlugin.class);
}

Expand Down
42 changes: 42 additions & 0 deletions x-pack/plugin/async/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

evaluationDependsOn(xpackModule('core'))

apply plugin: 'elasticsearch.esplugin'

esplugin {
name 'x-pack-async'
description 'A module which handles common async operations'
classname 'org.elasticsearch.xpack.async.AsyncResultsIndexPlugin'
extendedPlugins = ['x-pack-core']
}
archivesBaseName = 'x-pack-async'

dependencies {
compileOnly project(":server")
compileOnly project(path: xpackModule('core'), configuration: 'default')
}

dependencyLicenses {
ignoreSha 'x-pack-core'
}

integTest.enabled = false

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.async;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class AsyncResultsIndexPlugin extends Plugin implements SystemIndexPlugin {

protected final Settings settings;

public AsyncResultsIndexPlugin(Settings settings) {
this.settings = settings;
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(XPackPlugin.ASYNC_RESULTS_INDEX, this.getClass().getSimpleName()));
}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
List<Object> components = new ArrayList<>();
if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService = new AsyncTaskIndexService<>(
XPackPlugin.ASYNC_RESULTS_INDEX,
clusterService,
threadPool.getThreadContext(),
client,
ASYNC_SEARCH_ORIGIN,
AsyncSearchResponse::new,
namedWriteableRegistry
);
AsyncTaskMaintenanceService maintenanceService = new AsyncTaskMaintenanceService(
clusterService,
nodeEnvironment.nodeId(),
settings,
threadPool,
indexService
);
components.add(maintenanceService);
}
return components;
}
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.async;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

public class AsyncResultsIndexPluginTests extends ESTestCase {

public void testDummy() {
// This is a dummy test case to satisfy the conventions
AsyncResultsIndexPlugin plugin = new AsyncResultsIndexPlugin(Settings.EMPTY);
assertThat(plugin.getSystemIndexDescriptors(Settings.EMPTY), Matchers.hasSize(1));
}
}
Loading

0 comments on commit 509c674

Please sign in to comment.