An easy-to-use and efficient C++ thread pool that supports task priorities, statistics collection, and task submission to specific threads.
- Task Priority - Support for tasks with different priorities
- Personal Task Queue - Support for assigning tasks to specific threads
- Statistics Collection - Detailed task execution statistics and execution time monitoring
- Dynamic Management - Support for adjusting thread count and maximum queue size at runtime
- Log Tracking - Customizable logging functionality
- Clone this repository to your local machine:
git clone https://github.com/moehoshio/threadPool.hpp.git
- Copy the contents of the
threadPool.hpp/include
folder into your project'sinclude
directory. - Add the following include directive in your source file:
#include "neko/core/threadPool.hpp"
#include "neko/core/threadPool.hpp"
#include <iostream>
int main() {
// Create thread pool (uses hardware thread count by default)
neko::core::thread::ThreadPool pool(2);
// Submit simple task
auto future = pool.submit([]() {
return 42;
});
// Get result
std::cout << "Result: " << future.get() << std::endl;
return 0;
}
#include "neko/core/threadPool.hpp"
#include <iostream>
int add(int a, int b) {
return a + b;
}
int main() {
neko::core::thread::ThreadPool pool;
// Submit function with parameters
auto future1 = pool.submit(add, 10, 20);
// Submit lambda expression
auto future2 = pool.submit([](int x, int y) {
return x * y;
}, 5, 6);
std::cout << "Addition result: " << future1.get() << std::endl; // 30
std::cout << "Multiplication result: " << future2.get() << std::endl; // 30
return 0;
}
#include "neko/core/threadPool.hpp"
#include "neko/schema/types.hpp"
#include <iostream>
int main() {
neko::core::thread::ThreadPool pool;
// High priority task
auto highPriority = pool.submitWithPriority(
neko::Priority::High,
[](){}
);
// Low priority task
auto lowPriority = pool.submitWithPriority(
neko::Priority::Low,
[](){}
);
}
#include "neko/core/threadPool.hpp"
#include <iostream>
int main() {
neko::core::thread::ThreadPool pool(4);
// Get available thread IDs
auto workerIds = pool.getWorkerIds();
if (!workerIds.empty()) {
auto targetThreadId = workerIds[0]; // Assign to the first available thread
pool.submitToWorker(targetThreadId, []() {
std::cout << "Task executing on specific thread\n";
});
// Submit to each worker thread
for (const auto & id : workerIds) {
auto future = pool.submitToWorker(id, [id]() {
std::cout << "Task executing on thread " << id << "\n";
});
}
}
return 0;
}
Note: Assigning to a non-existent thread ID will throw a neko::ex::OutOfRange
exception.
#include "neko/core/threadPool.hpp"
#include <chrono>
#include <iostream>
int main() {
neko::core::thread::ThreadPool pool;
// Submit multiple tasks
for (int i = 0; i < 10; ++i) {
pool.submit([i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Task " << i << " completed\n";
});
}
// Wait for all tasks to complete
pool.waitForAllTasksCompletion();
std::cout << "All tasks completed\n";
// Or use timeout waiting
bool completed = pool.waitForAllTasksCompletion(std::chrono::seconds(5));
if (completed) {
std::cout << "Tasks completed within 5 seconds\n";
} else {
std::cout << "Tasks not completed within 5 seconds\n";
}
return 0;
}
#include "neko/core/threadPool.hpp"
#include <iostream>
int main() {
neko::core::thread::ThreadPool pool;
// Submit some tasks
for (int i = 0; i < 100; ++i) {
pool.submit([i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return i * i;
});
}
// Wait for completion
pool.waitForAllTasksCompletion();
// Get statistics information
auto stats = pool.getTaskStats();
std::cout << "Completed tasks: " << stats.completedTasks << std::endl;
std::cout << "Failed tasks: " << stats.failedTasks << std::endl;
std::cout << "Total execution time: " << stats.totalExecutionTime.count() << "ms\n";
std::cout << "Max execution time: " << stats.maxExecutionTime.count() << "ms\n";
std::cout << "Average execution time: " << stats.avgExecutionTime.count() << "ms\n";
return 0;
}
#include "neko/core/threadPool.hpp"
#include <iostream>
#include <thread>
int main() {
neko::core::thread::ThreadPool pool(4);
// Submit long-running tasks
for (int i = 0; i < 10; ++i) {
pool.submit([]() {
std::this_thread::sleep_for(std::chrono::seconds(2));
});
}
// Monitor thread pool status
for (int i = 0; i < 5; ++i) {
std::cout << "Queue utilization: " << pool.getQueueUtilization() * 100 << "%\n";
std::cout << "Thread utilization: " << pool.getThreadUtilization() * 100 << "%\n";
std::cout << "Pending tasks: " << pool.getPendingTaskCount() << std::endl;
std::cout << "---\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return 0;
}
neko::core::thread::ThreadPool pool;
// Set maximum queue size to 1000
pool.setMaxQueueSize(1000);
// Check if queue is full
if (pool.isQueueFull()) {
std::cout << "Queue is full, cannot submit more tasks\n";
}
neko::core::thread::ThreadPool pool(2);
std::cout << "Initial thread count: " << pool.getThreadCount() << std::endl;
// Increase threads
pool.setThreadCount(8);
std::cout << "Adjusted thread count: " << pool.getThreadCount() << std::endl;
// Decrease threads
pool.setThreadCount(4);
std::cout << "Final thread count: " << pool.getThreadCount() << std::endl;
neko::core::thread::ThreadPool pool;
// Set logger function
pool.setLogger([](const std::string& message) {
std::cout << "[ThreadPool] " << message << std::endl;
});
// Now the thread pool will log important events
neko::core::thread::ThreadPool pool;
// Disable statistics (improve performance)
pool.enableStatistics(false);
// Re-enable statistics
pool.enableStatistics(true);
// Reset statistics data
pool.resetStats();
#include "neko/core/threadPool.hpp"
#include "neko/schema/exception.hpp"
#include <iostream>
int main() {
try {
neko::core::thread::ThreadPool pool;
// Set small queue size
pool.setMaxQueueSize(2);
// Try to submit more tasks than queue size
for (int i = 0; i < 5; ++i) {
try {
pool.submit([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
});
std::cout << "Task " << i << " submitted\n";
} catch (const ex::TaskRejected& e) {
std::cout << "Task " << i << " rejected: " << e.what() << std::endl;
}
}
} catch (const std::exception& e) {
std::cout << "Error: " << e.what() << std::endl;
}
return 0;
}
Method | Description |
---|---|
submit(function, args...) |
Submit normal priority task |
submitWithPriority(priority, function, args...) |
Submit task with specified priority |
submitToWorker(workerId, function, args...) |
Submit task to specific thread |
waitForTasksEmpty() |
Wait for queue to be empty |
waitForAllTasksCompletion() |
Wait for all tasks to complete |
stop(waitForCompletion) |
Stop thread pool |
Method | Description |
---|---|
setThreadCount(count) |
Set thread count |
setMaxQueueSize(size) |
Set maximum queue size |
enableStatistics(enable) |
Enable/disable statistics |
setLogger(loggerFunc) |
Set logger function |
resetStats() |
Reset statistics data |
Method | Description |
---|---|
getThreadCount() |
Get thread count |
getPendingTaskCount() |
Get pending task count |
getTaskStats() |
Get task statistics |
getQueueUtilization() |
Get queue utilization |
getThreadUtilization() |
Get thread utilization |
isEmpty() |
Check if no tasks |
isQueueFull() |
Check if queue is full |
License MIT OR Apache-2.0