Skip to content

Commit

Permalink
Remove AsyncBatch*Processor, implement [opentelemetry-specification…
Browse files Browse the repository at this point in the history
…#2452](open-telemetry/opentelemetry-specification#2452)

Signed-off-by: WenTao Ou <admin@owent.net>
  • Loading branch information
owent committed Jun 16, 2022
1 parent b4d47e1 commit b75089a
Show file tree
Hide file tree
Showing 42 changed files with 161 additions and 1,785 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ struct ElasticsearchExporterOptions
// Whether to print the status of the exporter in the console
bool console_debug_;

# ifdef ENABLE_ASYNC_EXPORT
bool asynchronous_mode_ = false;
# endif

/**
* Constructor for the ElasticsearchExporterOptions. By default, the endpoint is
* localhost:9200/logs with a timeout of 30 seconds and disabled console debugging
Expand All @@ -49,12 +53,19 @@ struct ElasticsearchExporterOptions
int port = 9200,
std::string index = "logs",
int response_timeout = 30,
bool console_debug = false)
: host_{host},
port_{port},
index_{index},
response_timeout_{response_timeout},
console_debug_{console_debug}
bool console_debug = false
# ifdef ENABLE_ASYNC_EXPORT
,
bool asynchronous_mode = false
# endif
)
: host_{host}, port_{port}, index_{index}, response_timeout_{response_timeout}, console_debug_
{
console_debug
}
# ifdef ENABLE_ASYNC_EXPORT
, asynchronous_mode_ { asynchronous_mode }
# endif
{}
};

Expand Down Expand Up @@ -89,19 +100,6 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records to the Elasticsearch instance asynchronously.
* @param records A list of log records to send to Elasticsearch.
* @param result_callback callback function accepting ExportResult as argument
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
Expand Down
73 changes: 26 additions & 47 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,32 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
std::vector<uint8_t> body_vec(body.begin(), body.end());
request->SetBody(body_vec);

# ifdef ENABLE_ASYNC_EXPORT
if (options_.asynchronous_mode_)
{
// Send the request
std::size_t span_count = records.size();
auto handler = std::make_shared<AsyncResponseHandler>(
session,
[span_count](opentelemetry::sdk::common::ExportResult result) {
if (result != opentelemetry::sdk::common::ExportResult::kSuccess)
{
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] ERROR: Export "
<< span_count
<< " trace span(s) error: " << static_cast<int>(result));
}
else
{
OTEL_INTERNAL_LOG_DEBUG("[ES Trace Exporter] DEBUG: Export "
<< span_count << " trace span(s) success");
}
return true;
},
options_.console_debug_);
session->SendRequest(handler);
return sdk::common::ExportResult::kSuccess;
}
# endif
// Send the request
auto handler = std::make_shared<ResponseHandler>(options_.console_debug_);
session->SendRequest(handler);
Expand Down Expand Up @@ -283,53 +309,6 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void ElasticsearchLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
// Return failure if this exporter has been shutdown
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return;
}

// Create a connection to the ElasticSearch instance
auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_));
auto request = session->CreateRequest();

// Populate the request with headers and methods
request->SetUri(options_.index_ + "/_bulk?pretty");
request->SetMethod(http_client::Method::Post);
request->AddHeader("Content-Type", "application/json");
request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_));

// Create the request body
std::string body = "";
for (auto &record : records)
{
// Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified
// in URI
body += "{\"index\" : {}}\n";

// Add the context of the Recordable
auto json_record = std::unique_ptr<ElasticSearchRecordable>(
static_cast<ElasticSearchRecordable *>(record.release()));
body += json_record->GetJSON().dump() + "\n";
}
std::vector<uint8_t> body_vec(body.begin(), body.end());
request->SetBody(body_vec);

// Send the request
auto handler = std::make_shared<AsyncResponseHandler>(session, std::move(result_callback),
options_.console_debug_);
session->SendRequest(handler);
}
# endif

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,6 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
void Export(const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shutdown the exporter.
* @param timeout an option timeout, default to max.
Expand Down
11 changes: 0 additions & 11 deletions exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,6 @@ sdk_common::ExportResult JaegerExporter::Export(
return sdk_common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call");
auto status = Export(spans);
result_callback(status);
}
#endif

void JaegerExporter::InitializeEndpoint()
{
if (options_.transport_format == TransportFormat::kThriftUdpCompact)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,6 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override
{
OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call");
auto status = Export(spans);
result_callback(status);
}
#endif

/**
* @param timeout an optional value containing the timeout of the exporter
* note: passing custom timeout values is not currently supported for this exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a span of logs sent from the processor asynchronously.
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept;
# endif

/**
* Marks the OStream Log Exporter as shut down.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

Expand Down
11 changes: 0 additions & 11 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,6 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OStreamLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
// Do not have async support
auto result = Export(records);
result_callback(result);
}
# endif

bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
10 changes: 0 additions & 10 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,6 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OStreamSpanExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
auto result = Export(spans);
result_callback(result);
}
#endif

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,6 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
virtual void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,6 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
* @param result_callback callback function accepting ExportResult as argument
*/
virtual void Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ class OtlpHttpClient
*/
void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept;

/**
* Get options of current OTLP http client.
* @return options of current OTLP http client.
*/
inline const OtlpHttpClientOptions &GetOptions() const noexcept { return options_; }

private:
struct HttpSessionData
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,6 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
* @param result_callback callback function accepting ExportResult as argument
*/
virtual void Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,6 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
* @param result_callback callback function accepting ExportResult as argument
*/
virtual void Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
Expand Down
12 changes: 0 additions & 12 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
OTEL_INTERNAL_LOG_WARN(
"[OTLP TRACE GRPC Exporter] async not supported. Making sync interface call");
auto status = Export(spans);
result_callback(status);
}
#endif

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
12 changes: 0 additions & 12 deletions exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,6 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
OTEL_INTERNAL_LOG_WARN(
"[OTLP LOG GRPC Exporter] async not supported. Making sync interface call");
auto status = Export(logs);
result_callback(status);
}
# endif

bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
Loading

0 comments on commit b75089a

Please sign in to comment.