Skip to content

Commit

Permalink
send response之后,request/response对象析构之前执行一些自定义逻辑 (#2328)
Browse files Browse the repository at this point in the history
* call_after_rpc_resp

* fix function name & add examples & add ut

* fix mistake

* fix compile error

* fix compile error

* update ut

* complete ut

* update ut

* modify function name

---------

Co-authored-by: yuncheng <yuncheng@pinduoduo.com>
  • Loading branch information
yockie and yuncheng authored Oct 25, 2023
1 parent d142274 commit e1a8863
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 8 deletions.
20 changes: 19 additions & 1 deletion example/asynchronous_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ class EchoServiceImpl : public example::EchoService {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);

brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);

// optional: set a callback function which is called after response is sent
// and before cntl/req/res is destructed.
cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// The purpose of following logs is to help you to understand
// how clients interact with servers more intuitively. You should
// remove these logs in performance-sensitive servers.
Expand All @@ -64,6 +69,19 @@ class EchoServiceImpl : public example::EchoService {
cntl->response_attachment().append("bar");
}
}

// optional
static void CallAfterRpc(brpc::Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res) {
// at this time res is already sent to client, but cntl/req/res is not destructed
std::string req_str;
std::string res_str;
json2pb::ProtoMessageToJson(*req, &req_str, NULL);
json2pb::ProtoMessageToJson(*res, &res_str, NULL);
LOG(INFO) << "req:" << req_str
<< " res:" << res_str;
}
};

int main(int argc, char* argv[]) {
Expand Down
19 changes: 19 additions & 0 deletions example/echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <json2pb/pb_to_json.h>
#include "echo.pb.h"

DEFINE_bool(echo_attachment, true, "Echo attachment as well");
Expand Down Expand Up @@ -48,6 +49,11 @@ class EchoServiceImpl : public EchoService {
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);

// optional: set a callback function which is called after response is sent
// and before cntl/req/res is destructed.
cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// The purpose of following logs is to help you to understand
// how clients interact with servers more intuitively. You should
// remove these logs in performance-sensitive servers.
Expand All @@ -70,6 +76,19 @@ class EchoServiceImpl : public EchoService {
cntl->response_attachment().append(cntl->request_attachment());
}
}

// optional
static void CallAfterRpc(brpc::Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res) {
// at this time res is already sent to client, but cntl/req/res is not destructed
std::string req_str;
std::string res_str;
json2pb::ProtoMessageToJson(*req, &req_str, NULL);
json2pb::ProtoMessageToJson(*res, &res_str, NULL);
LOG(INFO) << "req:" << req_str
<< " res:" << res_str;
}
};
} // namespace example

Expand Down
21 changes: 20 additions & 1 deletion example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@ class HttpServiceImpl : public HttpService {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);

brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);

// optional: set a callback function which is called after response is sent
// and before cntl/req/res is destructed.
cntl->set_after_rpc_resp_fn(std::bind(&HttpServiceImpl::CallAfterRpc,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// Fill response.
cntl->http_response().set_content_type("text/plain");
butil::IOBufBuilder os;
Expand All @@ -59,6 +65,19 @@ class HttpServiceImpl : public HttpService {
os << "\nbody: " << cntl->request_attachment() << '\n';
os.move_to(cntl->response_attachment());
}

// optional
static void CallAfterRpc(brpc::Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res) {
// at this time res is already sent to client, but cntl/req/res is not destructed
std::string req_str;
std::string res_str;
json2pb::ProtoMessageToJson(*req, &req_str, NULL);
json2pb::ProtoMessageToJson(*res, &res_str, NULL);
LOG(INFO) << "req:" << req_str
<< " res:" << res_str;
}
};

// Service with dynamic path.
Expand Down
8 changes: 8 additions & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ void Controller::ResetNonPods() {
}
delete _remote_stream_settings;
_thrift_method_name.clear();
_after_rpc_resp_fn = nullptr;

CHECK(_unfinished_call == NULL);
}
Expand Down Expand Up @@ -1500,6 +1501,13 @@ int Controller::GetSockOption(int level, int optname, void* optval, socklen_t* o
}
}

void Controller::CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res) {
if (_after_rpc_resp_fn) {
_after_rpc_resp_fn(this, req, res);
_after_rpc_resp_fn = nullptr;
}
}

