Skip to content

Commit

Permalink
wasi: add more test cases to poll_oneoff, cleanup impl
Browse files Browse the repository at this point in the history
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
  • Loading branch information
evacchi committed Aug 4, 2023
1 parent 90f58bc commit e17b921
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 76 deletions.
98 changes: 47 additions & 51 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type event struct {
eventType byte
userData []byte
errno wasip1.Errno
outOffset uint32
}

func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno {
Expand Down Expand Up @@ -90,16 +89,16 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
var blockingStdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
clockEvents := uint32(0)
// Count of all the non-clock subscribers that have been already written back to outBuf.
readySubs := uint32(0)
// Count of all the subscriptions that have been already written back to outBuf.
// nevents*32 returns at all times the offset where the next event should be written:
// this way we ensure that there are no gaps between records.
nevents := uint32(0)

// Layout is subscription_u: Union
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u
for i := uint32(0); i < nsubscriptions; i++ {
inOffset := i * 48
outOffset := i * 32
outOffset := nevents * 32

eventType := inBuf[inOffset+8] // +8 past userdata
// +8 past userdata +8 contents_offset
Expand All @@ -110,12 +109,10 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
eventType: eventType,
userData: userData,
errno: wasip1.ErrnoSuccess,
outOffset: outOffset,
}

switch eventType {
case wasip1.EventTypeClock: // handle later
clockEvents++
newTimeout, err := processClockEvent(argBuf)
if err != 0 {
return err
Expand All @@ -125,24 +122,24 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
timeout = newTimeout
}
// Ack the clock event to the outBuf.
writeEvent(outBuf, evt)
writeEvent(outBuf[outOffset:], evt)
nevents++
case wasip1.EventTypeFdRead:
fd := int32(le.Uint32(argBuf))
if fd < 0 {
return sys.EBADF
}
if file, ok := fsc.LookupFile(fd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
writeEvent(outBuf[outOffset:], evt)
nevents++
} else if fd != internalsys.FdStdin && file.File.IsNonblock() {
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
// if the fd is Stdin, and it is in blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
} else {
writeEvent(outBuf, evt)
readySubs++
}
case wasip1.EventTypeFdWrite:
fd := int32(le.Uint32(argBuf))
Expand All @@ -154,47 +151,46 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
} else {
evt.errno = wasip1.ErrnoBadf
}
readySubs++
writeEvent(outBuf, evt)
nevents++
writeEvent(outBuf[outOffset:], evt)
default:
return sys.EINVAL
}
}

// If there are subscribers with data ready, we have already written them to outBuf,
// and we don't need to wait for the timeout: clear it.
if readySubs != 0 {
timeout = 0
sysCtx := mod.(*wasm.ModuleInstance).Sys
if nevents == nsubscriptions {
// We already wrote back all the results. We already wrote this number
// earlier to offset `resultNevents`.
// We only need to observe the timeout (nonzero if there are clock subscriptions)
// and return.
if timeout > 0 {
sysCtx.Nanosleep(int64(timeout))
}
return 0
}

// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
}
// Wait for the timeout to expire, or for some data to become available on Stdin.
stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds()))
if errno != 0 {
return errno
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
readySubs++
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
}
// Wait for the timeout to expire, or for some data to become available on Stdin.

if stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds())); errno != 0 {
return errno
} else if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf[nevents*32:], evt)
nevents++
}
} else {
// No subscribers, just wait for the given timeout.
sysCtx := mod.(*wasm.ModuleInstance).Sys
sysCtx.Nanosleep(int64(timeout))
}

if readySubs != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) {
if nevents != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, nevents) {
return sys.EFAULT
}
}
Expand Down Expand Up @@ -234,9 +230,9 @@ func processClockEvent(inBuf []byte) (time.Duration, sys.Errno) {
// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
copy(outBuf[evt.outOffset:], evt.userData) // userdata
outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[evt.outOffset+9] = 0
le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType))
copy(outBuf, evt.userData) // userdata
outBuf[8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[9] = 0
le.PutUint32(outBuf[10:], uint32(evt.eventType))
// TODO: When FD events are supported, write outOffset+16
}
81 changes: 70 additions & 11 deletions imports/wasi_snapshot_preview1/poll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wasi_snapshot_preview1_test

