From f699483802cf1ec4fa900226bcf04423e1f320f4 Mon Sep 17 00:00:00 2001 From: ds Date: Sat, 22 Jan 2022 22:43:54 +0800 Subject: [PATCH 1/2] fix(input_messenger) client side retry policy client retry parse message when baidu_std fall to streaming_rpc --- src/brpc/input_messenger.cpp | 61 +++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index aff1b9815d..46af4a1c93 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -67,31 +67,42 @@ ParseResult InputMessenger::CutInputMessage( // selection or by client. if (preferred >= 0 && preferred <= max_index && _handlers[preferred].parse != NULL) { - ParseResult result = - _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg); - if (result.is_ok() || - result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) { - *index = preferred; - return result; - } else if (result.error() != PARSE_ERROR_TRY_OTHERS) { - // Critical error, return directly. - LOG_IF(ERROR, result.error() == PARSE_ERROR_TOO_BIG_DATA) - << "A message from " << m->remote_side() - << "(protocol=" << _handlers[preferred].name - << ") is bigger than " << FLAGS_max_body_size - << " bytes, the connection will be closed." - " Set max_body_size to allow bigger messages"; - return result; - } - if (m->CreatedByConnect() && - // baidu_std may fall to streaming_rpc - (ProtocolType)preferred != PROTOCOL_BAIDU_STD) { - // The protocol is fixed at client-side, no need to try others. - LOG(ERROR) << "Fail to parse response from " << m->remote_side() - << " by " << _handlers[preferred].name - << " at client-side"; - return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); - } + int cur_index = preferred; + do { + ParseResult result = + _handlers[cur_index].parse(&m->_read_buf, m, read_eof, _handlers[cur_index].arg); + if (result.is_ok() || + result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) { + *index = cur_index; + return result; + } else if (result.error() != PARSE_ERROR_TRY_OTHERS) { + // Critical error, return directly. + LOG_IF(ERROR, result.error() == PARSE_ERROR_TOO_BIG_DATA) + << "A message from " << m->remote_side() + << "(protocol=" << _handlers[cur_index].name + << ") is bigger than " << FLAGS_max_body_size + << " bytes, the connection will be closed." + " Set max_body_size to allow bigger messages"; + return result; + } + + if (m->CreatedByConnect()) { + if((ProtocolType)cur_index == PROTOCOL_BAIDU_STD) { + // baidu_std may fall to streaming_rpc. + cur_index = (int)PROTOCOL_STREAMING_RPC; + continue; + } else { + // The protocol is fixed at client-side, no need to try others. + LOG(ERROR) << "Fail to parse response from " << m->remote_side() + << " by " << _handlers[preferred].name + << " at client-side"; + return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); + } + } else { + // Try other protocols. + break; + } + } while (1); // Clear context before trying next protocol which probably has // an incompatible context with the current one. if (m->parsing_context()) { From a7a66a865855a08d9bb2209bfcfca9cd3258f369 Mon Sep 17 00:00:00 2001 From: ds Date: Fri, 4 Mar 2022 15:22:49 +0800 Subject: [PATCH 2/2] fix(input_messenger) update preferred index fix comments --- src/brpc/input_messenger.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index 46af4a1c93..d9b1a3a9ad 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -73,6 +73,7 @@ ParseResult InputMessenger::CutInputMessage( _handlers[cur_index].parse(&m->_read_buf, m, read_eof, _handlers[cur_index].arg); if (result.is_ok() || result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) { + m->set_preferred_index(cur_index); *index = cur_index; return result; } else if (result.error() != PARSE_ERROR_TRY_OTHERS) { @@ -102,7 +103,7 @@ ParseResult InputMessenger::CutInputMessage( // Try other protocols. break; } - } while (1); + } while (true); // Clear context before trying next protocol which probably has // an incompatible context with the current one. if (m->parsing_context()) {