Skip to content

Commit

Permalink
feat(wasm): emits error response
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Oct 16, 2024
1 parent b4a3a83 commit 9cc6e69
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 59 deletions.
8 changes: 1 addition & 7 deletions pkg/workflows/sdk/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@ func (f EmitterFunc) Emit(msg string, labels map[string]any) error {
return f(msg, labels)
}

type ContextEmitter interface {
// EmitContext sends a message with the given message and labels to the configured collector.
//
// TODO(mstreet3): Do we want to support context here?
EmitContext(ctx string, msg string, labels map[string]any) error
}

// Guest interface
type Runtime interface {
Logger() logger.Logger
Fetch(req FetchRequest) (FetchResponse, error)
Expand Down
43 changes: 30 additions & 13 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

// UnsafeDataGetter abstractly defines the behavior of unsafely fetching WASM memory.
type UnsafeDataGetter func(*wasmtime.Caller) []byte
type UnsafeWriterFunc func(c *wasmtime.Caller, src []byte, ptr, len int32) int64

type UnsafeFixedLengthWriterFunc func(c *wasmtime.Caller, ptr int32, val uint32) int64

// UnsafeReaderFunc abstractly defines the behavior of reading from WASM memory.
type UnsafeReaderFunc func(c *wasmtime.Caller, ptr, len int32) ([]byte, error)
Expand Down Expand Up @@ -150,6 +151,7 @@ type Module struct {
module *wasmtime.Module
linker *wasmtime.Linker

// respStore collects responses from sendResponse mapped by request ID
r *respStore

cfg *ModuleConfig
Expand All @@ -172,12 +174,6 @@ func WithDeterminism() func(*ModuleConfig) {
}
}

// TODO(mstreet3): New module error

// TODO(mstreet3): Clean up functionlal options for New Module

// TODO(mstreet3): Add func option for setting emitter

func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*Module, error) {
// Apply options to the module config.
for _, opt := range opts {
Expand Down Expand Up @@ -338,7 +334,7 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
err = linker.FuncWrap(
"env",
"emit",
createEmitFn(logger, modCfg.Emitter, wasmRead),
createEmitFn(logger, modCfg.Emitter, wasmRead, wasmWrite, wasmWriteUInt32),
)
if err != nil {
return nil, fmt.Errorf("error wrapping emit func: %w", err)
Expand Down Expand Up @@ -500,16 +496,20 @@ func fetchFn(logger logger.Logger, modCfg *ModuleConfig) func(caller *wasmtime.C
}
}

// createEmitFn injects dependencies and builds the emit function used within the WASM. Errors in
// Emit, if any, are returned in the Error Message of the response.
func createEmitFn(
l logger.Logger,
e sdk.Emitter,
reader UnsafeReaderFunc,
) func(caller *wasmtime.Caller, msgptr, msglen int32) int32 {
writer UnsafeWriterFunc,
sizeWriter UnsafeFixedLengthWriterFunc,
) func(caller *wasmtime.Caller, respptr, resplenptr, msgptr, msglen int32) int32 {
logErr := func(err error) {
l.Errorf("error emitting message: %s", err)
}

return func(caller *wasmtime.Caller, msgptr, msglen int32) int32 {
return func(caller *wasmtime.Caller, respptr, resplenptr, msgptr, msglen int32) int32 {
b, err := reader(caller, msgptr, msglen)
if err != nil {
logErr(err)
Expand Down Expand Up @@ -541,8 +541,25 @@ func createEmitFn(
}

if err := e.Emit(msg.Message, labels); err != nil {
logErr(err)
return ErrnoFault
respBytes, err := proto.Marshal(&wasmpb.EmitMessageResponse{

Check failure on line 544 in pkg/workflows/wasm/host/module.go

View workflow job for this annotation

GitHub Actions / golangci-lint

shadow: declaration of "err" shadows declaration at line 543 (govet)
Error: &wasmpb.Error{
Message: err.Error(),
},
})
if err != nil {
logErr(err)
return ErrnoFault
}

if size := writer(caller, respBytes, respptr, int32(len(respBytes))); size == -1 {
logErr(errors.New("failed to write response"))
return ErrnoFault
}

if size := sizeWriter(caller, resplenptr, uint32(len(respBytes))); size == -1 {
logErr(errors.New("failed to write response length"))
return ErrnoFault
}
}

return ErrnoSuccess
Expand Down
34 changes: 28 additions & 6 deletions pkg/workflows/wasm/host/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ func Test_createEmitFn(t *testing.T) {
assert.NoError(t, err)
return b, nil
}),
UnsafeWriterFunc(func(c *wasmtime.Caller, src []byte, ptr, len int32) int64 {
return 0
}),
UnsafeFixedLengthWriterFunc(func(c *wasmtime.Caller, ptr int32, val uint32) int64 {
return 0
}),
)
gotCode := emitFn(new(wasmtime.Caller), 0, 0)
gotCode := emitFn(new(wasmtime.Caller), 0, 0, 0, 0)
assert.Equal(t, ErrnoSuccess, gotCode)
})

Expand All @@ -52,8 +58,14 @@ func Test_createEmitFn(t *testing.T) {
assert.NoError(t, err)
return b, nil
}),
UnsafeWriterFunc(func(c *wasmtime.Caller, src []byte, ptr, len int32) int64 {
return 0
}),
UnsafeFixedLengthWriterFunc(func(c *wasmtime.Caller, ptr int32, val uint32) int64 {
return 0
}),
)
gotCode := emitFn(new(wasmtime.Caller), 0, 0)
gotCode := emitFn(new(wasmtime.Caller), 0, 0, 0, 0)
assert.Equal(t, ErrnoSuccess, gotCode)
})

