diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 4f380612f..459137f62 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -19,7 +19,10 @@ import ( "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host/adapters/beholder" wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" ) @@ -100,6 +103,8 @@ type ModuleConfig struct { IsUncompressed bool Fetch func(*wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) + Emitter sdk.Emitter + // If Determinism is set, the module will override the random_get function in the WASI API with // the provided seed to ensure deterministic behavior. Determinism *DeterminismConfig @@ -132,6 +137,12 @@ func WithDeterminism() func(*ModuleConfig) { } } +// TODO(mstreet3): New module error + +// TODO(mstreet3): Clean up functionlal options for New Module + +// TODO(mstreet3): Add func option for setting emitter + func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*Module, error) { // Apply options to the module config. for _, opt := range opts { @@ -148,6 +159,10 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) } } + if modCfg.Emitter == nil { + modCfg.Emitter = beholder.NewEmitter() + } + logger := modCfg.Logger if modCfg.TickInterval == 0 { @@ -288,10 +303,11 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) err = linker.FuncWrap( "env", "emit", - func(caller *wasmtime.Caller, msgptr int32, msglen int32, labelsPtr int32, labelsLen int32) int32 { - return ErrnoNotsup - }, + createEmitFn(logger, modCfg.Emitter), ) + if err != nil { + return nil, fmt.Errorf("error wrapping emit func: %w", err) + } m := &Module{ engine: engine, @@ -453,3 +469,46 @@ func fetchFn(logger logger.Logger, modCfg *ModuleConfig) func(caller *wasmtime.C return ErrnoSuccess } } + +func createEmitFn( + l logger.Logger, + e sdk.Emitter, +) func(caller *wasmtime.Caller, msgptr, msglen int32) int32 { + logErr := func(err error) { + l.Errorw("error emitting message", "error", err) + } + + return func(caller *wasmtime.Caller, msgptr, msglen int32) int32 { + b, err := safeMem(caller, msgptr, msglen) + if err != nil { + logErr(err) + return ErrnoFault + } + + msg := &wasmpb.CustomEmitMessage{} + err = proto.Unmarshal(b, msg) + if err != nil { + logErr(err) + return ErrnoFault + } + + vl, err := values.FromMapValueProto(msg.Labels) + if err != nil { + logErr(err) + return ErrnoFault + } + + var labels map[string]any + if err := vl.UnwrapTo(&labels); err != nil { + logErr(err) + return ErrnoFault + } + + if err := e.Emit(msg.Message, labels); err != nil { + logErr(err) + return ErrnoFault + } + + return ErrnoSuccess + } +}