Skip to content

Commit

Permalink
CABI refactor: merge borrow_count and num_async_subtasks counters
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Oct 2, 2024
1 parent a7c94fc commit f2ddd27
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 74 deletions.
68 changes: 25 additions & 43 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,21 +394,19 @@ class Task(CallContext):
caller: Optional[Task]
on_return: Optional[Callable]
on_block: OnBlockCallback
borrow_count: int
need_to_drop: int
events: list[EventCallback]
has_events: asyncio.Event
num_async_subtasks: int

def __init__(self, opts, inst, ft, caller, on_return, on_block):
super().__init__(opts, inst, self)
self.ft = ft
self.caller = caller
self.on_return = on_return
self.on_block = on_block
self.borrow_count = 0
self.need_to_drop = 0
self.events = []
self.has_events = asyncio.Event()
self.num_async_subtasks = 0
```
The fields of `Task` are introduced in groups of related `Task` methods next.
Using a conservative syntactic analysis of the component-level definitions of
Expand Down Expand Up @@ -570,34 +568,6 @@ external I/O (as emulated in the Python code by awaiting `sleep(0)`:
await self.wait_on(asyncio.sleep(0))
```

All `Task`s (whether lifted `async` or not) are allowed to call `async`-lowered
imports. Calling an `async`-lowered import stores a `Subtask` (defined below)
in the current component instance's `async_subtasks` table. The current task
tracks the number of live async subtasks and guards this to be in `Task.exit`
(below) to ensure [structured concurrency].
```python
def add_async_subtask(self, subtask):
assert(subtask.task is self and not subtask.notify_supertask)
subtask.notify_supertask = True
self.num_async_subtasks += 1
return self.inst.async_subtasks.add(subtask)
```
The `notify_supertask` flag signals to the methods of `Subtask` (defined below)
to notify this `Task` when the async call makes progress.

The `borrow_count` field is used by the following methods to track the number
of borrowed handles that were passed as parameters to the export that have not
yet been dropped (and thus might dangle if the caller destroys the resource
after this export call finishes):
```python
def create_borrow(self):
self.borrow_count += 1

def drop_borrow(self):
assert(self.borrow_count > 0)
self.borrow_count -= 1
```

