Skip to content

Commit

Permalink
Add an index->step cache to the PolicyStepsRegistry (#82316)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Jan 27, 2022
1 parent 9d0be14 commit c330790
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/82316.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 82316
summary: Add an index->step cache to the `PolicyStepsRegistry`
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,25 @@ public void clusterChanged(ClusterChangedEvent event) {
if (prevIsMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster();
if (this.isMaster) {
// we weren't the master, and now we are
onMaster(event.state());
} else {
// we were the master, and now we aren't
cancelJob();
policyRegistry.clear();
}
}

final IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE);
if (this.isMaster && lifecycleMetadata != null) {
triggerPolicies(event.state(), true);
// if we're the master, then process deleted indices and trigger policies
if (this.isMaster) {
for (Index index : event.indicesDeleted()) {
policyRegistry.delete(index);
}

final IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE);
if (lifecycleMetadata != null) {
triggerPolicies(event.state(), true);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xcontent.DeprecationHandler;
Expand Down Expand Up @@ -49,20 +50,27 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class PolicyStepsRegistry {
private static final Logger logger = LogManager.getLogger(PolicyStepsRegistry.class);

private final NamedXContentRegistry xContentRegistry;
private final Client client;
private final XPackLicenseState licenseState;

// keeps track of existing policies in the cluster state
private final SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap;
// keeps track of what the first step in a policy is, the key is policy name
private final Map<String, Step> firstStepMap;
// keeps track of a mapping from policy/step-name to respective Step, the key is policy name
private final Map<String, Map<Step.StepKey, Step>> stepMap;
private final NamedXContentRegistry xContentRegistry;

// tracks an index->step cache, where the indexmetadata is also tracked for cache invalidation/eviction purposes.
// for a given index, the step can be cached as long as the indexmetadata (and the policy!) hasn't changed. since
// policies change infrequently, the entire cache is cleared on policy change.
private final Map<Index, Tuple<IndexMetadata, Step>> cachedSteps = new ConcurrentHashMap<>();

public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client, XPackLicenseState licenseState) {
this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client, licenseState);
Expand Down Expand Up @@ -99,6 +107,9 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {
public void update(IndexLifecycleMetadata meta) {
assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";

// since the policies (may have) changed, the whole steps cache needs to be thrown out
cachedSteps.clear();

DiffableUtils.MapDiff<String, LifecyclePolicyMetadata, Map<String, LifecyclePolicyMetadata>> mapDiff = DiffableUtils.diff(
lifecyclePolicyMap,
meta.getPolicyMetadatas(),
Expand Down Expand Up @@ -155,6 +166,36 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
}
}

/**
* Remove the entry for an index from the index->step cache.
*
* We clear the map entirely when the master of the cluster changes, and when any
* policy changes, but in a long-lived cluster that doesn't happen to experience
* either of those events (and where indices are removed regularly) we still want
* the cache to trim deleted indices.
*
* n.b. even with this, there's still a pretty small chance that a given index
* could leak, if we're right in the middle of populating the cache for that
* index (in getStep) when we process the delete here, then we'll end up with an
* entry that doesn't get deleted until the master changes or a policy changes
* -- it's harmless enough
*/
public void delete(Index deleted) {
cachedSteps.remove(deleted);
}

/**
* Clear internal maps that were populated by update (and others).
*/
public void clear() {
// this is potentially large, so it's important to clear it
cachedSteps.clear();
// these are relatively small, but there's no harm in clearing them
lifecyclePolicyMap.clear();
firstStepMap.clear();
stepMap.clear();
}

/**
* Return all ordered steps for the current policy for the index. Does not
* resolve steps using the phase caching, but only for the currently existing policy.
Expand Down Expand Up @@ -267,6 +308,14 @@ private List<Step> parseStepsFromPhase(String policy, String currentPhase, Strin

@Nullable
public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKey) {
final Tuple<IndexMetadata, Step> cachedStep = cachedSteps.get(indexMetadata.getIndex());
// n.b. we're using instance equality here for the IndexMetadata rather than object equality because it's fast,
// this means that we're erring on the side of cache misses (if the IndexMetadata changed in any way, it'll be
// a new instance, so we'll miss-and-repopulate the cache for the index in question)
if (cachedStep != null && cachedStep.v1() == indexMetadata && cachedStep.v2().getKey().equals(stepKey)) {
return cachedStep.v2();
}

if (ErrorStep.NAME.equals(stepKey.getName())) {
return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
}
Expand Down Expand Up @@ -305,7 +354,9 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe
+ phaseSteps;

// Return the step that matches the given stepKey or else null if we couldn't find it
return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s));
return s;
}

/**
Expand Down

0 comments on commit c330790

Please sign in to comment.