Skip to content

Commit

Permalink
make async_commit_queue scope annotation, rather than intrinsic
Browse files Browse the repository at this point in the history
  • Loading branch information
masahi committed Jun 27, 2022
1 parent c12d395 commit 9f80982
Showing 1 changed file with 86 additions and 90 deletions.
176 changes: 86 additions & 90 deletions rfcs/0077-async-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
- Start Date: (2022-06-17)

# Summary
This RFC proposes two TIR intrinsics and an additional annotation to the TIR software pipeline transform, to express asynchrony **within the device code**.
This RFC proposes TIR constructs for invoking and synchronizing asynchronous operations, to express asynchrony **within the device code**.
Building on the propposed constructs, we introduce "asynchronous stage" in the TIR software pipeline.
Asynchrony is prevalent on the host (runtime) side, and this proposal is the first step toward bringing the notion of an asynchronous operation in the
generated code.

The most important component we should agree on is the model of synchronization: Coming up with a design that is general enough to be useful for diverse backends, while making sure that the chosen design can be translated to a low-level synchronization model of a particular backend, is highly non-trivial.
The approach described in this document is motivated by a use case for NVIDIA GPUs, but we took some cares so that the design can be adopted by other backends. For example, if a backend has an asynchronous DMA engine, vector and tensor unit, we can specify that each of them runs asynchronously in different stages in a pipeline, with necessary synchronization between them.

The proposed model may have diverged from conventional ones, but we believe that this is a good fit for the TIR software pipeline specifically.

# Asynchronous stage in a software pipeline

### Background: What is a software pipeline, and what does the TIR software pipeline transform do?
Expand Down Expand Up @@ -76,21 +75,21 @@ sch.annotate(i, "software_pipeline_stage", [0, 1])
sch.annotate(i, "software_pipeline_async_stages", [0])
```

we generate the following IR. An asynchronous block is decorated with the `async_scope` attribute, and two intrinsics are inserted to express synchronization.
we generate the following IR. An asynchronous block is decorated with the `async_scope` attribute, and further enclosed in the scope `async_commit_queue(0)`. An intrinsic `async_wait_queue(0, 1)` is inserted to express synchronization.

```python
B = alloc([2])

# Prologue
async_scope:
B[0] = A[0]
async_commit_queue(0)
async_commit_queue(0):
async_scope:
B[0] = A[0]

# Body
for i in range(15):
async_scope:
B[(i + 1) % 2] = A[i] + 1
async_commit_queue(0)
async_commit_queue(0):
async_scope:
B[(i + 1) % 2] = A[i] + 1

