diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java new file mode 100644 index 000000000..57381f5b8 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java @@ -0,0 +1,308 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.actions; + + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.AppContext; +import org.opensearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; + +public class SearchBackPressureAction extends SuppressibleAction { + private static final Logger LOG = LogManager.getLogger(SearchBackPressureAction.class); + public static final String NAME = "SearchBackPressureAction"; + private static final ImpactVector NO_IMPACT = new ImpactVector(); + + /* + * Time to wait since last recommendation, before suggesting this action again + */ + private static final long DEFAULT_COOL_OFF_PERIOD_IN_MILLIS = TimeUnit.DAYS.toMillis(1); + + /* From Config Per Diumension Type + * canUpdate: whether the action should be emitted + * coolOffPeriodInMillis: how long the CoolOffPeriod the action should before reemit + * thresholdName: the name of threshold we are tuning (e.g. node_duress.cpu_threshold, search_heap_threshold) + * dimension: indicates whether the resource unit is caused by shard/task level searchbackpressure cancellation stats + * Step Size in percentage: how much should the threshold change in percentage + */ + private boolean canUpdate; + private long coolOffPeriodInMillis; + private String thresholdName; + + private SearchbpDimension dimension; + private SearchbpThresholdActionDirection direction; + private double stepSizeInPercentage; + + public SearchBackPressureAction( + final AppContext appContext, + final boolean canUpdate, + final long coolOffPeriodInMillis, + final String thresholdName, + final SearchbpDimension dimension, + final SearchbpThresholdActionDirection direction, + final double stepSizeInPercentage) { + super(appContext); + this.canUpdate = canUpdate; + this.coolOffPeriodInMillis = coolOffPeriodInMillis; + this.thresholdName = thresholdName; + this.dimension = dimension; + this.direction = direction; + this.stepSizeInPercentage = stepSizeInPercentage; + } + + @Override + public String name() { + return NAME; + } + + @Override + public boolean canUpdate() { + return canUpdate; + } + + @Override + public long coolOffPeriodInMillis() { + return coolOffPeriodInMillis; + } + + @Override + public List impactedNodes() { + // all nodes are impacted by this change + return appContext.getDataNodeInstances().stream() + .map(NodeKey::new) + .collect(Collectors.toList()); + } + + /* Search Back Pressure Decider/Policy only tunes searchbackpressure related thresholds (e.g. search_backpressure.search_task_heap_threshold) + * and it does not correlate directly with any current dimension in the ImpactVector (e.g. CPU/HEAP). + * And the current Searchbp actions only adjust heap related Searchbp Thresholds for now. + * Dimensions in ImpactVector is used by collator to determine which action should be emitted to Publisher, + * eventually which actions should the downstream class execute. So if there are 2 actions emitting in the same time, one increase CPU and one decrease it, the collator cancel out the actions. + * However, since for Searchbp Actions we only tune the searchbp threshold once per time (it's impossible for 2 actions emitting in the same time that increase and decrease searchbackpressure heap usage threshold). + * Therefore, we put no Impact for ImpactVector for Searchbp Actions. + */ + @Override + public Map impact() { + Map impact = new HashMap<>(); + for (NodeKey key : impactedNodes()) { + impact.put(key, NO_IMPACT); + } + return impact; + } + + public String getThresholdName() { + return thresholdName; + } + + public String getDimension() { + return dimension.toString(); + } + + public String getDirection() { + return direction.toString(); + } + + public double getStepSizeInPercentage() { + return stepSizeInPercentage; + } + + @Override + public String summary() { + Summary summary = + new Summary( + thresholdName, + dimension.toString(), + direction.toString(), + stepSizeInPercentage, + DEFAULT_COOL_OFF_PERIOD_IN_MILLIS, + canUpdate); + return summary.toJson(); + } + + public static final class Builder { + public static final boolean DEFAULT_CAN_UPDATE = true; + + private final AppContext appContext; + private final String thresholdName; + private final SearchbpDimension dimension; + private final SearchbpThresholdActionDirection direction; + private boolean canUpdate; + private double stepSizeInPercentage; + private long coolOffPeriodInMillis; + + private Builder( + final AppContext appContext, + final String thresholdName, + final SearchbpDimension dimension, + final SearchbpThresholdActionDirection direction, + final long coolOffPeriodInMillis) { + this.appContext = appContext; + this.thresholdName = thresholdName; + this.dimension = dimension; + this.direction = direction; + this.coolOffPeriodInMillis = coolOffPeriodInMillis; + this.canUpdate = DEFAULT_CAN_UPDATE; + } + + public Builder stepSizeInPercentage(double stepSizeInPercentage) { + this.stepSizeInPercentage = stepSizeInPercentage; + return this; + } + + public Builder coolOffPeriodInMillis(long coolOffPeriodInMillis) { + this.coolOffPeriodInMillis = coolOffPeriodInMillis; + return this; + } + + public SearchBackPressureAction build() { + return new SearchBackPressureAction( + appContext, + canUpdate, + coolOffPeriodInMillis, + thresholdName, + dimension, + direction, + stepSizeInPercentage); + } + } + + /* Write Static Class Summary to conver the Searchbp Action POJO to JSON Object + * Key fields to be included + * 1. ThresholdName: name of the SearchBackPressure threshold to be tuned + * 2. Dimension of the action (Shard/Task) + * 3. Direction of the action (Increase/Decrease) + * 3. StepSizeInPercentage to change the threshold + * 4. CoolOffPeriodInMillis for the action + * 5. canUpdate (whether the action should be emitted) + */ + public static class Summary { + public static final String THRESHOLD_NAME = "thresholdName"; + public static final String SEARCHBP_DIMENSION = "searchbpDimension"; + public static final String DIRECTION = "direction"; + public static final String STEP_SIZE_IN_PERCENTAGE = "stepSizeInPercentage"; + public static final String COOL_OFF_PERIOD = "coolOffPeriodInMillis"; + public static final String CAN_UPDATE = "canUpdate"; + + @SerializedName(value = THRESHOLD_NAME) + private String thresholdName; + + @SerializedName(value = SEARCHBP_DIMENSION) + private String searchbpSettingDimension; + + @SerializedName(value = DIRECTION) + private String direction; + + @SerializedName(value = STEP_SIZE_IN_PERCENTAGE) + private double stepSizeInPercentage; + + @SerializedName(value = COOL_OFF_PERIOD) + private long coolOffPeriodInMillis; + + @SerializedName(value = CAN_UPDATE) + private boolean canUpdate; + + public Summary( + String thresholdName, + String searchbpSettingDimension, + String direction, + double stepSizeInPercentage, + long coolOffPeriodInMillis, + boolean canUpdate) { + this.thresholdName = thresholdName; + this.searchbpSettingDimension = searchbpSettingDimension; + this.direction = direction; + this.stepSizeInPercentage = stepSizeInPercentage; + this.coolOffPeriodInMillis = coolOffPeriodInMillis; + this.canUpdate = canUpdate; + } + + /* + * ThresholdName is the name of the setting to be modified + * e.g. node_duress.cpu_threshold, node_duress.search_heap_threshold + */ + public String getThresholdName() { + return thresholdName; + } + + public String getSearchbpSettingDimension() { + return searchbpSettingDimension; + } + + public String getDirection() { + return direction; + } + + public double getStepSizeInPercentage() { + return stepSizeInPercentage; + } + + public long getCoolOffPeriodInMillis() { + return coolOffPeriodInMillis; + } + + public boolean getCanUpdate() { + return canUpdate; + } + + public String toJson() { + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(this); + } + } + + // enum to indicate to increase/decrease the threshold + public enum SearchbpThresholdActionDirection { + INCREASE(SearchbpThresholdActionDirection.Constants.INCREASE), + DECREASE(SearchbpThresholdActionDirection.Constants.DECREASE); + + private final String value; + + SearchbpThresholdActionDirection(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String INCREASE = "increase"; + public static final String DECREASE = "decrease"; + } + } + + // enum to indicate to whether the action is caused by shard/task level searchbackpressure + // cancellation + public enum SearchbpDimension { + SHARD(SearchbpDimension.Constants.SHARD), + TASK(SearchbpDimension.Constants.TASK); + + private final String value; + + SearchbpDimension(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String SHARD = "shard"; + public static final String TASK = "task"; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java index c2b3ed444..096c45917 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java @@ -8,6 +8,7 @@ import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.jvm.OldGenDecisionPolicyConfig; import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.jvm.young_gen.JvmGenTuningPolicyConfig; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.searchbackpressure.SearchBackPressurePolicyConfig; import org.opensearch.performanceanalyzer.rca.framework.core.NestedConfig; import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; @@ -28,11 +29,14 @@ public class DeciderConfig { private static final String OLD_GEN_DECISION_POLICY_CONFIG_NAME = "old-gen-decision-policy-config"; private static final String JVM_GEN_TUNING_POLICY_CONFIG_NAME = "jvm-gen-tuning-policy-config"; + private static final String SEARCH_BACK_PRESSURE_POLICY_CONFIG_NAME = + "search-back-pressure-policy-config"; private final CachePriorityOrderConfig cachePriorityOrderConfig; private final WorkLoadTypeConfig workLoadTypeConfig; private final OldGenDecisionPolicyConfig oldGenDecisionPolicyConfig; private final JvmGenTuningPolicyConfig jvmGenTuningPolicyConfig; + private final SearchBackPressurePolicyConfig searchBackPressurePolicyConfig; public DeciderConfig(final RcaConf rcaConf) { cachePriorityOrderConfig = @@ -51,6 +55,11 @@ public DeciderConfig(final RcaConf rcaConf) { new NestedConfig( JVM_GEN_TUNING_POLICY_CONFIG_NAME, rcaConf.getDeciderConfigSettings())); + searchBackPressurePolicyConfig = + new SearchBackPressurePolicyConfig( + new NestedConfig( + SEARCH_BACK_PRESSURE_POLICY_CONFIG_NAME, + rcaConf.getDeciderConfigSettings())); } public CachePriorityOrderConfig getCachePriorityOrderConfig() { @@ -68,4 +77,8 @@ public OldGenDecisionPolicyConfig getOldGenDecisionPolicyConfig() { public JvmGenTuningPolicyConfig getJvmGenTuningPolicyConfig() { return jvmGenTuningPolicyConfig; } + + public SearchBackPressurePolicyConfig getSearchBackPressurePolicyConfig() { + return searchBackPressurePolicyConfig; + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/searchbackpressure/SearchBackPressurePolicyConfig.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/searchbackpressure/SearchBackPressurePolicyConfig.java new file mode 100644 index 000000000..11f7593e6 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/searchbackpressure/SearchBackPressurePolicyConfig.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.searchbackpressure; + + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBackPressurePolicy; +import org.opensearch.performanceanalyzer.rca.framework.core.Config; +import org.opensearch.performanceanalyzer.rca.framework.core.NestedConfig; + +/** + * Configures various thresholds for the {@link SearchBackPressurePolicy} + * + *

The config follows the format below "decider-config-settings": { + * "search-back-pressure-policy-config": { "enabled": true, // whether the + * serch-back-pressure-policy should be enabled "hour-breach-threshold": 30, // threshold for hourly + * received unhealthy cluster level rca flow units, if above, then the below thresholds should be + * modified, "threshold_count": 1, // how many thresholds to be changed, in this case + * search-heap-threshold, "searchbp-heap-stepsize-in-percentage": 5, } } + * "searchbp-heap-stepsize-in-percentage" defines the step size to change heap related threshold (in + * percentage). + */ +public class SearchBackPressurePolicyConfig { + private static final Logger LOG = LogManager.getLogger(SearchBackPressurePolicyConfig.class); + + // Field Names + private static final String ENABLED = "enabled"; + private static final String HOUR_BREACH_THRESHOLD = "hour-breach-threshold"; + private static final String THRESHOLD_COUNT = "threshold_count"; + private static final String SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE = + "searchbp-heap-stepsize-in-percentage"; + + // Default values + public static final boolean DEFAULT_ENABLED = true; + + // TO DO: Decide the Default Hour breach threshold + public static final int DEFAULT_HOUR_BREACH_THRESHOLD = 2; + public static final int HOUR_MONITOR_WINDOW_SIZE_MINUTES = (int) TimeUnit.HOURS.toMinutes(1); + public static final int HOUR_MONITOR_BUCKET_SIZE_MINUTES = 1; + public static final double DEFAULT_SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE = 5; + + private Config hourBreachThreshold; + private Config enabled; + private Config searchbpHeapStepsizeInPercentage; + + public SearchBackPressurePolicyConfig(NestedConfig config) { + enabled = new Config<>(ENABLED, config.getValue(), DEFAULT_ENABLED, Boolean.class); + hourBreachThreshold = + new Config<>( + HOUR_BREACH_THRESHOLD, + config.getValue(), + DEFAULT_HOUR_BREACH_THRESHOLD, + Integer.class); + LOG.debug( + "SearchBackPressurePolicyConfig hour breach threshold is: {}", + hourBreachThreshold.getValue()); + + searchbpHeapStepsizeInPercentage = + new Config<>( + SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE, + config.getValue(), + DEFAULT_SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE, + Double.class); + LOG.debug( + "searchbpHeapStepsizeInPercentage is {}", + searchbpHeapStepsizeInPercentage.getValue()); + } + + /** + * Whether or not to enable the policy. A disabled policy will not emit any actions. + * + * @return Whether or not to enable the policy + */ + public boolean isEnabled() { + return enabled.getValue(); + } + + public int getHourBreachThreshold() { + return hourBreachThreshold.getValue(); + } + + public int getHourMonitorWindowSizeMinutes() { + return HOUR_MONITOR_WINDOW_SIZE_MINUTES; + } + + public int getHourMonitorBucketSizeMinutes() { + return HOUR_MONITOR_BUCKET_SIZE_MINUTES; + } + + public double getSearchbpHeapStepsizeInPercentage() { + return searchbpHeapStepsizeInPercentage.getValue(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressureDecider.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressureDecider.java new file mode 100644 index 000000000..68fbb3370 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressureDecider.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure; + + +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.AppContext; +import org.opensearch.performanceanalyzer.decisionmaker.actions.Action; +import org.opensearch.performanceanalyzer.decisionmaker.actions.SearchBackPressureAction; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.Decider; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.Decision; +import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; +import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA; + +/** decider to change the dynamic settings of SearchBackPressure In-flight Cancellation */ +public class SearchBackPressureDecider extends Decider { + private static final Logger LOG = LogManager.getLogger(SearchBackPressureDecider.class); + public static final String NAME = "SearchBackPressureDecider"; + + private final SearchBackPressurePolicy searchBackPressurePolicy; + + private int currentIteration = 0; + private SearchBackPressureClusterRCA searchBackPressureClusterRCA; + + public SearchBackPressureDecider( + long evalIntervalSeconds, + int decisionFrequency, + SearchBackPressureClusterRCA searchBackPressureClusterRCA) { + super(evalIntervalSeconds, decisionFrequency); + this.searchBackPressureClusterRCA = searchBackPressureClusterRCA; + this.searchBackPressurePolicy = new SearchBackPressurePolicy(searchBackPressureClusterRCA); + LOG.debug("SearchBackPressureDecider created"); + } + + @Override + public String name() { + return NAME; + } + + @Override + public Decision operate() { + LOG.debug( + "SearchBackPressureDecider#2 operate() with currentIteration: {}", + currentIteration); + + Decision decision = new Decision(System.currentTimeMillis(), NAME); + currentIteration += 1; + if (currentIteration < decisionFrequency) { + return decision; + } + + // reset the currentIteration for next action emitting cycle + currentIteration = 0; + + // SearchBackPressure Policy is always accepted since Searchbp Decider only use the actions + // suggested by Searchbp Policy + List searchBackPressureActions = searchBackPressurePolicy.evaluate(); + + // loop through the actions and print the action threshold name, dimension, + // increase/decrease + searchBackPressureActions.stream() + .forEach( + (action) -> { + LOG.debug( + "searchBackPressureActions details, threshold name: {}, dimension: {}, increase/decrease: {}, stepsize: {}", + ((SearchBackPressureAction) action).getThresholdName(), + ((SearchBackPressureAction) action).getDimension(), + ((SearchBackPressureAction) action).getDirection(), + ((SearchBackPressureAction) action).getStepSizeInPercentage()); + }); + + searchBackPressureActions.forEach(decision::addAction); + + LOG.debug("decision action size is {}", decision.getActions().size()); + return decision; + } + + /* Read RCA Config to fill the dynamic threshold settings for the SearchBackPressure Service */ + @Override + public void readRcaConf(RcaConf conf) { + super.readRcaConf(conf); + searchBackPressurePolicy.setRcaConf(conf); + } + + /* Set AppContext for SearchBackPressurePolicy */ + @Override + public void setAppContext(final AppContext appContext) { + super.setAppContext(appContext); + searchBackPressurePolicy.setAppContext(appContext); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java new file mode 100644 index 000000000..67bd14db3 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java @@ -0,0 +1,315 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure; + +import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_SHARD; +import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_TASK; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.AppContext; +import org.opensearch.performanceanalyzer.decisionmaker.actions.Action; +import org.opensearch.performanceanalyzer.decisionmaker.actions.SearchBackPressureAction; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.DecisionPolicy; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.searchbackpressure.SearchBackPressurePolicyConfig; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureIssue; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureSearchTaskIssue; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureSearchTaskIssue.SearchbpTaskAlarmMonitorMapKeys; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureShardIssue; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureShardIssue.SearchbpShardAlarmMonitorMapKeys; +import org.opensearch.performanceanalyzer.grpc.Resource; +import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.BucketizedSlidingWindowConfig; +import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; +import org.opensearch.performanceanalyzer.rca.framework.util.RcaConsts; +import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA; + +/** + * Decides if the SearchBackPressure threshold should be modified suggests actions to take to + * achieve improved performance. + */ +public class SearchBackPressurePolicy implements DecisionPolicy { + private static final Logger LOG = LogManager.getLogger(SearchBackPressurePolicy.class); + + // Default COOLOFF Period for the action (1 DAY) + private static final long DEAFULT_COOLOFF_PERIOD_IN_MILLIS = 24L * 60L * 60L * 1000L; + private static final String HEAP_THRESHOLD_STR = "heap_usage"; + private static final String SHARD_DIMENSION_STR = "SHARD"; + private static final String TASK_DIMENSION_STR = "TASK"; + private static final double DEFAULT_HEAP_CHANGE_IN_PERCENTAGE = 5.0; + + private static final Path SEARCHBP_DATA_FILE_PATH = + Paths.get(RcaConsts.CONFIG_DIR_PATH, "SearchBackPressurePolicy_heap"); + + /* TODO: Specify a path to store SearchBackpressurePolicy_Autotune Stats */ + + private AppContext appContext; + private RcaConf rcaConf; + private SearchBackPressurePolicyConfig policyConfig; + private SearchBackPressureClusterRCA searchBackPressureClusterRCA; + + /* Alarm for heap usage */ + static final List HEAP_SEARCHBP_SHARD_SIGNALS = + Lists.newArrayList(SEARCHBACKPRESSURE_SHARD); + static final List HEAP_SEARCHBP_TASK_SIGNALS = + Lists.newArrayList(SEARCHBACKPRESSURE_TASK); + + SearchBackPressureIssue searchBackPressureIssue; + + /* alarm monitors per threshold */ + // shard-level alarms + @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureShardHeapIncreaseAlarm; + @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureShardHeapDecreaseAlarm; + HashMap searchBackPressureShardAlarmMonitorMap; + + // task-level alarms + @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureTaskHeapIncreaseAlarm; + @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureTaskHeapDecreaseAlarm; + HashMap searchBackPressureTaskAlarmMonitorMap; + + public SearchBackPressurePolicy( + SearchBackPressureClusterRCA searchBackPressureClusterRCA, + SearchBpActionsAlarmMonitor searchBackPressureShardHeapIncreaseAlarm, + SearchBpActionsAlarmMonitor searchBackPressureShardHeapDecreaseAlarm, + SearchBpActionsAlarmMonitor searchBackPressureTaskHeapIncreaseAlarm, + SearchBpActionsAlarmMonitor searchBackPressureTaskHeapDecreaseAlarm) { + this.searchBackPressureClusterRCA = searchBackPressureClusterRCA; + this.searchBackPressureShardHeapIncreaseAlarm = searchBackPressureShardHeapIncreaseAlarm; + this.searchBackPressureShardHeapDecreaseAlarm = searchBackPressureShardHeapDecreaseAlarm; + this.searchBackPressureTaskHeapIncreaseAlarm = searchBackPressureTaskHeapIncreaseAlarm; + this.searchBackPressureTaskHeapDecreaseAlarm = searchBackPressureTaskHeapDecreaseAlarm; + } + + public SearchBackPressurePolicy(SearchBackPressureClusterRCA searchBackPressureClusterRCA) { + this(searchBackPressureClusterRCA, null, null, null, null); + } + + /** + * records issues which the policy cares about and discards others + * + * @param issue an issue with the application + */ + private void record(HotResourceSummary summary) { + if (HEAP_SEARCHBP_SHARD_SIGNALS.contains(summary.getResource())) { + searchBackPressureIssue = + new SearchBackPressureShardIssue( + summary, searchBackPressureShardAlarmMonitorMap); + searchBackPressureIssue.recordIssueBySummaryType(summary); + } + + if (HEAP_SEARCHBP_TASK_SIGNALS.contains(summary.getResource())) { + searchBackPressureIssue = + new SearchBackPressureSearchTaskIssue( + summary, searchBackPressureTaskAlarmMonitorMap); + searchBackPressureIssue.recordIssueBySummaryType(summary); + } + } + + /** gathers and records all issues observed in the application */ + private void recordIssues() { + LOG.debug("SearchBackPressurePolicy#recordIssues()"); + + if (searchBackPressureClusterRCA.getFlowUnits().isEmpty()) { + LOG.debug("No flow units in searchBackPressureClusterRCA"); + return; + } + + for (ResourceFlowUnit flowUnit : + searchBackPressureClusterRCA.getFlowUnits()) { + if (!flowUnit.hasResourceSummary()) { + continue; + } + + HotClusterSummary clusterSummary = flowUnit.getSummary(); + clusterSummary.getHotNodeSummaryList().stream() + .flatMap((nodeSummary) -> nodeSummary.getHotResourceSummaryList().stream()) + .forEach((resourceSummary) -> record(resourceSummary)); + } + } + + public boolean isShardHeapThresholdTooSmall() { + return !searchBackPressureShardHeapIncreaseAlarm.isHealthy(); + } + + public boolean isShardHeapThresholdTooLarge() { + return !searchBackPressureShardHeapDecreaseAlarm.isHealthy(); + } + + public boolean isTaskHeapThresholdTooSmall() { + return !searchBackPressureTaskHeapIncreaseAlarm.isHealthy(); + } + + public boolean isTaskHeapThresholdTooLarge() { + return !searchBackPressureTaskHeapDecreaseAlarm.isHealthy(); + } + + // create alarm monitor from config + public SearchBpActionsAlarmMonitor createAlarmMonitor(Path persistenceBasePath) { + LOG.debug( + "createAlarmMonitor with hour window: {}, bucket size: {}, hour threshold: {}, stepsize: {}", + policyConfig.getHourMonitorWindowSizeMinutes(), + policyConfig.getHourMonitorBucketSizeMinutes(), + policyConfig.getHourBreachThreshold(), + policyConfig.getSearchbpHeapStepsizeInPercentage()); + BucketizedSlidingWindowConfig hourMonitorConfig = + new BucketizedSlidingWindowConfig( + policyConfig.getHourMonitorWindowSizeMinutes(), + policyConfig.getHourMonitorBucketSizeMinutes(), + TimeUnit.MINUTES, + persistenceBasePath); + + // TODO: Check whether we need a persistence path to write our data + return new SearchBpActionsAlarmMonitor( + policyConfig.getHourBreachThreshold(), null, hourMonitorConfig); + } + + // initalize all alarm monitors + public void initialize() { + // initialize shard level alarm for resounce unit that suggests to increase jvm threshold + searchBackPressureShardHeapIncreaseAlarm = + initializeAlarmMonitor(searchBackPressureShardHeapIncreaseAlarm); + + // initialize shard level alarm for resounce unit that suggests to decrease jvm threshold + searchBackPressureShardHeapDecreaseAlarm = + initializeAlarmMonitor(searchBackPressureShardHeapDecreaseAlarm); + + // initialize task level alarm for resounce unit that suggests to increase jvm threshold + searchBackPressureTaskHeapIncreaseAlarm = + initializeAlarmMonitor(searchBackPressureTaskHeapIncreaseAlarm); + + // initialize task level alarm for resounce unit that suggests to decrease jvm threhsold + searchBackPressureTaskHeapDecreaseAlarm = + initializeAlarmMonitor(searchBackPressureTaskHeapDecreaseAlarm); + + initializeAlarmMonitorMap(); + } + + private SearchBpActionsAlarmMonitor initializeAlarmMonitor( + SearchBpActionsAlarmMonitor alarmMonitor) { + if (alarmMonitor == null) { + return createAlarmMonitor(SEARCHBP_DATA_FILE_PATH); + } else { + return alarmMonitor; + } + } + + private void initializeAlarmMonitorMap() { + // add shard level monitors to shardAlarmMonitorMap + searchBackPressureShardAlarmMonitorMap = new HashMap(); + searchBackPressureShardAlarmMonitorMap.put( + SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_INCREASE_ALARM.toString(), + searchBackPressureShardHeapIncreaseAlarm); + searchBackPressureShardAlarmMonitorMap.put( + SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString(), + searchBackPressureShardHeapDecreaseAlarm); + + // add task level monitors to taskAlarmMonitorMap + searchBackPressureTaskAlarmMonitorMap = new HashMap(); + searchBackPressureTaskAlarmMonitorMap.put( + SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_INCREASE_ALARM.toString(), + searchBackPressureTaskHeapIncreaseAlarm); + searchBackPressureTaskAlarmMonitorMap.put( + SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_DECREASE_ALARM.toString(), + searchBackPressureTaskHeapDecreaseAlarm); + } + + @Override + public List evaluate() { + List actions = new ArrayList<>(); + if (rcaConf == null || appContext == null) { + LOG.error("rca conf/app context is null, return empty action list"); + return actions; + } + + policyConfig = rcaConf.getDeciderConfig().getSearchBackPressurePolicyConfig(); + if (!policyConfig.isEnabled()) { + LOG.debug("SearchBackPressurePolicy is disabled"); + return actions; + } + + initialize(); + + recordIssues(); + + checkShardAlarms(actions); + checkTaskAlarms(actions); + + // print current size of the actions + LOG.debug("SearchBackPressurePolicy#evaluate() action size: {}", actions.size()); + + return actions; + } + + private void checkShardAlarms(List actions) { + if (isShardHeapThresholdTooSmall()) { + LOG.debug("isShardHeapThresholdTooSmall action Added"); + actions.add( + new SearchBackPressureAction( + appContext, + true, + DEAFULT_COOLOFF_PERIOD_IN_MILLIS, + HEAP_THRESHOLD_STR, + SearchBackPressureAction.SearchbpDimension.SHARD, + SearchBackPressureAction.SearchbpThresholdActionDirection.INCREASE, + policyConfig.getSearchbpHeapStepsizeInPercentage())); + } else if (isShardHeapThresholdTooLarge()) { + LOG.debug("isShardHeapThresholdTooLarge action Added"); + actions.add( + new SearchBackPressureAction( + appContext, + true, + DEAFULT_COOLOFF_PERIOD_IN_MILLIS, + HEAP_THRESHOLD_STR, + SearchBackPressureAction.SearchbpDimension.SHARD, + SearchBackPressureAction.SearchbpThresholdActionDirection.DECREASE, + policyConfig.getSearchbpHeapStepsizeInPercentage())); + } + } + + private void checkTaskAlarms(List actions) { + if (isTaskHeapThresholdTooSmall()) { + LOG.debug("isTaskHeapThresholdTooSmall action Added"); + actions.add( + new SearchBackPressureAction( + appContext, + true, + DEAFULT_COOLOFF_PERIOD_IN_MILLIS, + HEAP_THRESHOLD_STR, + SearchBackPressureAction.SearchbpDimension.TASK, + SearchBackPressureAction.SearchbpThresholdActionDirection.INCREASE, + policyConfig.getSearchbpHeapStepsizeInPercentage())); + } else if (isTaskHeapThresholdTooLarge()) { + LOG.debug("isTaskHeapThresholdTooLarge action Added"); + actions.add( + new SearchBackPressureAction( + appContext, + true, + DEAFULT_COOLOFF_PERIOD_IN_MILLIS, + HEAP_THRESHOLD_STR, + SearchBackPressureAction.SearchbpDimension.TASK, + SearchBackPressureAction.SearchbpThresholdActionDirection.DECREASE, + policyConfig.getSearchbpHeapStepsizeInPercentage())); + } + } + + public void setAppContext(AppContext appContext) { + this.appContext = appContext; + } + + public void setRcaConf(final RcaConf rcaConf) { + this.rcaConf = rcaConf; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBpActionsAlarmMonitor.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBpActionsAlarmMonitor.java new file mode 100644 index 000000000..666bb3a02 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBpActionsAlarmMonitor.java @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure; + + +import com.google.common.annotations.VisibleForTesting; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.AlarmMonitor; +import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.BucketizedSlidingWindow; +import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.BucketizedSlidingWindowConfig; +import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SlidingWindowData; + +public class SearchBpActionsAlarmMonitor implements AlarmMonitor { + private static final Logger LOG = LogManager.getLogger(SearchBpActionsAlarmMonitor.class); + /* Current design uses hour monitor to evaluate the health of the searchbackpressure service + * if there are more than 30 bad resournce units in one hour, then the alarm shows a Unhealthy Signal + */ + + private static final int DEFAULT_HOUR_BREACH_THRESHOLD = 30; + private static final int DEFAULT_BUCKET_WINDOW_SIZE = 1; + private static final String HOUR_PREFIX = "hour-"; + + public static final int HOUR_MONITOR_BUCKET_WINDOW_MINUTES = 5; + + private BucketizedSlidingWindow hourMonitor; + private int hourBreachThreshold; + + private boolean alarmHealthy = true; + + @Override + public boolean isHealthy() { + evaluateAlarm(); + return alarmHealthy; + } + + public SearchBpActionsAlarmMonitor( + int hourBreachThreshold, + @Nullable Path persistencePath, + @Nullable BucketizedSlidingWindowConfig hourMonitorConfig) { + Path hourMonitorPath = null; + if (persistencePath != null) { + Path persistenceBase = persistencePath.getParent(); + Path persistenceFile = persistencePath.getFileName(); + if (persistenceBase != null && persistenceFile != null) { + hourMonitorPath = + Paths.get( + persistenceBase.toString(), + HOUR_PREFIX + persistenceFile.toString()); + } + } + // initialize hourly alarm monitor + if (hourMonitorConfig == null) { + /* + * Bucket Window Size means the number of issues can exist in a bucket + * when you consider about the size of the BucketizedSlidingWindow, the size is the + * number of buckets, not issues + */ + hourMonitor = + new BucketizedSlidingWindow( + (int) TimeUnit.HOURS.toMinutes(1), + DEFAULT_BUCKET_WINDOW_SIZE, + TimeUnit.MINUTES, + hourMonitorPath); + } else { + hourMonitor = new BucketizedSlidingWindow(hourMonitorConfig); + } + + this.hourBreachThreshold = hourBreachThreshold; + } + + public SearchBpActionsAlarmMonitor(int hourBreachThreshold, @Nullable Path persistencePath) { + this(hourBreachThreshold, persistencePath, null); + } + + public SearchBpActionsAlarmMonitor(int hourBreachThreshold) { + this(hourBreachThreshold, null, null); + } + + public SearchBpActionsAlarmMonitor(@Nullable Path persistencePath) { + this(DEFAULT_HOUR_BREACH_THRESHOLD, persistencePath); + } + + public SearchBpActionsAlarmMonitor() { + this(DEFAULT_HOUR_BREACH_THRESHOLD); + } + + @Override + public void recordIssue(long timeStamp, double value) { + SlidingWindowData dataPoint = new SlidingWindowData(timeStamp, value); + LOG.debug("Search Backpressure Actions Alarm is recording a new issue at {}", timeStamp); + hourMonitor.next(dataPoint); + } + + private void evaluateAlarm() { + if (alarmHealthy) { + if (hourMonitor.size() >= hourBreachThreshold) { + LOG.debug( + "Search Backpressure Actions Alarm is Unhealthy because hourMonitor.size() is {}, and threshold is {}", + hourMonitor.size(), + hourBreachThreshold); + alarmHealthy = false; + } + } else { + if (hourMonitor.size() == 0) { + LOG.debug("SearchBackpressure Hour Monitor is now healthy for zero capacity"); + alarmHealthy = true; + } + } + } + + public int getHourBreachThreshold() { + return hourBreachThreshold; + } + + @VisibleForTesting + BucketizedSlidingWindow getHourMonitor() { + return hourMonitor; + } + + @VisibleForTesting + void setAlarmHealth(boolean isHealthy) { + this.alarmHealthy = isHealthy; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureIssue.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureIssue.java new file mode 100644 index 000000000..423ad429f --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureIssue.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model; + + +import java.util.HashMap; +import java.util.Map; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBpActionsAlarmMonitor; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; + +/* + * SearchBackPressureIssue is the interface for all types of SearchBackPressure Issue (e.g. issue caused by overflow of shard-level heap usage) + */ +public abstract class SearchBackPressureIssue { + public HotResourceSummary hotResourceSummary; + public Map actionsAlarmMonitorMap; + + // constructor + SearchBackPressureIssue( + HotResourceSummary hotResourceSummary, + HashMap actionsAlarmMonitorMap) { + this.hotResourceSummary = hotResourceSummary; + this.actionsAlarmMonitorMap = actionsAlarmMonitorMap; + } + + public abstract void recordIssueBySummaryType(HotResourceSummary hotResourceSummary); +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureSearchTaskIssue.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureSearchTaskIssue.java new file mode 100644 index 000000000..8169cc024 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureSearchTaskIssue.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model; + + +import java.util.HashMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBpActionsAlarmMonitor; +import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; + +public class SearchBackPressureSearchTaskIssue extends SearchBackPressureIssue { + private static final Logger LOG = LogManager.getLogger(SearchBackPressureSearchTaskIssue.class); + + public SearchBackPressureSearchTaskIssue( + HotResourceSummary hotResourceSummary, + HashMap actionsAlarmMonitorMap) { + super(hotResourceSummary, actionsAlarmMonitorMap); + } + + @Override + public void recordIssueBySummaryType(HotResourceSummary summary) { + + if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) { + LOG.debug("recording increase-level issue for task"); + actionsAlarmMonitorMap + .get(SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_INCREASE_ALARM.toString()) + .recordIssue(); + } + + // decrease alarm for heap-related threshold + if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) { + LOG.debug("recording decrease-level issue for task"); + actionsAlarmMonitorMap + .get(SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_DECREASE_ALARM.toString()) + .recordIssue(); + } + } + + public enum SearchbpTaskAlarmMonitorMapKeys { + TASK_HEAP_INCREASE_ALARM( + SearchbpTaskAlarmMonitorMapKeys.Constants.TASK_HEAP_INCREASE_ALARM), + TASK_HEAP_DECREASE_ALARM( + SearchbpTaskAlarmMonitorMapKeys.Constants.TASK_HEAP_DECREASE_ALARM); + + private final String value; + + SearchbpTaskAlarmMonitorMapKeys(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String TASK_HEAP_INCREASE_ALARM = + "searchBackPressureTaskHeapIncreaseAlarm"; + public static final String TASK_HEAP_DECREASE_ALARM = + "searchBackPressureTaskHeapDecreaseAlarm"; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java new file mode 100644 index 000000000..bb13e6b31 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model; + + +import java.util.HashMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBpActionsAlarmMonitor; +import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; +import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; + +public class SearchBackPressureShardIssue extends SearchBackPressureIssue { + private static final Logger LOG = LogManager.getLogger(SearchBackPressureShardIssue.class); + + public SearchBackPressureShardIssue( + HotResourceSummary hotResourceSummary, + HashMap actionsAlarmMonitorMap) { + super(hotResourceSummary, actionsAlarmMonitorMap); + } + + @Override + public void recordIssueBySummaryType(HotResourceSummary summary) { + if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) { + LOG.debug("recording increase-level issue for shard"); + LOG.debug("size of the HashMap: {}", actionsAlarmMonitorMap.size()); + actionsAlarmMonitorMap + .get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_INCREASE_ALARM.toString()) + .recordIssue(); + } + + // decrease alarm for heap-related threshold + if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) { + LOG.debug("recording decrease-level issue for shard"); + actionsAlarmMonitorMap + .get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString()) + .recordIssue(); + } + } + + public enum SearchbpShardAlarmMonitorMapKeys { + SHARD_HEAP_INCREASE_ALARM( + SearchbpShardAlarmMonitorMapKeys.Constants.SHARD_HEAP_DECREASE_ALARM), + SHARD_HEAP_DECREASE_ALARM( + SearchbpShardAlarmMonitorMapKeys.Constants.SHARD_HEAP_DECREASE_ALARM); + + private final String value; + + SearchbpShardAlarmMonitorMapKeys(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String SHARD_HEAP_INCREASE_ALARM = + "searchBackPressureShardHeapIncreaseAlarm"; + public static final String SHARD_HEAP_DECREASE_ALARM = + "searchBackPressureShardHeapDecreaseAlarm"; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index 1b11c014e..a9423ccd0 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -19,6 +19,7 @@ import org.opensearch.performanceanalyzer.decisionmaker.deciders.QueueHealthDecider; import org.opensearch.performanceanalyzer.decisionmaker.deciders.collator.Collator; import org.opensearch.performanceanalyzer.decisionmaker.deciders.jvm.HeapHealthDecider; +import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBackPressureDecider; import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; import org.opensearch.performanceanalyzer.plugins.PluginController; import org.opensearch.performanceanalyzer.plugins.PluginControllerConfig; @@ -443,28 +444,11 @@ public void construct() { shardRequestCacheClusterRca, highHeapUsageClusterRca)); - // Search Back Pressure Service RCA enabled - SearchBackPressureRCA searchBackPressureRCA = - new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats); - searchBackPressureRCA.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); - searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats)); - - // Search Back Pressure Service Cluster RCA enabled - SearchBackPressureClusterRCA searchBackPressureClusterRCA = - new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA); - searchBackPressureClusterRCA.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); - searchBackPressureClusterRCA.addAllUpstreams( - Collections.singletonList(searchBackPressureRCA)); - searchBackPressureClusterRCA.addTag( - RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM, - RcaConsts.RcaTagConstants.LOCUS_DATA_NODE); - - // TODO: Add SearchBackPressure Decider + // SearchBackPressure RCA Decider + SearchBackPressureDecider searchBackPressureDecider = + buildSearchBackPressureDecider(heapMax, heapUsed, searchbp_Stats); + // AdmissionControl RCA Decider AdmissionControlDecider admissionControlDecider = buildAdmissionControlDecider(heapUsed, heapMax); @@ -478,7 +462,8 @@ public void construct() { queueHealthDecider, cacheHealthDecider, heapHealthDecider, - admissionControlDecider); + admissionControlDecider, + searchBackPressureDecider); collator.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); @@ -487,7 +472,8 @@ public void construct() { queueHealthDecider, cacheHealthDecider, heapHealthDecider, - admissionControlDecider)); + admissionControlDecider, + searchBackPressureDecider)); // Publisher - Executes decisions output from collator Publisher publisher = new Publisher(EVALUATION_INTERVAL_SECONDS, collator); @@ -502,6 +488,40 @@ public void construct() { pluginController.initPlugins(); } + private SearchBackPressureDecider buildSearchBackPressureDecider( + Metric heapMax, Metric heapUsed, Metric searchbp_Stats) { + // Enbale SearchBackPressure node-level RCA + SearchBackPressureRCA searchBackPressureRCA = + new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats); + searchBackPressureRCA.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); + searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats)); + + // Enable SearchBackPressure cluster-level RCA + SearchBackPressureClusterRCA searchBackPressureClusterRCA = + new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA); + searchBackPressureClusterRCA.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); + searchBackPressureClusterRCA.addAllUpstreams( + Collections.singletonList(searchBackPressureRCA)); + searchBackPressureClusterRCA.addTag( + RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM, + RcaConsts.RcaTagConstants.LOCUS_DATA_NODE); + + // Enabel SearchBackPressureDecider + SearchBackPressureDecider searchBackPressureDecider = + new SearchBackPressureDecider( + EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, searchBackPressureClusterRCA); + searchBackPressureDecider.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); + searchBackPressureDecider.addAllUpstreams( + Collections.singletonList(searchBackPressureClusterRCA)); + return searchBackPressureDecider; + } + private AdmissionControlDecider buildAdmissionControlDecider(Metric heapUsed, Metric heapMax) { AdmissionControlRca admissionControlRca = new AdmissionControlRca(RCA_PERIOD, heapUsed, heapMax); diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java index e2056eec8..a7b2c25ac 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java @@ -6,8 +6,10 @@ package org.opensearch.performanceanalyzer.rca.store.rca; +import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Field; @@ -259,33 +261,24 @@ public double readMin() { * be implemented as minSlidingWindow or maxSlidingWindow depending on the need. */ public static class MinMaxSlidingWindow extends SlidingWindow { - boolean isMinSlidingWindow; + BiConsumer, SlidingWindowData> nextElementFunc; public MinMaxSlidingWindow( int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, TimeUnit timeUnit, - boolean isMinSlidingWindow) { + BiConsumer, SlidingWindowData> nextElementFunc) { super(SLIDING_WINDOW_SIZE_IN_TIMESTAMP, timeUnit); - this.isMinSlidingWindow = isMinSlidingWindow; + + // get the Biconsumer lambda function passed in + this.nextElementFunc = nextElementFunc; } @Override public void next(SlidingWindowData e) { - if (isMinSlidingWindow) { - // monotonically decreasing sliding window - while (!windowDeque.isEmpty() - && windowDeque.peekFirst().getValue() >= e.getValue()) { - windowDeque.pollFirst(); - } - } else { - // monotonically increasing sliding window - while (!windowDeque.isEmpty() - && windowDeque.peekFirst().getValue() < e.getValue()) { - windowDeque.pollFirst(); - } - } + // use the passed in lambda function to accept next element + nextElementFunc.accept(windowDeque, e); - windowDeque.addFirst(e); + // evict elements in sliding window outside the sliding window size while (!windowDeque.isEmpty() && TimeUnit.MILLISECONDS.toSeconds( e.getTimeStamp() - windowDeque.peekLast().getTimeStamp()) diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java index a7b95bada..2f2ea88a5 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java @@ -21,5 +21,6 @@ public class SearchBackPressureClusterRCA extends BaseClusterRca { public >> SearchBackPressureClusterRCA( final int rcaPeriod, final R SearchBackPressureRCA) { super(rcaPeriod, SearchBackPressureRCA); + LOG.info("SearchBackPressureClusterRCA enabeld."); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java index d274e6f52..0457ccad1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java @@ -11,8 +11,10 @@ import java.time.Clock; import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Field; @@ -98,6 +100,28 @@ public class SearchBackPressureRCA extends Rca> // Current time protected Clock clock; + // lambda function to add nextElement to monotonically decreasing sliding window + BiConsumer, SlidingWindowData> minSlidingWindowNextElement = + (windowDeque, nextElement) -> { + while (!windowDeque.isEmpty() + && windowDeque.peekFirst().getValue() >= nextElement.getValue()) { + windowDeque.pollFirst(); + } + + windowDeque.addFirst(nextElement); + }; + + // lambda function to add nextElement to monotonically increasing sliding window + BiConsumer, SlidingWindowData> maxSlidingWindowNextElement = + (windowDeque, nextElement) -> { + while (!windowDeque.isEmpty() + && windowDeque.peekFirst().getValue() < nextElement.getValue()) { + windowDeque.pollFirst(); + } + + windowDeque.addFirst(nextElement); + }; + public SearchBackPressureRCA( final int rcaPeriod, final M heapMax, final M heapUsed, M searchbp_Stats) { super(EVAL_INTERVAL_IN_S); @@ -129,9 +153,11 @@ public SearchBackPressureRCA( // sliding window for heap usage this.minHeapUsageSlidingWindow = - new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, true); + new MinMaxSlidingWindow( + SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, minSlidingWindowNextElement); this.maxHeapUsageSlidingWindow = - new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, false); + new MinMaxSlidingWindow( + SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, maxSlidingWindowNextElement); // sliding window for JVM this.shardJVMCancellationSlidingWindow =