diff --git a/example/baidu_proxy_and_generic_call/CMakeLists.txt b/example/baidu_proxy_and_generic_call/CMakeLists.txt new file mode 100644 index 0000000000..8cc9c0f135 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/CMakeLists.txt @@ -0,0 +1,134 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(baidu_proxy_and_generic_call C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + dl + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii") +endif() + +add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(proxy proxy.cpp) +add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(proxy ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB}) diff --git a/example/baidu_proxy_and_generic_call/client.cpp b/example/baidu_proxy_and_generic_call/client.cpp new file mode 100644 index 0000000000..d8040740a8 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/client.cpp @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server every 1 second. + +#include +#include +#include +#include +#include "echo.pb.h" + +DEFINE_int32(compress_type, 2, "The compress type of request"); +DEFINE_string(attachment, "", "Carry this along with requests"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(proxy_address, "0.0.0.0:8000", "IP Address of proxy"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_proxy_address.c_str(), + FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(&channel); + + // Send a request and wait for the response every 1 second. + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message("hello world"); + cntl.set_request_compress_type((brpc::CompressType)FLAGS_compress_type); + + cntl.set_log_id(log_id++); // set by user + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl.request_attachment().append(FLAGS_attachment); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + LOG(INFO) << "Received response from " << cntl.remote_side() + << " to " << cntl.local_side() + << ": " << response.message() + << ", response compress type=" << cntl.response_compress_type() + << ", attached=" << cntl.response_attachment() + << ", latency=" << cntl.latency_us() << "us"; + } else { + LOG(WARNING) << cntl.ErrorText(); + } + usleep(FLAGS_interval_ms * 1000L); + } + + LOG(INFO) << "EchoClient is going to quit"; + return 0; +} diff --git a/example/baidu_proxy_and_generic_call/echo.proto b/example/baidu_proxy_and_generic_call/echo.proto new file mode 100644 index 0000000000..2b39627fe8 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/echo.proto @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto2"; +package example; + +option cc_generic_services = true; + +message EchoRequest { + required string message = 1; +}; + +message EchoResponse { + required string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +}; diff --git a/example/baidu_proxy_and_generic_call/proxy.cpp b/example/baidu_proxy_and_generic_call/proxy.cpp new file mode 100644 index 0000000000..81e1176ba2 --- /dev/null +++ b/example/baidu_proxy_and_generic_call/proxy.cpp @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// todo +// A proxy to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include +#include +#include +#include + +DEFINE_int32(port, 8000, "TCP Port of this server"); +DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS." + " If this is set, the flag port will be ignored"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server_address, "0.0.0.0:8001", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); + +// Your implementation of example::EchoService +// Notice that implementing brpc::Describable grants the ability to put +// additional information in /status. +namespace example { +class BaiduMasterServiceImpl : public brpc::BaiduMasterService { +public: + void ProcessRpcRequest(brpc::Controller* cntl, + const brpc::SerializedRequest* request, + brpc::SerializedResponse* response, + ::google::protobuf::Closure* done) override { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server_address.c_str(), + FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + (*cntl->response_user_fields())["x-bd-proxy-error-code"] = + butil::IntToString(brpc::EINTERNAL); + (*cntl->response_user_fields())["x-bd-proxy-error-text"] = + "Fail to initialize channel"; + return; + } + + LOG(INFO) << "Received request[log_id=" << cntl->log_id() + << "] from " << cntl->remote_side() + << " to " << cntl->local_side() + << ", serialized request size=" << request->serialized_data().size() + << ", request compress type=" << cntl->request_compress_type() + << " (attached=" << cntl->request_attachment() << ")"; + + brpc::Controller call_cntl; + call_cntl.set_log_id(cntl->log_id()); + call_cntl.request_attachment().swap(cntl->request_attachment()); + call_cntl.set_request_compress_type(cntl->request_compress_type()); + call_cntl.reset_sampled_request(cntl->release_sampled_request()); + // It is ok to use request and response for sync rpc. + channel.CallMethod(NULL, &call_cntl, request, response, NULL); + (*cntl->response_user_fields())["x-bd-proxy-error-code"] = + butil::IntToString(call_cntl.ErrorCode()); + if (call_cntl.Failed()) { + (*cntl->response_user_fields())["x-bd-proxy-error-text"] = + call_cntl.ErrorText(); + LOG(ERROR) << "Fail to call service=" << call_cntl.sampled_request()->meta.service_name() + << ", method=" << call_cntl.sampled_request()->meta.method_name() + << ", error_code=" << call_cntl.ErrorCode() + << ", error_text=" << call_cntl.ErrorCode(); + return; + } else { + LOG(INFO) << "Received response from " << call_cntl.remote_side() + << " to " << call_cntl.local_side() + << ", serialized response size=" << response->serialized_data().size() + << ", response compress type=" << call_cntl.response_compress_type() + << ", attached=" << call_cntl.response_attachment() + << ", latency=" << call_cntl.latency_us() << "us"; + } + cntl->response_attachment().swap(call_cntl.response_attachment()); + cntl->set_response_compress_type(call_cntl.response_compress_type()); + } +}; +} // namespace example + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + butil::EndPoint point; + if (!FLAGS_listen_addr.empty()) { + if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) { + LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr; + return -1; + } + } else { + point = butil::EndPoint(butil::IP_ANY, FLAGS_port); + } + // Start the server. + brpc::ServerOptions options; + // Add the baidu master service into server. + // Notice new operator, because server will delete it in dtor of Server. + options.baidu_master_service = new example::BaiduMasterServiceImpl(); + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(point, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/example/baidu_proxy_and_generic_call/server.cpp b/example/baidu_proxy_and_generic_call/server.cpp new file mode 100644 index 0000000000..b3f161736e --- /dev/null +++ b/example/baidu_proxy_and_generic_call/server.cpp @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include +#include "echo.pb.h" + +DEFINE_int32(compress_type, 2, "The compress type of response"); +DEFINE_bool(echo_attachment, true, "Echo attachment as well"); +DEFINE_int32(port, 8001, "TCP Port of this server"); +DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS." + " If this is set, the flag port will be ignored"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); + +// Your implementation of example::EchoService +// Notice that implementing brpc::Describable grants the ability to put +// additional information in /status. +namespace example { +class EchoServiceImpl : public EchoService { +public: + EchoServiceImpl() = default; + ~EchoServiceImpl() override = default; + void Echo(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, + EchoResponse* response, + google::protobuf::Closure* done) override { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + auto cntl = static_cast(cntl_base); + + // The purpose of following logs is to help you to understand + // how clients interact with servers more intuitively. You should + // remove these logs in performance-sensitive servers. + LOG(INFO) << "Received request[log_id=" << cntl->log_id() + << "] from " << cntl->remote_side() + << " to " << cntl->local_side() + << ": " << request->message() + << ", request compress type=" << cntl->request_compress_type() + << ", attached=" << cntl->request_attachment(); + + // Fill response. + response->set_message(request->message()); + cntl->set_response_compress_type((brpc::CompressType)FLAGS_compress_type); + + // You can compress the response by setting Controller, but be aware + // that compression may be costly, evaluate before turning on. + // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP); + + if (FLAGS_echo_attachment) { + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl->response_attachment().append(cntl->request_attachment()); + } + } +}; +} // namespace example + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + example::EchoServiceImpl echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + butil::EndPoint point; + if (!FLAGS_listen_addr.empty()) { + if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) { + LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr; + return -1; + } + } else { + point = butil::EndPoint(butil::IP_ANY, FLAGS_port); + } + // Start the server. + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(point, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 1d9b1bb968..98e25ae2c6 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1427,6 +1427,12 @@ void Controller::reset_sampled_request(SampledRequest* req) { _sampled_request = req; } +SampledRequest* Controller::release_sampled_request() { + SampledRequest* saved_sampled_request = _sampled_request; + _sampled_request = NULL; + return saved_sampled_request; +} + void Controller::set_stream_creator(StreamCreator* sc) { if (_stream_creator) { LOG(FATAL) << "A StreamCreator has been set previously"; diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 813afce233..5b2132b4f2 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -305,7 +305,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Get/own SampledRequest for sending dumped requests. // Deleted along with controller. void reset_sampled_request(SampledRequest* req); - const SampledRequest* sampled_request() { return _sampled_request; } + const SampledRequest* sampled_request() const { return _sampled_request; } + SampledRequest* release_sampled_request(); + // Attach a StreamCreator to this RPC. Notice that the ownership of sc has // been transferred to cntl, and sc->DestroyStreamCreator() would be called diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 1c412eda37..6ce76467a3 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -476,6 +476,22 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { cntl->SetFailed(ENOMEM, "Fail to get sampled_request"); break; } + sampled_request->meta.set_service_name(request_meta.service_name()); + sampled_request->meta.set_method_name(request_meta.method_name()); + cntl->reset_sampled_request(sampled_request); + // Switch to service-specific error. + non_service_error.release(); + method_status = server->options().baidu_master_service->_status; + if (method_status) { + int rejected_cc = 0; + if (!method_status->OnRequested(&rejected_cc, cntl.get())) { + cntl->SetFailed( + ELIMIT, + "Rejected by %s's ConcurrencyLimiter, concurrency=%d", + butil::class_name(), rejected_cc); + break; + } + } if (span) { span->ResetServerSpanName(sampled_request->meta.method_name()); } @@ -490,19 +506,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { if (!msg->payload.empty()) { cntl->request_attachment().swap(msg->payload); } - // Switch to service-specific error. - non_service_error.release(); - method_status = server->options().baidu_master_service->_status; - if (method_status) { - int rejected_cc = 0; - if (!method_status->OnRequested(&rejected_cc, cntl.get())) { - cntl->SetFailed( - ELIMIT, - "Rejected by %s's ConcurrencyLimiter, concurrency=%d", - butil::class_name(), rejected_cc); - break; - } - } } else { // NOTE(gejun): jprotobuf sends service names without packages. So the // name should be changed to full when it's not. diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h index ec662b463d..1f2da79afe 100644 --- a/src/butil/memory/scope_guard.h +++ b/src/butil/memory/scope_guard.h @@ -78,7 +78,7 @@ ScopeGuard MakeScopeGuard(Callback&& callback) noexcept { } namespace internal { -// for BAIDU_SCOPE_EXIT. +// for BRPC_SCOPE_EXIT. enum class ScopeExitHelper {}; template diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 16b072e9fc..cc98f11154 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -1666,6 +1666,12 @@ class BaiduMasterServiceImpl : public brpc::BaiduMasterService { // This object helps you to call done->Run() in RAII style. If you need // to process the request asynchronously, pass done_guard.release(). brpc::ClosureGuard done_guard(done); + ASSERT_NE(nullptr, cntl->sampled_request()); + ASSERT_TRUE(cntl->sampled_request()->meta.has_service_name()); + ASSERT_EQ(test::EchoService::descriptor()->full_name(), + cntl->sampled_request()->meta.service_name()); + ASSERT_TRUE(cntl->sampled_request()->meta.has_method_name()); + ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name()); test::EchoRequest echo_request; test::EchoResponse echo_response; brpc::CompressType type = cntl->request_compress_type(); @@ -1741,12 +1747,11 @@ TEST_F(ServerTest, generic_call) { req, &serialized_request.serialized_data(), type)); cntl.request_attachment().append(EXP_REQUEST); cntl.set_request_compress_type(type); - brpc::SampledRequest* sampled_request = new (std::nothrow) brpc::SampledRequest(); + auto sampled_request = new (std::nothrow) brpc::SampledRequest(); sampled_request->meta.set_service_name( test::EchoService::descriptor()->full_name()); sampled_request->meta.set_method_name( test::EchoService::descriptor()->FindMethodByName("Echo")->name()); - // sampled_request->meta.set_compress_type(cntl.request_compress_type()); cntl.reset_sampled_request(sampled_request); chan.CallMethod(NULL, &cntl, &serialized_request, &serialized_response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp index e0da1af14f..14d150a767 100644 --- a/test/endpoint_unittest.cpp +++ b/test/endpoint_unittest.cpp @@ -494,7 +494,7 @@ TEST(EndPointTest, tcp_connect) { } { butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1)); - ASSERT_EQ(-1, sockfd); + ASSERT_EQ(-1, sockfd) << "errno=" << errno; ASSERT_EQ(ETIMEDOUT, errno); }