From 24fc31eaa56d8b522975ca41ed012faf6cf77438 Mon Sep 17 00:00:00 2001 From: "Yang,Liming" Date: Mon, 26 Feb 2024 10:46:43 +0800 Subject: [PATCH] span for new bthread (#2519) --- example/rpcz_echo_c++/CMakeLists.txt | 141 ++++++++++++++++++++++ example/rpcz_echo_c++/client.cpp | 96 +++++++++++++++ example/rpcz_echo_c++/echo.proto | 33 ++++++ example/rpcz_echo_c++/server.cpp | 171 +++++++++++++++++++++++++++ src/brpc/builtin/rpcz_service.cpp | 19 ++- src/brpc/global.cpp | 8 ++ src/brpc/span.cpp | 80 ++++++++++--- src/brpc/span.h | 14 ++- src/brpc/span.proto | 1 + src/bthread/bthread.cpp | 9 ++ src/bthread/task_group.cpp | 13 +- src/bthread/unstable.h | 3 + test/brpc_alpn_protocol_unittest.cpp | 2 +- test/bthread_unittest.cpp | 51 +++++++- 14 files changed, 616 insertions(+), 25 deletions(-) create mode 100644 example/rpcz_echo_c++/CMakeLists.txt create mode 100644 example/rpcz_echo_c++/client.cpp create mode 100644 example/rpcz_echo_c++/echo.proto create mode 100644 example/rpcz_echo_c++/server.cpp diff --git a/example/rpcz_echo_c++/CMakeLists.txt b/example/rpcz_echo_c++/CMakeLists.txt new file mode 100644 index 0000000000..53c1669028 --- /dev/null +++ b/example/rpcz_echo_c++/CMakeLists.txt @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(rpcz_echo_c++ C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto) +# include PROTO_HEADER +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +execute_process( + COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS +) +if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE") + execute_process( + COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'" + OUTPUT_VARIABLE GFLAGS_NS + ) +endif() +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}") +set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + ${THRIFT_LIB} + dl + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii") +endif() + +add_executable(rpcz_echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}) +add_executable(rpcz_echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) + +target_link_libraries(rpcz_echo_client ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(rpcz_echo_server ${BRPC_LIB} ${DYNAMIC_LIB}) + diff --git a/example/rpcz_echo_c++/client.cpp b/example/rpcz_echo_c++/client.cpp new file mode 100644 index 0000000000..ad80de0f50 --- /dev/null +++ b/example/rpcz_echo_c++/client.cpp @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server every 1 second. + +#include +#include +#include +#include +#include "echo.pb.h" + +namespace brpc { +DECLARE_bool(enable_rpcz); +} +DEFINE_string(attachment, "", "Carry this along with requests"); +DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8000", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // brpc::FLAGS_enable_rpcz = true; + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(&channel); + + // Send a request and wait for the response every 1 second. + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message("hello world"); + + cntl.set_log_id(log_id ++); // set by user + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl.request_attachment().append(FLAGS_attachment); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + LOG(INFO) << "Received response from " << cntl.remote_side() + << " to " << cntl.local_side() + << ": " << response.message() << " (attached=" + << cntl.response_attachment() << ")" + << " latency=" << cntl.latency_us() << "us"; + } else { + LOG(WARNING) << cntl.ErrorText(); + } + usleep(FLAGS_interval_ms * 1000L); + } + + LOG(INFO) << "EchoClient is going to quit"; + return 0; +} + diff --git a/example/rpcz_echo_c++/echo.proto b/example/rpcz_echo_c++/echo.proto new file mode 100644 index 0000000000..2b39627fe8 --- /dev/null +++ b/example/rpcz_echo_c++/echo.proto @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax="proto2"; +package example; + +option cc_generic_services = true; + +message EchoRequest { + required string message = 1; +}; + +message EchoResponse { + required string message = 1; +}; + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); +}; diff --git a/example/rpcz_echo_c++/server.cpp b/example/rpcz_echo_c++/server.cpp new file mode 100644 index 0000000000..0f5f7f26f5 --- /dev/null +++ b/example/rpcz_echo_c++/server.cpp @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include +#include +#include "echo.pb.h" + +DEFINE_bool(echo_attachment, true, "Echo attachment as well"); +DEFINE_int32(port, 8000, "TCP Port of this server"); +DEFINE_string(listen_addr, "", + "Server listen address, may be IPV4/IPV6/UDS." + " If this is set, the flag port will be ignored"); +DEFINE_int32(idle_timeout_s, -1, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); +DEFINE_string(server, "0.0.0.0:8001", "IP Address of server"); +DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +// Your implementation of example::EchoService +// Notice that implementing brpc::Describable grants the ability to put +// additional information in /status. +namespace example { + +static const bthread_attr_t BTHREAD_ATTR_NORMAL_WITH_SPAN = { + BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL, BTHREAD_TAG_INVALID}; + +void* RunThreadFunc(void*) { + TRACEPRINTF("RunThreadFunc %lu", bthread_self()); + // brpc::FLAGS_enable_rpcz = true; + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_server.c_str(), "", &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return nullptr; + } + example::EchoService_Stub stub(&channel); + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + request.set_message("hello world"); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + LOG(INFO) << "Received response from " << cntl.remote_side() << " to " << cntl.local_side() + << ": " << response.message() << " (attached=" << cntl.response_attachment() + << ")" + << " latency=" << cntl.latency_us() << "us"; + } else { + LOG(WARNING) << cntl.ErrorText(); + } + + return nullptr; +} + +class EchoServiceImpl : public EchoService { +public: + EchoServiceImpl() {} + virtual ~EchoServiceImpl() {} + virtual void Echo(google::protobuf::RpcController* cntl_base, const EchoRequest* request, + EchoResponse* response, google::protobuf::Closure* done) { + bthread_list_t list; + bthread_list_init(&list, 0, 0); + for (int i = 0; i < 2; ++i) { + bthread_t tid; + bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL_WITH_SPAN, RunThreadFunc, nullptr); + bthread_list_add(&list, tid); + } + bthread_list_join(&list); + + TRACEPRINTF("Handle request"); + + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + brpc::Controller* cntl = static_cast(cntl_base); + + // The purpose of following logs is to help you to understand + // how clients interact with servers more intuitively. You should + // remove these logs in performance-sensitive servers. + LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from " + << cntl->remote_side() << " to " << cntl->local_side() << ": " + << request->message() << " (attached=" << cntl->request_attachment() << ")"; + + // Fill response. + response->set_message(request->message()); + + // You can compress the response by setting Controller, but be aware + // that compression may be costly, evaluate before turning on. + // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP); + + if (FLAGS_echo_attachment) { + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl->response_attachment().append(cntl->request_attachment()); + } + } +}; +} // namespace example + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + example::EchoServiceImpl echo_service_impl; + + // Add the service into server. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + butil::EndPoint point; + if (!FLAGS_listen_addr.empty()) { + if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) { + LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr; + return -1; + } + } else { + point = butil::EndPoint(butil::IP_ANY, FLAGS_port); + } + // Start the server. + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + if (server.Start(point, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; +} diff --git a/src/brpc/builtin/rpcz_service.cpp b/src/brpc/builtin/rpcz_service.cpp index 3625c0684b..b9e056376a 100644 --- a/src/brpc/builtin/rpcz_service.cpp +++ b/src/brpc/builtin/rpcz_service.cpp @@ -309,6 +309,17 @@ static void PrintClientSpan(std::ostream& os,const RpczSpan& span, PrintClientSpan(os, span, &last_time, NULL, use_html); } +static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* last_time, + SpanInfoExtractor* server_extr, bool use_html) { + SpanInfoExtractor client_extr(span.info().c_str()); + int num_extr = 0; + SpanInfoExtractor* extr[2]; + if (server_extr) { + extr[num_extr++] = server_extr; + } + extr[num_extr++] = &client_extr; + PrintAnnotations(os, std::numeric_limits::max(), last_time, extr, num_extr); +} static void PrintServerSpan(std::ostream& os, const RpczSpan& span, bool use_html) { @@ -351,8 +362,12 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span, const int nclient = span.client_spans_size(); for (int i = 0; i < nclient; ++i) { - PrintClientSpan(os, span.client_spans(i), &last_time, - &server_extr, use_html); + auto& client_span = span.client_spans(i); + if (client_span.type() == SPAN_TYPE_CLIENT) { + PrintClientSpan(os, client_span, &last_time, &server_extr, use_html); + } else { + PrintBthreadSpan(os, client_span, &last_time, &server_extr, use_html); + } } if (PrintAnnotationsAndRealTimeSpan( diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index fbd669e774..121b9d9776 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -50,6 +50,11 @@ #include "brpc/policy/hasher.h" #include "brpc/policy/dynpart_load_balancer.h" + +// Span +#include "brpc/span.h" +#include "bthread/unstable.h" + // Compress handlers #include "brpc/compress.h" #include "brpc/policy/gzip_compress.h" @@ -329,6 +334,9 @@ static void GlobalInitializeOrDieImpl() { // Make GOOGLE_LOG print to comlog device SetLogHandler(&BaiduStreamingLogHandler); + // Set bthread create span function + bthread_set_create_span_func(CreateBthreadSpan); + // Setting the variable here does not work, the profiler probably check // the variable before main() for only once. // setenv("TCMALLOC_SAMPLE_PARAMETER", "524288", 0); diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp index 6e0e64afef..356a7cd039 100644 --- a/src/brpc/span.cpp +++ b/src/brpc/span.cpp @@ -121,6 +121,7 @@ Span* Span::CreateClientSpan(const std::string& full_method_name, span->_start_send_real_us = 0; span->_sent_real_us = 0; span->_next_client = NULL; + span->_client_list = NULL; span->_tls_next = NULL; span->_full_method_name = full_method_name; span->_info.clear(); @@ -129,8 +130,8 @@ Span* Span::CreateClientSpan(const std::string& full_method_name, span->_trace_id = parent->trace_id(); span->_parent_span_id = parent->span_id(); span->_local_parent = parent; - span->_next_client = parent->_next_client; - parent->_next_client = span; + span->_next_client = parent->_client_list; + parent->_client_list = span; } else { span->_trace_id = GenerateTraceId(); span->_parent_span_id = 0; @@ -140,6 +141,47 @@ Span* Span::CreateClientSpan(const std::string& full_method_name, return span; } +Span* Span::CreateBthreadSpan(const std::string& full_method_name, + int64_t base_real_us) { + Span* parent = (Span*)bthread::tls_bls.rpcz_parent_span; + if (parent == NULL) { + return NULL; + } + Span* span = butil::get_object(Forbidden()); + if (__builtin_expect(span == NULL, 0)) { + return NULL; + } + span->_log_id = 0; + span->_base_cid = INVALID_BTHREAD_ID; + span->_ending_cid = INVALID_BTHREAD_ID; + span->_type = SPAN_TYPE_BTHREAD; + span->_async = false; + span->_protocol = PROTOCOL_UNKNOWN; + span->_error_code = 0; + span->_request_size = 0; + span->_response_size = 0; + span->_base_real_us = base_real_us; + span->_received_real_us = 0; + span->_start_parse_real_us = 0; + span->_start_callback_real_us = 0; + span->_start_send_real_us = 0; + span->_sent_real_us = 0; + span->_next_client = NULL; + span->_client_list = NULL; + span->_tls_next = NULL; + span->_full_method_name = full_method_name; + span->_info.clear(); + + span->_trace_id = parent->trace_id(); + span->_parent_span_id = parent->span_id(); + span->_local_parent = parent; + span->_next_client = parent->_client_list; + parent->_client_list = span; + + span->_span_id = GenerateSpanId(); + return span; +} + inline const std::string& unknown_span_name() { // thread-safe in gcc. static std::string s_unknown_method_name = "unknown_method"; @@ -173,6 +215,7 @@ Span* Span::CreateServerSpan( span->_start_send_real_us = 0; span->_sent_real_us = 0; span->_next_client = NULL; + span->_client_list = NULL; span->_tls_next = NULL; span->_full_method_name = (!full_method_name.empty() ? full_method_name : unknown_span_name()); @@ -195,15 +238,20 @@ void Span::ResetServerSpanName(const std::string& full_method_name) { void Span::destroy() { EndAsParent(); - Span* p = _next_client; - while (p) { - Span* p_next = p->_next_client; - p->_info.clear(); - butil::return_object(p); - p = p_next; + traversal(this, [](Span* r) { + r->_info.clear(); + butil::return_object(r); + }); +} + +void Span::traversal(Span* r, const std::function& f) const { + if (r == NULL) { + return; } - _info.clear(); - butil::return_object(this); + for (auto p = r->_client_list; p != NULL; p = p->_next_client) { + traversal(p, f); + } + f(r); } void Span::Annotate(const char* fmt, ...) { @@ -243,8 +291,8 @@ void Span::AnnotateCStr(const char* info, size_t length) { size_t Span::CountClientSpans() const { size_t n = 0; - for (Span* p = _next_client; p; p = p->_next_client, ++n); - return n; + traversal(const_cast(this), [&](Span*) { ++n; }); + return n - 1; } int64_t Span::GetStartRealTimeUs() const { @@ -577,9 +625,13 @@ leveldb::Status SpanDB::Index(const Span* span, std::string* value_buf) { value_proto.add_client_spans(); } size_t i = 0; - for (const Span* p = span->_next_client; p; p = p->_next_client, ++i) { + span->traversal(const_cast(span), [&](Span* p) { + if (span == p) { + return; + } Span2Proto(p, value_proto.mutable_client_spans(client_span_count - i - 1)); - } + ++i; + }); if (!value_proto.SerializeToString(value_buf)) { return leveldb::Status::InvalidArgument( leveldb::Slice("Fail to serialize RpczSpan")); diff --git a/src/brpc/span.h b/src/brpc/span.h index 43ede3d5eb..4427340592 100644 --- a/src/brpc/span.h +++ b/src/brpc/span.h @@ -69,6 +69,10 @@ friend class SpanDB; static Span* CreateClientSpan(const std::string& full_method_name, int64_t base_real_us); + // Create a span to track start bthread + static Span* CreateBthreadSpan(const std::string& full_method_name, + int64_t base_real_us); + static void Submit(Span* span, int64_t cpuwide_time_us); // Set tls parent. @@ -82,7 +86,7 @@ friend class SpanDB; void Annotate(const std::string& info); // When length <= 0, use strlen instead. void AnnotateCStr(const char* cstr, size_t length); - + // #child spans, Not O(1) size_t CountClientSpans() const; @@ -142,6 +146,7 @@ friend class SpanDB; void dump_and_destroy(size_t round_index); void destroy(); + void traversal(Span*, const std::function&) const; bvar::CollectorSpeedLimit* speed_limit(); bvar::CollectorPreprocessor* preprocessor(); @@ -179,6 +184,7 @@ friend class SpanDB; Span* _local_parent; Span* _next_client; + Span* _client_list; Span* _tls_next; }; @@ -233,6 +239,12 @@ inline bool IsTraceable(bool is_upstream_traced) { (FLAGS_enable_rpcz && bvar::is_collectable(&g_span_sl)); } +inline void* CreateBthreadSpan() { + const int64_t received_us = butil::cpuwide_time_us(); + const int64_t base_realtime = butil::gettimeofday_us() - received_us; + return Span::CreateBthreadSpan("Bthread", base_realtime); +} + } // namespace brpc diff --git a/src/brpc/span.proto b/src/brpc/span.proto index a77a9cf62b..d37a53e209 100644 --- a/src/brpc/span.proto +++ b/src/brpc/span.proto @@ -25,6 +25,7 @@ option java_outer_classname="Span"; enum SpanType { SPAN_TYPE_SERVER = 0; SPAN_TYPE_CLIENT = 1; + SPAN_TYPE_BTHREAD = 2; } // We don't unify RpczSpan and TracingSpan as one because the former one needs diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 608c1b5883..07233e6048 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -83,6 +83,7 @@ TaskControl* g_task_control = NULL; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; extern void (*g_worker_startfn)(); extern void (*g_tagged_worker_startfn)(bthread_tag_t); +extern void* (*g_create_span_func)(); inline TaskControl* get_task_control() { return g_task_control; @@ -489,6 +490,14 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) { return 0; } +int bthread_set_create_span_func(void* (*func)()) { + if (func == NULL) { + return EINVAL; + } + bthread::g_create_span_func = func; + return 0; +} + void bthread_stop_world() { bthread::TaskControl* c = bthread::get_task_control(); if (c != NULL) { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 46bd2460ee..45bd89edaa 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -76,6 +76,15 @@ const size_t OFFSET_TABLE[] = { #include "bthread/offset_inl.list" }; +void* (*g_create_span_func)() = NULL; + +void* run_create_span_func() { + if (g_create_span_func) { + return g_create_span_func(); + } + return tls_bls.rpcz_parent_span; +} + int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) { TaskMeta* const m = address_meta(tid); if (m != NULL) { @@ -383,7 +392,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, m->attr = using_attr; m->local_storage = LOCAL_STORAGE_INIT; if (using_attr.flags & BTHREAD_INHERIT_SPAN) { - m->local_storage.rpcz_parent_span = tls_bls.rpcz_parent_span; + m->local_storage.rpcz_parent_span = run_create_span_func(); } m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; @@ -443,7 +452,7 @@ int TaskGroup::start_background(bthread_t* __restrict th, m->attr = using_attr; m->local_storage = LOCAL_STORAGE_INIT; if (using_attr.flags & BTHREAD_INHERIT_SPAN) { - m->local_storage.rpcz_parent_span = tls_bls.rpcz_parent_span; + m->local_storage.rpcz_parent_span = run_create_span_func(); } m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h index 5922cc2f6c..f5bdeecb49 100644 --- a/src/bthread/unstable.h +++ b/src/bthread/unstable.h @@ -91,6 +91,9 @@ extern int bthread_set_worker_startfn(void (*start_fn)()); // Add a startup function with tag extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)); +// Add a create span function +extern int bthread_set_create_span_func(void* (*func)()); + // Stop all bthread and worker pthreads. // You should avoid calling this function which may cause bthread after main() // suspend indefinitely. diff --git a/test/brpc_alpn_protocol_unittest.cpp b/test/brpc_alpn_protocol_unittest.cpp index 7884b3fe11..3040355a58 100644 --- a/test/brpc_alpn_protocol_unittest.cpp +++ b/test/brpc_alpn_protocol_unittest.cpp @@ -31,7 +31,7 @@ DEFINE_string(listen_addr, "0.0.0.0:8011", "Server listen address."); int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); - google::ParseCommandLineFlags(&argc, &argv, true); + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); return RUN_ALL_TESTS(); } diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp index 72b8da3f92..5ed8aba897 100644 --- a/test/bthread_unittest.cpp +++ b/test/bthread_unittest.cpp @@ -513,7 +513,7 @@ TEST_F(BthreadTest, bthread_usleep) { } static const bthread_attr_t BTHREAD_ATTR_NORMAL_WITH_SPAN = -{ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL }; +{ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL, BTHREAD_TAG_INVALID }; void* test_parent_span(void* p) { uint64_t *q = (uint64_t *)p; @@ -522,6 +522,32 @@ void* test_parent_span(void* p) { return NULL; } +void* test_grandson_parent_span(void* p) { + uint64_t* q = (uint64_t*)p; + *q = (uint64_t)(bthread::tls_bls.rpcz_parent_span); + LOG(INFO) << "parent span id in thread is " << *q; + return NULL; +} + +void* test_son_parent_span(void* p) { + uint64_t* q = (uint64_t*)p; + *q = (uint64_t)(bthread::tls_bls.rpcz_parent_span); + LOG(INFO) << "parent span id in thread is " << *q; + bthread_t th; + uint64_t multi_p; + bthread_start_urgent(&th, &BTHREAD_ATTR_NORMAL_WITH_SPAN, test_grandson_parent_span, &multi_p); + bthread_join(th, NULL); + return NULL; +} + +static uint64_t targets[] = {0xBADBEB0UL, 0xBADBEB1UL, 0xBADBEB2UL, 0xBADBEB3UL}; +void* create_span_func() { + static std::atomic index(0); + auto idx = index.fetch_add(1); + LOG(INFO) << "Bthread create span " << targets[idx]; + return (void*)targets[idx]; +} + TEST_F(BthreadTest, test_span) { uint64_t p1 = 0; uint64_t p2 = 0; @@ -531,17 +557,32 @@ TEST_F(BthreadTest, test_span) { bthread::tls_bls.rpcz_parent_span = (void*)target; bthread_t th1; - ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN, - test_parent_span, &p1)); + ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN, test_parent_span, &p1)); ASSERT_EQ(0, bthread_join(th1, NULL)); bthread_t th2; - ASSERT_EQ(0, bthread_start_background(&th2, NULL, - test_parent_span, &p2)); + ASSERT_EQ(0, bthread_start_background(&th2, NULL, test_parent_span, &p2)); ASSERT_EQ(0, bthread_join(th2, NULL)); ASSERT_EQ(p1, target); ASSERT_NE(p2, target); + + LOG(INFO) << "Test bthread create span"; + + bthread_set_create_span_func(create_span_func); + + bthread_t multi_th1; + bthread_t multi_th2; + uint64_t multi_p1; + uint64_t multi_p2; + ASSERT_EQ(0, bthread_start_background(&multi_th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN, + test_son_parent_span, &multi_p1)); + ASSERT_EQ(0, bthread_start_background(&multi_th2, &BTHREAD_ATTR_NORMAL_WITH_SPAN, + test_son_parent_span, &multi_p2)); + ASSERT_EQ(0, bthread_join(multi_th1, NULL)); + ASSERT_EQ(0, bthread_join(multi_th2, NULL)); + ASSERT_EQ(multi_p1, targets[0]); + ASSERT_EQ(multi_p2, targets[1]); } void* dummy_thread(void*) {