Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch log processor implementation with test coverage #434

Merged
merged 16 commits into from
Dec 22, 2020
Merged
139 changes: 139 additions & 0 deletions sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/processor.h"

#include <atomic>
#include <condition_variable>
#include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{

namespace logs
{

/**
* This is an implementation of the LogProcessor which creates batches of finished logs and passes
* the export-friendly log data representations to the configured LogExporter.
*/
class BatchLogProcessor : public LogProcessor
{
public:
/**
* Creates a batch log processor by configuring the specified exporter and other parameters
* as per the official, language-agnostic opentelemetry specs.
*
* @param exporter - The backend exporter to pass the logs to
* @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are
* dropped.
* @param scheduled_delay_millis - The time interval between two consecutive exports.
* @param max_export_batch_size - The maximum batch size of every export. It must be smaller or
* equal to max_queue_size
*/
explicit BatchLogProcessor(
std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size = 2048,
const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512);

/** Makes a new recordable **/
std::unique_ptr<Recordable> MakeRecordable() noexcept override;

/**
* Called when the Logger's log method creates a log record
* @param record the log record
*/

kxyr marked this conversation as resolved.
Show resolved Hide resolved
void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;

/**
* Export all log records that have not been exported yet.
*
* NOTE: Timeout functionality not supported yet.
*/
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter. Any subsequent calls to
* ForceFlush or Shutdown will return immediately without doing anything.
*
* NOTE: Timeout functionality not supported yet.
*/
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Class destructor which invokes the Shutdown() method.
*/
virtual ~BatchLogProcessor() override;

private:
/**
* The background routine performed by the worker thread.
*/
void DoBackgroundWork();

/**
* Exports all logs to the configured exporter.
*
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, then we have to
* notify the main thread to wake it up in the ForceFlush
* method.
*/
void Export(const bool was_for_flush_called);

/**
* Called when Shutdown() is invoked. Completely drains the queue of all log records and
* passes them to the exporter.
*/
void DrainQueue();

/* The configured backend log exporter */
std::unique_ptr<LogExporter> exporter_;

/* Configurable parameters as per the official *trace* specs */
const size_t max_queue_size_;
const std::chrono::milliseconds scheduled_delay_millis_;
const size_t max_export_batch_size_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;

/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_{false};
std::atomic<bool> is_force_flush_{false};
std::atomic<bool> is_force_flush_notified_{false};

/* The background worker thread */
std::thread worker_thread_;
};

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
2 changes: 1 addition & 1 deletion sdk/src/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(opentelemetry_logs logger_provider.cc logger.cc
simple_log_processor.cc)
simple_log_processor.cc batch_log_processor.cc)

target_link_libraries(opentelemetry_logs opentelemetry_common)
213 changes: 213 additions & 0 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "opentelemetry/sdk/logs/batch_log_processor.h"

#include <vector>
using opentelemetry::sdk::common::AtomicUniquePtr;
using opentelemetry::sdk::common::CircularBufferRange;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size,
const std::chrono::milliseconds scheduled_delay_millis,
const size_t max_export_batch_size)
: exporter_(std::move(exporter)),
max_queue_size_(max_queue_size),
scheduled_delay_millis_(scheduled_delay_millis),
max_export_batch_size_(max_export_batch_size),
buffer_(max_queue_size_),
worker_thread_(&BatchLogProcessor::DoBackgroundWork, this)
{}

std::unique_ptr<Recordable> BatchLogProcessor::MakeRecordable() noexcept
{
return exporter_->MakeRecordable();
}

void BatchLogProcessor::OnReceive(std::unique_ptr<Recordable> &&record) noexcept
{
if (is_shutdown_.load() == true)
{
return;
}

if (buffer_.Add(record) == false)
{
return;
}

// If the queue gets at least half full a preemptive notification is
// sent to the worker thread to start a new export cycle.
if (buffer_.size() >= max_queue_size_ / 2)
{
// signal the worker thread
cv_.notify_one();
}
}

bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
maxgolov marked this conversation as resolved.
Show resolved Hide resolved
if (is_shutdown_.load() == true)
{
return false;
}

is_force_flush_ = true;

// Keep attempting to wake up the worker thread
maxgolov marked this conversation as resolved.
Show resolved Hide resolved
while (is_force_flush_.load() == true)
{
cv_.notify_one();
}

// Now wait for the worker thread to signal back from the Export method
std::unique_lock<std::mutex> lk(force_flush_cv_m_);
while (is_force_flush_notified_.load() == false)
{
force_flush_cv_.wait(lk);
}

// Notify the worker thread
is_force_flush_notified_ = false;

return true;
}

void BatchLogProcessor::DoBackgroundWork()
{
auto timeout = scheduled_delay_millis_;

while (true)
{
// Wait for `timeout` milliseconds
std::unique_lock<std::mutex> lk(cv_m_);
cv_.wait_for(lk, timeout);

if (is_shutdown_.load() == true)
{
DrainQueue();
return;
}

bool was_force_flush_called = is_force_flush_.load();

// Check if this export was the result of a force flush.
if (was_force_flush_called == true)
{
// Since this export was the result of a force flush, signal the
// main thread that the worker thread has been notified
is_force_flush_ = false;
}
else
{
// If the buffer was empty during the entire `timeout` time interval,
// go back to waiting. If this was a spurious wake-up, we export only if
// `buffer_` is not empty. This is acceptable because batching is a best
// mechanism effort here.
if (buffer_.empty() == true)
{
continue;
}
}

auto start = std::chrono::steady_clock::now();
Export(was_force_flush_called);
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

// Subtract the duration of this export call from the next `timeout`.
timeout = scheduled_delay_millis_ - duration;
}
}

kxyr marked this conversation as resolved.
Show resolved Hide resolved
void BatchLogProcessor::Export(const bool was_force_flush_called)
{
std::vector<std::unique_ptr<Recordable>> records_arr;

size_t num_records_to_export;

if (was_force_flush_called == true)
{
num_records_to_export = buffer_.size();
}
else
{
num_records_to_export =
buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size();
}

buffer_.Consume(
num_records_to_export, [&](CircularBufferRange<AtomicUniquePtr<Recordable>> range) noexcept {
range.ForEach([&](AtomicUniquePtr<Recordable> &ptr) {
std::unique_ptr<Recordable> swap_ptr = std::unique_ptr<Recordable>(nullptr);
ptr.Swap(swap_ptr);
records_arr.push_back(std::unique_ptr<Recordable>(swap_ptr.release()));
return true;
});
});

exporter_->Export(
nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size()));

// Notify the main thread in case this export was the result of a force flush.
if (was_force_flush_called == true)
{
is_force_flush_notified_ = true;
while (is_force_flush_notified_.load() == true)
maxgolov marked this conversation as resolved.
Show resolved Hide resolved
{
force_flush_cv_.notify_one();
}
}
}

void BatchLogProcessor::DrainQueue()
{
while (buffer_.empty() == false)
{
Export(false);
}
}

bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
{
return exporter_->Shutdown();
}

return true;
}

BatchLogProcessor::~BatchLogProcessor()
{
if (is_shutdown_.load() == false)
kxyr marked this conversation as resolved.
Show resolved Hide resolved
{
Shutdown();
}
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
11 changes: 11 additions & 0 deletions sdk/test/logs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,14 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "batch_log_processor_test",
srcs = [
"batch_log_processor_test.cc",
],
deps = [
"//sdk/src/logs",
"@com_google_googletest//:gtest_main",
],
)
4 changes: 2 additions & 2 deletions sdk/test/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
foreach(testname logger_provider_sdk_test logger_sdk_test
simple_log_processor_test log_record_test)
foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test
simple_log_processor_test batch_log_processor_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs)
Expand Down
Loading