Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate thread for cancellation tasks #1335

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 81 additions & 7 deletions olp-cpp-sdk-core/include/olp/core/client/TaskContext.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2021 HERE Europe B.V.
* Copyright (C) 2019-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
#include <olp/core/client/CancellationContext.h>
#include <olp/core/client/CancellationToken.h>
#include <olp/core/client/Condition.h>
#include <olp/core/thread/TaskScheduler.h>

namespace olp {
namespace client {
Expand Down Expand Up @@ -89,6 +90,18 @@ class CORE_API TaskContext {
*/
client::CancellationToken CancelToken() const { return impl_->CancelToken(); }

/**
* @brief Provides a token which will schedule cancellation of the task.
*
* @param scheduler The task scheduler instance.
*
* @return The `CancellationToken` instance.
*/
client::CancellationToken CancelToken(
const std::shared_ptr<thread::TaskScheduler>& scheduler) const {
return impl_->CancelToken(scheduler);
}

/**
* @brief Checks whether the values of the `TaskContext` parameter are
* the same as the values of the `other` parameter.
Expand Down Expand Up @@ -156,6 +169,19 @@ class CORE_API TaskContext {
* @return The `CancellationToken` instance.
*/
virtual client::CancellationToken CancelToken() = 0;

/**
* @brief Provides a token which will schedule cancellation of the task.
*
* @param scheduler The task scheduler instance.
*
* @return The `CancellationToken` instance.
*/
virtual client::CancellationToken CancelToken(
const std::shared_ptr<thread::TaskScheduler>& scheduler) {
OLP_SDK_CORE_UNUSED(scheduler);
return CancelToken();
}
};

/**
Expand All @@ -164,10 +190,12 @@ class CORE_API TaskContext {
* Erases the type of the `Result` object produced by the `ExecuteFunc`
* function and passes it to the `UserCallback` instance.
*
* @tparam T The result type.
* @tparam Response The result type.
*/
template <typename Response>
class TaskContextImpl : public Impl {
class TaskContextImpl
: public Impl,
public std::enable_shared_from_this<TaskContextImpl<Response>> {
public:
/// The task that produces the `Response` instance.
using ExecuteFunc = std::function<Response(client::CancellationContext)>;
Expand All @@ -189,7 +217,7 @@ class CORE_API TaskContext {
context_(std::move(context)),
state_{State::PENDING} {}

~TaskContextImpl() override{};
~TaskContextImpl() override = default;

/**
* @brief Checks for the cancellation, executes the task, and calls
Expand All @@ -213,8 +241,7 @@ class CORE_API TaskContext {
callback = std::move(callback_);
}

Response user_response =
client::ApiError(client::ErrorCode::Cancelled, "Cancelled");
Response user_response = client::ApiError::Cancelled();

if (function && !context_.IsCancelled()) {
auto response = function(context_);
Expand All @@ -234,7 +261,7 @@ class CORE_API TaskContext {
callback(std::move(user_response));
}

// Resources need to be released before the notification, else lambas
// Resources need to be released before the notification, else lambdas
// would have captured resources like network or `TaskScheduler`.
function = nullptr;
callback = nullptr;
Expand Down Expand Up @@ -280,6 +307,53 @@ class CORE_API TaskContext {
[context]() mutable { context.CancelOperation(); });
}

/**
* @brief Provides a token which will schedule cancellation of the task.
*
* @param scheduler The task scheduler instance.
*
* @return The `CancellationToken` instance.
*/
client::CancellationToken CancelToken(
const std::shared_ptr<thread::TaskScheduler>& scheduler) override {
auto task_context = this->shared_from_this();
return client::CancellationToken([scheduler, task_context]() mutable {
if (scheduler) {
scheduler->ScheduleCancelTask([=] { task_context->Cancel(); });
} else {
task_context->Cancel();
}
});
}

/**
* @brief Cancels the operation and calls the callback with a 'Cancelled'
* error.
*/
void Cancel() {
context_.CancelOperation();

// Checks whether TaskContext has been executed
if (state_.load() == State::COMPLETED) {
return;
}

// If it hasn't -> mark as completed and proceed with user callback
state_.store(State::COMPLETED);

UserCallback callback{};

{
std::lock_guard<std::mutex> lock(mutex_);
callback = std::move(callback_);
execute_func_ = nullptr;
}

if (callback) {
callback(ApiError::Cancelled());
}
}

/**
* @brief Indicates the state of the request.
*/
Expand Down
29 changes: 28 additions & 1 deletion olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2021 HERE Europe B.V.
* Copyright (C) 2019-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,18 @@ class CORE_API TaskScheduler {
EnqueueTask(std::move(func), priority);
}

/**
* @brief Schedules the asynchronous cancellation task.
*
* @note Tasks added with this method has Priority::NORMAL priority.
*
* @param[in] func The callable target that should be added to the scheduling
* pipeline.
*/
void ScheduleCancelTask(CallFuncType&& func) {
EnqueueCancelTask(std::move(func));
}

/**
* @brief Schedules the asynchronous cancellable task.
*
Expand Down Expand Up @@ -136,6 +148,21 @@ class CORE_API TaskScheduler {
OLP_SDK_CORE_UNUSED(priority);
EnqueueTask(std::forward<CallFuncType>(func));
}

/**
* @brief The enqueue cancellation task interface that is implemented by
* the subclass.
*
* Implement this method in the subclass that takes `TaskScheduler`
* as a base and provides a custom algorithm for scheduling tasks
* enqueued by the SDK.
*
* @note Tasks added trough this method should be scheduled with
* Priority::NORMAL priority.
*/
virtual void EnqueueCancelTask(CallFuncType&& func) {
EnqueueTask(std::forward<CallFuncType>(func));
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2021 HERE Europe B.V.
* Copyright (C) 2019-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,13 +81,27 @@ class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler {
void EnqueueTask(TaskScheduler::CallFuncType&& func,
uint32_t priority) override;

/**
* @brief Overrides the base class method to enqueue cancellation tasks and
* execute them on the next free thread from the thread pool.
*
* @note Tasks added with this method has Priority::NORMAL priority.
*
* @param func The rvalue reference of the task that should be enqueued.
* Move this task into your queue. No internal references are
* kept. Once this method is called, you own the task.
*/
void EnqueueCancelTask(TaskScheduler::CallFuncType&& func) override;

private:
class QueueImpl;

/// Thread pool created in constructor.
std::vector<std::thread> thread_pool_;
/// SyncQueue used to manage tasks.
std::unique_ptr<QueueImpl> queue_;
/// SyncQueue used to manage cancel tasks.
std::unique_ptr<QueueImpl> cancel_queue_;
};

} // namespace thread
Expand Down
37 changes: 30 additions & 7 deletions olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2021 HERE Europe B.V.
* Copyright (C) 2019-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,8 +74,9 @@ struct ComparePrioritizedTask {
}
};

void SetExecutorName(size_t idx) {
std::string thread_name = "OLPSDKPOOL_" + std::to_string(idx);
void SetExecutorName(std::string name) {
std::string thread_name = "OLPSDKPOOL_" + std::move(name);

SetCurrentThreadName(thread_name);
OLP_SDK_LOG_INFO_F(kLogTag, "Starting thread '%s'", thread_name.c_str());
}
Expand All @@ -97,13 +98,15 @@ class ThreadPoolTaskScheduler::QueueImpl {
};

ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count)
: queue_{std::make_unique<QueueImpl>()} {
thread_pool_.reserve(thread_count);
: queue_{std::make_unique<QueueImpl>()},
cancel_queue_{std::make_unique<QueueImpl>()} {
constexpr auto kCancelThreadsCount = 1;

thread_pool_.reserve(thread_count + kCancelThreadsCount);

for (size_t idx = 0; idx < thread_count; ++idx) {
std::thread executor([this, idx]() {
// Set thread name for easy profiling and debugging
SetExecutorName(idx);
SetExecutorName(std::to_string(idx));

for (;;) {
PrioritizedTask task;
Expand All @@ -116,10 +119,25 @@ ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count)

thread_pool_.push_back(std::move(executor));
}

for (size_t idx = 0; idx < kCancelThreadsCount; ++idx) {
thread_pool_.emplace_back([this, idx] {
SetExecutorName("CANCEL_" + std::to_string(idx));

for (;;) {
PrioritizedTask task;
if (!cancel_queue_->Pull(task)) {
return;
}
task.function();
}
});
}
}

ThreadPoolTaskScheduler::~ThreadPoolTaskScheduler() {
queue_->Close();
cancel_queue_->Close();
for (auto& thread : thread_pool_) {
thread.join();
}
Expand All @@ -130,6 +148,11 @@ void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func) {
queue_->Push({std::move(func), thread::NORMAL});
}

void ThreadPoolTaskScheduler::EnqueueCancelTask(
TaskScheduler::CallFuncType&& func) {
cancel_queue_->Push({std::move(func), thread::NORMAL});
}

void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func,
uint32_t priority) {
queue_->Push({std::move(func), priority});
Expand Down
4 changes: 2 additions & 2 deletions olp-cpp-sdk-dataservice-read/src/TaskSink.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2020-2021 HERE Europe B.V.
* Copyright (C) 2020-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ client::CancellationToken TaskSink::AddTask(
[=](client::ApiResponse<bool, client::ApiError>) { func(context); },
context);
AddTaskImpl(task, priority);
return task.CancelToken();
return task.CancelToken(task_scheduler_);
}

bool TaskSink::AddTaskImpl(client::TaskContext task, uint32_t priority) {
Expand Down
6 changes: 3 additions & 3 deletions olp-cpp-sdk-dataservice-read/src/TaskSink.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2020-2021 HERE Europe B.V.
* Copyright (C) 2020-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ class TaskSink {
auto context = client::TaskContext::Create(
std::move(task), std::move(callback), std::forward<Args>(args)...);
AddTaskImpl(context, priority);
return context.CancelToken();
return context.CancelToken(task_scheduler_);
}

template <typename Function, typename Callback, typename... Args>
Expand All @@ -64,7 +64,7 @@ class TaskSink {
if (!AddTaskImpl(context, priority)) {
return boost::none;
}
return context.CancelToken();
return context.CancelToken(task_scheduler_);
}

protected:
Expand Down