Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi: fix nonblocking sockets on *NIX (gotip net/http) #1503

Merged
merged 8 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions imports/wasi_snapshot_preview1/fs_test.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notice something (unrelated?) happened to flags when logging

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related, probably intended? /cc @codefromthecrypt

Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ func Test_fdFdstatSetFlags(t *testing.T) {
// Let's remove O_APPEND.
requireErrnoResult(t, wasip1.ErrnoSuccess, mod, wasip1.FdFdstatSetFlagsName, uint64(fd), uint64(0))
require.Equal(t, `
==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=0)
==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=)
<== errno=ESUCCESS
`, "\n"+log.String())
`, "\n"+log.String()) // FIXME? flags==0 prints 'flags='
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine IMHO because there are no flags and the formatted flags are pipe delimited. that said it is easy to change to special case none differently. @achille-roussel @ncruces any preferences on this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this default applies to every flag type we have in wasi, btw.

log.Reset()

// Without O_APPEND flag, the data is written at the beginning.
Expand All @@ -568,9 +568,9 @@ func Test_fdFdstatSetFlags(t *testing.T) {
// Restore the O_APPEND flag.
requireErrnoResult(t, wasip1.ErrnoSuccess, mod, wasip1.FdFdstatSetFlagsName, uint64(fd), uint64(wasip1.FD_APPEND))
require.Equal(t, `
==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=1)
==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=APPEND)
<== errno=ESUCCESS
`, "\n"+log.String())
`, "\n"+log.String()) // FIXME? flags==1 prints 'flags=APPEND'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to docs, flags is an fdflags type, so usually in log formatting we format the flag name vs have people need to look up what it was and do the bit flag math in their head. Maybe it was surprising that the logging "fixed" in a different change? https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-fdflags-flagsu16

log.Reset()

// with O_APPEND flag, the data is appended to buffer.
Expand Down
50 changes: 50 additions & 0 deletions imports/wasi_snapshot_preview1/testdata/gotip/wasi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package main

import (
"fmt"
"io"
"net"
"net/http"
"os"
"syscall"
)

func main() {
Expand All @@ -12,9 +15,14 @@ func main() {
if err := mainSock(); err != nil {
panic(err)
}
case "http":
if err := mainHTTP(); err != nil {
panic(err)
}
}
}

