Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make IndexStoreListener a pluggable interface #16583

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
15 changes: 1 addition & 14 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -1412,18 +1413,4 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;

import java.util.Collections;
import java.util.List;

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
final class CompositeIndexStoreListener implements IndexStoreListener {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
jed326 marked this conversation as resolved.
Show resolved Hide resolved
private final List<IndexStoreListener> listeners;
private final static Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class);

public CompositeIndexStoreListener(List<IndexStoreListener> listeners) {
this.listeners = Collections.unmodifiableList(listeners);
}

@Override
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeShardPathDeleted(shardId, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e);

Check warning on line 57 in server/src/main/java/org/opensearch/index/store/IndexStoreListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/IndexStoreListener.java#L56-L57

Added lines #L56 - L57 were not covered by tests
andrross marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

@Override
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeIndexPathDeleted(index, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e);

Check warning on line 68 in server/src/main/java/org/opensearch/index/store/IndexStoreListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/IndexStoreListener.java#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -33,7 +34,7 @@
*
* @opensearch.internal
*/
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
public class FileCacheCleaner implements IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final Provider<FileCache> fileCacheProvider;
Expand Down
22 changes: 20 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
Expand Down Expand Up @@ -548,10 +549,27 @@ protected Node(
*/
this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
Stream<IndexStoreListener> indexStoreListenerStream = pluginsService.filterPlugins(IndexStorePlugin.class)
jed326 marked this conversation as resolved.
Show resolved Hide resolved
.stream()
.map(IndexStorePlugin::getIndexStoreListener)
.filter(Optional::isPresent)
.map(Optional::get);
// FileCache is only initialized on search nodes, so we only create FileCacheCleaner on search nodes as well
if (DiscoveryNode.isSearchNode(settings) == false) {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
nodeEnvironment = new NodeEnvironment(
settings,
environment,
new IndexStoreListener.CompositeIndexStoreListener(indexStoreListenerStream.collect(Collectors.toList()))
);
} else {
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
nodeEnvironment = new NodeEnvironment(
settings,
environment,
new IndexStoreListener.CompositeIndexStoreListener(
jed326 marked this conversation as resolved.
Show resolved Hide resolved
Stream.concat(indexStoreListenerStream, Stream.of(new FileCacheCleaner(this::fileCache)))
.collect(Collectors.toList())
)
);
}
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.indices.recovery.RecoveryState;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
* A plugin that provides alternative directory implementations.
Expand Down Expand Up @@ -105,4 +107,11 @@ interface RecoveryStateFactory {
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.emptyMap();
}

/**
* The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion
*/
default Optional<IndexStoreListener> getIndexStoreListener() {
return Optional.empty();
}
}
42 changes: 31 additions & 11 deletions server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.node.Node;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.NodeRoles;
Expand Down Expand Up @@ -360,24 +361,39 @@ protected void doRun() throws Exception {
}

public void testIndexStoreListener() throws Exception {
final AtomicInteger shardCounter = new AtomicInteger(0);
final AtomicInteger indexCounter = new AtomicInteger(0);
final AtomicInteger shardCounter1 = new AtomicInteger(0);
final AtomicInteger shardCounter2 = new AtomicInteger(0);
final AtomicInteger indexCounter1 = new AtomicInteger(0);
final AtomicInteger indexCounter2 = new AtomicInteger(0);
final Index index = new Index("foo", "fooUUID");
final ShardId shardId = new ShardId(index, 0);
final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() {
final IndexStoreListener listener1 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter.incrementAndGet();
shardCounter1.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter.incrementAndGet();
indexCounter1.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(listener);
final IndexStoreListener listener2 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter2.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter2.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2)));

for (Path path : env.indexPaths(index)) {
Files.createDirectories(path.resolve("0"));
Expand All @@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N
for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path.resolve("0")));
}
assertEquals(0, shardCounter.get());
assertEquals(0, shardCounter1.get());
assertEquals(0, shardCounter2.get());

env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path.resolve("0")));
}
assertEquals(1, shardCounter.get());
assertEquals(1, shardCounter1.get());
assertEquals(1, shardCounter2.get());

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path));
}
assertEquals(0, indexCounter.get());
assertEquals(0, indexCounter1.get());
assertEquals(0, indexCounter2.get());

env.deleteIndexDirectorySafe(index, 5000, idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path));
}
assertEquals(1, indexCounter.get());
assertEquals(1, indexCounter1.get());
assertEquals(1, indexCounter2.get());
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
env.close();
}
Expand Down Expand Up @@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(Settings.EMPTY);
}

public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException {
public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException {
Settings build = buildEnvSettings(Settings.EMPTY);
return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener);
}
Expand Down
Loading