From e1a88638fb5028262396d0e6d687e66c5abd4da2 Mon Sep 17 00:00:00 2001 From: youcheng huang <690394136@qq.com> Date: Wed, 25 Oct 2023 15:01:45 +0800 Subject: [PATCH] =?UTF-8?q?send=20response=E4=B9=8B=E5=90=8E=EF=BC=8Creque?= =?UTF-8?q?st/response=E5=AF=B9=E8=B1=A1=E6=9E=90=E6=9E=84=E4=B9=8B?= =?UTF-8?q?=E5=89=8D=E6=89=A7=E8=A1=8C=E4=B8=80=E4=BA=9B=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E9=80=BB=E8=BE=91=20(#2328)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * call_after_rpc_resp * fix function name & add examples & add ut * fix mistake * fix compile error * fix compile error * update ut * complete ut * update ut * modify function name --------- Co-authored-by: yuncheng --- example/asynchronous_echo_c++/server.cpp | 20 +++++++++++- example/echo_c++/server.cpp | 19 +++++++++++ example/http_c++/http_server.cpp | 21 ++++++++++++- src/brpc/controller.cpp | 8 +++++ src/brpc/controller.h | 11 +++++++ src/brpc/policy/baidu_rpc_protocol.cpp | 8 +++-- src/brpc/policy/http_rpc_protocol.cpp | 6 +++- test/brpc_channel_unittest.cpp | 40 ++++++++++++++++++++++-- 8 files changed, 125 insertions(+), 8 deletions(-) diff --git a/example/asynchronous_echo_c++/server.cpp b/example/asynchronous_echo_c++/server.cpp index d95d9dfe16..8c7ced67cb 100644 --- a/example/asynchronous_echo_c++/server.cpp +++ b/example/asynchronous_echo_c++/server.cpp @@ -39,10 +39,15 @@ class EchoServiceImpl : public example::EchoService { // This object helps you to call done->Run() in RAII style. If you need // to process the request asynchronously, pass done_guard.release(). brpc::ClosureGuard done_guard(done); - + brpc::Controller* cntl = static_cast(cntl_base); + // optional: set a callback function which is called after response is sent + // and before cntl/req/res is destructed. + cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + // The purpose of following logs is to help you to understand // how clients interact with servers more intuitively. You should // remove these logs in performance-sensitive servers. @@ -64,6 +69,19 @@ class EchoServiceImpl : public example::EchoService { cntl->response_attachment().append("bar"); } } + + // optional + static void CallAfterRpc(brpc::Controller* cntl, + const google::protobuf::Message* req, + const google::protobuf::Message* res) { + // at this time res is already sent to client, but cntl/req/res is not destructed + std::string req_str; + std::string res_str; + json2pb::ProtoMessageToJson(*req, &req_str, NULL); + json2pb::ProtoMessageToJson(*res, &res_str, NULL); + LOG(INFO) << "req:" << req_str + << " res:" << res_str; + } }; int main(int argc, char* argv[]) { diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp index 08b3e9d689..d1f0605178 100644 --- a/example/echo_c++/server.cpp +++ b/example/echo_c++/server.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "echo.pb.h" DEFINE_bool(echo_attachment, true, "Echo attachment as well"); @@ -48,6 +49,11 @@ class EchoServiceImpl : public EchoService { brpc::Controller* cntl = static_cast(cntl_base); + // optional: set a callback function which is called after response is sent + // and before cntl/req/res is destructed. + cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + // The purpose of following logs is to help you to understand // how clients interact with servers more intuitively. You should // remove these logs in performance-sensitive servers. @@ -70,6 +76,19 @@ class EchoServiceImpl : public EchoService { cntl->response_attachment().append(cntl->request_attachment()); } } + + // optional + static void CallAfterRpc(brpc::Controller* cntl, + const google::protobuf::Message* req, + const google::protobuf::Message* res) { + // at this time res is already sent to client, but cntl/req/res is not destructed + std::string req_str; + std::string res_str; + json2pb::ProtoMessageToJson(*req, &req_str, NULL); + json2pb::ProtoMessageToJson(*res, &res_str, NULL); + LOG(INFO) << "req:" << req_str + << " res:" << res_str; + } }; } // namespace example diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp index 9a1595b97f..202f55d162 100644 --- a/example/http_c++/http_server.cpp +++ b/example/http_c++/http_server.cpp @@ -45,9 +45,15 @@ class HttpServiceImpl : public HttpService { // This object helps you to call done->Run() in RAII style. If you need // to process the request asynchronously, pass done_guard.release(). brpc::ClosureGuard done_guard(done); - + brpc::Controller* cntl = static_cast(cntl_base); + + // optional: set a callback function which is called after response is sent + // and before cntl/req/res is destructed. + cntl->set_after_rpc_resp_fn(std::bind(&HttpServiceImpl::CallAfterRpc, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + // Fill response. cntl->http_response().set_content_type("text/plain"); butil::IOBufBuilder os; @@ -59,6 +65,19 @@ class HttpServiceImpl : public HttpService { os << "\nbody: " << cntl->request_attachment() << '\n'; os.move_to(cntl->response_attachment()); } + + // optional + static void CallAfterRpc(brpc::Controller* cntl, + const google::protobuf::Message* req, + const google::protobuf::Message* res) { + // at this time res is already sent to client, but cntl/req/res is not destructed + std::string req_str; + std::string res_str; + json2pb::ProtoMessageToJson(*req, &req_str, NULL); + json2pb::ProtoMessageToJson(*res, &res_str, NULL); + LOG(INFO) << "req:" << req_str + << " res:" << res_str; + } }; // Service with dynamic path. diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 5392f16c0b..47dafb6f1b 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -228,6 +228,7 @@ void Controller::ResetNonPods() { } delete _remote_stream_settings; _thrift_method_name.clear(); + _after_rpc_resp_fn = nullptr; CHECK(_unfinished_call == NULL); } @@ -1500,6 +1501,13 @@ int Controller::GetSockOption(int level, int optname, void* optval, socklen_t* o } } +void Controller::CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res) { + if (_after_rpc_resp_fn) { + _after_rpc_resp_fn(this, req, res); + _after_rpc_resp_fn = nullptr; + } +} + #if defined(OS_MACOSX) typedef sig_t SignalHandler; #else diff --git a/src/brpc/controller.h b/src/brpc/controller.h index a949881884..708ff8c64f 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -22,6 +22,7 @@ // To brpc developers: This is a header included by user, don't depend // on internal structures, use opaque pointers instead. +#include // std::function #include // Users often need gflags #include #include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr @@ -579,6 +580,14 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // -1 means no deadline. int64_t deadline_us() const { return _deadline_us; } + using AfterRpcRespFnType = std::function; + + void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn = fn; } + + void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res); + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -834,6 +843,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); std::string _thrift_method_name; uint32_t _auth_flags; + + AfterRpcRespFnType _after_rpc_resp_fn; }; // Advises the RPC system that the caller desires that the RPC call be diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index ef1ab7d467..7fafa2181f 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -149,10 +149,14 @@ void SendRpcResponse(int64_t correlation_id, span->set_start_send_us(butil::cpuwide_time_us()); } Socket* sock = accessor.get_sending_socket(); - std::unique_ptr recycle_cntl(cntl); - ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); + std::unique_ptr recycle_req(req); std::unique_ptr recycle_res(res); + + std::unique_ptr recycle_cntl(cntl); + ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); + + ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp, req, res)); StreamId response_stream_id = accessor.response_stream(); diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 9120ba5879..979e386166 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -736,7 +736,11 @@ friend class HttpResponseSenderAsDone; class HttpResponseSenderAsDone : public google::protobuf::Closure { public: HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {} - void Run() override { delete this; } + void Run() override { + _sender._cntl->CallAfterRpcResp(_sender._req.get(), _sender._res.get()); + delete this; + } + private: HttpResponseSender _sender; }; diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 6d7d374b8c..d43a0f4b95 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -40,7 +40,11 @@ #include "brpc/selective_channel.h" #include "brpc/socket_map.h" #include "brpc/controller.h" +#if BAZEL_TEST +#include "test/echo.pb.h" +#else #include "echo.pb.h" +#endif // BAZEL_TEST #include "brpc/options.pb.h" namespace brpc { @@ -129,6 +133,22 @@ static bool VerifyMyRequest(const brpc::InputMessageBase* msg_base) { return true; } +class CallAfterRpcObject { +public: + explicit CallAfterRpcObject() {} + + ~CallAfterRpcObject() { + EXPECT_EQ(str, "CallAfterRpcRespTest"); + } + + void Append(const std::string& s) { + str.append(s); + } + +private: + std::string str; +}; + class MyEchoService : public ::test::EchoService { void Echo(google::protobuf::RpcController* cntl_base, const ::test::EchoRequest* req, @@ -136,6 +156,9 @@ class MyEchoService : public ::test::EchoService { google::protobuf::Closure* done) { brpc::Controller* cntl = static_cast(cntl_base); + std::shared_ptr str_test(new CallAfterRpcObject()); + cntl->set_after_rpc_resp_fn(std::bind(&MyEchoService::CallAfterRpc, str_test, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); brpc::ClosureGuard done_guard(done); if (req->server_fail()) { cntl->SetFailed(req->server_fail(), "Server fail1"); @@ -157,6 +180,17 @@ class MyEchoService : public ::test::EchoService { } res->set_receiving_socket_id(cntl->_current_call.sending_sock->id()); } + static void CallAfterRpc(std::shared_ptr str, + brpc::Controller* cntl, + const google::protobuf::Message* req, + const google::protobuf::Message* res) { + const test::EchoRequest* request = static_cast(req); + const test::EchoResponse* response = static_cast(res); + str->Append("CallAfterRpcRespTest"); + EXPECT_TRUE(nullptr != cntl); + EXPECT_TRUE(nullptr != request); + EXPECT_TRUE(nullptr != response); + } }; pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT; @@ -247,7 +281,7 @@ class ChannelTest : public ::testing::Test{ const brpc::Server*, brpc::MethodStatus*, int64_t>( &brpc::policy::SendRpcResponse, - meta.correlation_id(), cntl, NULL, res, + meta.correlation_id(), cntl, req, res, &ts->_dummy, NULL, -1); ts->_svc.CallMethod(method, cntl, req, res, done); } @@ -1564,7 +1598,7 @@ class ChannelTest : public ::testing::Test{ } StopAndJoin(); } - + void RPCThread(brpc::ChannelBase* channel, bool async) { brpc::Controller cntl; test::EchoRequest req; @@ -1583,7 +1617,7 @@ class ChannelTest : public ::testing::Test{ test::EchoResponse res; req.set_message(__FUNCTION__); CallMethod(channel, &cntl, &req, &res, async); - + ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); ASSERT_EQ("received " + std::string(__FUNCTION__), res.message()); cntl.Reset();