Skip to content

Commit

Permalink
[Profiling] Support index migrations
Browse files Browse the repository at this point in the history
With this commit we add the required infrastructure to detect whether
indices need to be migrated and define migrations to update mappings or
dynamic index settings.
  • Loading branch information
danielmitterdorfer committed Jul 19, 2023
1 parent ab65e6f commit 1180e8c
Show file tree
Hide file tree
Showing 6 changed files with 620 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -21,12 +27,21 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public abstract class AbstractProfilingPersistenceManager<T extends AbstractProfilingPersistenceManager.ProfilingIndexAbstraction>
implements
Expand All @@ -35,11 +50,14 @@ public abstract class AbstractProfilingPersistenceManager<T extends AbstractProf
protected final Logger logger = LogManager.getLogger(getClass());

private final AtomicBoolean inProgress = new AtomicBoolean(false);

private final ClusterService clusterService;
protected final ThreadPool threadPool;
protected final Client client;
private volatile boolean templatesEnabled;

public AbstractProfilingPersistenceManager(ClusterService clusterService) {
public AbstractProfilingPersistenceManager(ThreadPool threadPool, Client client, ClusterService clusterService) {
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
}

Expand Down Expand Up @@ -90,9 +108,9 @@ public final void clusterChanged(ClusterChangedEvent event) {
try (var refs = new RefCountingRunnable(() -> inProgress.set(false))) {
ClusterState clusterState = event.state();
for (T index : getManagedIndices()) {
Status status = getStatus(clusterState, index);
if (status.actionable) {
onStatus(clusterState, status, index, ActionListener.releasing(refs.acquire()));
IndexState<T> state = getIndexState(clusterState, index);
if (state.getStatus().actionable) {
onIndexState(clusterState, state, ActionListener.releasing(refs.acquire()));
}
}
}
Expand Down Expand Up @@ -120,29 +138,32 @@ protected boolean isAllResourcesCreated(ClusterChangedEvent event) {
* Handler that takes appropriate action for a certain index status.
*
* @param clusterState The current cluster state. Never <code>null</code>.
* @param status Status of the current index.
* @param index The current index.
* @param indexState The state of the current index.
* @param listener Listener to be called on completion / errors.
*/
protected abstract void onStatus(ClusterState clusterState, Status status, T index, ActionListener<? super ActionResponse> listener);
protected abstract void onIndexState(
ClusterState clusterState,
IndexState<T> indexState,
ActionListener<? super ActionResponse> listener
);

private Status getStatus(ClusterState state, T index) {
private IndexState<T> getIndexState(ClusterState state, T index) {
IndexMetadata metadata = indexMetadata(state, index);
if (metadata == null) {
return Status.NEEDS_CREATION;
return new IndexState<>(index, null, Status.NEEDS_CREATION);
}
if (metadata.getState() == IndexMetadata.State.CLOSE) {
logger.warn(
"Index [{}] is closed. This is likely to prevent Universal Profiling from functioning correctly",
metadata.getIndex()
);
return Status.CLOSED;
return new IndexState<>(index, metadata.getIndex(), Status.CLOSED);
}
final IndexRoutingTable routingTable = state.getRoutingTable().index(metadata.getIndex());
ClusterHealthStatus indexHealth = new ClusterIndexHealth(metadata, routingTable).getStatus();
if (indexHealth == ClusterHealthStatus.RED) {
logger.debug("Index [{}] health status is RED, any pending mapping upgrades will wait until this changes", metadata.getIndex());
return Status.UNHEALTHY;
return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY);
}
MappingMetadata mapping = metadata.mapping();
if (mapping != null) {
Expand All @@ -159,23 +180,108 @@ private Status getStatus(ClusterState state, T index) {
currentIndexVersion = getVersionField(metadata.getIndex(), meta, "index-version");
currentTemplateVersion = getVersionField(metadata.getIndex(), meta, "index-template-version");
if (currentIndexVersion == -1 || currentTemplateVersion == -1) {
return Status.UNHEALTHY;
return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY);
}
}
if (index.getVersion() > currentIndexVersion) {
return Status.NEEDS_VERSION_BUMP;
} else if (ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION > currentTemplateVersion) {
// TODO 8.10+: Check if there are any pending migrations. If none are pending we can consider the index up to date.
return Status.NEEDS_MAPPINGS_UPDATE;
return new IndexState<>(index, metadata.getIndex(), Status.NEEDS_VERSION_BUMP);
} else if (getIndexTemplateVersion() > currentTemplateVersion) {
// if there are no migrations we can consider the index up-to-date even if the index template version does not match.
List<Migration> pendingMigrations = index.getMigrations(currentTemplateVersion);
if (pendingMigrations.isEmpty()) {
logger.trace(
"Index [{}] with index template version [{}] (current is [{}]) is up-to-date (no pending migrations).",
metadata.getIndex(),
currentTemplateVersion,
getIndexTemplateVersion()
);
return new IndexState<>(index, metadata.getIndex(), Status.UP_TO_DATE);
}
logger.trace(
"Index [{}] with index template version [{}] (current is [{}]) has [{}] pending migrations.",
metadata.getIndex(),
currentTemplateVersion,
getIndexTemplateVersion(),
pendingMigrations.size()
);
return new IndexState<>(index, metadata.getIndex(), Status.NEEDS_MAPPINGS_UPDATE, pendingMigrations);
} else {
return Status.UP_TO_DATE;
return new IndexState<>(index, metadata.getIndex(), Status.UP_TO_DATE);
}
} else {
logger.warn("No mapping found for existing index [{}]. Index cannot be migrated.", metadata.getIndex());
return Status.UNHEALTHY;
return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY);
}
}

// overridable for testing
protected int getIndexTemplateVersion() {
return ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION;
}

protected final void applyMigrations(IndexState<T> indexState, ActionListener<? super ActionResponse> listener) {
String writeIndex = indexState.getWriteIndex().getName();
try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (Migration migration : indexState.getPendingMigrations()) {
logger.debug("Applying migration [{}] for [{}].", migration, writeIndex);
migration.apply(
writeIndex,
(r -> updateMapping(r, ActionListener.releasing(refs.acquire()))),
(r -> updateSettings(r, ActionListener.releasing(refs.acquire())))
);
}
}
}

protected final void updateMapping(PutMappingRequest request, ActionListener<AcknowledgedResponse> listener) {
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsync("put mapping", request, listener, (req, l) -> client.admin().indices().putMapping(req, l));
}

protected final void updateSettings(UpdateSettingsRequest request, ActionListener<AcknowledgedResponse> listener) {
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsync("update settings", request, listener, (req, l) -> client.admin().indices().updateSettings(req, l));
}

protected final <Request extends ActionRequest & IndicesRequest, Response extends AcknowledgedResponse> void executeAsync(
final String actionName,
final Request request,
final ActionListener<Response> listener,
BiConsumer<Request, ActionListener<Response>> consumer
) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ClientHelper.PROFILING_ORIGIN, request, new ActionListener<>() {
@Override
public void onResponse(Response response) {
if (response.isAcknowledged() == false) {
logger.error(
"Could not execute action [{}] for indices [{}] for [{}], request was not acknowledged",
actionName,
request.indices(),
ClientHelper.PROFILING_ORIGIN
);
}
listener.onResponse(response);
}

@Override
public void onFailure(Exception ex) {
logger.error(
() -> format(
"Could not execute action [%s] for indices [%s] for [%s]",
actionName,
request.indices(),
ClientHelper.PROFILING_ORIGIN
),
ex
);
listener.onFailure(ex);
}
}, consumer);
});
}

