Skip to content

Commit

Permalink
Support arena rpc pb message factory (#2751)
Browse files Browse the repository at this point in the history
* Support arena rpc pb message factory

* Update server document

* Fix comment
  • Loading branch information
chenBright authored Sep 8, 2024
1 parent cdc8101 commit 9643150
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 35 deletions.
20 changes: 18 additions & 2 deletions docs/cn/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ public:
Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类,通过`new`来创建请求/响应message和`delete`来销毁请求/响应message。
如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并通过`ServerOptions.rpc_pb_message_factory`。
如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并设置`ServerOptions.rpc_pb_message_factory`为自定义的`RpcPBMessageFactory`。注意:server启动后,server拥有了`RpcPBMessageFactory`的所有权
接口如下:
Expand All @@ -1027,20 +1027,36 @@ Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};
// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetResponsePrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* messages) = 0;
};
```

### Protobuf arena

Protobuf arena是一种Protobuf message内存管理机制,有着提高内存分配效率、减少内存碎片、对缓存友好等优点。详细信息见[C++ Arena Allocation Guide](https://protobuf.dev/reference/cpp/arenas/)

如果用户希望使用protobuf arena来管理Protobuf message内存,可以设置`ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();`,使用默认的`start_block_size`(256 bytes)和`max_block_size`(8192 bytes)来创建arena。用户可以调用`brpc::GetArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();`自定义arena大小。

注意:从Protobuf v3.14.0开始,[默认开启arena](https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0https://github.com/protocolbuffers/protobuf/releases/tag/v3.14.0)。但是Protobuf v3.14.0之前的版本,用户需要再proto文件中加上选项:`option cc_enable_arenas = true;`,所以为了兼容性,可以统一都加上该选项。

# FAQ

### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF是什么意思
Expand Down
43 changes: 43 additions & 0 deletions docs/en/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,49 @@ public:
...
```
## RPC Protobuf message factory
`DefaultRpcPBMessageFactory' is used at server-side by default. It is a simple factory class that uses `new' to create request/response messages and `delete' to destroy request/response messages. Currently, the baidu_std protocol and HTTP protocol support this feature.
Users can implement `RpcPBMessages' (encapsulation of request/response message) and `RpcPBMessageFactory' (factory class) to customize the creation and destruction mechanism of protobuf message, and then set to `ServerOptions.rpc_pb_message_factory`. Note: After the server is started, the server owns the `RpcPBMessageFactory`.
The interface is as follows:
```c++
// Inherit this class to customize rpc protobuf messages,
// include request and response.
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};
// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetResponsePrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* protobuf_message) = 0;
};
```

### Protobuf arena

