Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added max async export support using separate AsyncBatchSpan/LogProcessor #1306

Merged
merged 11 commits into from
May 4, 2022
16 changes: 8 additions & 8 deletions exporters/otlp/test/otlp_http_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
resource_attributes["vec_string_value"] = std::vector<std::string>{"vector", "string"};
auto resource = resource::Resource::Create(resource_attributes);

auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions();
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);
auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions();
processor_opts.options.max_export_batch_size = 5;
processor_opts.options.max_queue_size = 5;
processor_opts.options.schedule_delay_millis = std::chrono::milliseconds(256);

auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts));
Expand Down Expand Up @@ -366,10 +366,10 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
resource_attributes["vec_string_value"] = std::vector<std::string>{"vector", "string"};
auto resource = resource::Resource::Create(resource_attributes);

auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions();
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);
auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions();
processor_opts.options.max_export_batch_size = 5;
processor_opts.options.max_queue_size = 5;
processor_opts.options.schedule_delay_millis = std::chrono::milliseconds(256);

auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts));
Expand Down
256 changes: 212 additions & 44 deletions exporters/otlp/test/otlp_http_log_exporter_test.cc

Large diffs are not rendered by default.

112 changes: 10 additions & 102 deletions sdk/include/opentelemetry/sdk/logs/async_batch_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
# ifdef ENABLE_ASYNC_EXPORT

# include "opentelemetry/sdk/common/circular_buffer.h"
# include "opentelemetry/sdk/logs/batch_log_processor.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/processor.h"

# include <atomic>
# include <condition_variable>
# include <cstdint>
# include <memory>
# include <queue>
# include <thread>
# ifdef ENABLE_ASYNC_EXPORT
# include <queue>
# endif

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand All @@ -30,20 +28,7 @@ namespace logs
*/
struct AsyncBatchLogProcessorOptions
{
/**
* The maximum buffer/queue size. After the size is reached, spans are
* dropped.
*/
size_t max_queue_size = 2048;

/* The time interval between two consecutive exports. */
std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000);

/**
* The maximum batch size of every export. It must be smaller or
* equal to max_queue_size.
*/
size_t max_export_batch_size = 512;
BatchLogProcessorOptions options;

/* Denotes the maximum number of async exports to continue
*/
Expand All @@ -54,27 +39,9 @@ struct AsyncBatchLogProcessorOptions
* This is an implementation of the LogProcessor which creates batches of finished logs and passes
* the export-friendly log data representations to the configured LogExporter.
*/
class AsyncBatchLogProcessor : public LogProcessor
class AsyncBatchLogProcessor : public BatchLogProcessor
{
public:
/**
* Creates a batch log processor by configuring the specified exporter and other parameters
* as per the official, language-agnostic opentelemetry specs.
*
* @param exporter - The backend exporter to pass the logs to
* @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are
* dropped.
* @param scheduled_delay_millis - The time interval between two consecutive exports.
* @param max_export_batch_size - The maximum batch size of every export. It must be smaller or
* equal to max_queue_size
*/
explicit AsyncBatchLogProcessor(
std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size = 2048,
const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512,
const size_t max_export_async = 8);

/**
* Creates a batch log processor by configuring the specified exporter and other parameters
* as per the official, language-agnostic opentelemetry specs.
Expand All @@ -85,24 +52,6 @@ class AsyncBatchLogProcessor : public LogProcessor
explicit AsyncBatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
const AsyncBatchLogProcessorOptions &options);

/** Makes a new recordable **/
std::unique_ptr<Recordable> MakeRecordable() noexcept override;

/**
* Called when the Logger's log method creates a log record
* @param record the log record
*/

void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;

/**
* Export all log records that have not been exported yet.
*
* NOTE: Timeout functionality not supported yet.
*/
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter. Any subsequent calls to
Expand All @@ -119,49 +68,25 @@ class AsyncBatchLogProcessor : public LogProcessor
virtual ~AsyncBatchLogProcessor() override;

private:
/**
* The background routine performed by the worker thread.
*/
void DoBackgroundWork();

/**
* Exports all logs to the configured exporter.
*
*/
void Export();

/**
* Called when Shutdown() is invoked. Completely drains the queue of all log records and
* passes them to the exporter.
*/
void DrainQueue();
void Export() override;

struct ExportDataStorage
{
std::queue<size_t> export_ids;
std::vector<bool> export_ids_flag;

std::condition_variable async_export_waker;
std::mutex async_export_data_m;
};
std::shared_ptr<ExportDataStorage> export_data_storage_;

const size_t max_export_async_;
static constexpr size_t kInvalidExportId = static_cast<size_t>(-1);

struct SynchronizationData
{
/* Synchronization primitives */
std::condition_variable cv, force_flush_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<bool> is_shutdown;

std::condition_variable async_export_waker;
std::mutex async_export_data_m;
};

/**
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
* any time
Expand All @@ -170,25 +95,8 @@ class AsyncBatchLogProcessor : public LogProcessor
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
const std::shared_ptr<SynchronizationData> &synchronization_data);

void GetWaitAdjustedTime(std::chrono::microseconds &timeout,
std::chrono::time_point<std::chrono::system_clock> &start_time);

/* The configured backend log exporter */
std::unique_ptr<LogExporter> exporter_;

/* Configurable parameters as per the official *trace* specs */
const size_t max_queue_size_;
const std::chrono::milliseconds scheduled_delay_millis_;
const size_t max_export_batch_size_;
/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;

std::shared_ptr<SynchronizationData> synchronization_data_;

/* The background worker thread */
std::thread worker_thread_;
const std::shared_ptr<SynchronizationData> &synchronization_data,
const std::shared_ptr<ExportDataStorage> &export_data_storage);
};

} // namespace logs
Expand Down
12 changes: 6 additions & 6 deletions sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ class BatchLogProcessor : public LogProcessor
const BatchLogProcessorOptions &options);

/** Makes a new recordable **/
std::unique_ptr<Recordable> MakeRecordable() noexcept override;
virtual std::unique_ptr<Recordable> MakeRecordable() noexcept override;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not add virtual keyword for overried methods. Clang will warning this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will make the change.


/**
* Called when the Logger's log method creates a log record
* @param record the log record
*/

void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;
virtual void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;

/**
* Export all log records that have not been exported yet.
*
* NOTE: Timeout functionality not supported yet.
*/
bool ForceFlush(
virtual bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
Expand All @@ -101,15 +101,15 @@ class BatchLogProcessor : public LogProcessor
*
* NOTE: Timeout functionality not supported yet.
*/
bool Shutdown(
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Class destructor which invokes the Shutdown() method.
*/
virtual ~BatchLogProcessor() override;

private:
protected:
/**
* The background routine performed by the worker thread.
*/
Expand All @@ -119,7 +119,7 @@ class BatchLogProcessor : public LogProcessor
* Exports all logs to the configured exporter.
*
*/
void Export();
virtual void Export();

/**
* Called when Shutdown() is invoked. Completely drains the queue of all log records and
Expand Down
Loading