#if defined(OS_MACOSX)
typedef sig_t SignalHandler;
#else
Expand Down
11 changes: 11 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.

#include <functional> // std::function
#include <gflags/gflags.h> // Users often need gflags
#include <string>
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
Expand Down Expand Up @@ -579,6 +580,14 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// -1 means no deadline.
int64_t deadline_us() const { return _deadline_us; }

using AfterRpcRespFnType = std::function<void(Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res)>;

void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn = fn; }

void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);

private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
Expand Down Expand Up @@ -834,6 +843,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
std::string _thrift_method_name;

uint32_t _auth_flags;

AfterRpcRespFnType _after_rpc_resp_fn;
};

// Advises the RPC system that the caller desires that the RPC call be
Expand Down
8 changes: 6 additions & 2 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,14 @@ void SendRpcResponse(int64_t correlation_id,
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp, req, res));

StreamId response_stream_id = accessor.response_stream();

Expand Down
6 changes: 5 additions & 1 deletion src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,11 @@ friend class HttpResponseSenderAsDone;
class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override { delete this; }
void Run() override {
_sender._cntl->CallAfterRpcResp(_sender._req.get(), _sender._res.get());
delete this;
}

private:
HttpResponseSender _sender;
};
Expand Down
40 changes: 37 additions & 3 deletions test/brpc_channel_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
#include "brpc/selective_channel.h"
#include "brpc/socket_map.h"
#include "brpc/controller.h"
#if BAZEL_TEST
#include "test/echo.pb.h"
#else
#include "echo.pb.h"
#endif // BAZEL_TEST
#include "brpc/options.pb.h"

namespace brpc {
Expand Down Expand Up @@ -129,13 +133,32 @@ static bool VerifyMyRequest(const brpc::InputMessageBase* msg_base) {
return true;
}

class CallAfterRpcObject {
public:
explicit CallAfterRpcObject() {}

~CallAfterRpcObject() {
EXPECT_EQ(str, "CallAfterRpcRespTest");
}

void Append(const std::string& s) {
str.append(s);
}

private:
std::string str;
};

class MyEchoService : public ::test::EchoService {
void Echo(google::protobuf::RpcController* cntl_base,
const ::test::EchoRequest* req,
::test::EchoResponse* res,
google::protobuf::Closure* done) {
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
std::shared_ptr<CallAfterRpcObject> str_test(new CallAfterRpcObject());
cntl->set_after_rpc_resp_fn(std::bind(&MyEchoService::CallAfterRpc, str_test,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
brpc::ClosureGuard done_guard(done);
if (req->server_fail()) {
cntl->SetFailed(req->server_fail(), "Server fail1");
Expand All @@ -157,6 +180,17 @@ class MyEchoService : public ::test::EchoService {
}
res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
}
static void CallAfterRpc(std::shared_ptr<CallAfterRpcObject> str,
brpc::Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res) {
const test::EchoRequest* request = static_cast<const test::EchoRequest*>(req);
const test::EchoResponse* response = static_cast<const test::EchoResponse*>(res);
str->Append("CallAfterRpcRespTest");
EXPECT_TRUE(nullptr != cntl);
EXPECT_TRUE(nullptr != request);
EXPECT_TRUE(nullptr != response);
}
};

pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;
Expand Down Expand Up @@ -247,7 +281,7 @@ class ChannelTest : public ::testing::Test{
const brpc::Server*,
brpc::MethodStatus*, int64_t>(
&brpc::policy::SendRpcResponse,
meta.correlation_id(), cntl, NULL, res,
meta.correlation_id(), cntl, req, res,
&ts->_dummy, NULL, -1);
ts->_svc.CallMethod(method, cntl, req, res, done);
}
Expand Down Expand Up @@ -1564,7 +1598,7 @@ class ChannelTest : public ::testing::Test{
}
StopAndJoin();
}

void RPCThread(brpc::ChannelBase* channel, bool async) {
brpc::Controller cntl;
test::EchoRequest req;
Expand All @@ -1583,7 +1617,7 @@ class ChannelTest : public ::testing::Test{
test::EchoResponse res;
req.set_message(__FUNCTION__);
CallMethod(channel, &cntl, &req, &res, async);

ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
cntl.Reset();
Expand Down

0 comments on commit e1a8863

Please sign in to comment.