// mainSock is an explicit test of a blocking socket.
func mainSock() error {
// Get a listener from the pre-opened file descriptor.
// The listener is the first pre-open, with a file-descriptor of 3.
Expand Down Expand Up @@ -43,3 +51,45 @@ func mainSock() error {
fmt.Println(string(buf[:n]))
return nil
}

// mainHTTP implicitly tests non-blocking sockets, as they are needed for
// middleware.
func mainHTTP() error {
// Get the file representing a pre-opened TCP socket.
// The socket (listener) is the first pre-open, with a file-descriptor of
// 3 because the host didn't add any pre-opened files.
listenerFD := 3
f := os.NewFile(uintptr(listenerFD), "")

// Wasm runs similarly to GOMAXPROCS=1, so multiple goroutines cannot work
// in parallel. non-blocking allows the poller to park the go-routine
// accepting connections while work is done on one.
if err := syscall.SetNonblock(listenerFD, true); err != nil {
return err
}

// Convert the file representing the pre-opened socket to a listener, so
// that we can integrate it with HTTP middleware.
ln, err := net.FileListener(f)
defer f.Close()
if err != nil {
return err
}
defer ln.Close()

// Serve middleware that echos the request body to the response.
return http.Serve(ln, echo{})
}

type echo struct{}

func (echo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Copy up to 32 bytes from the request to the response, appending a newline.
// Note: the test should write: "wazero", so that's all we should read.
var buf [32]byte
if n, err := r.Body.Read(buf[:]); err != nil && err != io.EOF {
panic(err)
} else if n, err = w.Write(append(buf[:n], '\n')); err != nil {
panic(err)
}
}
72 changes: 71 additions & 1 deletion imports/wasi_snapshot_preview1/wasi_stdlib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"io/fs"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"testing"
Expand All @@ -23,6 +25,10 @@ import (
"github.com/tetratelabs/wazero/sys"
)

// sleepALittle directly slows down test execution. So, use this sparingly and
// only when so where proper signals are unavailable.
var sleepALittle = func() { time.Sleep(500 * time.Millisecond) }

// This file ensures that the behavior we've implemented not only the wasi
// spec, but also at least two compilers use of sdks.

Expand Down Expand Up @@ -383,7 +389,7 @@ func testSock(t *testing.T, bin []byte) {
tcpAddr := <-tcpAddrCh

// Give a little time for _start to complete
time.Sleep(800 * time.Millisecond)
sleepALittle()

// Now dial to the initial address, which should be now held by wazero.
conn, err := net.Dial("tcp", tcpAddr.String())
Expand All @@ -396,3 +402,67 @@ func testSock(t *testing.T, bin []byte) {
require.NoError(t, err)
require.Equal(t, "wazero\n", console)
}

func Test_HTTP(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("syscall.Nonblocking() is not supported on wasip1+windows.")
}
toolchains := map[string][]byte{}
if wasmGotip != nil {
toolchains["gotip"] = wasmGotip
}

for toolchain, bin := range toolchains {
toolchain := toolchain
bin := bin
t.Run(toolchain, func(t *testing.T) {
testHTTP(t, bin)
})
}
}

func testHTTP(t *testing.T, bin []byte) {
sockCfg := experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0)
ctx := experimentalsock.WithConfig(testCtx, sockCfg)
ctx, cancelFunc := context.WithCancel(ctx)
// Set context to one that has an experimental listener that logs all host functions.
// ctx = context.WithValue(ctx, experimental.FunctionListenerFactoryKey{},
// logging.NewHostLoggingListenerFactory(os.Stdout, logging.LogScopeAll))

moduleConfig := wazero.NewModuleConfig().
WithSysWalltime().WithSysNanotime(). // HTTP middleware uses both clocks
WithArgs("wasi", "http")
tcpAddrCh := make(chan *net.TCPAddr, 1)
ch := make(chan string, 1)
go func() {
ch <- compileAndRunWithPreStart(t, ctx, moduleConfig, bin, func(t *testing.T, mod api.Module) {
tcpAddrCh <- requireTCPListenerAddr(t, mod)
})
}()
tcpAddr := <-tcpAddrCh

// Give a little time for _start to complete
sleepALittle()

// Now, send a POST to the address which we had pre-opened.
body := bytes.NewReader([]byte("wazero"))
req, err := http.NewRequest(http.MethodPost, "http://"+tcpAddr.String(), body)
require.NoError(t, err)

// TODO: test hangs here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's the hang. from the log, the request is never read.

Copy link
Contributor

@evacchi evacchi Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like it's stuck in a loop here:

// poll/fd_unix.go
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
	...
	for {
		...
		case syscall.EAGAIN:
			if fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		...

TBH I don't know if you can actually leave that loop ever 😬

	for {
		s, rsa, errcall, err := accept(fd.Sysfd)
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
		case syscall.EINTR:
			continue
		case syscall.EAGAIN:
			if fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		case syscall.ECONNABORTED:
			// This means that a socket on the listen
			// queue was closed before we Accept()ed it;
			// it's a silly error, so try again.
			continue
		}
		return -1, nil, errcall, err
	}

I mean, sure. But it doesn't look like there is a way to interrupt gracefully.

Stupidest I can think, SetDeadline()...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I am now calling SetDeadline(), catching the timeout error, and returning EAGAIN instead. This I think causes the Wasm module to do the loop above 😬 so the wasm module is effectively unstuck but I think it's still looping until the connection becomes available; OTOH this lets other goroutines interleave; in fact I can see a poll_oneoff happening:

	==> wasi_snapshot_preview1.clock_time_get(id=monotonic,precision=0)
	<== (timestamp=81477104823666,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)
	<== (fd=,errno=EAGAIN)
	==> wasi_snapshot_preview1.poll_oneoff(in=21422080,out=21446656,nsubscriptions=2)
	<== (nevents=2,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)
	<== (fd=,errno=EAGAIN)
	==> wasi_snapshot_preview1.poll_oneoff(in=21422080,out=21446656,nsubscriptions=2)
	<== (nevents=2,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)
	<== (fd=,errno=EAGAIN)
	==> wasi_snapshot_preview1.poll_oneoff(in=21422080,out=21446656,nsubscriptions=2)
	<== (nevents=2,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)
	<== (fd=,errno=EAGAIN)
	==> wasi_snapshot_preview1.poll_oneoff(in=21422080,out=21446656,nsubscriptions=2)
	<== (nevents=2,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)
	<== (fd=,errno=EAGAIN)
	==> wasi_snapshot_preview1.poll_oneoff(in=21422080,out=21446656,nsubscriptions=2)
	<== (nevents=2,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)
	<== (fd=,errno=EAGAIN)
	==> wasi_snapshot_preview1.poll_oneoff(in=21422080,out=21446656,nsubscriptions=2)
	<== (nevents=2,errno=ESUCCESS)
	==> wasi_snapshot_preview1.sock_accept(fd=3,flags=)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, 200, resp.StatusCode)
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "wazero\n", string(b))

