Skip to content

Commit

Permalink
Merge branch 'main' into fix_cpp_stdlib_1071
Browse files Browse the repository at this point in the history
  • Loading branch information
marcalff authored Sep 28, 2023
2 parents 037bc31 + 5e96b80 commit 0c0b5c0
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ class OtlpRecordable final : public opentelemetry::sdk::trace::Recordable
/** Dynamically converts the resource of this span into a proto. */
proto::resource::v1::Resource ProtoResource() const noexcept;

const opentelemetry::sdk::resource::Resource *GetResource() const noexcept;
const std::string GetResourceSchemaURL() const noexcept;
const opentelemetry::sdk::instrumentationscope::InstrumentationScope *GetInstrumentationScope()
const noexcept;
const std::string GetInstrumentationLibrarySchemaURL() const noexcept;

proto::common::v1::InstrumentationScope GetProtoInstrumentationScope() const noexcept;
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/src/otlp_recordable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ proto::resource::v1::Resource OtlpRecordable::ProtoResource() const noexcept
return proto;
}

const opentelemetry::sdk::resource::Resource *OtlpRecordable::GetResource() const noexcept
{
return resource_;
}

const std::string OtlpRecordable::GetResourceSchemaURL() const noexcept
{
std::string schema_url;
Expand All @@ -51,6 +56,12 @@ const std::string OtlpRecordable::GetResourceSchemaURL() const noexcept
return schema_url;
}

const opentelemetry::sdk::instrumentationscope::InstrumentationScope *
OtlpRecordable::GetInstrumentationScope() const noexcept
{
return instrumentation_scope_;
}

const std::string OtlpRecordable::GetInstrumentationLibrarySchemaURL() const noexcept
{
std::string schema_url;
Expand Down
49 changes: 42 additions & 7 deletions exporters/otlp/src/otlp_recordable_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,54 @@ void OtlpRecordableUtils::PopulateRequest(
return;
}

using spans_by_scope =
std::unordered_map<const opentelemetry::sdk::instrumentationscope::InstrumentationScope *,
std::vector<std::unique_ptr<OtlpRecordable>>>;
std::unordered_map<const opentelemetry::sdk::resource::Resource *, spans_by_scope> spans_index;

// Collect spans per resource and instrumentation scope
for (auto &recordable : spans)
{
auto rec = std::unique_ptr<OtlpRecordable>(static_cast<OtlpRecordable *>(recordable.release()));
auto resource_span = request->add_resource_spans();
auto scope_spans = resource_span->add_scope_spans();
auto resource = rec->GetResource();
auto instrumentation = rec->GetInstrumentationScope();

*scope_spans->add_spans() = std::move(rec->span());
*scope_spans->mutable_scope() = rec->GetProtoInstrumentationScope();
spans_index[resource][instrumentation].emplace_back(std::move(rec));
}

scope_spans->set_schema_url(rec->GetInstrumentationLibrarySchemaURL());
// Add all resource spans
for (auto &input_resource_spans : spans_index)
{
// Add the resource
auto resource_spans = request->add_resource_spans();
if (input_resource_spans.first)
{
proto::resource::v1::Resource resource_proto;
OtlpPopulateAttributeUtils::PopulateAttribute(&resource_proto, *input_resource_spans.first);
*resource_spans->mutable_resource() = resource_proto;
resource_spans->set_schema_url(input_resource_spans.first->GetSchemaURL());
}

*resource_span->mutable_resource() = rec->ProtoResource();
resource_span->set_schema_url(rec->GetResourceSchemaURL());
// Add all scope spans
for (auto &input_scope_spans : input_resource_spans.second)
{
// Add the instrumentation scope
auto scope_spans = resource_spans->add_scope_spans();
if (input_scope_spans.first)
{
proto::common::v1::InstrumentationScope instrumentation_scope_proto;
instrumentation_scope_proto.set_name(input_scope_spans.first->GetName());
instrumentation_scope_proto.set_version(input_scope_spans.first->GetVersion());
*scope_spans->mutable_scope() = instrumentation_scope_proto;
scope_spans->set_schema_url(input_scope_spans.first->GetSchemaURL());
}

// Add all spans to this scope spans
for (auto &input_span : input_scope_spans.second)
{
*scope_spans->add_spans() = std::move(input_span->span());
}
}
}
}

Expand Down
88 changes: 88 additions & 0 deletions exporters/otlp/test/otlp_recordable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/otlp/otlp_recordable.h"
#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h"
#include "opentelemetry/sdk/resource/resource.h"

#if defined(__GNUC__)
Expand Down Expand Up @@ -285,6 +286,93 @@ TEST(OtlpRecordable, SetArrayAttribute)
}
}