import (
"io/fs"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -150,6 +151,12 @@ func Test_pollOneoff_Errors(t *testing.T) {
}

func Test_pollOneoff_Stdin(t *testing.T) {
w, r, err := os.Pipe()
require.NoError(t, err)
defer w.Close()
defer r.Close()
_, _ = w.Write([]byte("wazero"))

tests := []struct {
name string
in, out, nsubscriptions, resultNevents uint32
Expand Down Expand Up @@ -192,7 +199,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
Expand Down Expand Up @@ -227,7 +233,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
Expand Down Expand Up @@ -262,7 +267,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
Expand Down Expand Up @@ -297,7 +301,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
Expand Down Expand Up @@ -332,7 +335,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),

expectedErrno: wasip1.ErrnoSuccess,
Expand All @@ -357,6 +359,52 @@ func Test_pollOneoff_Stdin(t *testing.T) {
expectedLog: `
==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=2)
<== (nevents=1,errno=ESUCCESS)
`,
},
{
name: "pollable pipe, multiple subs, events returned out of order",
nsubscriptions: 3,
expectedNevents: 3,
mem: concat(
fdReadSub,
clockNsSub(20*1000*1000),
// Illegal file fd with custom user data to recognize it in the event buffer.
fdReadSubFdWithUserData(100, []byte{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77})),
stdin: &sys.StdinFile{Reader: w},
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
resultNevents: 512, // past out
expectedMem: []byte{
// Clock is acknowledged first.
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit
wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,

// Then an illegal file with custom user data.
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata
byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,

// Stdin pipes are delayed to invoke sysfs.poll
// thus, they are written back last.
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,

'?', // stopped after encoding
},
expectedLog: `
==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=3)
<== (nevents=3,errno=ESUCCESS)
`,
},
}
Expand Down Expand Up @@ -420,7 +468,6 @@ func Test_pollOneoff_Zero(t *testing.T) {
concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
)

Expand Down Expand Up @@ -460,7 +507,6 @@ func Test_pollOneoff_Zero(t *testing.T) {
concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
)

Expand Down Expand Up @@ -491,10 +537,6 @@ func Test_pollOneoff_Zero(t *testing.T) {
require.Equal(t, uint32(1), nevents)
}

func singleton(b byte) []byte {
return []byte{b}
}

func concat(bytes ...[]byte) []byte {
var res []byte
for i := range bytes {
Expand Down Expand Up @@ -522,9 +564,26 @@ func fdReadSubFd(fd byte) []byte {
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
fd, 0x0, 0x0, 0x0, // valid readable FD
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, // pad to 32 bytes
}
}

func fdReadSubFdWithUserData(fd byte, userdata []byte) []byte {
return concat(
userdata,
[]byte{
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
fd, 0x0, 0x0, 0x0, // valid readable FD
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, // pad to 32 bytes
})
}

// subscription for an EventTypeFdRead on stdin
var fdReadSub = fdReadSubFd(byte(sys.FdStdin))

Expand Down
27 changes: 27 additions & 0 deletions internal/sysfs/sock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,30 @@ func TestTcpConnFile_Stat(t *testing.T) {
_, errno := file.Stat()
require.Zero(t, errno, "Stat should not fail")
}

func TestTcpConnFile_SetNonblock(t *testing.T) {
listen, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer listen.Close()

lf := newTCPListenerFile(listen.(*net.TCPListener))

tcpAddr, err := net.ResolveTCPAddr("tcp", listen.Addr().String())
require.NoError(t, err)
tcp, err := net.DialTCP("tcp", nil, tcpAddr)
require.NoError(t, err)
defer tcp.Close() //nolint

errno := lf.SetNonblock(true)
require.EqualErrno(t, 0, errno)
require.True(t, lf.IsNonblock())

conn, errno := lf.Accept()
require.EqualErrno(t, 0, errno)
defer conn.Close()

file := newTcpConn(tcp)
errno = file.SetNonblock(true)
require.EqualErrno(t, 0, errno)
require.True(t, file.IsNonblock())
}
Loading

0 comments on commit e17b921

Please sign in to comment.