Skip to content

Commit

Permalink
circuit breaker with half open state (#2634)
Browse files Browse the repository at this point in the history
* circuit breaker with half open state

* add switch for half open state

* add half open ut

* add some description

* record all lats of the half open window

* fix some typo

---------

Co-authored-by: jiangyuting <jiangyutingwangyi@163.com>
  • Loading branch information
jiangyt-git and jiangyuting authored Jun 3, 2024
1 parent bdc141c commit 976c588
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 3 deletions.
32 changes: 29 additions & 3 deletions src/brpc/circuit_breaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gflags/gflags.h>

#include "brpc/errno.pb.h"
#include "brpc/reloadable_flags.h"
#include "butil/time.h"

namespace brpc {
Expand All @@ -45,6 +46,12 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000,
"Maximum isolation duration in milliseconds");
DEFINE_double(circuit_breaker_epsilon_value, 0.02,
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)");
DEFINE_int32(circuit_breaker_half_open_window_size, 0,
"The limited number of requests allowed to pass through by the half-open "
"window. Only if all of them are successful, the circuit breaker will "
"go to the closed state. Otherwise, it goes back to the open state. "
"Values == 0 disables this feature");
BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger);

namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA.
Expand Down Expand Up @@ -132,7 +139,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
if (ema_latency != 0) {
error_cost = std::min(ema_latency * max_mutiple, error_cost);
}
//Errorous response
// Errorous response
if (error_cost != 0) {
int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
Expand All @@ -142,7 +149,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
return ema_error_cost <= max_error_cost;
}

//Ordinary response
// Ordinary response
int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
do {
if (ema_error_cost == 0) {
Expand Down Expand Up @@ -171,7 +178,9 @@ CircuitBreaker::CircuitBreaker()
, _last_reset_time_ms(0)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolated_times(0)
, _broken(false) {
, _broken(false)
, _half_open(false)
, _half_open_success_count(0) {
}

bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
Expand All @@ -188,6 +197,19 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
if (_broken.load(butil::memory_order_relaxed)) {
return false;
}
if (FLAGS_circuit_breaker_half_open_window_size > 0
&& _half_open.load(butil::memory_order_relaxed)) {
if (error_code != 0) {
MarkAsBroken();
return false;
}
if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed)
+ 1 == FLAGS_circuit_breaker_half_open_window_size) {
_half_open.store(false, butil::memory_order_relaxed);
_half_open_success_count.store(0, butil::memory_order_relaxed);
}
}

if (_long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency)) {
return true;
Expand All @@ -201,6 +223,10 @@ void CircuitBreaker::Reset() {
_short_window.Reset();
_last_reset_time_ms = butil::cpuwide_time_ms();
_broken.store(false, butil::memory_order_release);
if (FLAGS_circuit_breaker_half_open_window_size > 0) {
_half_open.store(true, butil::memory_order_relaxed);
_half_open_success_count.store(0, butil::memory_order_relaxed);
}
}

void CircuitBreaker::MarkAsBroken() {
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/circuit_breaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class CircuitBreaker {
butil::atomic<int> _isolation_duration_ms;
butil::atomic<int> _isolated_times;
butil::atomic<bool> _broken;
butil::atomic<bool> _half_open;
butil::atomic<int32_t> _half_open_success_count;
};

} // namespace brpc
Expand Down
57 changes: 57 additions & 0 deletions test/brpc_circuit_breaker_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const int kErrorCodeForSucc = 0;
const int kErrorCost = 1000;
const int kLatency = 1000;
const int kThreadNum = 3;
const int kHalfWindowSize = 0;
} // namespace

namespace brpc {
Expand All @@ -54,6 +55,7 @@ DECLARE_int32(circuit_breaker_short_window_error_percent);
DECLARE_int32(circuit_breaker_long_window_error_percent);
DECLARE_int32(circuit_breaker_min_isolation_duration_ms);
DECLARE_int32(circuit_breaker_max_isolation_duration_ms);
DECLARE_int32(circuit_breaker_half_open_window_size);
} // namespace brpc

int main(int argc, char* argv[]) {
Expand All @@ -63,6 +65,7 @@ int main(int argc, char* argv[]) {
brpc::FLAGS_circuit_breaker_long_window_error_percent = kLongWindowErrorPercent;
brpc::FLAGS_circuit_breaker_min_isolation_duration_ms = kMinIsolationDurationMs;
brpc::FLAGS_circuit_breaker_max_isolation_duration_ms = kMaxIsolationDurationMs;
brpc::FLAGS_circuit_breaker_half_open_window_size = kHalfWindowSize;
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
Expand Down Expand Up @@ -160,6 +163,60 @@ TEST_F(CircuitBreakerTest, should_isolate) {
}
}

TEST_F(CircuitBreakerTest, should_isolate_with_half_open) {
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
StartFeedbackThread(&thread_list, &fc_list, 100);
int total_failed = 0;
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_GT(fc->_unhealthy_cnt, 0);
EXPECT_FALSE(fc->_healthy);
total_failed += fc->_unhealthy_cnt;
}
_circuit_breaker.Reset();

int total_failed1 = 0;
StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
total_failed1 += fc->_unhealthy_cnt;
}

// Enable the half-open state.
// The first request cause _broken = true immediately.
brpc::FLAGS_circuit_breaker_half_open_window_size = 10;
_circuit_breaker.Reset();
int total_failed2 = 0;
StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
total_failed2 += fc->_unhealthy_cnt;
}
brpc::FLAGS_circuit_breaker_half_open_window_size = 0;

EXPECT_EQ(kLongWindowSize * 2 * kThreadNum -
kShortWindowSize *
brpc::FLAGS_circuit_breaker_short_window_error_percent /
100,
total_failed);

EXPECT_EQ(total_failed1, total_failed);
EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2);
}

TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) {
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
Expand Down

0 comments on commit 976c588

Please sign in to comment.