Skip to content

Commit

Permalink
Add EngineConfig extensions to EnginePlugin (#1387)
Browse files Browse the repository at this point in the history
This commit adds an extension point to EngineConfig through EnginePlugin using
a new EngineConfigFactory mechanism. EnginePlugin provides interface methods to
override configurations in EngineConfig. The EngineConfigFactory produces a new
instance of the EngineConfig using these overrides. Defaults are used absent
overridden configurations.

This serves as a mechanism to override Engine configurations (e.g., CodecService,
TranslogConfig) enabling Plugins to have higher fidelity for changing Engine
behavior without having to override the entire Engine (which is only permitted for
a single plugin).

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize authored Oct 20, 2021
1 parent 2da858c commit ecac8d3
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ public static final IndexShard newIndexShard(
indexService.mapperService(),
indexService.similarityService(),
shard.getEngineFactory(),
shard.getEngineConfigFactory(),
indexService.getIndexEventListener(),
wrapper,
indexService.getThreadPool(),
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.cache.query.IndexQueryCache;
import org.opensearch.index.cache.query.QueryCache;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -165,6 +166,7 @@ public final class IndexModule {
private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
private final EngineFactory engineFactory;
private final EngineConfigFactory engineConfigFactory;
private SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrapper =
new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
Expand All @@ -191,6 +193,7 @@ public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
Expand All @@ -199,6 +202,7 @@ public IndexModule(
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
Expand Down Expand Up @@ -496,6 +500,7 @@ public IndexService newIndexService(
shardStoreDeleter,
indexAnalyzers,
engineFactory,
engineConfigFactory,
circuitBreakerService,
bigArrays,
threadPool,
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.cache.query.QueryCache;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.fielddata.IndexFieldDataCache;
import org.opensearch.index.fielddata.IndexFieldDataService;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NamedWriteableRegistry namedWriteableRegistry;
private final SimilarityService similarityService;
private final EngineFactory engineFactory;
private final EngineConfigFactory engineConfigFactory;
private final IndexWarmer warmer;
private volatile Map<Integer, IndexShard> shards = emptyMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -174,6 +176,7 @@ public IndexService(
ShardStoreDeleter shardStoreDeleter,
IndexAnalyzers indexAnalyzers,
EngineFactory engineFactory,
EngineConfigFactory engineConfigFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand Down Expand Up @@ -254,6 +257,7 @@ public IndexService(
this.directoryFactory = directoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.readerWrapper = wrapperFactory.apply(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
Expand Down Expand Up @@ -507,6 +511,7 @@ public synchronized IndexShard createShard(
mapperService,
similarityService,
engineFactory,
engineConfigFactory,
eventListener,
readerWrapper,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* A factory to create an EngineConfig based on custom plugin overrides
*/
public class EngineConfigFactory {
private final Optional<CodecService> codecService;

/** default ctor primarily used for tests without plugins */
public EngineConfigFactory(IndexSettings idxSettings) {
this(Collections.emptyList(), idxSettings);
}

/**
* Construct a factory using the plugin service and provided index settings
*/
public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSettings) {
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings);
}

/* private constructor to construct the factory from specific EnginePlugins and IndexSettings */
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
Optional<CodecService> codecService = Optional.empty();
String codecServiceOverridingPlugin = null;
for (EnginePlugin enginePlugin : enginePlugins) {
// get overriding codec service from EnginePlugin
if (codecService.isPresent() == false) {
codecService = enginePlugin.getCustomCodecService(idxSettings);
codecServiceOverridingPlugin = enginePlugin.getClass().getName();
} else {
throw new IllegalStateException(
"existing codec service already overridden in: "
+ codecServiceOverridingPlugin
+ " attempting to override again by: "
+ enginePlugin.getClass().getName()
);
}
}
this.codecService = codecService;
}

/** Insantiates a new EngineConfig from the provided custom overrides */
public EngineConfig newEngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
) {

return new EngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
this.codecService.isPresent() == true ? this.codecService.get() : codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
);
}
}
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.Engine.GetResult;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.ReadOnlyEngine;
Expand Down Expand Up @@ -238,6 +239,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;
final EngineConfigFactory engineConfigFactory;

private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
Expand Down Expand Up @@ -302,7 +304,8 @@ public IndexShard(
final IndexCache indexCache,
final MapperService mapperService,
final SimilarityService similarityService,
final @Nullable EngineFactory engineFactory,
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final IndexEventListener indexEventListener,
final CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
final ThreadPool threadPool,
Expand All @@ -323,6 +326,7 @@ public IndexShard(
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
Expand Down Expand Up @@ -3179,7 +3183,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
return new EngineConfig(
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
indexSettings,
Expand Down Expand Up @@ -3705,6 +3709,10 @@ EngineFactory getEngineFactory() {
return engineFactory;
}

EngineConfigFactory getEngineConfigFactory() {
return engineConfigFactory;
}

// for tests
ReplicationTracker getReplicationTracker() {
return replicationTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.index.cache.request.ShardRequestCache;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NoOpEngine;
Expand Down Expand Up @@ -707,6 +708,7 @@ private synchronized IndexService createIndexService(
idxSettings,
analysisRegistry,
getEngineFactory(idxSettings),
getEngineConfigFactory(idxSettings),
directoryFactories,
() -> allowExpensiveQueries,
indexNameExpressionResolver,
Expand Down Expand Up @@ -739,6 +741,10 @@ private synchronized IndexService createIndexService(
);
}

private EngineConfigFactory getEngineConfigFactory(final IndexSettings idxSettings) {
return new EngineConfigFactory(this.pluginsService, idxSettings);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final IndexMetadata indexMetadata = idxSettings.getIndexMetadata();
if (indexMetadata != null && indexMetadata.getState() == IndexMetadata.State.CLOSE) {
Expand Down Expand Up @@ -781,6 +787,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
idxSettings,
analysisRegistry,
getEngineFactory(idxSettings),
getEngineConfigFactory(idxSettings),
directoryFactories,
() -> allowExpensiveQueries,
indexNameExpressionResolver,
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/plugins/EnginePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.plugins;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.EngineFactory;

import java.util.Optional;
Expand All @@ -52,4 +53,14 @@ public interface EnginePlugin {
*/
Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings);

/**
* EXPERT:
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings
* to determine if a custom {@link CodecService} should be provided for the given index. A plugin that is not overriding
* the {@link CodecService} through the plugin can ignore this method and the Codec specified in the {@link IndexSettings}
* will be used.
*/
default Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.index.cache.query.IndexQueryCache;
import org.opensearch.index.cache.query.QueryCache;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.InternalEngineTests;
import org.opensearch.index.fielddata.IndexFieldDataCache;
Expand Down Expand Up @@ -218,6 +219,7 @@ public void testWrapperIsBound() throws IOException {
indexSettings,
emptyAnalysisRegistry,
engineFactory,
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand All @@ -243,6 +245,7 @@ public void testRegisterIndexStore() throws IOException {
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
indexStoreFactories,
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down Expand Up @@ -570,6 +573,7 @@ public void testRegisterCustomRecoveryStateFactory() throws IOException {
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down Expand Up @@ -601,6 +605,7 @@ private static IndexModule createIndexModule(IndexSettings indexSettings, Analys
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void testRecoverFromNoOp() throws IOException {
indexShard,
initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE),
indexShard.indexSettings().getIndexMetadata(),
NoOpEngine::new
NoOpEngine::new,
new EngineConfigFactory(indexShard.indexSettings())
);
recoverShardFromStore(primary);
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
Expand Down
Loading

0 comments on commit ecac8d3

Please sign in to comment.