Skip to content

Commit

Permalink
Version 3.0 is on its way (#17)
Browse files Browse the repository at this point in the history
v3.0 complete
  • Loading branch information
benny-edlund authored Feb 3, 2023
1 parent 4a68c49 commit cf0740d
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 44 deletions.
36 changes: 23 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
# v3.0
Refined api after trials with various of allocator implementations.

* Require fully specified allocator type for pool definition.
* Allowing the pool to be defined using only the allocator template was convinient however ultimately unhelpful when rebinding allocators that require many template arguments. This required a interface change to the pool class and hence bumps the major version but there are no interface changes other then this so v2.0 code will be source compatible.
* Corrected detection of member function pointers used as task with lazy arguments.
* This was a blind spot in the test suite
* Allowing pipelines to be explicitly detatched
* Added some example applications

# v2.0
Refined version reducing overload set by binding allocator to the pool itself and adding pipe operator and future api for function composition

- [x] Remove allocator variants of task_pool::submit in favour of templating the task bool so that we do not need to pass allocators all the time.
- [x] Change external API wait_for_tasks to wait()
- [x] Pools should be future-like
- [x] Dropping support for callables with templated call operators
- [x] Reduced overload set of `submit` and `make_deferred_task`
- [x] Allow pipeline tasks to take allocators and stop_tokens
- [x] Changed `task_pool.h` to simply `pool.h`
- [x] API is now const and noexcept correct ( to the best of my mortal abilities )
* Remove allocator variants of task_pool::submit in favour of templating the task bool so that we do not need to pass allocators all the time.
* Change external API wait_for_tasks to wait()
* Pools should be future-like
* Dropping support for callables with templated call operators
* Reduced overload set of `submit` and `make_deferred_task`
* Allow pipeline tasks to take allocators and stop_tokens
* Changed `task_pool.h` to simply `pool.h`
* API is now const and noexcept correct ( to the best of my mortal abilities )
# v1.0
Initial version supporting full set of features

- [x] Allow tasks from function pointers, free and member
- [x] Allow cancelling tasks after started by passing token type
- [x] Allow using custom allocators to allocate future states
- [x] Allow using custom allocators to allocate tasks storage
- [x] Allow using custom allocators inside tasks perform allocations
* Allow tasks from function pointers, free and member
* Allow cancelling tasks after started by passing token type
* Allow using custom allocators to allocate future states
* Allow using custom allocators to allocate tasks storage
* Allow using custom allocators inside tasks perform allocations
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ include(${_project_options_SOURCE_DIR}/Index.cmake)
# Set the project name and language
project(
task_pool
VERSION 1.0.0
VERSION 3.0.0
DESCRIPTION ""
HOMEPAGE_URL "%%https://github.com/benny-edlund/task-pool%%"
LANGUAGES CXX C)
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ Describes build the library from source and it's various configuration options.
 
### [Usage tutorial](docs/tutorial.md)
Comprehensive description on how to use the features of this library to build asynchronous applications.

 
### [Examples](examples/README.md)
Example programs using the library for asynchronous tasking
34 changes: 23 additions & 11 deletions docs/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void process_frame( data& frame) {
}
}
```
Here we have a user defined type and we pass its `run` function pointer by value along with pointers to the instances we wish to execute the method on. Additionally a reference to some data structure is passed to the processors. With member functions as well as when passing data by references we need to make sure we are careful to observe the lifetime requirements of our dataset.
Here we have a user defined type and we pass its `run` function pointer by value along with pointers to the instances we wish to execute the method on. Additionally a reference to some data structure is passed to the processors. With member functions as well as when passing data by references we need to make sure we are careful to observe the lifetime requirements of our dataset. It is typically safer to build value orient pipelines in asynchronous applications when ever possible.
 
## Return values
Expand All @@ -76,7 +76,7 @@ auto result = pool.submit( task );
auto the_awnser = result.get();
```

