Skip to content

Commit

Permalink
Support user fields of baidu protocol (#2406)
Browse files Browse the repository at this point in the history
* Support user fields of baidu protocol

* Use request_user_fields before RPC
  • Loading branch information
chenBright authored Dec 20, 2023
1 parent 73b307a commit 2098dd3
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ void Controller::ResetNonPods() {
_request_buf.clear();
delete _http_request;
delete _http_response;
delete _request_user_fields;
delete _response_user_fields;
_request_attachment.clear();
_response_attachment.clear();
if (_wpa) {
Expand Down Expand Up @@ -283,6 +285,8 @@ void Controller::ResetPods() {
_idl_result = IDL_VOID_RESULT;
_http_request = NULL;
_http_response = NULL;
_request_user_fields = NULL;
_response_user_fields = NULL;
_request_stream = INVALID_STREAM_ID;
_response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL;
Expand Down
26 changes: 26 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ enum StopStyle {

const int32_t UNSET_MAGIC_NUM = -123456789;

typedef butil::FlatMap<std::string, std::string> UserFieldsMap;

// A Controller mediates a single method call. The primary purpose of
// the controller is to provide a way to manipulate settings per RPC-call
// and to find out about RPC-level errors.
Expand Down Expand Up @@ -255,6 +257,26 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
return tmp;
}

UserFieldsMap* request_user_fields() {
if (!_request_user_fields) {
_request_user_fields = new UserFieldsMap;
_request_user_fields->init(29);
}
return _request_user_fields;
}

bool has_request_user_fields() const { return _request_user_fields; }

UserFieldsMap* response_user_fields() {
if (!_response_user_fields) {
_response_user_fields = new UserFieldsMap;
_response_user_fields->init(29);
}
return _response_user_fields;
}

bool has_response_user_fields() const { return _response_user_fields; }

// User attached data or body of http request, which is wired to network
// directly instead of being serialized into protobuf messages.
butil::IOBuf& request_attachment() { return _request_attachment; }
Expand Down Expand Up @@ -820,6 +842,10 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
HttpHeader* _http_request;
HttpHeader* _http_response;

// User fields of baidu_std protocol.
UserFieldsMap* _request_user_fields;
UserFieldsMap* _response_user_fields;

std::unique_ptr<KVMap> _session_kv;

// Fields with large size but low access frequency
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/policy/baidu_rpc_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ message RpcMeta {
optional int32 attachment_size = 5;
optional ChunkInfo chunk_info = 6;
optional bytes authentication_data = 7;
optional StreamSettings stream_settings = 8;
optional StreamSettings stream_settings = 8;
map<string, string> user_fields = 9;
}

message RpcRequestMeta {
Expand Down
29 changes: 29 additions & 0 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,15 @@ void SendRpcResponse(int64_t correlation_id,
}
}

if (cntl->has_response_user_fields() &&
!cntl->response_user_fields()->empty()) {
::google::protobuf::Map<std::string, std::string>& user_fields
= *meta.mutable_user_fields();
user_fields.insert(cntl->response_user_fields()->begin(),
cntl->response_user_fields()->end());

}

butil::IOBuf res_buf;
SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
if (append_body) {
Expand Down Expand Up @@ -380,6 +389,12 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
accessor.set_remote_stream_settings(meta.release_stream_settings());
}

if (!meta.user_fields().empty()) {
for (const auto& it : meta.user_fields()) {
(*cntl->request_user_fields())[it.first] = it.second;
}
}

// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
Expand Down Expand Up @@ -595,6 +610,13 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
accessor.set_remote_stream_settings(
new StreamSettings(meta.stream_settings()));
}

if (!meta.user_fields().empty()) {
for (const auto& it : meta.user_fields()) {
(*cntl->response_user_fields())[it.first] = it.second;
}
}

Span* span = accessor.span();
if (span) {
span->set_base_real_us(msg->base_real_us());
Expand Down Expand Up @@ -694,6 +716,13 @@ void PackRpcRequest(butil::IOBuf* req_buf,
s->FillSettings(meta.mutable_stream_settings());
}

if (cntl->has_request_user_fields() && !cntl->request_user_fields()->empty()) {
::google::protobuf::Map<std::string, std::string>& user_fields
= *meta.mutable_user_fields();
user_fields.insert(cntl->request_user_fields()->begin(),
cntl->request_user_fields()->end());
}

// Don't use res->ByteSize() since it may be compressed
const size_t req_size = request_body.length();
const size_t attached_size = cntl->request_attachment().length();
Expand Down
36 changes: 36 additions & 0 deletions test/brpc_server_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ bool g_delete = false;
const std::string EXP_REQUEST = "hello";
const std::string EXP_RESPONSE = "world";
const std::string EXP_REQUEST_BASE64 = "aGVsbG8=";
const std::string EXP_USER_FIELD_KEY = "hello";
const std::string EXP_USER_FIELD_VALUE = "world";

class EchoServiceImpl : public test::EchoService {
public:
Expand All @@ -118,6 +120,13 @@ class EchoServiceImpl : public test::EchoService {
} else {
LOG(INFO) << "No sleep, protocol=" << cntl->request_protocol();
}
if (cntl->has_request_user_fields()) {
ASSERT_TRUE(!cntl->request_user_fields()->empty());
std::string* val = cntl->request_user_fields()->seek(EXP_USER_FIELD_KEY);
ASSERT_TRUE(val != NULL);
ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
cntl->response_user_fields()->insert(EXP_USER_FIELD_KEY, EXP_USER_FIELD_VALUE);
}
}

virtual void ComboEcho(google::protobuf::RpcController*,
Expand Down Expand Up @@ -1620,4 +1629,31 @@ TEST_F(ServerTest, max_concurrency) {
stub.Echo(&cntl4, &req, NULL, NULL);
ASSERT_FALSE(cntl4.Failed()) << cntl4.ErrorText();
}

TEST_F(ServerTest, user_fields) {
const int port = 9200;
brpc::Server server;
EchoServiceImpl service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(port, NULL));

brpc::Channel channel;
ASSERT_EQ(0, channel.Init("0.0.0.0", port, NULL));
test::EchoService_Stub stub(&channel);

brpc::Controller cntl;
cntl.request_user_fields()->insert(EXP_USER_FIELD_KEY, EXP_USER_FIELD_VALUE);
test::EchoRequest req;
test::EchoResponse res;
req.set_message("hello");
stub.Echo(&cntl, &req, &res, NULL);

ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.has_response_user_fields());
ASSERT_TRUE(!cntl.response_user_fields()->empty());
std::string* val = cntl.response_user_fields()->seek(EXP_USER_FIELD_KEY);
ASSERT_TRUE(val != NULL);
ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
}

} //namespace

0 comments on commit 2098dd3

Please sign in to comment.