From 63448b02763f9fcf31028e6e11076dc22db21220 Mon Sep 17 00:00:00 2001 From: owent Date: Mon, 14 Jun 2021 14:59:50 +0800 Subject: [PATCH 1/2] Optimize OtlpHttpExporter to support timeout and cancle Signed-off-by: owent --- .../exporters/otlp/otlp_http_exporter.h | 9 + exporters/otlp/src/otlp_http_exporter.cc | 344 +++++++++++++----- .../ext/http/common/url_parser.h | 8 +- 3 files changed, 270 insertions(+), 91 deletions(-) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 7aa73f9290..777221d23e 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -6,6 +6,9 @@ // We need include exporter.h first, which will include Windows.h with NOMINMAX on Windows #include "opentelemetry/sdk/trace/exporter.h" +#include "opentelemetry/ext/http/client/http_client.h" + +#include #include #include #include @@ -65,6 +68,7 @@ struct OtlpHttpExporterOptions // TODO: Enable/disable to verify SSL certificate // TODO: Reuqest timeout + std::chrono::milliseconds timeout = std::chrono::milliseconds(30000); }; /** @@ -113,6 +117,11 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter // The configuration options associated with this exporter. const OtlpHttpExporterOptions options_; + + // Object that stores the HTTP sessions that have been created + std::shared_ptr http_client_; + // Cached parsed URI + std::string http_uri_; }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 7d13a3a7a7..ea9cc98560 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -4,6 +4,7 @@ #include "opentelemetry/exporters/otlp/otlp_http_exporter.h" #include "opentelemetry/exporters/otlp/otlp_recordable.h" #include "opentelemetry/ext/http/client/http_client_factory.h" +#include "opentelemetry/ext/http/common/url_parser.h" #include "nlohmann/json.hpp" @@ -37,6 +38,208 @@ namespace otlp namespace { +/** + * This class handles the response message from the Elasticsearch request + */ +class ResponseHandler : public http_client::EventHandler +{ +public: + /** + * Creates a response handler, that by default doesn't display to console + */ + ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} + + /** + * Automatically called when the response is received, store the body into a string and notify any + * threads blocked on this result + */ + void OnResponse(http_client::Response &response) noexcept override + { + // Lock the private members so they can't be read while being modified + { + std::unique_lock lk(mutex_); + + // Store the body of the request + body_ = std::string(response.GetBody().begin(), response.GetBody().end()); + + if (console_debug_) + { + std::cout << "[OTLP HTTP Exporter] Status:" << response.GetStatusCode() << std::endl + << "Header:" << std::endl; + response.ForEachHeader([](opentelemetry::nostd::string_view header_name, + opentelemetry::nostd::string_view header_value) { + std::cout << "\t" << header_name.data() << " : " << header_value.data() << std::endl; + return true; + }); + std::cout << "Body:" << std::endl << body_ << std::endl; + } + + // Set the response_received_ flag to true and notify any threads waiting on this result + response_received_ = true; + } + cv_.notify_all(); + } + + /** + * A method the user calls to block their thread until the response is received. The longest + * duration is the timeout of the request, set by SetTimeoutMs() + */ + bool waitForResponse() + { + std::unique_lock lk(mutex_); + cv_.wait(lk); + return response_received_; + } + + /** + * Returns the body of the response + */ + std::string GetResponseBody() + { + // Lock so that body_ can't be written to while returning it + std::unique_lock lk(mutex_); + return body_; + } + + // Callback method when an http event occurs + void OnEvent(http_client::SessionState state, + opentelemetry::nostd::string_view reason) noexcept override + { + // If any failure event occurs, release the condition variable to unblock main thread + switch (state) + { + case http_client::SessionState::CreateFailed: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: session create failed" << std::endl; + } + cv_.notify_all(); + break; + + case http_client::SessionState::Created: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: session created" << std::endl; + } + break; + + case http_client::SessionState::Destroyed: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: session destroyed" << std::endl; + } + break; + + case http_client::SessionState::Connecting: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: connecting to peer" << std::endl; + } + break; + + case http_client::SessionState::ConnectFailed: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: connection failed" << std::endl; + } + cv_.notify_all(); + break; + + case http_client::SessionState::Connected: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: connected" << std::endl; + } + break; + + case http_client::SessionState::Sending: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: sending request" << std::endl; + } + break; + + case http_client::SessionState::SendFailed: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: request send failed" << std::endl; + } + cv_.notify_all(); + break; + + case http_client::SessionState::Response: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: response received" << std::endl; + } + break; + + case http_client::SessionState::SSLHandshakeFailed: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: SSL handshake failed" << std::endl; + } + cv_.notify_all(); + break; + + case http_client::SessionState::TimedOut: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: request time out" << std::endl; + } + cv_.notify_all(); + break; + + case http_client::SessionState::NetworkError: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: network error" << std::endl; + } + cv_.notify_all(); + break; + + case http_client::SessionState::ReadError: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: error reading response" << std::endl; + } + break; + + case http_client::SessionState::WriteError: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: error writing request" << std::endl; + } + break; + + case http_client::SessionState::Cancelled: + if (console_debug_) + { + std::cerr << "[OTLP HTTP Exporter] Session state: (manually) cancelled" << std::endl; + } + cv_.notify_all(); + break; + + default: + break; + } + } + +private: + // Define a condition variable and mutex + std::condition_variable cv_; + std::mutex mutex_; + + // Whether the response from Elasticsearch has been received + bool response_received_ = false; + + // A string to store the response body + std::string body_ = ""; + + // Whether to print the results from the callback + bool console_debug_ = false; +}; + static inline char HexEncode(unsigned char byte) { if (byte >= 10) @@ -344,7 +547,9 @@ static void PopulateRequest( OtlpHttpExporter::OtlpHttpExporter() : OtlpHttpExporter(OtlpHttpExporterOptions()) {} -OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) : options_(options) {} +OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) + : options_(options), http_client_(http_client::HttpClientFactory::Create()) +{} // ----------------------------- Exporter methods ------------------------------ @@ -367,6 +572,31 @@ sdk::common::ExportResult OtlpHttpExporter::Export( return sdk::common::ExportResult::kFailure; } + // Parse uri and store it to cache + if (http_uri_.empty()) + { + auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url)); + if (!parse_url.success_) + { + if (options_.console_debug) + { + std::cerr << "[OTLP HTTP Exporter] Export failed, invalid url: " << options_.url + << std::endl; + } + + return sdk::common::ExportResult::kFailure; + } + + if (!parse_url.path_.empty() && parse_url.path_[0] == '/') + { + http_uri_ = parse_url.path_.substr(1); + } + else + { + http_uri_ = parse_url.path_; + } + } + proto::collector::trace::v1::ExportTraceServiceRequest service_request; PopulateRequest(spans, &service_request); @@ -413,100 +643,36 @@ sdk::common::ExportResult OtlpHttpExporter::Export( } // Send the request - auto client = http_client::HttpClientFactory::CreateSync(); - // TODO: Set timeout - auto result = client->Post(options_.url, body_vec, {{"Content-Type", content_type}}); + auto session = http_client_->CreateSession(options_.url); + auto request = session->CreateRequest(); + request->SetUri(http_uri_); + request->SetTimeoutMs(options_.timeout); + request->SetMethod(http_client::Method::Post); + request->SetBody(body_vec); + request->AddHeader("Content-Type", content_type); - // If an error occurred with the HTTP request - if (!result) - { - if (options_.console_debug) - { - switch (result.GetSessionState()) - { - case http_client::SessionState::CreateFailed: - std::cerr << "[OTLP HTTP Exporter] session state: session create failed" << std::endl; - break; - - case http_client::SessionState::Created: - std::cerr << "[OTLP HTTP Exporter] session state: session created" << std::endl; - break; - - case http_client::SessionState::Destroyed: - std::cerr << "[OTLP HTTP Exporter] session state: session destroyed" << std::endl; - break; - - case http_client::SessionState::Connecting: - std::cerr << "[OTLP HTTP Exporter] session state: connecting to peer" << std::endl; - break; - - case http_client::SessionState::ConnectFailed: - std::cerr << "[OTLP HTTP Exporter] session state: connection failed" << std::endl; - break; - - case http_client::SessionState::Connected: - std::cerr << "[OTLP HTTP Exporter] session state: connected" << std::endl; - break; - - case http_client::SessionState::Sending: - std::cerr << "[OTLP HTTP Exporter] session state: sending request" << std::endl; - break; - - case http_client::SessionState::SendFailed: - std::cerr << "[OTLP HTTP Exporter] session state: request send failed" << std::endl; - break; - - case http_client::SessionState::Response: - std::cerr << "[OTLP HTTP Exporter] session state: response received" << std::endl; - break; - - case http_client::SessionState::SSLHandshakeFailed: - std::cerr << "[OTLP HTTP Exporter] session state: SSL handshake failed" << std::endl; - break; - - case http_client::SessionState::TimedOut: - std::cerr << "[OTLP HTTP Exporter] session state: request time out" << std::endl; - break; - - case http_client::SessionState::NetworkError: - std::cerr << "[OTLP HTTP Exporter] session state: network error" << std::endl; - break; - - case http_client::SessionState::ReadError: - std::cerr << "[OTLP HTTP Exporter] session state: error reading response" << std::endl; - break; + // Send the request + std::unique_ptr handler(new ResponseHandler(options_.console_debug)); + session->SendRequest(*handler); - case http_client::SessionState::WriteError: - std::cerr << "[OTLP HTTP Exporter] session state: error writing request" << std::endl; - break; + // Wait for the response to be received + if (options_.console_debug) + { + std::cout << "[OTLP HTTP Exporter] Waiting for response from " << options_.url + << " (timeout = " << options_.timeout.count() << " milliseconds)" << std::endl; + } + bool write_successful = handler->waitForResponse(); - case http_client::SessionState::Cancelled: - std::cerr << "[OTLP HTTP Exporter] session state: (manually) cancelled" << std::endl; - break; + // End the session + session->FinishSession(); - default: - break; - } - } + // If an error occurred with the HTTP request + if (!write_successful) + { // TODO: retry logic return sdk::common::ExportResult::kFailure; } - if (options_.console_debug) - { - std::cout << "[OTLP HTTP Exporter] Status:" << result.GetResponse().GetStatusCode() << std::endl - << "Header:" << std::endl; - result.GetResponse().ForEachHeader([](opentelemetry::nostd::string_view header_name, - opentelemetry::nostd::string_view header_value) { - std::cout << "\t" << header_name.data() << " : " << header_value.data() << std::endl; - return true; - }); - std::cout << "Body:" << std::endl - << std::string(result.GetResponse().GetBody().begin(), - result.GetResponse().GetBody().end()) - << std::endl; - } - return sdk::common::ExportResult::kSuccess; } @@ -514,7 +680,9 @@ bool OtlpHttpExporter::Shutdown(std::chrono::microseconds) noexcept { is_shutdown_ = true; - // TODO: Shutdown the http request + // Shutdown the session manager + http_client_->CancelAllSessions(); + http_client_->FinishAllSessions(); return true; } diff --git a/ext/include/opentelemetry/ext/http/common/url_parser.h b/ext/include/opentelemetry/ext/http/common/url_parser.h index 491a961756..9ca45eda09 100644 --- a/ext/include/opentelemetry/ext/http/common/url_parser.h +++ b/ext/include/opentelemetry/ext/http/common/url_parser.h @@ -27,7 +27,7 @@ class UrlParser std::string host_; std::string scheme_; std::string path_; - size_t port_; + uint16_t port_; std::string query_; bool success_; @@ -81,7 +81,8 @@ class UrlParser path_ = "/"; // use default path if (is_port) { - port_ = std::stoi(std::string(url_.begin() + cpos, url_.begin() + url_.length())); + port_ = static_cast( + std::stoi(std::string(url_.begin() + cpos, url_.begin() + url_.length()))); } else { @@ -91,7 +92,8 @@ class UrlParser } if (is_port) { - port_ = std::stoi(std::string(url_.begin() + cpos, url_.begin() + pos)); + port_ = + static_cast(std::stoi(std::string(url_.begin() + cpos, url_.begin() + pos))); } else { From 1ea95031b9804b61f8ac67ba93b8daed279987b7 Mon Sep 17 00:00:00 2001 From: owent Date: Mon, 14 Jun 2021 16:02:13 +0800 Subject: [PATCH 2/2] Fix includes Signed-off-by: owent --- exporters/otlp/src/otlp_http_exporter.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index ea9cc98560..dec5268165 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -17,8 +17,10 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +#include #include #include +#include #include #include