Skip to content

Commit

Permalink
limit total streams oncomsume bytes in one socket
Browse files Browse the repository at this point in the history
  • Loading branch information
chenbay committed Oct 24, 2022
1 parent 6457ad7 commit 1874f5a
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 11 deletions.
8 changes: 8 additions & 0 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ DEFINE_int64(socket_max_unwritten_bytes, 64 * 1024 * 1024,
"Max unwritten bytes in each socket, if the limit is reached,"
" Socket.Write fails with EOVERCROWDED");

DEFINE_int64(socket_max_streams_unconsumed_bytes, 0,
"Max stream receivers' unconsumed bytes in one socket,"
" it used in stream for receiver buffer control.");

DEFINE_int32(max_connection_pool_size, 100,
"Max number of pooled connections to a single endpoint");
BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate);
Expand Down Expand Up @@ -455,6 +459,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
{
CreateVarsOnce();
Expand Down Expand Up @@ -640,6 +645,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
Expand Down Expand Up @@ -2149,6 +2155,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\n_additional_ref_status="
<< ptr->_additional_ref_status.load(butil::memory_order_relaxed)
<< "\ntotal_streams_buffer_size="
<< ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
<< "\nninflight_app_health_check="
<< ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed)
<< "\nagent_socket_id=";
Expand Down
1 change: 1 addition & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ friend void DereferenceSocket(Socket*);

butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;
butil::atomic<int64_t> _total_streams_unconsumed_size;

