Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limiting max bytes in stream comsume queue with the same host socket #1958

Merged
merged 1 commit into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ DEFINE_int64(socket_max_unwritten_bytes, 64 * 1024 * 1024,
"Max unwritten bytes in each socket, if the limit is reached,"
" Socket.Write fails with EOVERCROWDED");

DEFINE_int64(socket_max_streams_unconsumed_bytes, 0,
"Max stream receivers' unconsumed bytes in one socket,"
" it used in stream for receiver buffer control.");

DEFINE_int32(max_connection_pool_size, 100,
"Max number of pooled connections to a single endpoint");
BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate);
Expand Down Expand Up @@ -455,6 +459,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
{
CreateVarsOnce();
Expand Down Expand Up @@ -640,6 +645,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
Expand Down Expand Up @@ -2149,6 +2155,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\n_additional_ref_status="
<< ptr->_additional_ref_status.load(butil::memory_order_relaxed)
<< "\ntotal_streams_buffer_size="
<< ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
<< "\nninflight_app_health_check="
<< ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed)
<< "\nagent_socket_id=";
Expand Down
1 change: 1 addition & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ friend void DereferenceSocket(Socket*);

butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;
butil::atomic<int64_t> _total_streams_unconsumed_size;

butil::atomic<int64_t> _ninflight_app_health_check;
};
Expand Down
57 changes: 47 additions & 10 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace brpc {

DECLARE_bool(usercode_in_pthread);
DECLARE_int64(socket_max_streams_unconsumed_bytes);

const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;

Expand All @@ -45,6 +46,7 @@ Stream::Stream()
, _closed(false)
, _produced(0)
, _remote_consumed(0)
, _cur_buf_size(0)
, _local_consumed(0)
, _parse_rpc_response(false)
, _pending_buf(NULL)
Expand Down Expand Up @@ -72,6 +74,16 @@ int Stream::Create(const StreamOptions &options,
s->_connected = false;
s->_options = options;
s->_closed = false;
s->_cur_buf_size = options.max_buf_size;
if (options.max_buf_size > 0 && options.min_buf_size > options.max_buf_size) {
// set 0 if min_buf_size is invalid.
s->_options.min_buf_size = 0;
LOG(WARNING) << "options.min_buf_size is larger than options.max_buf_size, it will be set to 0.";
}
if (FLAGS_socket_max_streams_unconsumed_bytes > 0 && s->_options.min_buf_size > 0) {
s->_cur_buf_size = s->_options.min_buf_size;
}

if (remote_settings != NULL) {
s->_remote_settings.MergeFrom(*remote_settings);
s->_parse_rpc_response = false;
Expand Down Expand Up @@ -260,45 +272,69 @@ void Stream::TriggerOnConnectIfNeed() {
}

int Stream::AppendIfNotFull(const butil::IOBuf &data) {
if (_options.max_buf_size > 0) {
if (_cur_buf_size > 0) {
std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
if (_produced >= _remote_consumed + (size_t)_options.max_buf_size) {
if (_produced >= _remote_consumed + _cur_buf_size) {
const size_t saved_produced = _produced;
const size_t saved_remote_consumed = _remote_consumed;
lck.unlock();
RPC_VLOG << "Stream=" << _id << " is full"
<< "_produced=" << saved_produced
<< " _remote_consumed=" << saved_remote_consumed
<< " gap=" << saved_produced - saved_remote_consumed
<< " max_buf_size=" << _options.max_buf_size;
<< " max_buf_size=" << _cur_buf_size;
return 1;
}
_produced += data.length();
}

size_t data_length = data.length();
butil::IOBuf copied_data(data);
const int rc = _fake_socket_weak_ref->Write(&copied_data);
if (rc != 0) {
// Stream may be closed by peer before
LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
BAIDU_SCOPED_LOCK(_congestion_control_mutex);
_produced -= data.length();
_produced -= data_length;
return -1;
}
if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
_host_socket->_total_streams_unconsumed_size += data_length;
}
return 0;
}

void Stream::SetRemoteConsumed(size_t new_remote_consumed) {
CHECK(_options.max_buf_size > 0);
CHECK(_cur_buf_size > 0);
bthread_id_list_t tmplist;
bthread_id_list_init(&tmplist, 0, 0);
bthread_mutex_lock(&_congestion_control_mutex);
if (_remote_consumed >= new_remote_consumed) {
bthread_mutex_unlock(&_congestion_control_mutex);
return;
}
const bool was_full = _produced >= _remote_consumed + (size_t)_options.max_buf_size;
const bool was_full = _produced >= _remote_consumed + _cur_buf_size;

if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
_host_socket->_total_streams_unconsumed_size -= new_remote_consumed - _remote_consumed;
if (_host_socket->_total_streams_unconsumed_size > FLAGS_socket_max_streams_unconsumed_bytes) {
if (_options.min_buf_size > 0) {
_cur_buf_size = _options.min_buf_size;
} else {
_cur_buf_size /= 2;
}
LOG(INFO) << "stream consumers on socket " << _host_socket->id() << " is crowded, " << "cut stream " << id() << " buffer to " << _cur_buf_size;
} else if (_produced >= new_remote_consumed + _cur_buf_size && (_options.max_buf_size <= 0 || _cur_buf_size < (size_t)_options.max_buf_size)) {
if (_options.max_buf_size > 0 && _cur_buf_size * 2 > (size_t)_options.max_buf_size) {
_cur_buf_size = _options.max_buf_size;
} else {
_cur_buf_size *= 2;
}
}
}

_remote_consumed = new_remote_consumed;
const bool is_full = _produced >= _remote_consumed + (size_t)_options.max_buf_size;
const bool is_full = _produced >= _remote_consumed + _cur_buf_size;
if (was_full && !is_full) {
bthread_id_list_swap(&tmplist, &_writable_wait_list);
}
Expand Down Expand Up @@ -374,8 +410,8 @@ void Stream::Wait(void (*on_writable)(StreamId, void*, int), void* arg,
}
}
bthread_mutex_lock(&_congestion_control_mutex);
if (_options.max_buf_size <= 0
|| _produced < _remote_consumed + (size_t)_options.max_buf_size) {
if (_cur_buf_size <= 0
|| _produced < _remote_consumed + _cur_buf_size) {
bthread_mutex_unlock(&_congestion_control_mutex);
CHECK_EQ(0, TriggerOnWritable(wait_id, wm, 0));
return;
Expand Down Expand Up @@ -524,6 +560,7 @@ int Stream::Consume(void *meta, bthread::TaskIterator<butil::IOBuf*>& iter) {
}
}
mb.flush();

if (s->_remote_settings.need_feedback() && mb.total_length() > 0) {
s->_local_consumed += mb.total_length();
s->SendFeedback();
Expand Down Expand Up @@ -560,7 +597,7 @@ int Stream::SetHostSocket(Socket *host_socket) {

void Stream::FillSettings(StreamSettings *settings) {
settings->set_stream_id(id());
settings->set_need_feedback(_options.max_buf_size > 0);
settings->set_need_feedback(_cur_buf_size > 0);
settings->set_writable(_options.handler != NULL);
}

Expand Down
8 changes: 7 additions & 1 deletion src/brpc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ class StreamInputHandler {

struct StreamOptions {
StreamOptions()
: max_buf_size(2 * 1024 * 1024)
: min_buf_size(1024 * 1024)
, max_buf_size(2 * 1024 * 1024)
, idle_timeout_ms(-1)
, messages_in_batch(128)
, handler(NULL)
{}

// stream max buffer size limit in [min_buf_size, max_buf_size]
// If |min_buf_size| <= 0, there's no min size limit of buf size
// default: 1048576 (1M)
int min_buf_size;

// The max size of unconsumed data allowed at remote side.
// If |max_buf_size| <= 0, there's no limit of buf size
// default: 2097152 (2M)
Expand Down
1 change: 1 addition & 0 deletions src/brpc/stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ friend class MessageBatcher;
bthread_mutex_t _congestion_control_mutex;
size_t _produced;
size_t _remote_consumed;
size_t _cur_buf_size;
bthread_id_list_t _writable_wait_list;

int64_t _local_consumed;
Expand Down