The `return_` method is called by either `canon_task_return` or `canon_lift`
(both defined below) to lift and return results to the caller using the
`on_return` callback that was supplied by the caller to `canon_lift`. Using a
Expand All @@ -619,15 +589,16 @@ more than once which must be checked by `return_` and `exit`.
```

Lastly, when a task exits, the runtime enforces the guard conditions mentioned
above and allows a pending task to start.
above and allows a pending task to start. The `need_to_drop` counter is
incremented and decremented below as a way of ensuring that a task does
something (like dropping a resource or subtask handle) before the task exits.
```python
def exit(self):
assert(current_task.locked())
assert(not self.events)
assert(self.inst.num_tasks >= 1)
trap_if(self.on_return)
trap_if(self.borrow_count != 0)
trap_if(self.num_async_subtasks != 0)
trap_if(self.need_to_drop != 0)
self.inst.num_tasks -= 1
if self.opts.sync:
assert(not self.inst.interruptible.is_set())
Expand Down Expand Up @@ -1568,7 +1539,9 @@ def pack_flags_into_int(v, labels):
```

Finally, `own` and `borrow` handles are lowered by initializing new handle
elements in the current component instance's handle table:
elements in the current component instance's handle table. The increment of
`need_to_drop` is complemented by a decrement in `canon_resource_drop` and
ensures that all borrowed handles are dropped before the end of the task.
```python
def lower_own(cx, rep, t):
h = ResourceHandle(rep, own=True)
Expand All @@ -1579,7 +1552,7 @@ def lower_borrow(cx, rep, t):
if cx.inst is t.rt.impl:
return rep
h = ResourceHandle(rep, own=False, scope=cx)
cx.create_borrow()
cx.need_to_drop += 1
return cx.inst.resources.add(t.rt, h)
```
The special case in `lower_borrow` is an optimization, recognizing that, when
Expand All @@ -1588,6 +1561,7 @@ type, the only thing the borrowed handle is good for is calling
`resource.rep`, so lowering might as well avoid the overhead of creating an
intermediate borrow handle.


### Flattening

With only the definitions above, the Canonical ABI would be forced to place all
Expand Down Expand Up @@ -2166,16 +2140,24 @@ async def canon_lower(opts, ft, callee, task, flat_args):
await callee(task, subtask.on_start, subtask.on_return, on_block)
[] = subtask.finish()
if await call_and_handle_blocking(do_call):
i = task.add_async_subtask(subtask)
subtask.notify_supertask = True
task.need_to_drop += 1
i = task.inst.async_subtasks.add(subtask)
flat_results = [pack_async_result(i, subtask.state)]
else:
flat_results = [0]
return flat_results
```
In the asynchronous case, `Task.call_and_handle_blocking` returns `True` if the
call to `do_call` blocks. If the `callee` blocks, `on_start` and `on_return`
may be called after `canon_lower` has returned to the core wasm caller, which
is signaled via the `subtask.state` packed into the result `i32`:
call to `do_call` blocks. In this blocking case, the `Subtask` is added to
stored in an instance-wide table and given an `i32` index that is later
returned by `task.wait` to indicate that the subtask made progress. The
`need_to_drop` increment is matched by a decrement in `canon_subtask_drop` and
ensures that all subtasks of a supertask are allowed to complete before the
supertask completes. The `notify_supertask` flag is set to tell `Subtask`
methods (below) to asynchronously notify the supertask of progress. Lastly,
the current state of the subtask is eagerly returned to the caller, packed
with the `i32` subtask index:
```python
def pack_async_result(i, state):
assert(0 < i < 2**30)
Expand Down Expand Up @@ -2265,7 +2247,7 @@ async def canon_resource_drop(rt, sync, task, i):
else:
task.trap_if_on_the_stack(rt.impl)
else:
h.scope.drop_borrow()
h.scope.need_to_drop -= 1
return flat_results
```
In general, the call to a resource's destructor is treated like a
Expand Down Expand Up @@ -2440,7 +2422,7 @@ async def canon_subtask_drop(task, i):
subtask = task.inst.async_subtasks.remove(i)
trap_if(subtask.enqueued)
trap_if(subtask.state != CallState.DONE)
subtask.task.num_async_subtasks -= 1
subtask.task.need_to_drop -= 1
return []
```

Expand Down
32 changes: 9 additions & 23 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,21 +339,19 @@ class Task(CallContext):
caller: Optional[Task]
on_return: Optional[Callable]
on_block: OnBlockCallback
borrow_count: int
need_to_drop: int
events: list[EventCallback]
has_events: asyncio.Event
num_async_subtasks: int

def __init__(self, opts, inst, ft, caller, on_return, on_block):
super().__init__(opts, inst, self)
self.ft = ft
self.caller = caller
self.on_return = on_return
self.on_block = on_block
self.borrow_count = 0
self.need_to_drop = 0
self.events = []
self.has_events = asyncio.Event()
self.num_async_subtasks = 0

def trap_if_on_the_stack(self, inst):
c = self.caller
Expand Down Expand Up @@ -433,19 +431,6 @@ async def yield_(self):
self.maybe_start_pending_task()
await self.wait_on(asyncio.sleep(0))

def add_async_subtask(self, subtask):
assert(subtask.task is self and not subtask.notify_supertask)
subtask.notify_supertask = True
self.num_async_subtasks += 1
return self.inst.async_subtasks.add(subtask)

def create_borrow(self):
self.borrow_count += 1

def drop_borrow(self):
assert(self.borrow_count > 0)
self.borrow_count -= 1

def return_(self, flat_results):
trap_if(not self.on_return)
if self.opts.sync and not self.opts.sync_task_return:
Expand All @@ -462,8 +447,7 @@ def exit(self):
assert(not self.events)
assert(self.inst.num_tasks >= 1)
trap_if(self.on_return)
trap_if(self.borrow_count != 0)
trap_if(self.num_async_subtasks != 0)
trap_if(self.need_to_drop != 0)
self.inst.num_tasks -= 1
if self.opts.sync:
assert(not self.inst.interruptible.is_set())
Expand Down Expand Up @@ -1116,7 +1100,7 @@ def lower_borrow(cx, rep, t):
if cx.inst is t.rt.impl:
return rep
h = ResourceHandle(rep, own=False, scope=cx)
cx.create_borrow()
cx.need_to_drop += 1
return cx.inst.resources.add(t.rt, h)

### Flattening
Expand Down Expand Up @@ -1466,7 +1450,9 @@ async def do_call(on_block):
await callee(task, subtask.on_start, subtask.on_return, on_block)
[] = subtask.finish()
if await call_and_handle_blocking(do_call):
i = task.add_async_subtask(subtask)
subtask.notify_supertask = True
task.need_to_drop += 1
i = task.inst.async_subtasks.add(subtask)
flat_results = [pack_async_result(i, subtask.state)]
else:
flat_results = [0]
Expand Down Expand Up @@ -1508,7 +1494,7 @@ async def canon_resource_drop(rt, sync, task, i):
else:
task.trap_if_on_the_stack(rt.impl)
else:
h.scope.drop_borrow()
h.scope.need_to_drop -= 1
return flat_results

### `canon resource.rep`
Expand Down Expand Up @@ -1567,5 +1553,5 @@ async def canon_subtask_drop(task, i):
subtask = task.inst.async_subtasks.remove(i)
trap_if(subtask.enqueued)
trap_if(subtask.state != CallState.DONE)
subtask.task.num_async_subtasks -= 1
subtask.task.need_to_drop -= 1
return []
8 changes: 0 additions & 8 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,24 +560,20 @@ async def consumer(task, args):
ptr = consumer_heap.realloc(0, 0, 1, 1)
[ret] = await canon_lower(consumer_opts, eager_ft, eager_callee, task, [0, ptr])
assert(ret == 0)
assert(task.num_async_subtasks == 0)
u8 = consumer_heap.memory[ptr]
assert(u8 == 43)
[ret] = await canon_lower(consumer_opts, toggle_ft, toggle_callee, task, [])
assert(ret == (1 | (CallState.STARTED << 30)))
assert(task.num_async_subtasks == 1)
retp = ptr
consumer_heap.memory[retp] = 13
[ret] = await canon_lower(consumer_opts, blocking_ft, blocking_callee, task, [83, retp])
assert(ret == (2 | (CallState.STARTING << 30)))
assert(task.num_async_subtasks == 2)
assert(consumer_heap.memory[retp] == 13)
fut1.set_result(None)
event, callidx = await task.wait()
assert(event == EventCode.CALL_DONE)
assert(callidx == 1)
[] = await canon_subtask_drop(task, callidx)
assert(task.num_async_subtasks == 1)
event, callidx = await task.wait()
assert(event == EventCode.CALL_STARTED)
assert(callidx == 2)
Expand All @@ -588,12 +584,10 @@ async def consumer(task, args):
assert(callidx == 2)
assert(consumer_heap.memory[retp] == 44)
fut3.set_result(None)
assert(task.num_async_subtasks == 1)
event, callidx = await task.wait()
assert(event == EventCode.CALL_DONE)
assert(callidx == 2)
[] = await canon_subtask_drop(task, callidx)
assert(task.num_async_subtasks == 0)

dtor_fut = asyncio.Future()
dtor_value = None
Expand All @@ -609,14 +603,12 @@ async def dtor(task, args):
assert(dtor_value is None)
[ret] = await canon_resource_drop(rt, False, task, 1)
assert(ret == (2 | (CallState.STARTED << 30)))
assert(task.num_async_subtasks == 1)
assert(dtor_value is None)
dtor_fut.set_result(None)
event, callidx = await task.wait()
assert(event == CallState.DONE)
assert(callidx == 2)
[] = await canon_subtask_drop(task, callidx)
assert(task.num_async_subtasks == 0)

[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [42])
return []
Expand Down

0 comments on commit f2ddd27

Please sign in to comment.