Skip to content

Commit

Permalink
[fix](brpc) check failed socket before SetConnected (apache#32790)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Mar 25, 2024
1 parent 7169c07 commit 55691ad
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions thirdparty/patches/brpc-1.4.0-fix-stream-rpc-set-connected.patch
Original file line number Diff line number Diff line change
@@ -1,20 +1,48 @@
From 9acc6ef89d8770d5516953e2eadf0c27a7d424fc Mon Sep 17 00:00:00 2001
From 031194784d540235dfa6ba56bd196a7aad92e30c Mon Sep 17 00:00:00 2001
From: Kaijie Chen <ckj@apache.org>
Date: Sun, 28 Jan 2024 15:58:31 +0800
Subject: [PATCH] fix set connected for stream rpc

---
src/brpc/policy/baidu_rpc_protocol.cpp | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
src/brpc/policy/baidu_rpc_protocol.cpp | 25 ++++++++++++++++++-------
1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp
index 0239960e..c47903a6 100644
index 0239960e..e8a90e34 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -234,6 +234,11 @@ void SendRpcResponse(int64_t correlation_id,
@@ -25,6 +25,7 @@
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/raw_pack.h" // RawPacker RawUnpacker
#include "brpc/controller.h" // Controller
+#include "brpc/errno.pb.h"
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
@@ -212,7 +213,9 @@ void SendRpcResponse(int64_t correlation_id,
if (Socket::Address(response_stream_id, &stream_ptr) == 0) {
Stream* s = (Stream*)stream_ptr->conn();
s->FillSettings(meta.mutable_stream_settings());
- s->SetHostSocket(sock);
+ if (s->SetHostSocket(sock) != 0) {
+ LOG(WARNING) << "SetHostSocket failed";
+ }
} else {
LOG(WARNING) << "Stream=" << response_stream_id
<< " was closed before sending response";
@@ -234,6 +237,20 @@ void SendRpcResponse(int64_t correlation_id,
// Send rpc response over stream even if server side failed to create
// stream for some reasons.
if(cntl->has_remote_stream()){
+ if (sock->Failed()) {
+ LOG(WARNING) << "Fail to write into " << *sock;
+ cntl->SetFailed(EFAILEDSOCKET, "Fail to write into %s",
+ sock->description().c_str());
+ if (stream_ptr) {
+ ((Stream *)stream_ptr->conn())->Close();
+ }
+ return;
+ }
+ if(stream_ptr) {
+ // Now it's ok the mark this server-side stream as connectted as all the
+ // written user data would follower the RPC response.
Expand All @@ -23,7 +51,7 @@ index 0239960e..c47903a6 100644
// Send the response over stream to notify that this stream connection
// is successfully built.
// Response_stream can be INVALID_STREAM_ID when error occurs.
@@ -249,12 +254,6 @@ void SendRpcResponse(int64_t correlation_id,
@@ -249,12 +266,6 @@ void SendRpcResponse(int64_t correlation_id,
}
return;
}
Expand Down

0 comments on commit 55691ad

Please sign in to comment.