Protobuf arena is a Protobuf message memory management mechanism with the advantages of improving memory allocation efficiency, reducing memory fragmentation, and being cache-friendly. For more information, see [C++ Arena Allocation Guide](https://protobuf.dev/reference/cpp/arenas/).

Users can set `ServerOptions.rpc_pb_message_factory = brpc::GetArenaRpcPBMessageFactory();` to manage Protobuf message memory, with the default `start_block_size` (256 bytes) and `max_block_size` (8192 bytes). Alternatively, users can use `brpc::GetArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();` to customize the arena size.

Note: Since Protocol Buffers v3.14.0, Arenas are now unconditionally enabled. However, for versions prior to Protobuf v3.14.0, users need to add the option `option cc_enable_arenas = true;` to the proto file. so for compatibility, this option can be added uniformly.

# FAQ

### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got EOF
Expand Down
36 changes: 23 additions & 13 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,30 +710,33 @@ friend class HttpResponseSenderAsDone;
HttpResponseSender()
: HttpResponseSender(NULL) {}
explicit HttpResponseSender(Controller* cntl/*own*/)
: _cntl(cntl), _method_status(NULL), _received_us(0), _h2_stream_id(-1) {}
: _cntl(cntl)
, _messages(NULL)
, _method_status(NULL)
, _received_us(0)
, _h2_stream_id(-1) {}

HttpResponseSender(HttpResponseSender&& s) noexcept
: _cntl(std::move(s._cntl))
, _req(std::move(s._req))
, _res(std::move(s._res))
, _messages(s._messages)
, _method_status(s._method_status)
, _received_us(s._received_us)
, _h2_stream_id(s._h2_stream_id) {
s._messages = NULL;
s._method_status = NULL;
s._received_us = 0;
s._h2_stream_id = -1;
}
~HttpResponseSender();

void own_request(google::protobuf::Message* req) { _req.reset(req); }
void own_response(google::protobuf::Message* res) { _res.reset(res); }
void set_messages(RpcPBMessages* messages) { _messages = messages; }
void set_method_status(MethodStatus* ms) { _method_status = ms; }
void set_received_us(int64_t t) { _received_us = t; }
void set_h2_stream_id(int id) { _h2_stream_id = id; }

private:
std::unique_ptr<Controller, LogErrorTextAndDelete> _cntl;
std::unique_ptr<google::protobuf::Message> _req;
std::unique_ptr<google::protobuf::Message> _res;
RpcPBMessages* _messages;
MethodStatus* _method_status;
int64_t _received_us;
int _h2_stream_id;
Expand All @@ -743,7 +746,8 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
explicit HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override {
_sender._cntl->CallAfterRpcResp(_sender._req.get(), _sender._res.get());
_sender._cntl->CallAfterRpcResp(
_sender._messages->Request(), _sender._messages->Response());
delete this;
}

Expand All @@ -752,6 +756,12 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure {
};

HttpResponseSender::~HttpResponseSender() {
// Return messages to factory at the end.
BRPC_SCOPE_EXIT {
if (NULL != _messages) {
_cntl->server()->options().rpc_pb_message_factory->Return(_messages);
}
};
Controller* cntl = _cntl.get();
if (cntl == NULL) {
return;
Expand All @@ -763,7 +773,7 @@ HttpResponseSender::~HttpResponseSender() {
}
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
Socket* socket = accessor.get_sending_socket();
const google::protobuf::Message* res = _res.get();
const google::protobuf::Message* res = NULL != _messages ? _messages->Response() : NULL;

if (cntl->IsCloseConnection()) {
socket->SetFailed();
Expand Down Expand Up @@ -1488,10 +1498,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
google::protobuf::Message* req = svc->GetRequestPrototype(method).New();
resp_sender.own_request(req);
google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
resp_sender.own_response(res);
RpcPBMessages* messages = server->options().rpc_pb_message_factory->Get(*svc, *method);;
resp_sender.set_messages(messages);
google::protobuf::Message* req = messages->Request();
google::protobuf::Message* res = messages->Response();

if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
Expand Down
1 change: 0 additions & 1 deletion src/brpc/rpc_pb_message_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

#include "brpc/rpc_pb_message_factory.h"
#include "butil/object_pool.h"

namespace brpc {

Expand Down
87 changes: 85 additions & 2 deletions src/brpc/rpc_pb_message_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
#include <google/protobuf/arena.h>
#include "butil/object_pool.h"

namespace brpc {

Expand All @@ -29,17 +31,24 @@ namespace brpc {
class RpcPBMessages {
public:
virtual ~RpcPBMessages() = default;
// Get protobuf request message.
virtual google::protobuf::Message* Request() = 0;
// Get protobuf response message.
virtual google::protobuf::Message* Response() = 0;
};

// Factory to manage `RpcPBMessages'.
class RpcPBMessageFactory {
public:
virtual ~RpcPBMessageFactory() = default;
// Get `RpcPBMessages' according to `service' and `method'.
// Common practice to create protobuf message:
// service.GetRequestPrototype(&method).New() -> request;
// service.GetResponsePrototype(&method).New() -> response.
virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) = 0;
virtual void Return(RpcPBMessages* protobuf_message) = 0;
// Return `RpcPBMessages' to factory.
virtual void Return(RpcPBMessages* messages) = 0;
};

class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
Expand All @@ -49,6 +58,80 @@ class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
void Return(RpcPBMessages* messages) override;
};

namespace internal {

// Allocate protobuf message from arena.
// The arena is created with `StartBlockSize' and `MaxBlockSize' options.
// For more details, see `google::protobuf::ArenaOptions'.
template<size_t StartBlockSize, size_t MaxBlockSize>
struct ArenaRpcPBMessages : public RpcPBMessages {
struct ArenaOptionsWrapper {
public:
ArenaOptionsWrapper() {
options.start_block_size = StartBlockSize;
options.max_block_size = MaxBlockSize;
}

private:
friend struct ArenaRpcPBMessages;
::google::protobuf::ArenaOptions options;
};

explicit ArenaRpcPBMessages(ArenaOptionsWrapper options_wrapper)
: arena(options_wrapper.options)
, request(NULL)
, response(NULL) {}

::google::protobuf::Message* Request() override { return request; }
::google::protobuf::Message* Response() override { return response; }

::google::protobuf::Arena arena;
::google::protobuf::Message* request;
::google::protobuf::Message* response;
};

template<size_t StartBlockSize, size_t MaxBlockSize>
class ArenaRpcPBMessageFactory : public RpcPBMessageFactory {
typedef ::brpc::internal::ArenaRpcPBMessages<StartBlockSize, MaxBlockSize>
ArenaRpcPBMessages;
public:
ArenaRpcPBMessageFactory() {
_arena_options.start_block_size = StartBlockSize;
_arena_options.max_block_size = MaxBlockSize;
}

RpcPBMessages* Get(const ::google::protobuf::Service& service,
const ::google::protobuf::MethodDescriptor& method) override {
typename ArenaRpcPBMessages::ArenaOptionsWrapper options_wrapper;
auto messages = butil::get_object<ArenaRpcPBMessages>(options_wrapper);
messages->request = service.GetRequestPrototype(&method).New(&messages->arena);
messages->response = service.GetResponsePrototype(&method).New(&messages->arena);
return messages;
}

void Return(RpcPBMessages* messages) override {
auto arena_messages = static_cast<ArenaRpcPBMessages*>(messages);
arena_messages->request = NULL;
arena_messages->response = NULL;
butil::return_object(arena_messages);
}

private:
::google::protobuf::ArenaOptions _arena_options;
};

}

template<size_t StartBlockSize, size_t MaxBlockSize>
RpcPBMessageFactory* GetArenaRpcPBMessageFactory() {
return new ::brpc::internal::ArenaRpcPBMessageFactory<StartBlockSize, MaxBlockSize>();
}

BUTIL_FORCE_INLINE RpcPBMessageFactory* GetArenaRpcPBMessageFactory() {
// Default arena options, same as `google::protobuf::ArenaOptions'.
return GetArenaRpcPBMessageFactory<256, 8192>();
}

} // namespace brpc

#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H
#endif // BRPC_RPC_PB_MESSAGE_FACTORY_H
Loading

0 comments on commit 9643150

Please sign in to comment.