Skip to content

Commit

Permalink
wip(host): implement host side emit
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Oct 14, 2024
1 parent 2b3ec3f commit 0f6ea6b
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 3 deletions.
65 changes: 62 additions & 3 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host/adapters/beholder"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

Expand Down Expand Up @@ -100,6 +103,8 @@ type ModuleConfig struct {
IsUncompressed bool
Fetch func(*wasmpb.FetchRequest) (*wasmpb.FetchResponse, error)

Emitter sdk.Emitter

// If Determinism is set, the module will override the random_get function in the WASI API with
// the provided seed to ensure deterministic behavior.
Determinism *DeterminismConfig
Expand Down Expand Up @@ -132,6 +137,12 @@ func WithDeterminism() func(*ModuleConfig) {
}
}

// TODO(mstreet3): New module error

// TODO(mstreet3): Clean up functionlal options for New Module

// TODO(mstreet3): Add func option for setting emitter

func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*Module, error) {
// Apply options to the module config.
for _, opt := range opts {
Expand All @@ -148,6 +159,10 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
}
}

if modCfg.Emitter == nil {
modCfg.Emitter = beholder.NewEmitter()
}

logger := modCfg.Logger

if modCfg.TickInterval == 0 {
Expand Down Expand Up @@ -288,10 +303,11 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
err = linker.FuncWrap(
"env",
"emit",
func(caller *wasmtime.Caller, msgptr int32, msglen int32, labelsPtr int32, labelsLen int32) int32 {
return ErrnoNotsup
},
createEmitFn(logger, modCfg.Emitter),
)
if err != nil {
return nil, fmt.Errorf("error wrapping emit func: %w", err)
}

m := &Module{
engine: engine,
Expand Down Expand Up @@ -453,3 +469,46 @@ func fetchFn(logger logger.Logger, modCfg *ModuleConfig) func(caller *wasmtime.C
return ErrnoSuccess
}
}

func createEmitFn(
l logger.Logger,
e sdk.Emitter,
) func(caller *wasmtime.Caller, msgptr, msglen int32) int32 {
logErr := func(err error) {
l.Errorf("error emitting message: %s", err)
}

return func(caller *wasmtime.Caller, msgptr, msglen int32) int32 {
b, err := safeMem(caller, msgptr, msglen)
if err != nil {
logErr(err)
return ErrnoFault
}

msg := &wasmpb.CustomEmitMessage{}
err = proto.Unmarshal(b, msg)
if err != nil {
logErr(err)
return ErrnoFault
}

vl, err := values.FromMapValueProto(msg.Labels)
if err != nil {
logErr(err)
return ErrnoFault
}

var labels map[string]any
if err := vl.UnwrapTo(&labels); err != nil {
logErr(err)
return ErrnoFault
}

if err := e.Emit(msg.Message, labels); err != nil {
logErr(err)
return ErrnoFault
}

return ErrnoSuccess
}
}
41 changes: 41 additions & 0 deletions pkg/workflows/wasm/host/test/emit/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//go:build wasip1

package main

import (
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictrigger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)

func BuildWorkflow(config []byte) *sdk.WorkflowSpecFactory {
workflow := sdk.NewWorkflowSpecFactory(
sdk.NewWorkflowParams{},
)

triggerCfg := basictrigger.TriggerConfig{Name: "trigger", Number: 100}
trigger := triggerCfg.New(workflow)

sdk.Compute1[basictrigger.TriggerOutputs, bool](
workflow,
"transform",
sdk.Compute1Inputs[basictrigger.TriggerOutputs]{Arg0: trigger},
func(rsdk sdk.Runtime, outputs basictrigger.TriggerOutputs) (bool, error) {
err := rsdk.Emit("testing emit", map[string]any{
"test-string-field-key": "this is a test field content",
})
if err != nil {
return false, err
}
return true, nil
})

return workflow
}

func main() {
runner := wasm.NewRunner()
workflow := BuildWorkflow(runner.Config())
runner.Run(workflow)
}
80 changes: 80 additions & 0 deletions pkg/workflows/wasm/host/wasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
fetchBinaryCmd = "test/fetch/cmd"
randBinaryLocation = "test/rand/cmd/testmodule.wasm"
randBinaryCmd = "test/rand/cmd"
emitBinaryLocation = "test/emit/cmd/testmodule.wasm"
emitBinaryCmd = "test/emit/cmd"
)

func createTestBinary(outputPath, path string, compress bool, t *testing.T) []byte {
Expand Down Expand Up @@ -187,6 +189,84 @@ func Test_Compute_Logs(t *testing.T) {
}
}

func Test_Compute_Emit(t *testing.T) {
binary := createTestBinary(emitBinaryCmd, emitBinaryLocation, true, t)

lggr := logger.Test(t)

req := &wasmpb.Request{
Id: uuid.New().String(),
Message: &wasmpb.Request_ComputeRequest{
ComputeRequest: &wasmpb.ComputeRequest{
Request: &capabilitiespb.CapabilityRequest{
Inputs: &valuespb.Map{},
Config: &valuespb.Map{},
Metadata: &capabilitiespb.RequestMetadata{
ReferenceId: "transform",
},
},
},
},
}

t.Run("success", func(t *testing.T) {
m, err := NewModule(&ModuleConfig{
Logger: lggr,
Fetch: func(req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return nil, nil
},
Emitter: sdk.EmitterFunc(func(msg string, kvs map[string]any) error {
t.Helper()

assert.Equal(t, "testing emit", msg)
assert.Equal(t, "this is a test field content", kvs["test-string-field-key"])
return nil
}),
}, binary)
require.NoError(t, err)

m.Start()

_, err = m.Run(req)
assert.Nil(t, err)
})

t.Run("failure on emit", func(t *testing.T) {
lggr, logs := logger.TestObserved(t, zapcore.InfoLevel)

m, err := NewModule(&ModuleConfig{
Logger: lggr,
Fetch: func(req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return nil, nil
},
Emitter: sdk.EmitterFunc(func(msg string, kvs map[string]any) error {
t.Helper()

assert.Equal(t, "testing emit", msg)
assert.Equal(t, "this is a test field content", kvs["test-string-field-key"])
return assert.AnError
}),
}, binary)
require.NoError(t, err)

m.Start()

_, err = m.Run(req)
assert.Error(t, err)
require.Len(t, logs.AllUntimed(), 1)

expectedEntries := []Entry{
{
Log: zapcore.Entry{Level: zapcore.ErrorLevel, Message: fmt.Sprintf("error emitting message: %s", assert.AnError)},
},
}
for i := range expectedEntries {
assert.Equal(t, expectedEntries[i].Log.Level, logs.AllUntimed()[i].Entry.Level)
assert.Equal(t, expectedEntries[i].Log.Message, logs.AllUntimed()[i].Entry.Message)
}
})
}

func Test_Compute_Fetch(t *testing.T) {
binary := createTestBinary(fetchBinaryCmd, fetchBinaryLocation, true, t)

Expand Down

0 comments on commit 0f6ea6b

Please sign in to comment.