// Test otlp resource populate request util
TEST(OtlpRecordable, PopulateRequest)
{
auto rec1 = std::unique_ptr<sdk::trace::Recordable>(new OtlpRecordable);
auto resource1 = resource::Resource::Create({{"service.name", "one"}});
rec1->SetResource(resource1);
auto inst_lib1 = trace_sdk::InstrumentationScope::Create("one", "1");
rec1->SetInstrumentationScope(*inst_lib1);

auto rec2 = std::unique_ptr<sdk::trace::Recordable>(new OtlpRecordable);
auto resource2 = resource::Resource::Create({{"service.name", "two"}});
rec2->SetResource(resource2);
auto inst_lib2 = trace_sdk::InstrumentationScope::Create("two", "2");
rec2->SetInstrumentationScope(*inst_lib2);

// This has the same resource as rec2, but a different scope
auto rec3 = std::unique_ptr<sdk::trace::Recordable>(new OtlpRecordable);
rec3->SetResource(resource2);
auto inst_lib3 = trace_sdk::InstrumentationScope::Create("three", "3");
rec3->SetInstrumentationScope(*inst_lib3);

proto::collector::trace::v1::ExportTraceServiceRequest req;
std::vector<std::unique_ptr<sdk::trace::Recordable>> spans;
spans.push_back(std::move(rec1));
spans.push_back(std::move(rec2));
spans.push_back(std::move(rec3));
const nostd::span<std::unique_ptr<sdk::trace::Recordable>, 3> spans_span(spans.data(), 3);
OtlpRecordableUtils::PopulateRequest(spans_span, &req);

EXPECT_EQ(req.resource_spans().size(), 2);
for (auto resource_spans : req.resource_spans())
{
auto service_name = resource_spans.resource().attributes(0).value().string_value();
auto scope_spans_size = resource_spans.scope_spans().size();
if (service_name == "one")
{
EXPECT_EQ(scope_spans_size, 1);
EXPECT_EQ(resource_spans.scope_spans(0).scope().name(), "one");
}
if (service_name == "two")
{
EXPECT_EQ(scope_spans_size, 2);
}
}
}

// Test otlp resource populate request util with missing data
TEST(OtlpRecordable, PopulateRequestMissing)
{
// Missing scope
auto rec1 = std::unique_ptr<sdk::trace::Recordable>(new OtlpRecordable);
auto resource1 = resource::Resource::Create({{"service.name", "one"}});
rec1->SetResource(resource1);

// Missing resource
auto rec2 = std::unique_ptr<sdk::trace::Recordable>(new OtlpRecordable);
auto inst_lib2 = trace_sdk::InstrumentationScope::Create("two", "2");
rec2->SetInstrumentationScope(*inst_lib2);

proto::collector::trace::v1::ExportTraceServiceRequest req;
std::vector<std::unique_ptr<sdk::trace::Recordable>> spans;
spans.push_back(std::move(rec1));
spans.push_back(std::move(rec2));
const nostd::span<std::unique_ptr<sdk::trace::Recordable>, 2> spans_span(spans.data(), 2);
OtlpRecordableUtils::PopulateRequest(spans_span, &req);

EXPECT_EQ(req.resource_spans().size(), 2);
for (auto resource_spans : req.resource_spans())
{
// Both should have scope spans
EXPECT_EQ(resource_spans.scope_spans().size(), 1);
auto scope = resource_spans.scope_spans(0).scope();
// Select the one with missing scope
if (scope.name() == "")
{
// Version is also empty
EXPECT_EQ(scope.version(), "");
}
else
{
// The other has a name and version
EXPECT_EQ(scope.name(), "two");
EXPECT_EQ(scope.version(), "2");
}
}
}

template <typename T>
struct EmptyArrayAttributeTest : public testing::Test
{
Expand Down
11 changes: 9 additions & 2 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ void HttpClient::CleanupSession(uint64_t session_id)
}
}

bool need_wakeup_background_thread = false;
{
std::lock_guard<std::recursive_mutex> lock_guard{session_ids_m_};
pending_to_add_session_ids_.erase(session_id);
Expand All @@ -259,10 +260,15 @@ void HttpClient::CleanupSession(uint64_t session_id)
{
// If this session is already running, give it to the background thread for cleanup.
pending_to_abort_sessions_[session_id] = std::move(session);
wakeupBackgroundThread();
need_wakeup_background_thread = true;
}
}
}

if (need_wakeup_background_thread)
{
wakeupBackgroundThread();
}
}

void HttpClient::MaybeSpawnBackgroundThread()
Expand Down Expand Up @@ -519,7 +525,8 @@ bool HttpClient::doRemoveSessions()
std::lock_guard<std::recursive_mutex> session_id_lock_guard{session_ids_m_};
pending_to_remove_session_handles_.swap(pending_to_remove_session_handles);
pending_to_remove_sessions_.swap(pending_to_remove_sessions);

}
{
// If user callback do not call CancelSession or FinishSession, We still need to remove it
// from sessions_
std::lock_guard<std::mutex> session_lock_guard{sessions_m_};
Expand Down

0 comments on commit 0c0b5c0

Please sign in to comment.