From 3471d5ee18da71147605797072af678d96deafb4 Mon Sep 17 00:00:00 2001 From: Andrey Kashcheev Date: Wed, 8 Jun 2022 14:49:59 +0200 Subject: [PATCH] Separate thread for cancellation tasks This commit introduces separate thread specifically for the cancellation tasks. By doing so we may speed up final callback execution as task won't wait for its turn in the main busy queue and instead will be executed on the separate usually free thread. This is a quite bold move as we always allocate one additional thread regardless of user-specified total threads number. Relates-To: OLPSUP-17825 Signed-off-by: Andrey Kashcheev --- .../include/olp/core/client/TaskContext.h | 88 +++++++++++++++++-- .../include/olp/core/thread/TaskScheduler.h | 29 +++++- .../olp/core/thread/ThreadPoolTaskScheduler.h | 16 +++- .../src/thread/ThreadPoolTaskScheduler.cpp | 37 ++++++-- olp-cpp-sdk-dataservice-read/src/TaskSink.cpp | 4 +- olp-cpp-sdk-dataservice-read/src/TaskSink.h | 6 +- 6 files changed, 159 insertions(+), 21 deletions(-) diff --git a/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h b/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h index 56a098218..6fe498342 100644 --- a/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h +++ b/olp-cpp-sdk-core/include/olp/core/client/TaskContext.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ #include #include #include +#include namespace olp { namespace client { @@ -89,6 +90,18 @@ class CORE_API TaskContext { */ client::CancellationToken CancelToken() const { return impl_->CancelToken(); } + /** + * @brief Provides a token which will schedule cancellation of the task. + * + * @param scheduler The task scheduler instance. + * + * @return The `CancellationToken` instance. + */ + client::CancellationToken CancelToken( + const std::shared_ptr& scheduler) const { + return impl_->CancelToken(scheduler); + } + /** * @brief Checks whether the values of the `TaskContext` parameter are * the same as the values of the `other` parameter. @@ -156,6 +169,19 @@ class CORE_API TaskContext { * @return The `CancellationToken` instance. */ virtual client::CancellationToken CancelToken() = 0; + + /** + * @brief Provides a token which will schedule cancellation of the task. + * + * @param scheduler The task scheduler instance. + * + * @return The `CancellationToken` instance. + */ + virtual client::CancellationToken CancelToken( + const std::shared_ptr& scheduler) { + OLP_SDK_CORE_UNUSED(scheduler); + return CancelToken(); + } }; /** @@ -164,10 +190,12 @@ class CORE_API TaskContext { * Erases the type of the `Result` object produced by the `ExecuteFunc` * function and passes it to the `UserCallback` instance. * - * @tparam T The result type. + * @tparam Response The result type. */ template - class TaskContextImpl : public Impl { + class TaskContextImpl + : public Impl, + public std::enable_shared_from_this> { public: /// The task that produces the `Response` instance. using ExecuteFunc = std::function; @@ -189,7 +217,7 @@ class CORE_API TaskContext { context_(std::move(context)), state_{State::PENDING} {} - ~TaskContextImpl() override{}; + ~TaskContextImpl() override = default; /** * @brief Checks for the cancellation, executes the task, and calls @@ -213,8 +241,7 @@ class CORE_API TaskContext { callback = std::move(callback_); } - Response user_response = - client::ApiError(client::ErrorCode::Cancelled, "Cancelled"); + Response user_response = client::ApiError::Cancelled(); if (function && !context_.IsCancelled()) { auto response = function(context_); @@ -234,7 +261,7 @@ class CORE_API TaskContext { callback(std::move(user_response)); } - // Resources need to be released before the notification, else lambas + // Resources need to be released before the notification, else lambdas // would have captured resources like network or `TaskScheduler`. function = nullptr; callback = nullptr; @@ -280,6 +307,53 @@ class CORE_API TaskContext { [context]() mutable { context.CancelOperation(); }); } + /** + * @brief Provides a token which will schedule cancellation of the task. + * + * @param scheduler The task scheduler instance. + * + * @return The `CancellationToken` instance. + */ + client::CancellationToken CancelToken( + const std::shared_ptr& scheduler) override { + auto task_context = this->shared_from_this(); + return client::CancellationToken([scheduler, task_context]() mutable { + if (scheduler) { + scheduler->ScheduleCancelTask([=] { task_context->Cancel(); }); + } else { + task_context->Cancel(); + } + }); + } + + /** + * @brief Cancels the operation and calls the callback with a 'Cancelled' + * error. + */ + void Cancel() { + context_.CancelOperation(); + + // Checks whether TaskContext has been executed + if (state_.load() == State::COMPLETED) { + return; + } + + // If it hasn't -> mark as completed and proceed with user callback + state_.store(State::COMPLETED); + + UserCallback callback{}; + + { + std::lock_guard lock(mutex_); + callback = std::move(callback_); + execute_func_ = nullptr; + } + + if (callback) { + callback(ApiError::Cancelled()); + } + } + /** * @brief Indicates the state of the request. */ diff --git a/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h b/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h index 8190cadc7..d9561eb84 100644 --- a/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h +++ b/olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,18 @@ class CORE_API TaskScheduler { EnqueueTask(std::move(func), priority); } + /** + * @brief Schedules the asynchronous cancellation task. + * + * @note Tasks added with this method has Priority::NORMAL priority. + * + * @param[in] func The callable target that should be added to the scheduling + * pipeline. + */ + void ScheduleCancelTask(CallFuncType&& func) { + EnqueueCancelTask(std::move(func)); + } + /** * @brief Schedules the asynchronous cancellable task. * @@ -136,6 +148,21 @@ class CORE_API TaskScheduler { OLP_SDK_CORE_UNUSED(priority); EnqueueTask(std::forward(func)); } + + /** + * @brief The enqueue cancellation task interface that is implemented by + * the subclass. + * + * Implement this method in the subclass that takes `TaskScheduler` + * as a base and provides a custom algorithm for scheduling tasks + * enqueued by the SDK. + * + * @note Tasks added trough this method should be scheduled with + * Priority::NORMAL priority. + */ + virtual void EnqueueCancelTask(CallFuncType&& func) { + EnqueueTask(std::forward(func)); + } }; /** diff --git a/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h b/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h index 078b2b951..409a20f79 100644 --- a/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h +++ b/olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,18 @@ class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler { void EnqueueTask(TaskScheduler::CallFuncType&& func, uint32_t priority) override; + /** + * @brief Overrides the base class method to enqueue cancellation tasks and + * execute them on the next free thread from the thread pool. + * + * @note Tasks added with this method has Priority::NORMAL priority. + * + * @param func The rvalue reference of the task that should be enqueued. + * Move this task into your queue. No internal references are + * kept. Once this method is called, you own the task. + */ + void EnqueueCancelTask(TaskScheduler::CallFuncType&& func) override; + private: class QueueImpl; @@ -88,6 +100,8 @@ class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler { std::vector thread_pool_; /// SyncQueue used to manage tasks. std::unique_ptr queue_; + /// SyncQueue used to manage cancel tasks. + std::unique_ptr cancel_queue_; }; } // namespace thread diff --git a/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp b/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp index d8a915990..282c97be8 100644 --- a/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp +++ b/olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2021 HERE Europe B.V. + * Copyright (C) 2019-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,8 +74,9 @@ struct ComparePrioritizedTask { } }; -void SetExecutorName(size_t idx) { - std::string thread_name = "OLPSDKPOOL_" + std::to_string(idx); +void SetExecutorName(std::string name) { + std::string thread_name = "OLPSDKPOOL_" + std::move(name); + SetCurrentThreadName(thread_name); OLP_SDK_LOG_INFO_F(kLogTag, "Starting thread '%s'", thread_name.c_str()); } @@ -97,13 +98,15 @@ class ThreadPoolTaskScheduler::QueueImpl { }; ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count) - : queue_{std::make_unique()} { - thread_pool_.reserve(thread_count); + : queue_{std::make_unique()}, + cancel_queue_{std::make_unique()} { + constexpr auto kCancelThreadsCount = 1; + + thread_pool_.reserve(thread_count + kCancelThreadsCount); for (size_t idx = 0; idx < thread_count; ++idx) { std::thread executor([this, idx]() { - // Set thread name for easy profiling and debugging - SetExecutorName(idx); + SetExecutorName(std::to_string(idx)); for (;;) { PrioritizedTask task; @@ -116,10 +119,25 @@ ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count) thread_pool_.push_back(std::move(executor)); } + + for (size_t idx = 0; idx < kCancelThreadsCount; ++idx) { + thread_pool_.emplace_back([this, idx] { + SetExecutorName("CANCEL_" + std::to_string(idx)); + + for (;;) { + PrioritizedTask task; + if (!cancel_queue_->Pull(task)) { + return; + } + task.function(); + } + }); + } } ThreadPoolTaskScheduler::~ThreadPoolTaskScheduler() { queue_->Close(); + cancel_queue_->Close(); for (auto& thread : thread_pool_) { thread.join(); } @@ -130,6 +148,11 @@ void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func) { queue_->Push({std::move(func), thread::NORMAL}); } +void ThreadPoolTaskScheduler::EnqueueCancelTask( + TaskScheduler::CallFuncType&& func) { + cancel_queue_->Push({std::move(func), thread::NORMAL}); +} + void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func, uint32_t priority) { queue_->Push({std::move(func), priority}); diff --git a/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp b/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp index 91d46200c..e8e7dc811 100644 --- a/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp +++ b/olp-cpp-sdk-dataservice-read/src/TaskSink.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2020-2021 HERE Europe B.V. + * Copyright (C) 2020-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ client::CancellationToken TaskSink::AddTask( [=](client::ApiResponse) { func(context); }, context); AddTaskImpl(task, priority); - return task.CancelToken(); + return task.CancelToken(task_scheduler_); } bool TaskSink::AddTaskImpl(client::TaskContext task, uint32_t priority) { diff --git a/olp-cpp-sdk-dataservice-read/src/TaskSink.h b/olp-cpp-sdk-dataservice-read/src/TaskSink.h index ed8b65df9..b5a74ac8d 100644 --- a/olp-cpp-sdk-dataservice-read/src/TaskSink.h +++ b/olp-cpp-sdk-dataservice-read/src/TaskSink.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2020-2021 HERE Europe B.V. + * Copyright (C) 2020-2022 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ class TaskSink { auto context = client::TaskContext::Create( std::move(task), std::move(callback), std::forward(args)...); AddTaskImpl(context, priority); - return context.CancelToken(); + return context.CancelToken(task_scheduler_); } template @@ -64,7 +64,7 @@ class TaskSink { if (!AddTaskImpl(context, priority)) { return boost::none; } - return context.CancelToken(); + return context.CancelToken(task_scheduler_); } protected: