Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EngineConfig extensions to EnginePlugin #1387

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ public static final IndexShard newIndexShard(
indexService.mapperService(),
indexService.similarityService(),
shard.getEngineFactory(),
shard.getEngineConfigFactory(),
indexService.getIndexEventListener(),
wrapper,
indexService.getThreadPool(),
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.cache.query.IndexQueryCache;
import org.opensearch.index.cache.query.QueryCache;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -165,6 +166,7 @@ public final class IndexModule {
private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
private final EngineFactory engineFactory;
private final EngineConfigFactory engineConfigFactory;
private SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrapper =
new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
Expand All @@ -191,6 +193,7 @@ public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
Expand All @@ -199,6 +202,7 @@ public IndexModule(
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
Expand Down Expand Up @@ -496,6 +500,7 @@ public IndexService newIndexService(
shardStoreDeleter,
indexAnalyzers,
engineFactory,
engineConfigFactory,
circuitBreakerService,
bigArrays,
threadPool,
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.cache.query.QueryCache;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.fielddata.IndexFieldDataCache;
import org.opensearch.index.fielddata.IndexFieldDataService;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NamedWriteableRegistry namedWriteableRegistry;
private final SimilarityService similarityService;
private final EngineFactory engineFactory;
private final EngineConfigFactory engineConfigFactory;
private final IndexWarmer warmer;
private volatile Map<Integer, IndexShard> shards = emptyMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -174,6 +176,7 @@ public IndexService(
ShardStoreDeleter shardStoreDeleter,
IndexAnalyzers indexAnalyzers,
EngineFactory engineFactory,
EngineConfigFactory engineConfigFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand Down Expand Up @@ -254,6 +257,7 @@ public IndexService(
this.directoryFactory = directoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.readerWrapper = wrapperFactory.apply(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
Expand Down Expand Up @@ -507,6 +511,7 @@ public synchronized IndexShard createShard(
mapperService,
similarityService,
engineFactory,
engineConfigFactory,
eventListener,
readerWrapper,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* A factory to create an EngineConfig based on custom plugin overrides
*/
public class EngineConfigFactory {
private final Optional<CodecService> codecService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be declared with Optional

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mostly for the logic on L111.

this.codecService.isPresent() ? this.codecService.get() : codecService;

instead of

this.codecService != null ? this.codecService : codecService;

Which feels a little clumsy?


/** default ctor primarily used for tests without plugins */
public EngineConfigFactory(IndexSettings idxSettings) {
this(Collections.emptyList(), idxSettings);
}

/**
* Construct a factory using the plugin service and provided index settings
*/
public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSettings) {
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings);
}

/* private constructor to construct the factory from specific EnginePlugins and IndexSettings */
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
setiah marked this conversation as resolved.
Show resolved Hide resolved
Optional<CodecService> codecService = Optional.empty();
String codecServiceOverridingPlugin = null;
for (EnginePlugin enginePlugin : enginePlugins) {
// get overriding codec service from EnginePlugin
if (codecService.isPresent() == false) {
codecService = enginePlugin.getCustomCodecService(idxSettings);
codecServiceOverridingPlugin = enginePlugin.getClass().getName();
} else {
throw new IllegalStateException(
"existing codec service already overridden in: "
+ codecServiceOverridingPlugin
+ " attempting to override again by: "
+ enginePlugin.getClass().getName()
);
}
}
this.codecService = codecService;
}

/** Insantiates a new EngineConfig from the provided custom overrides */
public EngineConfig newEngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
) {

return new EngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
this.codecService.isPresent() == true ? this.codecService.get() : codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.Engine.GetResult;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.ReadOnlyEngine;
Expand Down Expand Up @@ -238,6 +239,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;
final EngineConfigFactory engineConfigFactory;

private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
Expand Down Expand Up @@ -303,6 +305,7 @@ public IndexShard(
final MapperService mapperService,
final SimilarityService similarityService,
final @Nullable EngineFactory engineFactory,
final @Nullable EngineConfigFactory engineConfigFactory,
nknize marked this conversation as resolved.
Show resolved Hide resolved
final IndexEventListener indexEventListener,
final CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
final ThreadPool threadPool,
Expand All @@ -323,6 +326,7 @@ public IndexShard(
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
Expand Down Expand Up @@ -3179,7 +3183,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
return new EngineConfig(
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
indexSettings,
Expand Down Expand Up @@ -3705,6 +3709,10 @@ EngineFactory getEngineFactory() {
return engineFactory;
}

EngineConfigFactory getEngineConfigFactory() {
return engineConfigFactory;
}

// for tests
ReplicationTracker getReplicationTracker() {
return replicationTracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.index.cache.request.ShardRequestCache;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NoOpEngine;
Expand Down Expand Up @@ -707,6 +708,7 @@ private synchronized IndexService createIndexService(
idxSettings,
analysisRegistry,
getEngineFactory(idxSettings),
getEngineConfigFactory(idxSettings),
directoryFactories,
() -> allowExpensiveQueries,
indexNameExpressionResolver,
Expand Down Expand Up @@ -739,6 +741,10 @@ private synchronized IndexService createIndexService(
);
}

private EngineConfigFactory getEngineConfigFactory(final IndexSettings idxSettings) {
return new EngineConfigFactory(this.pluginsService, idxSettings);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final IndexMetadata indexMetadata = idxSettings.getIndexMetadata();
if (indexMetadata != null && indexMetadata.getState() == IndexMetadata.State.CLOSE) {
Expand Down Expand Up @@ -781,6 +787,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
idxSettings,
analysisRegistry,
getEngineFactory(idxSettings),
getEngineConfigFactory(idxSettings),
directoryFactories,
() -> allowExpensiveQueries,
indexNameExpressionResolver,
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/plugins/EnginePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.plugins;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.EngineFactory;

import java.util.Optional;
Expand All @@ -52,4 +53,14 @@ public interface EnginePlugin {
*/
Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings);

/**
* EXPERT:
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings
* to determine if a custom {@link CodecService} should be provided for the given index. A plugin that is not overriding
* the {@link CodecService} through the plugin can ignore this method and the Codec specified in the {@link IndexSettings}
* will be used.
*/
default Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.index.cache.query.IndexQueryCache;
import org.opensearch.index.cache.query.QueryCache;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.InternalEngineTests;
import org.opensearch.index.fielddata.IndexFieldDataCache;
Expand Down Expand Up @@ -218,6 +219,7 @@ public void testWrapperIsBound() throws IOException {
indexSettings,
emptyAnalysisRegistry,
engineFactory,
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand All @@ -243,6 +245,7 @@ public void testRegisterIndexStore() throws IOException {
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
indexStoreFactories,
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down Expand Up @@ -570,6 +573,7 @@ public void testRegisterCustomRecoveryStateFactory() throws IOException {
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down Expand Up @@ -601,6 +605,7 @@ private static IndexModule createIndexModule(IndexSettings indexSettings, Analys
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void testRecoverFromNoOp() throws IOException {
indexShard,
initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE),
indexShard.indexSettings().getIndexMetadata(),
NoOpEngine::new
NoOpEngine::new,
new EngineConfigFactory(indexShard.indexSettings())
);
recoverShardFromStore(primary);
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
Expand Down
Loading