Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip shard refreshes if shard is search idle #27500

Merged
merged 22 commits into from
Nov 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -86,6 +88,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener<ExplainResponse> listener) throws IOException {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}

@Override
protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException {
ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -38,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the get operation.
*/
Expand Down Expand Up @@ -76,6 +78,23 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}

@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -47,6 +48,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
Expand Down Expand Up @@ -78,7 +81,7 @@ protected TransportSingleShardAction(Settings settings, String actionName, Threa
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
}

/**
Expand All @@ -97,6 +100,19 @@ protected void doExecute(Request request, ActionListener<Response> listener) {

protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(this.executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
}
});
}
protected abstract Response newResponse();

protected abstract boolean resolveIndex(Request request);
Expand Down Expand Up @@ -291,11 +307,27 @@ public void messageReceived(final Request request, final TransportChannel channe
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
asyncShardOperation(request, request.internalShardId, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (IOException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
}

/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the get operation.
*/
Expand Down Expand Up @@ -82,6 +85,23 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, ActionListener<TermVectorsResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
listener.onResponse(shardOperation(request, shardId));
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}

@Override
protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
40 changes: 29 additions & 11 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -624,6 +625,27 @@ public synchronized void updateMetaData(final IndexMetaData metadata) {
}
}
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search reqeusts
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("forced refresh failed after interval change", e);
}

@Override
protected void doRun() throws Exception {
maybeRefreshEngine(true);
}

@Override
public boolean isForceExecution() {
return true;
}
});
rescheduleRefreshTasks();
}
final Translog.Durability durability = indexSettings.getTranslogDurability();
Expand Down Expand Up @@ -686,17 +708,13 @@ private void maybeFSyncTranslogs() {
}
}

private void maybeRefreshEngine() {
if (indexSettings.getRefreshInterval().millis() > 0) {
private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
if (shard.isReadAllowed()) {
try {
if (shard.isRefreshNeeded()) {
shard.refresh("schedule");
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
try {
shard.scheduledRefresh();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
}
}
Expand Down Expand Up @@ -896,7 +914,7 @@ final class AsyncRefreshTask extends BaseAsyncTask {

@Override
protected void runInternal() {
indexService.maybeRefreshEngine();
indexService.maybeRefreshEngine(false);
}

@Override
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
Property.IndexScope);
public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING =
new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(),
(value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope);
Expand Down Expand Up @@ -262,6 +265,8 @@ public final class IndexSettings {
private volatile int maxNgramDiff;
private volatile int maxShingleDiff;
private volatile boolean TTLPurgeDisabled;
private volatile TimeValue searchIdleAfter;

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -371,6 +376,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered
if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) {
throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: ["
Expand Down Expand Up @@ -411,8 +417,11 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }

private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}
Expand Down Expand Up @@ -752,4 +761,16 @@ public IndexSortConfig getIndexSortConfig() {
}

public IndexScopedSettings getScopedSettings() { return scopedSettings;}

/**
* Returns true iff the refresh setting exists or in other words is explicitly set.
*/
public boolean isExplicitRefresh() {
return INDEX_REFRESH_INTERVAL_SETTING.exists(settings);
}

/**
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
}
Loading