cancelFunc()

// FIXME: this hangs because the example listens forever;
// however mod.Close() does not result in a clean shudown.
// console := <-ch
// require.Equal(t, "", console)
Copy link
Contributor

@evacchi evacchi Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solving by serving only 1 request and then exiting the Wasm routine; probably worth investigating what happens if we mod.Close() instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah I was thinking about this route also.

}
9 changes: 4 additions & 5 deletions imports/wasi_snapshot_preview1/wasi_stdlib_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"strings"
"syscall"
"testing"
"time"

"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/internal/testing/require"
)

func Test_Nonblock(t *testing.T) {
func Test_NonblockingFile(t *testing.T) {
const fifo = "/test-fifo"
tempDir := t.TempDir()
fifoAbsPath := tempDir + fifo
Expand All @@ -29,9 +28,9 @@ func Test_Nonblock(t *testing.T) {
ch := make(chan string, 1)
go func() { ch <- compileAndRun(t, testCtx, moduleConfig, wasmZigCc) }()

// The test writes a few dots on the console until the pipe has data ready for reading,
// so we wait for a little to ensure those dots are printed.
time.Sleep(500 * time.Millisecond)
// The test writes a few dots on the console until the pipe has data ready
// for reading. So, so we wait to ensure those dots are printed.
sleepALittle()

f, err := os.OpenFile(fifoAbsPath, os.O_APPEND|os.O_WRONLY, 0)
require.NoError(t, err)
Expand Down
136 changes: 19 additions & 117 deletions internal/sysfs/sock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,149 +6,51 @@ import (
"syscall"

"github.com/tetratelabs/wazero/internal/fsapi"
"github.com/tetratelabs/wazero/internal/platform"
socketapi "github.com/tetratelabs/wazero/internal/sock"
)

// NewTCPListenerFile creates a socketapi.TCPSock for a given *net.TCPListener.
func NewTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
return &tcpListenerFile{tl: tl}
return newTCPListenerFile(tl)
}

var _ socketapi.TCPSock = (*tcpListenerFile)(nil)

type tcpListenerFile struct {
// baseSockFile implements base behavior for all TCPSock, TCPConn files,
// regardless the platform.
type baseSockFile struct {
fsapi.UnimplementedFile

tl *net.TCPListener
}

// Accept implements the same method as documented on socketapi.TCPSock
func (f *tcpListenerFile) Accept() (socketapi.TCPConn, syscall.Errno) {
conn, err := f.tl.Accept()
if err != nil {
return nil, platform.UnwrapOSError(err)
}
return &tcpConnFile{tc: conn.(*net.TCPConn)}, 0
}

// IsDir implements the same method as documented on File.IsDir
func (*tcpListenerFile) IsDir() (bool, syscall.Errno) {
// We need to override this method because WASI-libc prestats the FD
// and the default impl returns ENOSYS otherwise.
return false, 0
}

// Stat implements the same method as documented on File.Stat
func (f *tcpListenerFile) Stat() (fs fsapi.Stat_t, errno syscall.Errno) {
// The mode is not really important, but it should be neither a regular file nor a directory.
fs.Mode = os.ModeIrregular
return
}

// Close implements the same method as documented on fsapi.File
func (f *tcpListenerFile) Close() syscall.Errno {
return platform.UnwrapOSError(f.tl.Close())
}

// Addr is exposed for testing.
func (f *tcpListenerFile) Addr() *net.TCPAddr {
return f.tl.Addr().(*net.TCPAddr)
}

var _ socketapi.TCPConn = (*tcpConnFile)(nil)

type tcpConnFile struct {
fsapi.UnimplementedFile

tc *net.TCPConn

// closed is true when closed was called. This ensures proper syscall.EBADF
closed bool
}
var (
_ fsapi.File = (*baseSockFile)(nil)
_ socketapi.TCPSock = (*baseSockFile)(nil)
_ socketapi.TCPConn = (*baseSockFile)(nil)
)

// IsDir implements the same method as documented on File.IsDir
func (*tcpConnFile) IsDir() (bool, syscall.Errno) {
func (*baseSockFile) IsDir() (bool, syscall.Errno) {
// We need to override this method because WASI-libc prestats the FD
// and the default impl returns ENOSYS otherwise.
return false, 0
}

// Stat implements the same method as documented on File.Stat
func (f *tcpConnFile) Stat() (fs fsapi.Stat_t, errno syscall.Errno) {
func (f *baseSockFile) Stat() (fs fsapi.Stat_t, errno syscall.Errno) {
// The mode is not really important, but it should be neither a regular file nor a directory.
fs.Mode = os.ModeIrregular
return
}

// SetNonblock implements the same method as documented on fsapi.File
func (f *tcpConnFile) SetNonblock(enabled bool) (errno syscall.Errno) {
syscallConn, err := f.tc.SyscallConn()
if err != nil {
return platform.UnwrapOSError(err)
}

// Prioritize the error from setNonblock over Control
if controlErr := syscallConn.Control(func(fd uintptr) {
errno = platform.UnwrapOSError(setNonblock(fd, enabled))
}); errno == 0 {
errno = platform.UnwrapOSError(controlErr)
}
return
}

// Read implements the same method as documented on fsapi.File
func (f *tcpConnFile) Read(buf []byte) (n int, errno syscall.Errno) {
if n, errno = read(f.tc, buf); errno != 0 {
// Defer validation overhead until we've already had an error.
errno = fileError(f, f.closed, errno)
}
return
}

// Write implements the same method as documented on fsapi.File
func (f *tcpConnFile) Write(buf []byte) (n int, errno syscall.Errno) {
if n, errno = write(f.tc, buf); errno != 0 {
// Defer validation overhead until we've alwritey had an error.
errno = fileError(f, f.closed, errno)
}
return
// Accept implements the same method as documented on socketapi.TCPSock
func (f *baseSockFile) Accept() (socketapi.TCPConn, syscall.Errno) {
return nil, syscall.ENOSYS
}

// Recvfrom implements the same method as documented on socketapi.TCPConn
func (f *tcpConnFile) Recvfrom(p []byte, flags int) (n int, errno syscall.Errno) {
if flags != MSG_PEEK {
errno = syscall.EINVAL
return
}
return recvfromPeek(f.tc, p)
func (f *baseSockFile) Recvfrom(p []byte, flags int) (n int, errno syscall.Errno) {
return -1, syscall.ENOSYS
}

// Shutdown implements the same method as documented on fsapi.Conn
func (f *tcpConnFile) Shutdown(how int) syscall.Errno {
// FIXME: can userland shutdown listeners?
var err error
switch how {
case syscall.SHUT_RD:
err = f.tc.CloseRead()
case syscall.SHUT_WR:
err = f.tc.CloseWrite()
case syscall.SHUT_RDWR:
return f.close()
default:
return syscall.EINVAL
}
return platform.UnwrapOSError(err)
}

// Close implements the same method as documented on fsapi.File
func (f *tcpConnFile) Close() syscall.Errno {
return f.close()
}

func (f *tcpConnFile) close() syscall.Errno {
if f.closed {
return 0
}
f.closed = true
return f.Shutdown(syscall.SHUT_RDWR)
func (f *baseSockFile) Shutdown(how int) syscall.Errno {
return syscall.ENOSYS
}
Loading