Skip to content

Commit

Permalink
Add a node setting to disable recovery source (elastic#111824)
Browse files Browse the repository at this point in the history
This change adds a new setting called indices.recovery_source.enabled (defaults to true) that can be used to disable
the recovery source for engines that don't require peer recovery.
  • Loading branch information
jimczi authored Sep 6, 2024
1 parent 0fa6a57 commit a07d398
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
Expand Down Expand Up @@ -139,6 +140,7 @@ public abstract class Engine implements Closeable {
protected final EventListener eventListener;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final SetOnce<Exception> failedEngine = new SetOnce<>();
protected final boolean enableRecoverySource;

private final AtomicBoolean isClosing = new AtomicBoolean();
private final SubscribableListener<Void> drainOnCloseListener = new SubscribableListener<>();
Expand Down Expand Up @@ -167,6 +169,9 @@ protected Engine(EngineConfig engineConfig) {
// we use the engine class directly here to make sure all subclasses have the same logger name
this.logger = Loggers.getLogger(Engine.class, engineConfig.getShardId());
this.eventListener = engineConfig.getEventListener();
this.enableRecoverySource = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(
engineConfig.getIndexSettings().getSettings()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -3130,6 +3131,13 @@ public Translog.Snapshot newChangesSnapshot(
boolean singleConsumer,
boolean accessStats
) throws IOException {
if (enableRecoverySource == false) {
throw new IllegalStateException(
"Changes snapshot are unavailable when the "
+ RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.getKey()
+ " setting is disabled."
);
}
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.List;
import java.util.Locale;

import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING;

public class SourceFieldMapper extends MetadataFieldMapper {
public static final NodeFeature SYNTHETIC_SOURCE_FALLBACK = new NodeFeature("mapper.source.synthetic_source_fallback");
public static final NodeFeature SYNTHETIC_SOURCE_STORED_FIELDS_ADVANCE_FIX = new NodeFeature(
Expand All @@ -61,23 +63,53 @@ private enum Mode {
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
null
null,
true
);

private static final SourceFieldMapper DEFAULT_NO_RECOVERY_SOURCE = new SourceFieldMapper(
null,
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
null,
false
);

private static final SourceFieldMapper TSDB_DEFAULT = new SourceFieldMapper(
Mode.SYNTHETIC,
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
IndexMode.TIME_SERIES
IndexMode.TIME_SERIES,
true
);

private static final SourceFieldMapper TSDB_DEFAULT_NO_RECOVERY_SOURCE = new SourceFieldMapper(
Mode.SYNTHETIC,
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
IndexMode.TIME_SERIES,
false
);

private static final SourceFieldMapper LOGSDB_DEFAULT = new SourceFieldMapper(
Mode.SYNTHETIC,
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
IndexMode.LOGSDB
IndexMode.LOGSDB,
true
);

private static final SourceFieldMapper LOGSDB_DEFAULT_NO_RECOVERY_SOURCE = new SourceFieldMapper(
Mode.SYNTHETIC,
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
IndexMode.LOGSDB,
false
);

/*
Expand All @@ -89,7 +121,17 @@ private enum Mode {
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
IndexMode.TIME_SERIES
IndexMode.TIME_SERIES,
true
);

private static final SourceFieldMapper TSDB_LEGACY_DEFAULT_NO_RECOVERY_SOURCE = new SourceFieldMapper(
null,
Explicit.IMPLICIT_TRUE,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
IndexMode.TIME_SERIES,
false
);

public static class Defaults {
Expand Down Expand Up @@ -148,11 +190,19 @@ public static class Builder extends MetadataFieldMapper.Builder {

private final boolean supportsNonDefaultParameterValues;

public Builder(IndexMode indexMode, final Settings settings, boolean supportsCheckForNonDefaultParams) {
private final boolean enableRecoverySource;

public Builder(
IndexMode indexMode,
final Settings settings,
boolean supportsCheckForNonDefaultParams,
boolean enableRecoverySource
) {
super(Defaults.NAME);
this.indexMode = indexMode;
this.supportsNonDefaultParameterValues = supportsCheckForNonDefaultParams == false
|| settings.getAsBoolean(LOSSY_PARAMETERS_ALLOWED_SETTING_NAME, true);
this.enableRecoverySource = enableRecoverySource;
}

public Builder setSynthetic() {
Expand Down Expand Up @@ -218,7 +268,8 @@ public SourceFieldMapper build() {
enabled.get(),
includes.getValue().toArray(Strings.EMPTY_ARRAY),
excludes.getValue().toArray(Strings.EMPTY_ARRAY),
indexMode
indexMode,
enableRecoverySource
);
if (indexMode != null) {
indexMode.validateSourceFieldMapper(sourceFieldMapper);
Expand All @@ -230,23 +281,25 @@ public SourceFieldMapper build() {

public static final TypeParser PARSER = new ConfigurableTypeParser(c -> {
var indexMode = c.getIndexSettings().getMode();
boolean enableRecoverySource = INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(c.getSettings());
if (indexMode.isSyntheticSourceEnabled()) {
if (indexMode == IndexMode.TIME_SERIES) {
if (c.getIndexSettings().getIndexVersionCreated().onOrAfter(IndexVersions.V_8_7_0)) {
return TSDB_DEFAULT;
return enableRecoverySource ? TSDB_DEFAULT : TSDB_DEFAULT_NO_RECOVERY_SOURCE;
} else {
return TSDB_LEGACY_DEFAULT;
return enableRecoverySource ? TSDB_LEGACY_DEFAULT : TSDB_LEGACY_DEFAULT_NO_RECOVERY_SOURCE;
}
} else if (indexMode == IndexMode.LOGSDB) {
return LOGSDB_DEFAULT;
return enableRecoverySource ? LOGSDB_DEFAULT : LOGSDB_DEFAULT_NO_RECOVERY_SOURCE;
}
}
return DEFAULT;
return enableRecoverySource ? DEFAULT : DEFAULT_NO_RECOVERY_SOURCE;
},
c -> new Builder(
c.getIndexSettings().getMode(),
c.getSettings(),
c.indexVersionCreated().onOrAfter(IndexVersions.SOURCE_MAPPER_LOSSY_PARAMS_CHECK)
c.indexVersionCreated().onOrAfter(IndexVersions.SOURCE_MAPPER_LOSSY_PARAMS_CHECK),
INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(c.getSettings())
)
);

Expand Down Expand Up @@ -299,8 +352,16 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
private final SourceFilter sourceFilter;

private final IndexMode indexMode;

private SourceFieldMapper(Mode mode, Explicit<Boolean> enabled, String[] includes, String[] excludes, IndexMode indexMode) {
private final boolean enableRecoverySource;

private SourceFieldMapper(
Mode mode,
Explicit<Boolean> enabled,
String[] includes,
String[] excludes,
IndexMode indexMode,
boolean enableRecoverySource
) {
super(new SourceFieldType((enabled.explicit() && enabled.value()) || (enabled.explicit() == false && mode != Mode.DISABLED)));
assert enabled.explicit() == false || mode == null;
this.mode = mode;
Expand All @@ -313,6 +374,7 @@ private SourceFieldMapper(Mode mode, Explicit<Boolean> enabled, String[] include
}
this.complete = stored() && sourceFilter == null;
this.indexMode = indexMode;
this.enableRecoverySource = enableRecoverySource;
}

private static SourceFilter buildSourceFilter(String[] includes, String[] excludes) {
Expand Down Expand Up @@ -357,7 +419,7 @@ public void preParse(DocumentParserContext context) throws IOException {
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource) {
if (enableRecoverySource && originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
Expand Down Expand Up @@ -385,7 +447,7 @@ protected String contentType() {

@Override
public FieldMapper.Builder getMergeBuilder() {
return new Builder(indexMode, Settings.EMPTY, false).init(this);
return new Builder(indexMode, Settings.EMPTY, false, enableRecoverySource).init(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.monitor.os.OsProbe;
import org.elasticsearch.node.NodeRoleSettings;

Expand Down Expand Up @@ -380,6 +381,16 @@ public Iterator<Setting<?>> settings() {
Setting.Property.NodeScope
);

/**
* Indicates whether the `recovery_source` should be enabled (see {@link SourceFieldMapper}).
* This setting is not registered and should be used exclusively in a serverless environment.
*/
public static final Setting<Boolean> INDICES_RECOVERY_SOURCE_ENABLED_SETTING = Setting.boolSetting(
"indices.recovery.recovery_source.enabled",
true,
Property.NodeScope
);

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile ByteSizeValue maxBytesPerSec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogOperationsUtils;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.index.IndexVersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -7773,6 +7774,24 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I
}
}

public void testDisableRecoverySource() throws Exception {
Settings settings = Settings.builder()
.put(defaultSettings.getNodeSettings())
.put(RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.getKey(), false)
.build();
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetadata(), settings, defaultSettings.getScopedSettings());
try (
Store store = createStore();
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE)
) {
IllegalStateException exc = expectThrows(
IllegalStateException.class,
() -> engine.newChangesSnapshot("test", 0, 1000, true, true, true)
);
assertThat(exc.getMessage(), containsString("unavailable"));
}
}

private static void assertCommitGenerations(Map<IndexCommit, Engine.IndexCommitRef> commits, List<Long> expectedGenerations) {
assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testCreateDynamicStringFieldAsKeywordForDimension() throws IOExcepti
XContentParser parser = createParser(JsonXContent.jsonXContent, source);
SourceToParse sourceToParse = new SourceToParse("test", new BytesArray(source), XContentType.JSON);

SourceFieldMapper sourceMapper = new SourceFieldMapper.Builder(null, Settings.EMPTY, false).setSynthetic().build();
SourceFieldMapper sourceMapper = new SourceFieldMapper.Builder(null, Settings.EMPTY, false, true).setSynthetic().build();
RootObjectMapper root = new RootObjectMapper.Builder("_doc", Optional.empty()).add(
new PassThroughObjectMapper.Builder("labels").setPriority(0).setContainsDimensions().dynamic(ObjectMapper.Dynamic.TRUE)
).build(MapperBuilderContext.root(false, false));
Expand Down
Loading

0 comments on commit a07d398

Please sign in to comment.