Skip to content

Using Multiple Streams

Matt Norman edited this page Mar 24, 2023 · 9 revisions

Accelerator devices typically have a concept of a "stream" or "queue", where work within a stream is performed in order, but work between different streams is not performed in order. This provides a means of overlapping multiple kernels on the same device. One situation where this can be beneficial is if each kernel few enough threads per stream that it cannot span the entire accelerator device but contains a fairly large amount of work. Executing this is parallel streams will enable more of the device to be used at a time.

Coding Multiple Streams with YAKL

YAKL exposes streams through a fairly simple API, and all asynchronous kernels, functions, and intrinsics accept an optional stream parameter.

First, to create a YAKL stream, you use the yakl::create_stream() function:

yakl::Stream my_stream = yakl::create_stream();

This object can be passed as an optional parameter to functions that are asynchronous, e.g.:

array1.deep_copy_to(array2,my_stream);
yakl::intrinsics::sum(array2,my_stream);

You can place a barrier until all work in a stream is completed by using the stream object's fence() method, and you can test whether all previously queued work in the stream is completed with the completed() method:

my_stream.fence();
if (my_stream.completed()) { // Work that depends on completion of my_stream's work }

All stream objects default to the "default stream" when the stream is created. Unless the user calls stream_object.create() or gets the stream from yakl::create_stream(), the stream is the "default stream". You can test whether a given object is the default stream with my_stream.is_default_stream().

Launching parallel_for and parallel_outer in a stream

To perform a parallel_for or parallel_outer launch in a given stream, you need to pass the optional LaunchConfig parameter. The most convenient way to do this is yakl::DefaultLaunchConfig().set_stream(my_stream). E.g.:

yakl::c::parallel_for( YAKL_AUTO_LABEL() , yakl::Bounds<2>(nx,ny) ,
                       YAKL_LAMBDA (int i, int j) {
  // Some work
} , yakl::DefaultLanchConfig.set_stream(my_stream) );

Stream Events

There are times when you need to have streams synchronize on certain parts of the workflow. This is where Event objects come in. You can create an Event object with yakl::record_event(Stream):

yakl::Event my_event = yakl::record_event(my_stream);

You can barrier host work until the event is completed with my_event.fence(), test whether it's completed with my_event.completed(), and tell a stream to barrier future work in that stream until the event is completed with my_stream2.wait_on_event(my_event);. If you have an event object, and you want to record a new event, overwriting the old event, you can use event.record().

Getting the underlying backend types

You can grab the underlying CUDA, HIP, SYCL, etc. stream and event type with my_stream.get_real_stream() and my_event.get_real_event().

Compiling to enable multiple streams

You need to pass -DYAKL_ENABLE_STREAMS to the compiler flags CPP macro definitions to enable multiple streams. If you do not pass this flag, all streams will be the default stream, even if you call yakl::create_stream(). The reason is that there are areas where performance can degrade when streams are enabled, such as non-streamed operators like setting an array equal to a scalar value. Once streams are enabled, my_array = 5; will have yakl::fence(); before and after the operation to guarantee correctness with existing streams because operator overloads cannot use a stream parameter and must be performed in the default stream.

IMPORTANT: Pool allocator and streams

When using streams with the pool allocator enabled (which it is by default), you need to remember that the pool allocator is non-blocking and shared among all streams. Therefore, if you allocate and deallocate during runtime while multiple streams are active, you may run into pointer aliasing issues between your streams. For instance, the following sequence can cause a problem:

  • Launch kernel that uses array a in stream1
  • Deallocate a
  • Allocate b -- At this point, b might have the same pointer as a
  • Launch kernel that uses b in stream2

The kernel in stream2 can run at the same time as the kernel in stream1, and b can have the same or overlapping underlying pointer as a. This means you can have a race condition between the two kernels. To overcome this possibility, if you allocate and deallocate during runtime with multiple streams active, you need to register which streams a given array is used in before you deallocate it. You can do this after the variable is created with a.add_stream_dependency(stream1);. This way, a won't be released from the pool until all work in stream1 is completed up to the point that you deallocate a. Therefore, its pointer cannot alias with another variable until its actually done being used.

auto stream1 = yakl::create_stream();
auto stream2 = yakl::create_stream();
yakl::Array<float,1> a("a",1024);
yakl::parallel_for( YAKL_AUTO_LABEL() , nx , YAKL_LAMBDA (int i) { a(i) = 1.; } ,
                    yakl::DefaultLaunchConfig().set_stream(stream1) );
// Tells YAKL that whenever "a" is deallocated, wait until existing work in stream1 completes
//    before releasing its pointer from the pool.
a.add_stream_dependency( stream1 );
// A separate event is created in each dependent stream at this deallocation point. The pointer
//    will not actually be released from the pool until all of those created events complete.
//    Therefore, the data "a" points to cannot be aliased until all previous streams using "a"
//    complete.
a.deallocate();
// We can now safely allocate "b" because we know it cannot alias any part of "a" while "a"
//    is still being used.
yakl::Array<float,1> b("b",1024);
yakl::parallel_for( YAKL_AUTO_LABEL() , nx , YAKL_LAMBDA (int i) { b(i) = 2.; } ,
                    yakl::DefaultLaunchConfig().set_stream(stream2) );
// Similarly, we don't want "b" aliased until it's done being used.
b.add_stream_dependency(stream2);
// ...

In the previous example without the line a.add_stream_dependency(stream1), the result after the second kernel is indeterminant due to a race condition due to b possibly aliasing a.

Clone this wiki locally