-
Notifications
You must be signed in to change notification settings - Fork 266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wasi: use File.Poll for all blocking FDs in poll_oneoff #1606
Changes from all commits
697d7e4
a24e05f
ddeeabc
70f38cc
943afcf
dc50c2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
// - sys.ENOTSUP: a parameters is valid, but not yet supported. | ||
// - sys.EFAULT: there is not enough memory to read the subscriptions or | ||
// write results. | ||
// - sys.EINTR: an OS interrupt has occurred while invoking the syscall. | ||
// | ||
// # Notes | ||
// | ||
|
@@ -42,11 +43,15 @@ var pollOneoff = newHostFunc( | |
"in", "out", "nsubscriptions", "result.nevents", | ||
) | ||
|
||
type event struct { | ||
type pollEvent struct { | ||
eventType byte | ||
userData []byte | ||
errno wasip1.Errno | ||
outOffset uint32 | ||
} | ||
|
||
type filePollEvent struct { | ||
f *internalsys.FileEntry | ||
e *pollEvent | ||
} | ||
|
||
func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno { | ||
|
@@ -86,36 +91,34 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno | |
|
||
// Extract FS context, used in the body of the for loop for FS access. | ||
fsc := mod.(*wasm.ModuleInstance).Sys.FS() | ||
// Slice of events that are processed out of the loop (blocking stdin subscribers). | ||
var blockingStdinSubs []*event | ||
// Slice of events that are processed out of the loop (blocking subscribers). | ||
var blockingSubs []*filePollEvent | ||
// The timeout is initialized at max Duration, the loop will find the minimum. | ||
var timeout time.Duration = 1<<63 - 1 | ||
// Count of all the clock subscribers that have been already written back to outBuf. | ||
clockEvents := uint32(0) | ||
// Count of all the non-clock subscribers that have been already written back to outBuf. | ||
readySubs := uint32(0) | ||
// Count of all the subscriptions that have been already written back to outBuf. | ||
// nevents*32 returns at all times the offset where the next event should be written: | ||
// this way we ensure that there are no gaps between records. | ||
nevents := uint32(0) | ||
|
||
// Layout is subscription_u: Union | ||
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u | ||
for i := uint32(0); i < nsubscriptions; i++ { | ||
inOffset := i * 48 | ||
outOffset := i * 32 | ||
outOffset := nevents * 32 | ||
|
||
eventType := inBuf[inOffset+8] // +8 past userdata | ||
// +8 past userdata +8 contents_offset | ||
argBuf := inBuf[inOffset+8+8:] | ||
userData := inBuf[inOffset : inOffset+8] | ||
|
||
evt := &event{ | ||
evt := &pollEvent{ | ||
eventType: eventType, | ||
userData: userData, | ||
errno: wasip1.ErrnoSuccess, | ||
outOffset: outOffset, | ||
} | ||
|
||
switch eventType { | ||
case wasip1.EventTypeClock: // handle later | ||
clockEvents++ | ||
newTimeout, err := processClockEvent(argBuf) | ||
if err != 0 { | ||
return err | ||
|
@@ -125,24 +128,25 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno | |
timeout = newTimeout | ||
} | ||
// Ack the clock event to the outBuf. | ||
writeEvent(outBuf, evt) | ||
writeEvent(outBuf[outOffset:], evt) | ||
nevents++ | ||
case wasip1.EventTypeFdRead: | ||
fd := int32(le.Uint32(argBuf)) | ||
if fd < 0 { | ||
return sys.EBADF | ||
} | ||
if file, ok := fsc.LookupFile(fd); !ok { | ||
evt.errno = wasip1.ErrnoBadf | ||
writeEvent(outBuf, evt) | ||
readySubs++ | ||
continue | ||
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() { | ||
// if the fd is Stdin, and it is in non-blocking mode, | ||
// do not ack yet, append to a slice for delayed evaluation. | ||
blockingStdinSubs = append(blockingStdinSubs, evt) | ||
writeEvent(outBuf[outOffset:], evt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. writeEvents has been simplified, we pass the buffer at the right offset already |
||
nevents++ | ||
} else if file.File.IsNonblock() { | ||
writeEvent(outBuf[outOffset:], evt) | ||
nevents++ | ||
} else { | ||
writeEvent(outBuf, evt) | ||
readySubs++ | ||
// If the fd is blocking, do not ack yet, | ||
// append to a slice for delayed evaluation. | ||
fe := &filePollEvent{f: file, e: evt} | ||
blockingSubs = append(blockingSubs, fe) | ||
Comment on lines
+142
to
+149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these have been reordered for clarity; first we avoid the double negation ( |
||
} | ||
case wasip1.EventTypeFdWrite: | ||
fd := int32(le.Uint32(argBuf)) | ||
|
@@ -154,47 +158,46 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno | |
} else { | ||
evt.errno = wasip1.ErrnoBadf | ||
} | ||
readySubs++ | ||
writeEvent(outBuf, evt) | ||
nevents++ | ||
writeEvent(outBuf[outOffset:], evt) | ||
default: | ||
return sys.EINVAL | ||
} | ||
} | ||
|
||
// If there are subscribers with data ready, we have already written them to outBuf, | ||
// and we don't need to wait for the timeout: clear it. | ||
if readySubs != 0 { | ||
timeout = 0 | ||
sysCtx := mod.(*wasm.ModuleInstance).Sys | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the section below has been reordered for clarity |
||
if nevents == nsubscriptions { | ||
// We already wrote back all the results. We already wrote this number | ||
// earlier to offset `resultNevents`. | ||
// We only need to observe the timeout (nonzero if there are clock subscriptions) | ||
// and return. | ||
if timeout > 0 { | ||
sysCtx.Nanosleep(int64(timeout)) | ||
} | ||
return 0 | ||
} | ||
|
||
// If nevents != nsubscriptions, then there are blocking subscribers. | ||
// We check these fds once using poll. | ||
n, errno := pollFileEventsOnce(blockingSubs, outBuf[nevents*32:]) | ||
if errno != 0 { | ||
return errno | ||
} | ||
nevents += n | ||
|
||
// If there are blocking stdin subscribers, check for data with given timeout. | ||
if len(blockingStdinSubs) > 0 { | ||
stdin, ok := fsc.LookupFile(internalsys.FdStdin) | ||
if !ok { | ||
return sys.EBADF | ||
} | ||
// Wait for the timeout to expire, or for some data to become available on Stdin. | ||
stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds())) | ||
// If the previous poll returned n == 0 (no data) but the timeout is nonzero | ||
// (i.e. there are clock subscriptions), we poll until either the timeout expires | ||
// or any File.Poll() returns true ("ready"); otherwise we are done. | ||
if n == 0 && timeout > 0 { | ||
n, errno = pollFileEventsUntil(sysCtx, timeout, blockingSubs, outBuf[nevents*32:]) | ||
if errno != 0 { | ||
return errno | ||
} | ||
if stdinReady { | ||
// stdin has data ready to for reading, write back all the events | ||
for i := range blockingStdinSubs { | ||
readySubs++ | ||
evt := blockingStdinSubs[i] | ||
evt.errno = 0 | ||
writeEvent(outBuf, evt) | ||
} | ||
} | ||
} else { | ||
// No subscribers, just wait for the given timeout. | ||
sysCtx := mod.(*wasm.ModuleInstance).Sys | ||
sysCtx.Nanosleep(int64(timeout)) | ||
nevents += n | ||
} | ||
|
||
if readySubs != nsubscriptions { | ||
if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) { | ||
if nevents != nsubscriptions { | ||
if !mod.Memory().WriteUint32Le(resultNevents, nevents) { | ||
return sys.EFAULT | ||
} | ||
} | ||
|
@@ -233,10 +236,60 @@ func processClockEvent(inBuf []byte) (time.Duration, sys.Errno) { | |
|
||
// writeEvent writes the event corresponding to the processed subscription. | ||
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct | ||
func writeEvent(outBuf []byte, evt *event) { | ||
copy(outBuf[evt.outOffset:], evt.userData) // userdata | ||
outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255 | ||
outBuf[evt.outOffset+9] = 0 | ||
le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType)) | ||
func writeEvent(outBuf []byte, evt *pollEvent) { | ||
copy(outBuf, evt.userData) // userdata | ||
outBuf[8] = byte(evt.errno) // uint16, but safe as < 255 | ||
outBuf[9] = 0 | ||
le.PutUint32(outBuf[10:], uint32(evt.eventType)) | ||
// TODO: When FD events are supported, write outOffset+16 | ||
} | ||
|
||
// closeChAfter closes a channel after the given timeout. | ||
// It is similar to time.After but it uses sysCtx.Nanosleep. | ||
func closeChAfter(sysCtx *internalsys.Context, timeout time.Duration, timeoutCh chan struct{}) { | ||
sysCtx.Nanosleep(int64(timeout)) | ||
close(timeoutCh) | ||
} | ||
|
||
// pollFileEventsOnce invokes Poll on each sys.FileEntry in the given slice | ||
// and writes back the result to outBuf for each file reported "ready"; | ||
// i.e., when Poll() returns true, and no error. | ||
func pollFileEventsOnce(evts []*filePollEvent, outBuf []byte) (n uint32, errno sys.Errno) { | ||
// For simplicity, we assume that there are no multiple subscriptions for the same file. | ||
for _, e := range evts { | ||
isReady, errno := e.f.File.Poll(sys.POLLIN, 0) | ||
if errno != 0 { | ||
return 0, errno | ||
} | ||
if isReady { | ||
e.e.errno = 0 | ||
writeEvent(outBuf[n*32:], e.e) | ||
n++ | ||
} | ||
} | ||
return | ||
} | ||
|
||
// pollFileEventsUntil repeatedly invokes pollFileEventsOnce until the given timeout is reached. | ||
// The poll interval is currently fixed at 100 millis. | ||
func pollFileEventsUntil(sysCtx *internalsys.Context, timeout time.Duration, blockingSubs []*filePollEvent, outBuf []byte) (n uint32, errno sys.Errno) { | ||
timeoutCh := make(chan struct{}, 1) | ||
go closeChAfter(sysCtx, timeout, timeoutCh) | ||
|
||
pollInterval := 100 * time.Millisecond | ||
ticker := time.NewTicker(pollInterval) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi closeChAfter we are intentionally using the context clock, but this will use a real one.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whoops! |
||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-timeoutCh: | ||
// Give one last chance before returning. | ||
return pollFileEventsOnce(blockingSubs, outBuf) | ||
case <-ticker.C: | ||
n, errno = pollFileEventsOnce(blockingSubs, outBuf) | ||
if errno != 0 || n > 0 { | ||
return | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this. why not poll with 100ms vs poll zero+sleep? Are you suggesting that the poll implementation blocks too long even if you put 100ms? If so maybe the above paragraph needs to clarify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, if we put a 100ms timeout, then the syscall will block for 100ms, which means it will also block the underlying OS thread. I will add a clarification.