Skip to content

Commit

Permalink
Make future and stream element types optional
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 16, 2025
1 parent b87bcf4 commit 039f1f2
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 18 deletions.
12 changes: 12 additions & 0 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@ readable and writable ends of streams and futures each have a well-defined
parent `Task` that will receive "progress" events on all child streams/futures
that have previously blocked.

The `T` element type of streams and futures is optional, such that `future` and
`stream` can be written in WIT without a trailing `<T>`. In this case, the
asynchronous "values(s)" being delivered are effectively meaningless [unit]
values. However, the *timing* of delivery is meaningful and thus `future` and
`stream` can used to convey timing-related information. Note that, since
functions are asynchronous by default, a plain `f: func()` conveys completion
without requiring an explicit `future` return type. Thus, a function like
`f2: func() -> future` would convey *two* events: first, the return of `f2`, at
which point the caller receives the readable end of a `future` that, when
successfully read, conveys the completion of a second event.

From a [structured-concurrency](#structured-concurrency) perspective, the
readable and writable ends of streams and futures are leaves of the async call
tree. Unlike subtasks, the parent of the readable ends of streams and future
Expand Down Expand Up @@ -606,6 +617,7 @@ comes after:
[CPS Transform]: https://en.wikipedia.org/wiki/Continuation-passing_style
[Event Loop]: https://en.wikipedia.org/wiki/Event_loop
[Structured Concurrency]: https://en.wikipedia.org/wiki/Structured_concurrency
[Unit]: https://en.wikipedia.org/wiki/Unit_type

[AST Explainer]: Explainer.md
[Lift and Lower Definitions]: Explainer.md#canonical-definitions
Expand Down
4 changes: 2 additions & 2 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ defvaltype ::= pvt:<primvaltype> => pvt
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?)
| 0x69 i:<typeidx> => (own i)
| 0x68 i:<typeidx> => (borrow i)
| 0x66 i:<typeidx> => (stream i)
| 0x65 i:<typeidx> => (future i)
| 0x66 i?:<typeidx>? => (stream i?)
| 0x65 i?:<typeidx>? => (future i?)
labelvaltype ::= l:<label'> t:<valtype> => l t
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?)
label' ::= len:<u32> l:<label> => l (if len = |l|)
Expand Down
24 changes: 18 additions & 6 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,9 @@ class BufferGuestImpl(Buffer):