private int getVersionField(Index index, Map<String, Object> meta, String fieldName) {
Object value = meta.get(fieldName);
if (value instanceof Integer) {
Expand All @@ -189,6 +295,40 @@ private int getVersionField(Index index, Map<String, Object> meta, String fieldN
return -1;
}

protected static final class IndexState<T extends ProfilingIndexAbstraction> {
private final T index;
private final Index writeIndex;
private final Status status;
private final List<Migration> pendingMigrations;

IndexState(T index, Index writeIndex, Status status) {
this(index, writeIndex, status, null);
}

IndexState(T index, Index writeIndex, Status status, List<Migration> pendingMigrations) {
this.index = index;
this.writeIndex = writeIndex;
this.status = status;
this.pendingMigrations = pendingMigrations;
}

public T getIndex() {
return index;
}

public Index getWriteIndex() {
return writeIndex;
}

public Status getStatus() {
return status;
}

public List<Migration> getPendingMigrations() {
return pendingMigrations;
}
}

enum Status {
CLOSED(false),
UNHEALTHY(false),
Expand All @@ -214,5 +354,7 @@ interface ProfilingIndexAbstraction {
String getName();

int getVersion();

List<Migration> getMigrations(int currentIndexTemplateVersion);
}
}
Loading

0 comments on commit 1180e8c

Please sign in to comment.