Expand All @@ -64,8 +76,10 @@ func Test_createEmitFn(t *testing.T) {
UnsafeReaderFunc(func(_ *wasmtime.Caller, _, _ int32) ([]byte, error) {
return nil, assert.AnError
}),
nil,
nil,
)
gotCode := emitFn(new(wasmtime.Caller), 0, 0)
gotCode := emitFn(new(wasmtime.Caller), 0, 0, 0, 0)
assert.Equal(t, ErrnoFault, gotCode)
})

Expand All @@ -80,9 +94,15 @@ func Test_createEmitFn(t *testing.T) {
assert.NoError(t, err)
return b, nil
}),
UnsafeWriterFunc(func(c *wasmtime.Caller, src []byte, ptr, len int32) int64 {
return 0
}),
UnsafeFixedLengthWriterFunc(func(c *wasmtime.Caller, ptr int32, val uint32) int64 {
return 0
}),
)
gotCode := emitFn(new(wasmtime.Caller), 0, 0)
assert.Equal(t, ErrnoFault, gotCode)
gotCode := emitFn(new(wasmtime.Caller), 0, 0, 0, 0)
assert.Equal(t, ErrnoSuccess, gotCode)
})

t.Run("bad read failure to unmarshal protos", func(t *testing.T) {
Expand All @@ -92,8 +112,10 @@ func Test_createEmitFn(t *testing.T) {
UnsafeReaderFunc(func(_ *wasmtime.Caller, _, _ int32) ([]byte, error) {
return []byte("not proto bufs"), nil
}),
nil,
nil,
)
gotCode := emitFn(new(wasmtime.Caller), 0, 0)
gotCode := emitFn(new(wasmtime.Caller), 0, 0, 0, 0)
assert.Equal(t, ErrnoFault, gotCode)
})
}
Expand Down
110 changes: 110 additions & 0 deletions pkg/workflows/wasm/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package wasm