async_wait_queue(0, 1)
C[i] = B[i % 2] + 1
Expand All @@ -100,20 +99,19 @@ async_wait_queue(0, 0)
C[15] = B[1] + 1
```

The proposed intrinsics are intentionally more general / abstract than what's needed by the TIR software pipeline, in the hope that
The proposed async constructs are intentionally more general / abstract than what's needed by the TIR software pipeline, in the hope that
they would find their uses in more general settings. In particular, synchronization is done in terms of "queue": It is an abstract entity
associated with each asynchronous unit, and it tracks invocations and completions of asynchronous operations in the FIFO order.

**Semantics of the proposed intrinsics**
- `async_commit_queue(i)` : Group one or more invocations of async operations, and “commit”(or push) them to the queue `i`. The exact interpretation of “committing” can be up to each backend, but informally it signifies that a group of async operations are now in-flight. A group of operations committed together is awaited as one chunk, and thus they constitute the granularity at which the synchronization intrinsic discussed next operates on. Groups
committed to the same queue complete in the FIFO order.
**Semantics of the proposed constructs**
- `async_commit_queue(i)` scope annotation: Group one or more invocations of async operations in the given scope, and “commit”(or push) them to the queue `i`. The exact interpretation of “committing” can be up to each backend, but informally it signifies that a group of async operations are now in-flight. A group of operations committed together is awaited as one chunk, and thus they constitute the granularity at which the synchronization intrinsic discussed next operates on. Groups committed to the same queue complete in the FIFO order.

- `async_wait_queue(i, N)` : Block until only `N` **most recent** committed groups are still in-flight in the queue `i` . In other words, if there are `M` committed groups in-flight in the queue `i`, at the invocation of `async_wait_queue(i, N)`, `M - N` oldest committed groups would be forced to complete. `N` doesn’t have to be a constant, but some backends may require a constant count (e.g. PTX)
- `async_wait_queue(i, N)` intrinsic: Block until only `N` **most recent** committed groups are still in-flight in the queue `i` . In other words, if there are `M` committed groups in-flight in the queue `i`, at the invocation of `async_wait_queue(i, N)`, `M - N` oldest committed groups would be forced to complete. `N` doesn’t have to be a constant, but some backends may require a constant count (e.g. PTX)

The two intrinsics are inspired by the corresponding async data movement instructions in CUDA (PTX): [`cp.async.commit_group`](https://docs.nvidia.com/cuda/parallel-thread-execution/index.html#data-movement-and-conversion-instructions-cp-async-commit-group) and [`cp.async.wait_group`](https://docs.nvidia.com/cuda/parallel-thread-execution/index.html#data-movement-and-conversion-instructions-cp-async-wait-group).
They are inspired by the corresponding async data movement instructions in CUDA (PTX): [`cp.async.commit_group`](https://docs.nvidia.com/cuda/parallel-thread-execution/index.html#data-movement-and-conversion-instructions-cp-async-commit-group) and [`cp.async.wait_group`](https://docs.nvidia.com/cuda/parallel-thread-execution/index.html#data-movement-and-conversion-instructions-cp-async-wait-group).
The CUDA counterparts do not have the notion of “queue”, since there is only one kind of async operation (copy from global to shared memory) supported by the current generation of NVIDIA GPU (Ampere, at the time of writing). So "commit" and "wait" always refer to the same internal queue.

To support more general cases where there could be multiple kinds of asynchronous units, each of which has its own queue(s), TIR `async_commit_queue` and `async_wait_queue` take the“queue”parameter. It can be an arbitrary integer, as long as it is used consistently by the two intrinsics. Moreover, it does not have to be a constant. However, in the current usage by the TIR software pipeline, "queue" coincides with the notion of "stage", and thus it is always an integer constant.
To support more general cases where there could be multiple kinds of asynchronous units, each of which has its own queue(s), `async_commit_queue` and `async_wait_queue` take the “queue”parameter. It can be an arbitrary integer, as long as it is used consistently. Moreover, it does not have to be a constant. However, in the current usage by the TIR software pipeline, "queue" coincides with the notion of "stage", and thus it is always an integer constant.

**The role of async_scope**. `async_scope` is represented by `AttrStmt` with key `tir::attr::async_scope`. It is inserted to let later transform passes know that the enclosed statement is intended to run asynchronously. This way, the actual lowering to target-dependent asynchronous instructions
can happen much later in the compilation flow, rather than before the software pipeline transform using tensorization. For example, rewriting of global to shared memory copy by CUDA-specific `cp.async` can be made simpler if the rewrite happens after buffer flattening and loop vectorization passes.
Expand Down Expand Up @@ -153,28 +151,28 @@ C = alloc([2])

# Prologue
for i in range(2):
async_scope:
B[i % 2] = A[i] + 1
async_commit_queue(0)
async_commit_queue(0):
async_scope:
B[i % 2] = A[i] + 1

if 1 <= i:
async_wait_queue(0, 1)
async_scope:
C[(i - 1) % 2] = B[(i - 1) % 2] + 1
async_commit_queue(1)
async_commit_queue(1):
async_wait_queue(0, 1)
async_scope:
C[(i - 1) % 2] = B[(i - 1) % 2] + 1

# Body
for i in range(14):
# Stage 0
async_scope:
B[(i + 2) % 2] = A[i + 2] + 1
async_commit_queue(0)
async_commit_queue(0):
async_scope:
B[(i + 2) % 2] = A[i + 2] + 1

# Stage 1
async_wait_queue(0, 1)
async_scope:
C[(i + 1) % 2] = B[(i + 1) % 2] + 1
async_commit_queue(1)
async_commit_queue(1):
async_wait_queue(0, 1)
async_scope:
C[(i + 1) % 2] = B[(i + 1) % 2] + 1

# Stage 2
async_wait_queue(1, 1)
Expand All @@ -184,10 +182,10 @@ for i in range(14):
# Epilogue
for i in range(2):
if i < 1:
async_wait_queue(0, 0)
async_scope:
C[(i + 15) % 2] = B[(i + 15) % 2] + 1
async_commit_queue(1)
async_commit_queue(1):
async_wait_queue(0, 0)
async_scope:
C[(i + 15) % 2] = B[(i + 15) % 2] + 1

if i < 1:
async_wait_group(1, 1)
Expand All @@ -209,7 +207,7 @@ sch.annotate(k1, ann_key="software_pipeline_stage", ann_val=[0, 0, 1])
sch.annotate(k1, ann_key="software_pipeline_order", ann_val=[0, 1, 2])
```

`async_commit_queue` is inserted after copies to `A_shared` and `B_shared` are issued, so that the two copies can be awaited as one chunk.
`async_commit_queue` encloses both copies to `A_shared` and `B_shared`, so that the two copies can be awaited as one chunk.

```python

Expand All @@ -220,13 +218,12 @@ A_shared = [4, ...]
B_shared = [4, ...]

for i in range(3):
async_scope:
A_shared[i] <- global[...]

async_scope:
B_shared[i] <- global[...]
async_commit_queue(0):
async_scope:
A_shared[i] <- global[...]

async_commit_queue(0)
async_scope:
B_shared[i] <- global[...]

if 2 <= i:
async_wait_queue(0, 2)
Expand All @@ -235,13 +232,12 @@ for i in range(3):

# Body
for i in range(125):
async_scope:
A_shared[(i + 3) % 4] <- global[...]

async_scope:
B_shared[(i + 3) % 4] <- global[...]
async_commit_queue(0):
async_scope:
A_shared[(i + 3) % 4] <- global[...]

async_commit_queue(0)
async_scope:
B_shared[(i + 3) % 4] <- global[...]

async_wait_queue(0, 2)

Expand Down Expand Up @@ -293,45 +289,46 @@ The current design started from and has stayed with CUDA’s implicit synchroniz
It’s also worth noting some cons of the token-based synchronization, in the context of the TIR software pipeline. First, it is not obvious how a token should be represented at all. It would probably be an integer, but each backend would probably have its own different representation. Second, expressing dependencies via tokens would be natural if what we generate is a DAG-like structure. But the output of the TIR software pipeline transform is still a loop, without unrolling: We would need to be able to refer to “the token associated with an async operation from three iterations ago”, for example, but that is a bit awkward to express. We would end up maintaining a circular buffer of tokens, in addition to the circular buffer of multi-versioned buffer copies.

### Where to put `async_commit_queue`?
Although `async_commit_queue` can be attached to each async operation individually, we group multiple async invocations into one `async_commit_queue` if there are multiple async operations in the same stage.

Simple: It should be after the last async block in a given stage. For example, given the annotation,
For example, given the annotation,

```python
sch.annotate(k0, ann_key="software_pipeline_stage", ann_val=[0, 0, 3])
sch.annotate(k0, ann_key="software_pipeline_async_stages", ann_val=[0])
```

`async_commit_queue(0)` is inserted after the second block.
`async_commit_queue(0)` would enclose the first two blocks.

However, if the order is given by

```python
sch.annotate(loop, ann_key="software_pipeline_order", ann_val=[0, 2, 1])
```

, the two async blocks are interleaved with their consumer block in the middle. In such cases, we need to put `async_commit_queue` after each async block. An example transformation is shown toward the bottom of this document.
, the two async blocks are interleaved with their consumer block in the middle. In such cases, we need to attach `async_commit_queue` for each async block. An example transformation is shown toward the bottom of this document.

### Where to put `async_wait_queue` ?

We must put wait before the consumer of async ops, so for example the following is correct:

```python
for i in range(3):
async_scope:
A_shared[i] <- global[...]
...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
A_shared[i] <- global[...]
...

if i <= 2:
async_wait_queue(0, 2)
A_local[0] <- A_shared[0, ...]


for i in range(125):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...

async_wait_queue(0, 3)
A_local[1] <- A_shared[i % 4, ...]
Expand All @@ -346,10 +343,10 @@ But the second wait subsumes the first one (since it allows less in-flight ops),

```python
for i in range(125):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...

async_wait_queue(0, 2)
A_local[1] <- A_shared[i % 4, ...]
Expand All @@ -376,10 +373,10 @@ For example, below the async producer writes to `i + 3`, and two consumers read

```python
for i in range(125):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...

async_wait_queue(0, 3)
A_local[1] <- A_shared[i % 4, ...]
Expand All @@ -406,10 +403,10 @@ In such cases, we need to force all pending asyncs ops to complete before the as

```python
for i in range(125):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
A_shared[(i + 3) % 4] <- global[...]
...

async_wait_queue(0, 0)
A_local[1] <- A_shared[i % 4, ...]
Expand All @@ -419,22 +416,22 @@ for i in range(125):
...
```

Note that the index `i + 3` is both produced and consumed in the same iteration. So before we access `A_shared[(i + 3) % 4, ...]`, we need to put `async_stage_wait(0, 0)`. `(i + 3) - (i + 3) = 0`, so the math checks out here too.
Note that the index `i + 3` is both produced and consumed in the same iteration. So before we access `A_shared[(i + 3) % 4, ...]`, we need to put `async_wait_queue(0, 0)`. `(i + 3) - (i + 3) = 0`, so the math checks out here too.

**Waiting across pipeline body and epilogue boundary**. In this example, there is no async producer in the epilogue. Since the prologue and body have issued 128 async ops in total, the producer head can be determined as 127. Two consumer access copies at the indices `i + 125` and `i + 126`.

```python
for i in range(3):
async_scope:
shared[i] = ...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
shared[i] = ...

...

for i in range(125):
async_scope:
shared[(i + 3) % 4] = ...
async_commit_queue(0)
async_commit_queue(0):
async_scope:
shared[(i + 3) % 4] = ...

...

Expand Down Expand Up @@ -484,25 +481,24 @@ A_shared = alloc([4])
B_shared = alloc([4])

for i in range(3):
async_scope:
A_shared[i] = A[...]

async_scope:
B_shared[i] = B[...]
async_commit_queue(0):
async_scope:
A_shared[i] = A[...]

async_commit_queue(0)
async_scope:
B_shared[i] = B[...]

for i in range(13):
async_scope:
A_shared[(i + 3) % 4] = A[...]
async_commit_queue(0)
async_commit_queue(0):
async_scope:
A_shared[(i + 3) % 4] = A[...]

async_wait_queue(0, 5)
compute(A_shared[i], B_shared[i])

async_scope:
B_shared[(i + 3) % 4] = B[...]
async_commit_queue(0)
async_commit_queue(0):
async_scope:
B_shared[(i + 3) % 4] = B[...]

for i in range(3):
...
Expand Down

0 comments on commit 9f80982

Please sign in to comment.