diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java
index aee71ff543..57d20729f5 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java
@@ -15,19 +15,12 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade;
-import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.context.Context;
-import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slots.block.AbstractRule;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
-import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Objects;
/**
*
@@ -52,13 +45,10 @@
*
*
* @author jialiang.linjl
+ * @author Eric Zhao
*/
public class DegradeRule extends AbstractRule {
- @SuppressWarnings("PMD.ThreadPoolCreationRule")
- private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
- Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));
-
public DegradeRule() {}
public DegradeRule(String resourceName) {
@@ -66,33 +56,34 @@ public DegradeRule(String resourceName) {
}
/**
- * RT threshold or exception ratio threshold count.
+ * Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count).
*/
- private double count;
+ private int grade = RuleConstant.DEGRADE_GRADE_RT;
/**
- * Degrade recover timeout (in seconds) when degradation occurs.
+ * Threshold count.
*/
- private int timeWindow;
+ private double count;
/**
- * Degrade strategy (0: average RT, 1: exception ratio, 2: exception count).
+ * Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will
+ * transform to half-open state for trying a few requests.
*/
- private int grade = RuleConstant.DEGRADE_GRADE_RT;
+ private int timeWindow;
/**
- * Minimum number of consecutive slow requests that can trigger RT circuit breaking.
+ * Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
*/
- private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
+ private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
/**
- * Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
- *
- * @since 1.7.0
+ * The threshold of slow request ratio in RT mode.
*/
- private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
+ private double slowRatioThreshold = 1.0d;
+
+ private int statIntervalMs = 1000;
public int getGrade() {
return grade;
@@ -121,21 +112,30 @@ public DegradeRule setTimeWindow(int timeWindow) {
return this;
}
- public int getRtSlowRequestAmount() {
- return rtSlowRequestAmount;
+ public int getMinRequestAmount() {
+ return minRequestAmount;
}
- public DegradeRule setRtSlowRequestAmount(int rtSlowRequestAmount) {
- this.rtSlowRequestAmount = rtSlowRequestAmount;
+ public DegradeRule setMinRequestAmount(int minRequestAmount) {
+ this.minRequestAmount = minRequestAmount;
return this;
}
- public int getMinRequestAmount() {
- return minRequestAmount;
+ public double getSlowRatioThreshold() {
+ return slowRatioThreshold;
}
- public DegradeRule setMinRequestAmount(int minRequestAmount) {
- this.minRequestAmount = minRequestAmount;
+ public DegradeRule setSlowRatioThreshold(double slowRatioThreshold) {
+ this.slowRatioThreshold = slowRatioThreshold;
+ return this;
+ }
+
+ public int getStatIntervalMs() {
+ return statIntervalMs;
+ }
+
+ public DegradeRule setStatIntervalMs(int statIntervalMs) {
+ this.statIntervalMs = statIntervalMs;
return this;
}
@@ -144,23 +144,19 @@ public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (!super.equals(o)) { return false; }
- DegradeRule that = (DegradeRule) o;
- return Double.compare(that.count, count) == 0 &&
- timeWindow == that.timeWindow &&
- grade == that.grade &&
- rtSlowRequestAmount == that.rtSlowRequestAmount &&
- minRequestAmount == that.minRequestAmount;
+ DegradeRule rule = (DegradeRule)o;
+ return Double.compare(rule.count, count) == 0 &&
+ timeWindow == rule.timeWindow &&
+ grade == rule.grade &&
+ minRequestAmount == rule.minRequestAmount &&
+ Double.compare(rule.slowRatioThreshold, slowRatioThreshold) == 0 &&
+ statIntervalMs == rule.statIntervalMs;
}
@Override
public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + new Double(count).hashCode();
- result = 31 * result + timeWindow;
- result = 31 * result + grade;
- result = 31 * result + rtSlowRequestAmount;
- result = 31 * result + minRequestAmount;
- return result;
+ return Objects.hash(super.hashCode(), count, timeWindow, grade, minRequestAmount,
+ slowRatioThreshold, statIntervalMs);
}
@Override
@@ -171,84 +167,15 @@ public String toString() {
", count=" + count +
", limitApp=" + getLimitApp() +
", timeWindow=" + timeWindow +
- ", rtSlowRequestAmount=" + rtSlowRequestAmount +
", minRequestAmount=" + minRequestAmount +
- "}";
+ ", slowRatioThreshold=" + slowRatioThreshold +
+ ", statIntervalMs=" + statIntervalMs +
+ '}';
}
- // Internal implementation (will be deprecated and moved outside).
-
- private AtomicLong passCount = new AtomicLong(0);
- private final AtomicBoolean cut = new AtomicBoolean(false);
-
@Override
- public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
- if (cut.get()) {
- return false;
- }
-
- ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
- if (clusterNode == null) {
- return true;
- }
-
- if (grade == RuleConstant.DEGRADE_GRADE_RT) {
- double rt = clusterNode.avgRt();
- if (rt < this.count) {
- passCount.set(0);
- return true;
- }
-
- // Sentinel will degrade the service only if count exceeds.
- if (passCount.incrementAndGet() < rtSlowRequestAmount) {
- return true;
- }
- } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
- double exception = clusterNode.exceptionQps();
- double success = clusterNode.successQps();
- double total = clusterNode.totalQps();
- // If total amount is less than minRequestAmount, the request will pass.
- if (total < minRequestAmount) {
- return true;
- }
-
- // In the same aligned statistic time window,
- // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
- double realSuccess = success - exception;
- if (realSuccess <= 0 && exception < minRequestAmount) {
- return true;
- }
-
- if (exception / success < count) {
- return true;
- }
- } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
- double exception = clusterNode.totalException();
- if (exception < count) {
- return true;
- }
- }
-
- if (cut.compareAndSet(false, true)) {
- ResetTask resetTask = new ResetTask(this);
- pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
- }
-
+ @Deprecated
+ public boolean passCheck(Context context, DefaultNode node, int count, Object... args) {
return false;
}
-
- private static final class ResetTask implements Runnable {
-
- private DegradeRule rule;
-
- ResetTask(DegradeRule rule) {
- this.rule = rule;
- }
-
- @Override
- public void run() {
- rule.passCount.set(0);
- rule.cut.set(false);
- }
- }
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java
index 6cb09f8e97..d7d6b6c9f9 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java
@@ -21,29 +21,29 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import com.alibaba.csp.sentinel.config.SentinelConfig;
-import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.log.RecordLog;
-import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
-import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
-import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
+import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
+import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker;
+import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.StringUtil;
/**
+ * The rule manager for circuit breaking rules ({@link DegradeRule}).
+ *
* @author youji.zj
* @author jialiang.linjl
* @author Eric Zhao
*/
public final class DegradeRuleManager {
- private static final Map> degradeRules = new ConcurrentHashMap<>();
+ private static volatile Map> circuitBreakers = new HashMap<>();
+ private static volatile Map> ruleMap = new HashMap<>();
private static final RulePropertyListener LISTENER = new RulePropertyListener();
private static SentinelProperty> currentProperty
@@ -69,41 +69,37 @@ public static void register2Property(SentinelProperty> propert
}
}
- public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
- throws BlockException {
-
- Set rules = degradeRules.get(resource.getName());
- if (rules == null) {
- return;
- }
-
- for (DegradeRule rule : rules) {
- if (!rule.passCheck(context, node, count)) {
- throw new DegradeException(rule.getLimitApp(), rule);
- }
- }
+ static List getCircuitBreakers(String resourceName) {
+ return circuitBreakers.get(resourceName);
}
public static boolean hasConfig(String resource) {
if (resource == null) {
return false;
}
- return degradeRules.containsKey(resource);
+ return circuitBreakers.containsKey(resource);
}
/**
- * Get a copy of the rules.
+ * Get existing circuit breaking rules.
+ * Note: DO NOT modify the rules from the returned list directly.
+ * The behavior is undefined.
*
- * @return a new copy of the rules.
+ * @return list of existing circuit breaking rules, or empty list if no rules were loaded
*/
public static List getRules() {
List rules = new ArrayList<>();
- for (Map.Entry> entry : degradeRules.entrySet()) {
+ for (Map.Entry> entry : ruleMap.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}
+ public static Set getRulesOfResource(String resource) {
+ AssertUtil.assertNotBlank(resource, "resource name cannot be blank");
+ return ruleMap.get(resource);
+ }
+
/**
* Load {@link DegradeRule}s, former rules will be replaced.
*
@@ -113,7 +109,7 @@ public static void loadRules(List rules) {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
- RecordLog.warn("[DegradeRuleManager] Unexpected error when loading degrade rules", e);
+ RecordLog.error("[DegradeRuleManager] Unexpected error when loading degrade rules", e);
}
}
@@ -128,7 +124,7 @@ public static void loadRules(List rules) {
public static boolean setRulesForResource(String resourceName, Set rules) {
AssertUtil.notEmpty(resourceName, "resourceName cannot be empty");
try {
- Map> newRuleMap = new HashMap<>(degradeRules);
+ Map> newRuleMap = new HashMap<>(ruleMap);
if (rules == null) {
newRuleMap.remove(resourceName);
} else {
@@ -146,88 +142,127 @@ public static boolean setRulesForResource(String resourceName, Set
}
return currentProperty.updateValue(allRules);
} catch (Throwable e) {
- RecordLog.warn(
- "[DegradeRuleManager] Unexpected error when setting degrade rules for resource: " + resourceName, e);
+ RecordLog.error("[DegradeRuleManager] Unexpected error when setting circuit breaking"
+ + " rules for resource: " + resourceName, e);
+ return false;
+ }
+ }
+
+ private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
+ List cbs = getCircuitBreakers(rule.getResource());
+ if (cbs == null || cbs.isEmpty()) {
+ return newCircuitBreakerFrom(rule);
+ }
+ for (CircuitBreaker cb : cbs) {
+ if (rule.equals(cb.getRule())) {
+ // Reuse the circuit breaker if the rule remains unchanged.
+ return cb;
+ }
+ }
+ return newCircuitBreakerFrom(rule);
+ }
+
+ /**
+ * Create a circuit breaker instance from provided circuit breaking rule.
+ *
+ * @param rule a valid circuit breaking rule
+ * @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type
+ */
+ private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
+ switch (rule.getGrade()) {
+ case RuleConstant.DEGRADE_GRADE_RT:
+ return new ResponseTimeCircuitBreaker(rule);
+ case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
+ case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
+ return new ExceptionCircuitBreaker(rule);
+ default:
+ return null;
+ }
+ }
+
+ public static boolean isValidRule(DegradeRule rule) {
+ boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
+ && rule.getCount() >= 0 && rule.getTimeWindow() > 0;
+ if (!baseValid) {
+ return false;
+ }
+ if (rule.getMinRequestAmount() <= 0 || rule.getStatIntervalMs() <= 0) {
return false;
}
+ switch (rule.getGrade()) {
+ case RuleConstant.DEGRADE_GRADE_RT:
+ return rule.getSlowRatioThreshold() >= 0 && rule.getSlowRatioThreshold() <= 1;
+ case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
+ return rule.getCount() <= 1;
+ case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
+ return true;
+ default:
+ return false;
+ }
}
private static class RulePropertyListener implements PropertyListener> {
+ private synchronized void reloadFrom(List list) {
+ Map> cbs = buildCircuitBreakers(list);
+ Map> rm = new HashMap<>(cbs.size());
+
+ for (Map.Entry> e : cbs.entrySet()) {
+ assert e.getValue() != null && !e.getValue().isEmpty();
+
+ Set rules = new HashSet<>(e.getValue().size());
+ for (CircuitBreaker cb : e.getValue()) {
+ rules.add(cb.getRule());
+ }
+ rm.put(e.getKey(), rules);
+ }
+
+ DegradeRuleManager.circuitBreakers = cbs;
+ DegradeRuleManager.ruleMap = rm;
+ }
+
@Override
public void configUpdate(List conf) {
- Map> rules = loadDegradeConf(conf);
- if (rules != null) {
- degradeRules.clear();
- degradeRules.putAll(rules);
- }
- RecordLog.info("[DegradeRuleManager] Degrade rules received: " + degradeRules);
+ reloadFrom(conf);
+ RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: " + ruleMap);
}
@Override
public void configLoad(List conf) {
- Map> rules = loadDegradeConf(conf);
- if (rules != null) {
- degradeRules.clear();
- degradeRules.putAll(rules);
- }
- RecordLog.info("[DegradeRuleManager] Degrade rules loaded: " + degradeRules);
+ reloadFrom(conf);
+ RecordLog.info("[DegradeRuleManager] Degrade rules loaded: " + ruleMap);
}
- private Map> loadDegradeConf(List list) {
- Map> newRuleMap = new ConcurrentHashMap<>();
-
+ private Map> buildCircuitBreakers(List list) {
+ Map> cbMap = new HashMap<>(8);
if (list == null || list.isEmpty()) {
- return newRuleMap;
+ return cbMap;
}
-
for (DegradeRule rule : list) {
if (!isValidRule(rule)) {
- RecordLog.warn(
- "[DegradeRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
+ RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
-
- String identity = rule.getResource();
- Set ruleSet = newRuleMap.get(identity);
- if (ruleSet == null) {
- ruleSet = new HashSet<>();
- newRuleMap.put(identity, ruleSet);
+ CircuitBreaker cb = getExistingSameCbOrNew(rule);
+ if (cb == null) {
+ RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: " + rule);
+ continue;
}
- ruleSet.add(rule);
- }
- return newRuleMap;
- }
- }
+ String resourceName = rule.getResource();
- public static boolean isValidRule(DegradeRule rule) {
- boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
- && rule.getCount() >= 0 && rule.getTimeWindow() > 0;
- if (!baseValid) {
- return false;
- }
- int maxAllowedRt = SentinelConfig.statisticMaxRt();
- if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT) {
- if (rule.getRtSlowRequestAmount() <= 0) {
- return false;
- }
- // Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
- if (rule.getCount() > maxAllowedRt) {
- RecordLog.warn(String.format("[DegradeRuleManager] WARN: setting large RT threshold (%.1f ms)"
- + " in RT mode will not take effect since it exceeds the max allowed value (%d ms)",
- rule.getCount(), maxAllowedRt));
+ List cbList = cbMap.get(resourceName);
+ if (cbList == null) {
+ cbList = new ArrayList<>();
+ cbMap.put(resourceName, cbList);
+ }
+ cbList.add(cb);
}
+ return cbMap;
}
-
- // Check exception ratio mode.
- if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
- return rule.getCount() <= 1 && rule.getMinRequestAmount() > 0;
- }
- return true;
}
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java
index 8e8dd19109..9bc8c793aa 100755
--- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java
@@ -15,30 +15,73 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade;
+import java.util.List;
+
+import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
+import com.alibaba.csp.sentinel.slots.block.BlockException;
+import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.SpiOrder;
+import com.alibaba.csp.sentinel.util.TimeUtil;
/**
- * A {@link ProcessorSlot} dedicates to {@link DegradeRule} checking.
+ * A {@link ProcessorSlot} dedicates to circuit breaking.
*
- * @author leyou
+ * @author Carpenter Lee
+ * @author Eric Zhao
*/
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot {
@Override
- public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
- throws Throwable {
- DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
+ public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
+ boolean prioritized, Object... args) throws Throwable {
+ performChecking(resourceWrapper);
+
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
+ void performChecking(ResourceWrapper r) throws BlockException {
+ List circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
+ if (circuitBreakers == null || circuitBreakers.isEmpty()) {
+ return;
+ }
+ for (CircuitBreaker cb : circuitBreakers) {
+ if (!cb.tryPass()) {
+ throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
+ }
+ }
+ }
+
@Override
- public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
- fireExit(context, resourceWrapper, count, args);
+ public void exit(Context context, ResourceWrapper r, int count, Object... args) {
+ Entry curEntry = context.getCurEntry();
+ if (curEntry.getBlockError() != null) {
+ fireExit(context, r, count, args);
+ return;
+ }
+ List circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
+ if (circuitBreakers == null || circuitBreakers.isEmpty()) {
+ fireExit(context, r, count, args);
+ return;
+ }
+
+ if (curEntry.getBlockError() == null) {
+ long completeTime = curEntry.getCompleteTimestamp();
+ if (completeTime <= 0) {
+ completeTime = TimeUtil.currentTimeMillis();
+ }
+ long rt = completeTime - curEntry.getCreateTimestamp();
+ Throwable error = curEntry.getError();
+ for (CircuitBreaker circuitBreaker : circuitBreakers) {
+ circuitBreaker.onRequestComplete(rt, error);
+ }
+ }
+
+ fireExit(context, r, count, args);
}
}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java
new file mode 100644
index 0000000000..2d3c2370af
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
+import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+import com.alibaba.csp.sentinel.util.TimeUtil;
+
+/**
+ * @author Eric Zhao
+ * @since 1.8.0
+ */
+public abstract class AbstractCircuitBreaker implements CircuitBreaker {
+
+ protected final DegradeRule rule;
+ protected final int recoveryTimeoutMs;
+
+ private final EventObserverRegistry observerRegistry;
+
+ protected final AtomicReference currentState = new AtomicReference<>(State.CLOSED);
+ protected volatile long nextRetryTimestamp;
+
+ public AbstractCircuitBreaker(DegradeRule rule) {
+ this(rule, EventObserverRegistry.getInstance());
+ }
+
+ AbstractCircuitBreaker(DegradeRule rule, EventObserverRegistry observerRegistry) {
+ AssertUtil.notNull(observerRegistry, "observerRegistry cannot be null");
+ if (!DegradeRuleManager.isValidRule(rule)) {
+ throw new IllegalArgumentException("Invalid DegradeRule: " + rule);
+ }
+ this.observerRegistry = observerRegistry;
+ this.rule = rule;
+ this.recoveryTimeoutMs = rule.getTimeWindow() * 1000;
+ }
+
+ @Override
+ public DegradeRule getRule() {
+ return rule;
+ }
+
+ @Override
+ public State currentState() {
+ return currentState.get();
+ }
+
+ @Override
+ public boolean tryPass() {
+ // Template implementation.
+ if (currentState.get() == State.CLOSED) {
+ return true;
+ }
+ if (currentState.get() == State.OPEN) {
+ // For half-open state we allow a request for trial.
+ return retryTimeoutArrived() && fromOpenToHalfOpen();
+ }
+ return false;
+ }
+
+ /**
+ * Reset the statistic data.
+ */
+ abstract void resetStat();
+
+ protected boolean retryTimeoutArrived() {
+ return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
+ }
+
+ protected void updateNextRetryTimestamp() {
+ this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
+ }
+
+ protected boolean fromCloseToOpen(double snapshotValue) {
+ State prev = State.CLOSED;
+ if (currentState.compareAndSet(prev, State.OPEN)) {
+ updateNextRetryTimestamp();
+
+ for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
+ observer.onStateChange(prev, State.OPEN, rule, snapshotValue);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean fromOpenToHalfOpen() {
+ if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
+ for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
+ observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean fromHalfOpenToOpen(double snapshotValue) {
+ if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
+ updateNextRetryTimestamp();
+ for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
+ observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean fromHalfOpenToClose() {
+ if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
+ resetStat();
+ for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
+ observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected void transformToOpen(double triggerValue) {
+ State cs = currentState.get();
+ switch (cs) {
+ case CLOSED:
+ fromCloseToOpen(triggerValue);
+ break;
+ case HALF_OPEN:
+ fromHalfOpenToOpen(triggerValue);
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java
new file mode 100644
index 0000000000..3e5faf43fc
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
+
+/**
+ * Basic circuit breaker interface.
+ *
+ * @author Eric Zhao
+ */
+public interface CircuitBreaker {
+
+ /**
+ * Get the associated circuit breaking rule.
+ *
+ * @return associated circuit breaking rule
+ */
+ DegradeRule getRule();
+
+ /**
+ * Acquires permission of an invocation only if it is available at the time of invocation.
+ *
+ * @return {@code true} if permission was acquired and {@code false} otherwise
+ */
+ boolean tryPass();
+
+ /**
+ * Get current state of the circuit breaker.
+ *
+ * @return current state of the circuit breaker
+ */
+ State currentState();
+
+ /**
+ * Record a completed request with the given response time and error (if present) and
+ * handle state transformation of the circuit breaker.
+ *
+ * @param rt the response time of this entry
+ * @param error the error of this entry (if present)
+ */
+ void onRequestComplete(long rt, Throwable error);
+
+ /**
+ * Circuit breaker state.
+ */
+ enum State {
+ /**
+ * In {@code OPEN} state, all requests will be rejected until the next recovery time point.
+ */
+ OPEN,
+ /**
+ * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
+ * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker
+ * will re-transform to the {@code OPEN} state and wait for the next recovery time point;
+ * otherwise the resource will be regarded as "recovered" and the circuit breaker
+ * will cease cutting off requests and transform to {@code CLOSED} state.
+ */
+ HALF_OPEN,
+ /**
+ * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
+ * the circuit breaker will transform to {@code OPEN} state.
+ */
+ CLOSED
+ }
+}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java
new file mode 100644
index 0000000000..e7e5b2dc76
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
+
+/**
+ * @author Eric Zhao
+ * @since 1.8.0
+ */
+public interface CircuitBreakerStateChangeObserver {
+
+ /**
+ * Observer method triggered when circuit breaker state changed. The transformation could be:
+ *
+ * - From {@code CLOSED} to {@code OPEN} (with the triggered metric)
+ * - From {@code OPEN} to {@code HALF_OPEN}
+ * - From {@code OPEN} to {@code CLOSED}
+ * - From {@code HALF_OPEN} to {@code OPEN} (with the triggered metric)
+ *
+ *
+ * @param prevState previous state of the circuit breaker
+ * @param newState new state of the circuit breaker
+ * @param rule associated rule
+ * @param snapshotValue triggered value on circuit breaker opens (null if the new state is CLOSED or HALF_OPEN)
+ */
+ void onStateChange(CircuitBreaker.State prevState, CircuitBreaker.State newState, DegradeRule rule,
+ Double snapshotValue);
+}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java
new file mode 100644
index 0000000000..79d3b12a32
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+/**
+ * @author Eric Zhao
+ * @since 1.8.0
+ */
+public enum CircuitBreakerStrategy {
+
+ /**
+ * Circuit breaker opens (cuts off) when slow request ratio exceeds the threshold.
+ */
+ SLOW_REQUEST_RATIO(0),
+ /**
+ * Circuit breaker opens (cuts off) when error ratio exceeds the threshold.
+ */
+ ERROR_RATIO(1),
+ /**
+ * Circuit breaker opens (cuts off) when error count exceeds the threshold.
+ */
+ ERROR_COUNT(2);
+
+ private int type;
+
+ CircuitBreakerStrategy(int type) {
+ this.type = type;
+ }
+
+ public int getType() {
+ return type;
+ }
+}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java
new file mode 100644
index 0000000000..2243eebaa5
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 1999-2020 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+/**
+ * Registry for circuit breaker event observers.
+ *
+ * @author Eric Zhao
+ * @since 1.8.0
+ */
+public class EventObserverRegistry {
+
+ private final Map stateChangeObserverMap = new HashMap<>();
+
+ /**
+ * Register a circuit breaker state change observer.
+ *
+ * @param name observer name
+ * @param observer a valid observer
+ */
+ public void addStateChangeObserver(String name, CircuitBreakerStateChangeObserver observer) {
+ AssertUtil.notNull(name, "name cannot be null");
+ AssertUtil.notNull(observer, "observer cannot be null");
+ stateChangeObserverMap.put(name, observer);
+ }
+
+ public boolean removeStateChangeObserver(String name) {
+ AssertUtil.notNull(name, "name cannot be null");
+ return stateChangeObserverMap.remove(name) != null;
+ }
+
+ /**
+ * Get all registered state chane observers.
+ *
+ * @return all registered state chane observers
+ */
+ public List getStateChangeObservers() {
+ return new ArrayList<>(stateChangeObserverMap.values());
+ }
+
+ public static EventObserverRegistry getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ private static class InstanceHolder {
+ private static EventObserverRegistry instance = new EventObserverRegistry();
+ }
+
+ EventObserverRegistry() {}
+}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java
new file mode 100644
index 0000000000..d57a037b1d
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+import java.util.List;
+
+import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
+import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
+import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
+import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+import static com.alibaba.csp.sentinel.slots.block.RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO;
+import static com.alibaba.csp.sentinel.slots.block.RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT;
+
+/**
+ * @author Eric Zhao
+ * @since 1.8.0
+ */
+public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
+
+ private final int strategy;
+ private final int minRequestAmount;
+ private final double threshold;
+
+ private final LeapArray stat;
+
+ public ExceptionCircuitBreaker(DegradeRule rule) {
+ this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
+ }
+
+ ExceptionCircuitBreaker(DegradeRule rule, LeapArray stat) {
+ super(rule);
+ this.strategy = rule.getGrade();
+ boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
+ AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
+ AssertUtil.notNull(stat, "stat cannot be null");
+ this.minRequestAmount = rule.getMinRequestAmount();
+ this.threshold = rule.getCount();
+ this.stat = stat;
+ }
+
+ @Override
+ protected void resetStat() {
+ // Reset current bucket (bucket count = 1).
+ stat.currentWindow().value().reset();
+ }
+
+ @Override
+ public void onRequestComplete(long rt, Throwable error) {
+ SimpleErrorCounter counter = stat.currentWindow().value();
+ if (error != null) {
+ counter.getErrorCount().add(1);
+ }
+ counter.getTotalCount().add(1);
+
+ handleStateChangeWhenThresholdExceeded(error);
+ }
+
+ private void handleStateChangeWhenThresholdExceeded(Throwable error) {
+ if (currentState.get() == State.OPEN) {
+ return;
+ }
+ if (currentState.get() == State.HALF_OPEN) {
+ if (error == null) {
+ fromHalfOpenToClose();
+ } else {
+ fromHalfOpenToOpen(1.0d);
+ }
+ return;
+ }
+ List counters = stat.values();
+ long errCount = 0;
+ long totalCount = 0;
+ for (SimpleErrorCounter counter : counters) {
+ errCount += counter.errorCount.sum();
+ totalCount += counter.totalCount.sum();
+ }
+ if (totalCount < minRequestAmount) {
+ return;
+ }
+ double curCount = errCount;
+ if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
+ // Use errorRatio
+ curCount = errCount * 1.0d / totalCount;
+ }
+ if (curCount > threshold) {
+ transformToOpen(curCount);
+ }
+ }
+
+ static class SimpleErrorCounter {
+ private LongAdder errorCount;
+ private LongAdder totalCount;
+
+ public SimpleErrorCounter() {
+ this.errorCount = new LongAdder();
+ this.totalCount = new LongAdder();
+ }
+
+ public LongAdder getErrorCount() {
+ return errorCount;
+ }
+
+ public LongAdder getTotalCount() {
+ return totalCount;
+ }
+
+ public SimpleErrorCounter reset() {
+ errorCount.reset();
+ totalCount.reset();
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleErrorCounter{" +
+ "errorCount=" + errorCount +
+ ", totalCount=" + totalCount +
+ '}';
+ }
+ }
+
+ static class SimpleErrorCounterLeapArray extends LeapArray {
+
+ public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
+ super(sampleCount, intervalInMs);
+ }
+
+ @Override
+ public SimpleErrorCounter newEmptyBucket(long timeMillis) {
+ return new SimpleErrorCounter();
+ }
+
+ @Override
+ protected WindowWrap resetWindowTo(WindowWrap w, long startTime) {
+ // Update the start time and reset value.
+ w.resetTo(startTime);
+ w.value().reset();
+ return w;
+ }
+ }
+}
diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java
new file mode 100644
index 0000000000..252aaa438f
--- /dev/null
+++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 1999-2019 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;
+
+import java.util.List;
+
+import com.alibaba.csp.sentinel.slots.block.RuleConstant;
+import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
+import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
+import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
+import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
+import com.alibaba.csp.sentinel.util.AssertUtil;
+
+/**
+ * @author Eric Zhao
+ * @since 1.8.0
+ */
+public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
+
+ private final long maxAllowedRt;
+ private final double maxSlowRequestRatio;
+ private final int minRequestAmount;
+
+ private final LeapArray slidingCounter;
+
+ public ResponseTimeCircuitBreaker(DegradeRule rule) {
+ this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
+ }
+
+ ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray stat) {
+ super(rule);
+ AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");
+ AssertUtil.notNull(stat, "stat cannot be null");
+ this.maxAllowedRt = Math.round(rule.getCount());
+ this.maxSlowRequestRatio = rule.getSlowRatioThreshold();
+ this.minRequestAmount = rule.getMinRequestAmount();
+ this.slidingCounter = stat;
+ }
+
+ @Override
+ public void resetStat() {
+ // Reset current bucket (bucket count = 1).
+ slidingCounter.currentWindow().value().reset();
+ }
+
+ @Override
+ public void onRequestComplete(long rt, Throwable error) {
+ SlowRequestCounter counter = slidingCounter.currentWindow().value();
+ if (rt > maxAllowedRt) {
+ counter.slowCount.add(1);
+ }
+ counter.totalCount.add(1);
+
+ handleStateChangeWhenThresholdExceeded(rt);
+ }
+
+ private void handleStateChangeWhenThresholdExceeded(long rt) {
+ if (currentState.get() == State.OPEN) {
+ return;
+ }
+ if (currentState.get() == State.HALF_OPEN) {
+ // TODO: improve logic for half-open recovery
+ if (rt > maxAllowedRt) {
+ fromHalfOpenToOpen(1.0d);
+ } else {
+ fromHalfOpenToClose();
+ }
+ return;
+ }
+
+ List counters = slidingCounter.values();
+ long slowCount = 0;
+ long totalCount = 0;
+ for (SlowRequestCounter counter : counters) {
+ slowCount += counter.slowCount.sum();
+ totalCount += counter.totalCount.sum();
+ }
+ if (totalCount < minRequestAmount) {
+ return;
+ }
+ double currentRatio = slowCount * 1.0d / totalCount;
+ if (currentRatio > maxSlowRequestRatio) {
+ transformToOpen(currentRatio);
+ }
+ }
+
+ static class SlowRequestCounter {
+ private LongAdder slowCount;
+ private LongAdder totalCount;
+
+ public SlowRequestCounter() {
+ this.slowCount = new LongAdder();
+ this.totalCount = new LongAdder();
+ }
+
+ public LongAdder getSlowCount() {
+ return slowCount;
+ }
+
+ public LongAdder getTotalCount() {
+ return totalCount;
+ }
+
+ public SlowRequestCounter reset() {
+ slowCount.reset();
+ totalCount.reset();
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "SlowRequestCounter{" +
+ "slowCount=" + slowCount +
+ ", totalCount=" + totalCount +
+ '}';
+ }
+ }
+
+ static class SlowRequestLeapArray extends LeapArray {
+
+ public SlowRequestLeapArray(int sampleCount, int intervalInMs) {
+ super(sampleCount, intervalInMs);
+ }
+
+ @Override
+ public SlowRequestCounter newEmptyBucket(long timeMillis) {
+ return new SlowRequestCounter();
+ }
+
+ @Override
+ protected WindowWrap resetWindowTo(WindowWrap w, long startTime) {
+ w.resetTo(startTime);
+ w.value().reset();
+ return w;
+ }
+ }
+}