butil::atomic<int64_t> _ninflight_app_health_check;
};
Expand Down
54 changes: 45 additions & 9 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace brpc {

DECLARE_bool(usercode_in_pthread);
DECLARE_int64(socket_max_streams_unconsumed_bytes);

const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;

Expand All @@ -45,6 +46,7 @@ Stream::Stream()
, _closed(false)
, _produced(0)
, _remote_consumed(0)
, _cur_buf_size(0)
, _local_consumed(0)
, _parse_rpc_response(false)
, _pending_buf(NULL)
Expand Down Expand Up @@ -72,6 +74,17 @@ int Stream::Create(const StreamOptions &options,
s->_connected = false;
s->_options = options;
s->_closed = false;
s->_cur_buf_size = options.max_buf_size;
if (options.max_buf_size > 0 && options.min_buf_size > options.max_buf_size)
{
// set 0 if min_buf_size is invalid.
s->_options.min_buf_size = 0;
LOG(WARNING) << "options.min_buf_size is larger than options.max_buf_size, it will be set to 0.";
}
if (FLAGS_socket_max_streams_unconsumed_bytes > 0 && s->_options.min_buf_size > 0) {
s->_cur_buf_size = s->_options.min_buf_size;
}

if (remote_settings != NULL) {
s->_remote_settings.MergeFrom(*remote_settings);
s->_parse_rpc_response = false;
Expand Down Expand Up @@ -260,17 +273,17 @@ void Stream::TriggerOnConnectIfNeed() {
}

int Stream::AppendIfNotFull(const butil::IOBuf &data) {
if (_options.max_buf_size > 0) {
if (_cur_buf_size > 0) {
std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
if (_produced >= _remote_consumed + (size_t)_options.max_buf_size) {
if (_produced >= _remote_consumed + _cur_buf_size) {
const size_t saved_produced = _produced;
const size_t saved_remote_consumed = _remote_consumed;
lck.unlock();
RPC_VLOG << "Stream=" << _id << " is full"
<< "_produced=" << saved_produced
<< " _remote_consumed=" << saved_remote_consumed
<< " gap=" << saved_produced - saved_remote_consumed
<< " max_buf_size=" << _options.max_buf_size;
<< " max_buf_size=" << _cur_buf_size;
return 1;
}
_produced += data.length();
Expand All @@ -284,21 +297,43 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
_produced -= data.length();
return -1;
}
if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
_host_socket->_total_streams_unconsumed_size += data.length();
}
return 0;
}

void Stream::SetRemoteConsumed(size_t new_remote_consumed) {
CHECK(_options.max_buf_size > 0);
CHECK(_cur_buf_size > 0);
bthread_id_list_t tmplist;
bthread_id_list_init(&tmplist, 0, 0);
bthread_mutex_lock(&_congestion_control_mutex);
if (_remote_consumed >= new_remote_consumed) {
bthread_mutex_unlock(&_congestion_control_mutex);
return;
}
const bool was_full = _produced >= _remote_consumed + (size_t)_options.max_buf_size;
const bool was_full = _produced >= _remote_consumed + _cur_buf_size;

if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
_host_socket->_total_streams_unconsumed_size -= new_remote_consumed - _remote_consumed;
if (_host_socket->_total_streams_unconsumed_size > FLAGS_socket_max_streams_unconsumed_bytes) {
if (_options.min_buf_size > 0) {
_cur_buf_size = _options.min_buf_size;
} else {
_cur_buf_size /= 2;
}
LOG(INFO) << "stream consumers on socket " << _host_socket->id() << " is crowded, " << "cut stream " << id() << " buffer to " << _cur_buf_size;
} else if (_produced >= new_remote_consumed + _cur_buf_size && (_options.max_buf_size == 0 || _cur_buf_size < _options.max_buf_size)) {
if (_options.max_buf_size > 0 && _cur_buf_size * 2 > _options.max_buf_size) {
_cur_buf_size = _options.max_buf_size;
} else {
_cur_buf_size *= 2;
}
}
}

_remote_consumed = new_remote_consumed;
const bool is_full = _produced >= _remote_consumed + (size_t)_options.max_buf_size;
const bool is_full = _produced >= _remote_consumed + _cur_buf_size;
if (was_full && !is_full) {
bthread_id_list_swap(&tmplist, &_writable_wait_list);
}
Expand Down Expand Up @@ -374,8 +409,8 @@ void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg,
}
}
bthread_mutex_lock(&_congestion_control_mutex);
if (_options.max_buf_size <= 0
|| _produced < _remote_consumed + (size_t)_options.max_buf_size) {
if (_cur_buf_size <= 0
|| _produced < _remote_consumed + _cur_buf_size) {
bthread_mutex_unlock(&_congestion_control_mutex);
CHECK_EQ(0, TriggerOnWritable(wait_id, wm, 0));
return;
Expand Down Expand Up @@ -524,6 +559,7 @@ int Stream::Consume(void *meta, bthread::TaskIterator<butil::IOBuf*>& iter) {
}
}
mb.flush();

if (s->_remote_settings.need_feedback() && mb.total_length() > 0) {
s->_local_consumed += mb.total_length();
s->SendFeedback();
Expand Down Expand Up @@ -560,7 +596,7 @@ int Stream::SetHostSocket(Socket *host_socket) {

void Stream::FillSettings(StreamSettings *settings) {
settings->set_stream_id(id());
settings->set_need_feedback(_options.max_buf_size > 0);
settings->set_need_feedback(_cur_buf_size > 0);
settings->set_writable(_options.handler != NULL);
}

Expand Down
10 changes: 8 additions & 2 deletions src/brpc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ class StreamInputHandler {

struct StreamOptions {
StreamOptions()
: max_buf_size(2 * 1024 * 1024)
: min_buf_size(1024 * 1024)
, max_buf_size(2 * 1024 * 1024)
, idle_timeout_ms(-1)
, messages_in_batch(128)
, handler(NULL)
{}

// stream max buffer size limit in [min_buf_size, max_buf_size]
// If |min_buf_size| <= 0, there's no min size limit of buf size
// default: 1048576 (1M)
size_t min_buf_size;

// The max size of unconsumed data allowed at remote side.
// If |max_buf_size| <= 0, there's no limit of buf size
// default: 2097152 (2M)
int max_buf_size;
size_t max_buf_size;

// Notify user when there's no data for at least |idle_timeout_ms|
// milliseconds since the last time that HandleIdleTimeout or HandleInput
Expand Down
1 change: 1 addition & 0 deletions src/brpc/stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ friend class MessageBatcher;
bthread_mutex_t _congestion_control_mutex;
size_t _produced;
size_t _remote_consumed;
size_t _cur_buf_size;
bthread_id_list_t _writable_wait_list;

int64_t _local_consumed;
Expand Down

0 comments on commit 1874f5a

Please sign in to comment.