def __init__(self, cx, t, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
if t:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
self.cx = cx
self.t = t
self.ptr = ptr
Expand All @@ -774,18 +775,29 @@ class BufferGuestImpl(Buffer):
class ReadableBufferGuestImpl(BufferGuestImpl):
def lift(self, n):
assert(n <= self.remain())
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
self.ptr += n * elem_size(self.t)
if self.t:
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
self.ptr += n * elem_size(self.t)
else:
vs = n * [()]
self.progress += n
return vs

class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
def lower(self, vs):
assert(len(vs) <= self.remain())
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
self.ptr += len(vs) * elem_size(self.t)
if self.t:
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
self.ptr += len(vs) * elem_size(self.t)
else:
assert(all(v == () for v in vs))
self.progress += len(vs)
```
Note that when `t` is `None` (arising from `stream` and `future` with empty
element types), the core-wasm-supplied `ptr` is entirely ignored, while the
`length` and `progress` are still semantically meaningful. Source bindings may
represent this case with a generic stream/future of [unit] type or a distinct
type that conveys events without values.

The `ReadableStreamGuestImpl` class implements `ReadableStream` for a stream
created by wasm (via `canon stream.new`) and encapsulates the synchronization
Expand Down
13 changes: 11 additions & 2 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ defvaltype ::= bool
| (result <valtype>? (error <valtype>)?)
| (own <typeidx>)
| (borrow <typeidx>)
| (stream <typeidx>)
| (future <typeidx>)
| (stream <typeidx>?)
| (future <typeidx>?)
valtype ::= <typeidx>
| <defvaltype>
resourcetype ::= (resource (rep i32) (dtor async? <funcidx> (callback <funcidx>)?)?)
Expand Down Expand Up @@ -733,6 +733,14 @@ futures are useful in more advanced scenarios where a parameter or result
value may not be ready at the same time as the other synchronous parameters or
results.

The `T` element type of `stream` and `future` is an optional `valtype`. As with
variant-case payloads and function results, when `T` is absent, the "value(s)"
being asynchronously passed can be thought of as [unit] values. In such cases,
there is no representation of the value in Core WebAssembly (pointers into
linear memory are ignored) however the *timing* of completed reads and writes
is observable and meaningful. Thus, empty futures and streams can be useful for
timing-related APIs.

Currently, validation rejects `(stream T)` and `(future T)` when `T`
transitively contains a `borrow`. This restriction could be relaxed in the
future by extending the call-scoping rules of `borrow` to streams and futures.
Expand Down Expand Up @@ -2672,6 +2680,7 @@ For some use-case-focused, worked examples, see:
[Subtyping]: https://en.wikipedia.org/wiki/Subtyping
[Universal Types]: https://en.wikipedia.org/wiki/System_F
[Existential Types]: https://en.wikipedia.org/wiki/System_F
[Unit]: https://en.wikipedia.org/wiki/Unit_type

[Generative]: https://www.researchgate.net/publication/2426300_A_Syntactic_Theory_of_Type_Generativity_and_Sharing
[Avoidance Problem]: https://counterexamples.org/avoidance.html
Expand Down
11 changes: 11 additions & 0 deletions design/mvp/WIT.md
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,8 @@ ty ::= 'u8' | 'u16' | 'u32' | 'u64'
| option
| result
| handle
| future
| stream
| id
tuple ::= 'tuple' '<' tuple-list '>'
Expand All @@ -1574,6 +1576,12 @@ result ::= 'result' '<' ty ',' ty '>'
| 'result' '<' '_' ',' ty '>'
| 'result' '<' ty '>'
| 'result'
future ::= 'future' '<' ty '>'
| 'future'
stream ::= 'stream' '<' ty '>'
| 'stream'
```

The `tuple` type is semantically equivalent to a `record` with numerical fields,
Expand Down Expand Up @@ -1608,6 +1616,9 @@ variant result {
These types are so frequently used and frequently have language-specific
meanings though so they're also provided as first-class types.

The `future` and `stream` types are described as part of the [async
explainer](Async.md#streams-and-futures).

Finally the last case of a `ty` is simply an `id` which is intended to refer to
another type or resource defined in the document. Note that definitions can come
through a `use` statement or they can be defined locally.
Expand Down
23 changes: 15 additions & 8 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ class BorrowType(ValType):

@dataclass
class StreamType(ValType):
t: ValType
t: Optional[ValType]

@dataclass
class FutureType(ValType):
t: ValType
t: Optional[ValType]

### Lifting and Lowering Context

Expand Down Expand Up @@ -534,8 +534,9 @@ class BufferGuestImpl(Buffer):

def __init__(self, cx, t, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
if t:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
self.cx = cx
self.t = t
self.ptr = ptr
Expand All @@ -548,16 +549,22 @@ def remain(self):
class ReadableBufferGuestImpl(BufferGuestImpl):
def lift(self, n):
assert(n <= self.remain())
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
self.ptr += n * elem_size(self.t)
if self.t:
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
self.ptr += n * elem_size(self.t)
else:
vs = n * [()]
self.progress += n
return vs

class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
def lower(self, vs):
assert(len(vs) <= self.remain())
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
self.ptr += len(vs) * elem_size(self.t)
if self.t:
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
self.ptr += len(vs) * elem_size(self.t)
else:
assert(all(v == () for v in vs))
self.progress += len(vs)

class ReadableStreamGuestImpl(ReadableStream):
Expand Down
86 changes: 86 additions & 0 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,91 @@ async def core_func2(task, args):
await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:())


async def test_wasm_to_wasm_stream_empty():
fut1, fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future(), asyncio.Future()

inst1 = ComponentInstance()
opts1 = mk_opts(memory=None, sync=False)
ft1 = FuncType([], [StreamType(None)])
async def core_func1(task, args):
assert(not args)
[wsi] = await canon_stream_new(None, task)
[] = await canon_task_return(task, [StreamType(None)], opts1, [wsi])

await task.on_block(fut1)

[ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2)
assert(ret == 2)
[ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2)
assert(ret == 2)

await task.on_block(fut2)

[ret] = await canon_stream_write(None, opts1, task, wsi, 0, 8)
assert(ret == definitions.BLOCKED)

fut3.set_result(None)

event, p1, p2 = await task.wait(sync = False)
assert(event == EventCode.STREAM_WRITE)
assert(p1 == wsi)
assert(p2 == 4)

fut4.set_result(None)

[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
[] = await canon_stream_close_writable(None, task, wsi, errctxi)
[] = await canon_error_context_drop(task, errctxi)
return []

func1 = partial(canon_lift, opts1, inst1, ft1, core_func1)

inst2 = ComponentInstance()
heap2 = Heap(10)
mem2 = heap2.memory
opts2 = mk_opts(memory=heap2.memory, realloc=heap2.realloc, sync=False)
ft2 = FuncType([], [])
async def core_func2(task, args):
assert(not args)
[] = await canon_task_return(task, [], opts2, [])

retp = 0
[ret] = await canon_lower(opts2, ft1, func1, task, [retp])
assert(ret == 0)
rsi = mem2[0]
assert(rsi == 1)

[ret] = await canon_stream_read(None, opts2, task, rsi, 0, 8)
assert(ret == definitions.BLOCKED)

fut1.set_result(None)

event, p1, p2 = await task.wait(sync = False)
assert(event == EventCode.STREAM_READ)
assert(p1 == rsi)
assert(p2 == 4)

fut2.set_result(None)
await task.on_block(fut3)

[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
assert(ret == 2)
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
assert(ret == 2)

await task.on_block(fut4)

[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
errctxi = 1
assert(ret == (definitions.CLOSED | errctxi))
[] = await canon_stream_close_readable(None, task, rsi)
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
[] = await canon_error_context_drop(task, errctxi)
return []

await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:())


async def test_cancel_copy():
inst = ComponentInstance()
mem = bytearray(10)
Expand Down Expand Up @@ -1612,6 +1697,7 @@ async def run_async_tests():
await test_host_partial_reads_writes()
await test_async_stream_ops()
await test_wasm_to_wasm_stream()
await test_wasm_to_wasm_stream_empty()
await test_cancel_copy()
await test_futures()

Expand Down

0 comments on commit 039f1f2

Please sign in to comment.