Skip to content

Commit

Permalink
wip(host): implement host side emit
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Oct 14, 2024
1 parent ddd3f75 commit f304f4d
Showing 1 changed file with 62 additions and 3 deletions.
65 changes: 62 additions & 3 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host/adapters/beholder"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

Expand Down Expand Up @@ -100,6 +103,8 @@ type ModuleConfig struct {
IsUncompressed bool
Fetch func(*wasmpb.FetchRequest) (*wasmpb.FetchResponse, error)

Emitter sdk.Emitter

// If Determinism is set, the module will override the random_get function in the WASI API with
// the provided seed to ensure deterministic behavior.
Determinism *DeterminismConfig
Expand Down Expand Up @@ -132,6 +137,12 @@ func WithDeterminism() func(*ModuleConfig) {
}
}

// TODO(mstreet3): New module error

// TODO(mstreet3): Clean up functionlal options for New Module

// TODO(mstreet3): Add func option for setting emitter

func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig)) (*Module, error) {
// Apply options to the module config.
for _, opt := range opts {
Expand All @@ -148,6 +159,10 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
}
}

if modCfg.Emitter == nil {
modCfg.Emitter = beholder.NewEmitter()
}

logger := modCfg.Logger

if modCfg.TickInterval == 0 {
Expand Down Expand Up @@ -288,10 +303,11 @@ func NewModule(modCfg *ModuleConfig, binary []byte, opts ...func(*ModuleConfig))
err = linker.FuncWrap(
"env",
"emit",
func(caller *wasmtime.Caller, msgptr int32, msglen int32, labelsPtr int32, labelsLen int32) int32 {
return ErrnoNotsup
},
createEmitFn(logger, modCfg.Emitter),
)
if err != nil {
return nil, fmt.Errorf("error wrapping emit func: %w", err)
}

m := &Module{
engine: engine,
Expand Down Expand Up @@ -453,3 +469,46 @@ func fetchFn(logger logger.Logger, modCfg *ModuleConfig) func(caller *wasmtime.C
return ErrnoSuccess
}
}

func createEmitFn(
l logger.Logger,
e sdk.Emitter,
) func(caller *wasmtime.Caller, msgptr, msglen int32) int32 {
logErr := func(err error) {
l.Errorw("error emitting message", "error", err)
}

return func(caller *wasmtime.Caller, msgptr, msglen int32) int32 {
b, err := safeMem(caller, msgptr, msglen)
if err != nil {
logErr(err)
return ErrnoFault
}

msg := &wasmpb.CustomEmitMessage{}
err = proto.Unmarshal(b, msg)
if err != nil {
logErr(err)
return ErrnoFault
}

vl, err := values.FromMapValueProto(msg.Labels)
if err != nil {
logErr(err)
return ErrnoFault
}

var labels map[string]any
if err := vl.UnwrapTo(&labels); err != nil {
logErr(err)
return ErrnoFault
}

if err := e.Emit(msg.Message, labels); err != nil {
logErr(err)
return ErrnoFault
}

return ErrnoSuccess
}
}

0 comments on commit f304f4d

Please sign in to comment.