import (
"encoding/base64"
"encoding/binary"
"testing"
"unsafe"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictrigger"
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
Expand Down Expand Up @@ -202,3 +205,110 @@ func TestRunner_Run_GetWorkflowSpec(t *testing.T) {
gotSpec.Triggers[0].Config["number"] = int64(gotSpec.Triggers[0].Config["number"].(uint64))
assert.Equal(t, &gotSpec, spc)
}

func Test_createEmitFn(t *testing.T) {
var (
l = logger.Test(t)
sdkConfig = &RuntimeConfig{
MaxFetchResponseSizeBytes: 1_000,
}
)

t.Run("success", func(t *testing.T) {
var (
giveMsg = "testing guest"
giveLabels = map[string]any{
"some-key": "some-value",
}
err error
emitFn = createEmitFn(sdkConfig, l, func(respptr, resplenptr, reqptr unsafe.Pointer, reqptrlen int32) int32 {
return 0
})
)

err = emitFn(giveMsg, giveLabels)
assert.NoError(t, err)
})

t.Run("successfully read error message when emit fails", func(t *testing.T) {
var (
giveMsg = "testing guest"
giveLabels = map[string]any{
"some-key": "some-value",
}
err error
resp = &wasmpb.EmitMessageResponse{
Error: &wasmpb.Error{
Message: assert.AnError.Error(),
},
}
)

emitFn := createEmitFn(sdkConfig, l, func(respptr, resplenptr, reqptr unsafe.Pointer, reqptrlen int32) int32 {
// marshall the protobufs
b, err := proto.Marshal(resp)

Check failure on line 249 in pkg/workflows/wasm/runner_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

shadow: declaration of "err" shadows declaration at line 239 (govet)
assert.NoError(t, err)

// write the marshalled response message to memory
resp := unsafe.Slice((*byte)(respptr), len(b))
for i, v := range b {

Check failure on line 254 in pkg/workflows/wasm/runner_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

S1001: should use copy(to, from) instead of a loop (gosimple)
resp[i] = v
}

// write the length of the response to memory in little endian
respLen := unsafe.Slice((*byte)(resplenptr), uint32Size)
binary.LittleEndian.PutUint32(respLen, uint32(len(b)))

return 0
})
err = emitFn(giveMsg, giveLabels)
assert.Error(t, err)
assert.ErrorContains(t, err, assert.AnError.Error())
})

t.Run("fail to deserialize response from memory", func(t *testing.T) {
var (
giveMsg = "testing guest"
giveLabels = map[string]any{
"some-key": "some-value",
}
err error
resp = assert.AnError.Error()
)

emitFn := createEmitFn(sdkConfig, l, func(respptr, resplenptr, reqptr unsafe.Pointer, reqptrlen int32) int32 {
b := []byte(resp)
// write the marshalled response message to memory
resp := unsafe.Slice((*byte)(respptr), len(b))
for i, v := range b {

Check failure on line 283 in pkg/workflows/wasm/runner_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

S1001: should use copy(to, from) instead of a loop (gosimple)
resp[i] = v
}

// write the length of the response to memory in little endian
respLen := unsafe.Slice((*byte)(resplenptr), uint32Size)
binary.LittleEndian.PutUint32(respLen, uint32(len(b)))

return 0
})
err = emitFn(giveMsg, giveLabels)
assert.Error(t, err)
assert.ErrorContains(t, err, "failed to unmarshal")
})

t.Run("fail with nonzero code from emit", func(t *testing.T) {
var (
giveMsg = "testing guest"
giveLabels = map[string]any{
"some-key": "some-value",
}
err error
)

emitFn := createEmitFn(sdkConfig, l, func(respptr, resplenptr, reqptr unsafe.Pointer, reqptrlen int32) int32 {
return 42
})
err = emitFn(giveMsg, giveLabels)
assert.Error(t, err)
assert.ErrorContains(t, err, "failed to emit")
})
}
35 changes: 2 additions & 33 deletions pkg/workflows/wasm/runner_wasip1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func log(respptr unsafe.Pointer, respptrlen int32)
func fetch(respptr unsafe.Pointer, resplenptr unsafe.Pointer, reqptr unsafe.Pointer, reqptrlen int32) int32

//go:wasmimport env emit
func emit(pbptr unsafe.Pointer, pblen int32) int32
func emit(respptr unsafe.Pointer, resplenptr unsafe.Pointer, reqptr unsafe.Pointer, reqptrlen int32) int32

func NewRunner() *Runner {
l := logger.NewWithSync(&wasmWriteSyncer{})
Expand All @@ -35,7 +35,7 @@ func NewRunner() *Runner {
return &Runtime{
logger: l,
fetchFn: createFetchFn(sdkConfig, l),
emitFn: emitFn,
emitFn: createEmitFn(sdkConfig, l, emit),
}
},
args: os.Args,
Expand Down Expand Up @@ -126,7 +126,6 @@ func createFetchFn(
}

return sdk.FetchResponse{
Success: response.Success,
StatusCode: uint8(response.StatusCode),
Headers: headersResp,
Body: response.Body,
Expand All @@ -135,29 +134,6 @@ func createFetchFn(
return fetchFn
}

func emitFn(msg string, labels map[string]any) error {
vm, err := values.NewMap(labels)
if err != nil {
return err
}

b, err := proto.Marshal(&wasmpb.EmitMessageRequest{
Message: msg,
Labels: values.ProtoMap(vm),
})
if err != nil {
return err
}

ptr, ptrlen := bufferToPointerLen(b)
errno := emit(ptr, ptrlen)
if errno != 0 {
os.Exit(CodeRunnerErr)
}

return nil
}

type wasmWriteSyncer struct{}

// Write is used to proxy log requests from the WASM binary back to the host
Expand All @@ -166,10 +142,3 @@ func (wws *wasmWriteSyncer) Write(p []byte) (n int, err error) {
log(ptr, ptrlen)
return int(ptrlen), nil
}

const uint32Size = int32(4)

// bufferToPointerLen returns a pointer to the first element of the buffer and the length of the buffer.
func bufferToPointerLen(buf []byte) (unsafe.Pointer, int32) {
return unsafe.Pointer(&buf[0]), int32(len(buf))
}
Loading

0 comments on commit 9cc6e69

Please sign in to comment.