When using `task_pool` it is only required to capture the futures of tasks that return values however using `std::future<void>`from functions that do not return values to check for task completions using its [`std::future::wait_for`](https://en.cppreference.com/w/cpp/thread/future/wait_for) API is still recommended as it is typically well optimized by compilers.
When using `task_pool` it is only required to capture the futures of tasks that return values however using `std::future<void>`from functions that do not return values to check for task completions using its [`std::future::wait_for`](https://en.cppreference.com/w/cpp/thread/future/wait_for) API is still recommended for a fork-join workflow and is typically well optimized by compilers.

```cpp
be::task_pool pool;
Expand Down Expand Up @@ -117,7 +117,7 @@ void do_work( work_data& data ) {
```
This example will compile however the task function will not operate on the work_data value referenced into the `do_work` function. This is because the underlying bind expression must copy the `work_data` value passed by reference to submit into the task storage and when executed the task will reference this data instead.
The solution may be to use a [std::reference_wrapper](https://en.cppreference.com/w/cpp/utility/functional/reference_wrapper) value to hold the reference to the origial `work_data`.
A solution can be to use a [std::reference_wrapper](https://en.cppreference.com/w/cpp/utility/functional/reference_wrapper) value to hold the reference to the origial `work_data` however life time rules must be carefully observed.
```cpp
void process_data( work_data& );
Expand Down Expand Up @@ -153,7 +153,7 @@ while( result.wait_for(0s) != std::future_status::ready) {
```
In this slightly contrived exampled some external api is used to generate and process `LargeData` objects supposedly in an asynchronous way that is out of our control.

Instead of using an entire thread to wait for `future_data` to be ready we can immediately submit this future with the data processor to the pool and it will use the wait time in the pool to check when the future is ready and start the processing function as promptly as possible.
Instead of using an entire thread to wait for `future_data` to be ready we can immediately submit this future with the data processor to the pool and it will use the wait time in the pool to check when the future is ready and dispatch the function as promptly as possible.

This works for all future-like objects. [^2]

Expand Down Expand Up @@ -243,11 +243,11 @@ int main()
}

```
The code using the standard api quickly becomes a lot more involved to read and verify as more lines need to change and more data dependencies need to be understood. The version using pipes in contrast features a single addition in the pipeline that is neatly diffirentiated from its previous iteration.
The code using the standard api quickly becomes a lot more involved to read as more lines have changed and more data dependencies need to be understood. The version using pipes in contrast features a single addition in the pipeline that is neatly differentiated from its previous iteration.

As `be::task_pool`s pipe implementation utilizes the lazy task arguments we read about previously to pass values between pipeline stages it also quite naturally falls into value oriented programing which is a much safer way to structure asynchronous programs. Functions used in such a pipeline can simply take and return inputs by value and `be::task_pool` will convert these into futures passed lazily between stages using `submit` by wrapping each stage into a dynamically generated type that suits the function, the `Pipe`.
As the pipe implementation utilizes the lazy task arguments we read about previously to pass values between pipeline stages it also quite naturally falls into value oriented programing which is a much safer way to structure asynchronous programs. Functions used in such a pipeline can simply take and return inputs by value and `be::task_pool` will convert these into futures passed lazily between stages using `submit` by wrapping each stage into a dynamically generated type that suits the function, the `Pipe`.

This type contains only a reference to the executing pool and the `Future` of the previous stage. When a new stage is added this future is moved into the task_pool as a lazy argument and as such the resulting temporaries require very little storage. There is only ever one future on the loose that can control the conclusion of the entire pipeline.
This type contains only a reference to the executing pool and the `Future` of the previous stage. When a new stage is added this future is moved into the task_pool as a lazy argument and as such the resulting temporaries require very little storage. There is only ever one future on the loose that can control the conclusion of the entire pipeline to to that point.

Pipe object that hold valid futures will call `Future::wait()` on destruction which means that uncaptured pipelines will safely block as if they where direct function calls while allowing cancellation.
```cpp
Expand All @@ -260,6 +260,18 @@ int main()
```
As the last binary operator concludes it leaves a temporary Pipe object that owns a valid future and since the destruction of this temporary must occur prior to executing the next expression we are guarenteed no dangling work is left over when returning.

If desired pipelines may be detached from the current stack scope by chaining on `be::detach` to end of a pipeline

```cpp
void queue_process( be::task_pool& pool, Data x )
{
pool | [data=std::move(x)]{ return data; } | log_data | api::process_data | be::detach;
}
```
Now calls to `queue_process` will not block until the pipeline is completed before returning. Assuming no data is needed to be return from `api::process_data` this would still be safe with regards to the `Data` variable passed into the queue function.
`Pipe` objects that are captured are also `future-like` objects and can as such be used as lazy arguments to other tasks.
```cpp
Expand Down Expand Up @@ -511,20 +523,20 @@ First lets assume the `beans` library defines some allocator type that we would

```cpp

beans::pool_allocator<> alloc(1'000'000);
be::task_pool_t<beans::pool_allocator> pool(alloc);
beans::pool_allocator<LargeData> alloc(1'000'000);
be::task_pool_t<beans::pool_allocator<LargeData>> pool(alloc);

```
Here we initialize our (fictional) allocator with a million beans and then declare that our pool will use this allocator by passing it as a template parameter to `be::task_pool_t` following by constructing our pool taking a reference to the allocator instance. This would likely only be necessary if the allocator is stateful or if it can not be default constructible.
The default allocator for `be::task_pool_t` is perhaps unsupricingly [`std::allocator`](https://en.cppreference.com/w/cpp/memory/allocator) and it for example can be default constructed and hence does not need to be passed to the constructor. In fact `be::task_pool` that we have been using so far is just a type alias for `be::task_pool_t<std::allocator>`
The default allocator for `be::task_pool_t` is perhaps unsupricingly [`std::allocator`](https://en.cppreference.com/w/cpp/memory/allocator) and it for example can be default constructed and hence does not need to be passed to the constructor. In fact `be::task_pool` that we have been using so far is just a type alias for `be::task_pool_t<std::allocator<void>>`
```cpp
struct LargeData; // lets picture some large data there
using Vector = std::vector<LargeData, beans::pool_allocator<LargeData>>;
using Vector = std::vector<LargeData, beans::pool_allocator<LargeData>;
auto make_data = [](std::allocator_arg_t, Allocator const& alloc, std::size_t x) {
return Vector( x, alloc );
}
Expand Down
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Task pool examples
As most examples in tests and the documentation are rather contrived these examples here are provided to show task-pool workflows in real applications that actually do something real.
As most examples in tests and the documentation are rather contrived these examples here are provided to show task-pool workflows in real applications that actually do something.

## Building
First build the task-pool and install to some temporary location
First build task-pool and install to some temporary location

```bash
git clone https://github.com/benny-edlund/task-pool.git
Expand Down
3 changes: 3 additions & 0 deletions examples/image_processing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ find_package(CURL CONFIG REQUIRED)
find_package(libjpeg-turbo CONFIG REQUIRED)

add_executable(example_img main.cpp)
if (UNIX)
target_link_libraries(example_img PRIVATE pthread )
endif()
target_link_libraries(example_img PRIVATE task_pool::task_pool )
target_link_libraries(example_img PRIVATE fmt::fmt)
target_link_libraries(example_img PRIVATE libjpeg-turbo::turbojpeg-static)
Expand Down
1 change: 1 addition & 0 deletions examples/webserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ find_package(CLI11 CONFIG REQUIRED)
find_package(Boost CONFIG REQUIRED)

add_executable(example_http main.cpp server.cpp)
target_link_libraries(example_http PRIVATE pthread )
target_link_libraries(example_http PRIVATE task_pool::task_pool )
target_link_libraries(example_http PRIVATE Boost::headers )
target_link_libraries(example_http PRIVATE fmt::fmt)
Expand Down
14 changes: 7 additions & 7 deletions examples/webserver/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ void tcp_server::serve_forever()
{
// blocking call on main
auto socket = accept_connection();
// offload response to thread
auto work = m_pool | [=] { return socket; }
| &receive_data
| &parse_request
| &send_response;
// pipes block on destroy so last job we manually submit and disgard the future
auto end = m_pool.submit( &close_connection, std::move( work ) );
// offload response to detached pipeline
m_pool | [=] { return socket; }
| &receive_data
| &parse_request
| &send_response
| &close_connection
| be::detach;
}
}
// clang-format on
Expand Down
27 changes: 25 additions & 2 deletions lib/public/task_pool/pipes.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

namespace be {

struct detach_t
{
template< typename Pipe, std::enable_if_t< is_pipe< Pipe >::value, bool > = true >
auto consume_future( Pipe& pipe ) const noexcept
{
auto future = std::move( pipe.future_ );
return future;
}
};
static detach_t detach{}; // NOLINT

template< typename Allocator, typename Func, typename... Args >
auto make_pipe( be::task_pool_t< Allocator >& pool, Func&& func, Args&&... args )
{
Expand Down Expand Up @@ -56,16 +67,28 @@ auto make_pipe( be::task_pool_t< Allocator >& pool, Func&& func, Args&&... args

template< typename TaskPool,
typename Func,
std::enable_if_t< is_pool< TaskPool >::value, bool > = true >
std::enable_if_t< is_pool< TaskPool >::value &&
!std::is_same< std::decay_t< Func >, be::detach_t >::value,
bool > = true >
auto operator|( TaskPool& pool, Func&& f )
{
return make_pipe( pool, std::forward< Func >( f ) );
}

template< typename Pipe, typename Func, std::enable_if_t< is_pipe< Pipe >::value, bool > = true >
template< typename Pipe,
typename Func,
std::enable_if_t< is_pipe< Pipe >::value &&
!std::is_same< std::decay_t< Func >, be::detach_t >::value,
bool > = true >
auto operator|( Pipe&& p, Func&& f )
{
return make_pipe( p.pool_, std::forward< Func >( f ), std::move( p.future_ ) );
}

template< typename Pipe, std::enable_if_t< is_pipe< Pipe >::value, bool > = true >
void operator|( Pipe&& p, detach_t const& x )
{
x.consume_future( p );
}

} // namespace be
11 changes: 3 additions & 8 deletions lib/public/task_pool/traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,16 +488,11 @@ struct is_pipe< T, be_void_t< pipe_api::pool_t< T >, pipe_api::future_t< T > > >
template< typename T,
typename Allocator,
std::enable_if_t< !std::is_void< T >::value, bool > = true >
auto rebind_alloc( Allocator const& x )
{
return typename std::allocator_traits< Allocator >::template rebind_alloc< T >( x );
}
auto rebind_alloc( Allocator const& x ) ->
typename std::allocator_traits< Allocator >::template rebind_alloc< T >;

template< typename T,
typename Allocator,
std::enable_if_t< std::is_void< T >::value, bool > = true >
auto rebind_alloc( Allocator const& x )
{
return Allocator( x );
}
auto rebind_alloc( Allocator const& x ) -> Allocator;
} // namespace be
65 changes: 65 additions & 0 deletions test/tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,50 @@ TEST_CASE( "submit( f, future )->void throws", "[task_pool][submit][throws]" )
REQUIRE_THROWS_AS( result.get(), test_exception );
}

struct test_processor
{
int run( int value ) const noexcept // NOLINT
{
return value;
}
};
TEST_CASE( "submit( f(member), instance, future )->int ", "[task_pool][submit]" )
{
const int expected = 42;
be::task_pool pool( 1 );
auto fun_a = []( int x ) -> int { return x; };
std::future< int > future = pool.submit( fun_a, expected );
test_processor instance;
std::future< int > result = pool.submit( &test_processor::run, &instance, std::move( future ) );
REQUIRE( result.get() == expected );
}
TEST_CASE( "submit( f(member), instance, future )->int throws ", "[task_pool][submit][throws]" )
{
const int expected = 42;
be::task_pool pool( 1 );
auto fun_a = []( int /*x*/ ) -> int { throw test_exception{}; };
std::future< int > future = pool.submit( fun_a, expected );
test_processor instance;
std::future< int > result = pool.submit( &test_processor::run, &instance, std::move( future ) );
REQUIRE_THROWS_AS( result.get(), test_exception );
}

void func_run_( int value, std::atomic_bool& called )
{
called = value != 0;
}
TEST_CASE( "submit( f(free func), instance, future )->int ", "[task_pool][submit]" )
{
std::atomic_bool called{ false };
const int expected = 42;
be::task_pool pool( 1 );
auto fun_a = []( int x ) -> int { return x; };
std::future< int > future = pool.submit( fun_a, expected );
std::future< void > result = pool.submit( &func_run_, std::move( future ), std::ref( called ) );
REQUIRE_NOTHROW( result.get() );
REQUIRE( called );
}

TEST_CASE( "submit( f, future )->int throws", "[task_pool][submit][throws]" )
{
const int expected = 42;
Expand Down Expand Up @@ -2034,6 +2078,27 @@ TEST_CASE( "pipe with allocator and stop_token", "[pipe][allocator][stop_token]"
REQUIRE( called );
}

TEST_CASE( "detach pipelines", "[pipe]" )
{
be::task_pool pool;

std::atomic_bool called{ false };
auto first = [] {
std::this_thread::sleep_for( 1us );
return 1;
};

auto second = [&]( std::allocator_arg_t /*x*/,
std::allocator< int > const& /*alloc*/,
int /*value*/,
be::stop_token /*token*/ ) { called = true; }; // NOLINT
{
pool | first | second | be::detach;
}
pool.wait();
REQUIRE( called );
}

//
// Task pools are futures!
//
Expand Down

0 comments on commit cf0740d

Please sign in to comment.