Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the flow of request to fix #1638 for circuit breakers #1645

Merged
merged 4 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
*/
package com.alibaba.csp.sentinel;

import java.util.Iterator;
import java.util.LinkedList;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.context.NullContext;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

/**
* Linked entry within current context.
Expand All @@ -35,6 +40,8 @@ class CtEntry extends Entry {

protected ProcessorSlot<Object> chain;
protected Context context;
protected LinkedList<BiConsumer<Context, Entry>> exitHandlers;


CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
Expand Down Expand Up @@ -102,10 +109,32 @@ protected void exitForContext(Context context, int count, Object... args) throws
protected void clearEntryContext() {
this.context = null;
}

@Override
public void whenComplete(BiConsumer<Context, Entry> consumer) {
if (this.exitHandlers == null) {
this.exitHandlers = new LinkedList<>();
}
this.exitHandlers.add(consumer);
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);

if (this.exitHandlers != null) {
Iterator<BiConsumer<Context, Entry>> it = this.exitHandlers.iterator();
BiConsumer<Context, Entry> cur;
while (it.hasNext()) {
cur = it.next();
try {
cur.accept(this.context, this);
} catch (Exception e) {
RecordLog.warn("Error invoking exit handler", e);
}
}
this.exitHandlers = null;
}

return parent;
}
Expand Down
10 changes: 10 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.util.function.BiConsumer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
Expand Down Expand Up @@ -178,4 +179,13 @@ public void setOriginNode(Node originNode) {
this.originNode = originNode;
}

/**
* Like `CompletableFuture` since JDK8 it guarantees specified consumer
* is invoked when this entry exited.
* Use it when you did some STATEFUL operations on entries.
*
* @param consumer
*/
public abstract void whenComplete(BiConsumer<Context, Entry> consumer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.SpiOrder;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* A {@link ProcessorSlot} dedicates to circuit breaking.
Expand All @@ -40,18 +39,18 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(resourceWrapper);
performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(ResourceWrapper r) throws BlockException {
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass()) {
if (!cb.tryPass(context, r)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
Expand All @@ -71,14 +70,9 @@ public void exit(Context context, ResourceWrapper r, int count, Object... args)
}

if (curEntry.getBlockError() == null) {
long completeTime = curEntry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - curEntry.getCreateTimestamp();
Throwable error = curEntry.getError();
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(rt, error);
circuitBreaker.afterRequestPassed(context, r);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

/**
* @author Eric Zhao
Expand Down Expand Up @@ -61,14 +65,14 @@ public State currentState() {
}

@Override
public boolean tryPass() {
public boolean tryPass(Context context, ResourceWrapper r) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for trial.
return retryTimeoutArrived() && fromOpenToHalfOpen();
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
Expand Down Expand Up @@ -99,11 +103,24 @@ protected boolean fromCloseToOpen(double snapshotValue) {
return false;
}

protected boolean fromOpenToHalfOpen() {
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null);
}
Entry entry = context.getCurEntry();
entry.whenComplete(new BiConsumer<Context, Entry>() {

@Override
public void accept(Context context, Entry entry) {
if (entry.getBlockError() != null) {
// blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
return;
}

}
});
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;

/**
Expand All @@ -32,11 +34,13 @@ public interface CircuitBreaker {
DegradeRule getRule();

/**
* Acquires permission of an invocation only if it is available at the time of invocation.
* Acquires permission of an invocation only if it is available at the time of invoking.
*
* @param context
* @param r
* @return {@code true} if permission was acquired and {@code false} otherwise
*/
boolean tryPass();
boolean tryPass(Context context, ResourceWrapper r);

/**
* Get current state of the circuit breaker.
Expand All @@ -46,13 +50,12 @@ public interface CircuitBreaker {
State currentState();

/**
* Record a completed request with the given response time and error (if present) and
* handle state transformation of the circuit breaker.
* Called when a passed invocation finished.
*
* @param rt the response time of this entry
* @param error the error of this entry (if present)
* @param context context of current invocation
* @param wrapper current resource
*/
void onRequestComplete(long rt, Throwable error);
void afterRequestPassed(Context context, ResourceWrapper wrapper);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually IMHO "complete" might be more concrete. In Sentinel, "pass" means allowed by Sentinel (not started but pending), and "complete" means an invocation has completed (the blocked requests never started, so never completed).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the ResourceWrapper seems unused? (it could be taken from Entry in Context)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending

Yeah naming issues often have more than one answer mostly. I tried to make it clear for requests passed by sentinel(passed) and it will be invoked as it completed(after). Surely we can have other picks. Is onRequestComplete implicit whether sentinel guarantees it would be called no matter whether it was blocked?

Talk to the parameters in definition it just follows similar designs in related logics and try to make it more stable in future iterations. ResourceWrapper will contains the resource information which it's useful especially in implementations in future. My opinion is more like to keep it as stability of interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah naming issues often have more than one answer mostly. I tried to make it clear for requests passed by sentinel(passed) and it will be invoked as it completed(after). Surely we can have other picks. Is onRequestComplete implicit whether sentinel guarantees it would be called no matter whether it was blocked?

Originally the semantics of onComplete means the invocation has completed, so it MUST have passed Sentinel's checking. The rejected invocations never passed, so it would never complete.

Talk to the parameters in definition it just follows similar designs in related logics and try to make it more stable in future iterations. ResourceWrapper will contains the resource information which it's useful especially in implementations in future. My opinion is more like to keep it as stability of interface.

Okay for me. Actually I'd like the circuit breaker to be a more generic design (e.g. the context could be generic, and metadata could be retrieved from it).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay for me. Actually I'd like the circuit breaker to be a more generic design (e.g. the context could be generic, and metadata could be retrieved from it).

Naming of callback has been reverted and it has more comments on it.
Talk to the generic context the idea makes sense but it also may introduce more memory footprints. Maybe it can be decided in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we may discuss it later in a new issue.


/**
* Circuit breaker state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
Expand Down Expand Up @@ -60,7 +63,12 @@ protected void resetStat() {
}

@Override
public void onRequestComplete(long rt, Throwable error) {
public void afterRequestPassed(Context context, ResourceWrapper r) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
Expand All @@ -74,14 +82,17 @@ private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}

List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author Eric Zhao
Expand Down Expand Up @@ -57,8 +61,17 @@ public void resetStat() {
}

@Override
public void onRequestComplete(long rt, Throwable error) {
public void afterRequestPassed(Context context, ResourceWrapper wrapper) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
Expand All @@ -71,7 +84,9 @@ private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.util.function;

/**
* BiConsumer interface from JDK 8.
*/
public interface BiConsumer<T, U> {

void accept(T t, U u);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.util.function.BiConsumer;

import org.junit.Test;

Expand Down Expand Up @@ -64,5 +66,10 @@ protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeExcepti
public Node getLastNode() {
return null;
}

@Override
public void whenComplete(BiConsumer<Context, Entry> consumer) {
// do nothing
}
}
}
}
Loading