Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cappl-86] feat(workflows/wasm): emit msgs to beholder #845

Merged
merged 27 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
115f766
wip(wasm): adds Emit to Runtime interface
MStreet3 Oct 11, 2024
17cdef1
refactor(wasm): separte funcs out of NewRunner
MStreet3 Oct 11, 2024
305ab97
refactor(wasm): shifts logging related funcs around
MStreet3 Oct 11, 2024
c163b47
feat(wasm): adds custom pb message
MStreet3 Oct 11, 2024
380b8f9
feat(wasm): calls emit from guest runner
MStreet3 Oct 11, 2024
a99b631
refactor(workflows): splits out emitter interface + docstring
MStreet3 Oct 14, 2024
f774522
feat(host): defines a beholder adapter for emitter
MStreet3 Oct 14, 2024
37ff890
wip(host): implement host side emit
MStreet3 Oct 14, 2024
fbd410b
refactor(wasm/host): abstracts read and write to wasm
MStreet3 Oct 16, 2024
ac32e38
protos wip
MStreet3 Oct 16, 2024
f0fdee2
feat(wasm): emits error response
MStreet3 Oct 16, 2024
5c58c4e
refactor(wasm/host): write all failures from wasm to memory
MStreet3 Oct 17, 2024
b22b1ae
feat(wasm): inject metadata into module
MStreet3 Oct 17, 2024
29b1087
feat(events+wasm): pull emit md from req md
MStreet3 Oct 17, 2024
e59bec2
feat(custmsg): creates labels from map
MStreet3 Oct 18, 2024
ca5827b
feat(wasm): adds tests and validates labels
MStreet3 Oct 18, 2024
f8f40d9
feat(wasm/host): use custmsg implementation for calling beholder
MStreet3 Oct 18, 2024
1880358
chore(wasm+host): docstrings and lint
MStreet3 Oct 18, 2024
9b18900
chore(host): new emitter iface + private func types
MStreet3 Oct 21, 2024
cb9deba
chore(multi) review comments
MStreet3 Oct 21, 2024
34108ee
chore(wasm): add id and md to config directly
MStreet3 Oct 21, 2024
19c7e84
refactor(custmsg+host): adapter labeler from config for emit
MStreet3 Oct 21, 2024
fa2225e
refactor(wasm): remove emitter from mod config
MStreet3 Oct 21, 2024
5792179
refactor(custmsg+wasm): expose emitlabeler on guest
MStreet3 Oct 21, 2024
5f29a10
refactor(wasm+sdk): EmitLabeler to MessageEmitter
MStreet3 Oct 22, 2024
5a909db
refactor(wasm+events): share label keys
MStreet3 Oct 22, 2024
89a8a21
refactor(wasm+values): use map[string]string directly
MStreet3 Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions pkg/capabilities/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (

const (
// Duplicates the attributes in beholder/message.go::Metadata
labelWorkflowOwner = "workflow_owner_address"
labelWorkflowID = "workflow_id"
labelWorkflowExecutionID = "workflow_execution_id"
labelWorkflowName = "workflow_name"
labelCapabilityContractAddress = "capability_contract_address"
labelCapabilityID = "capability_id"
labelCapabilityVersion = "capability_version"
labelCapabilityName = "capability_name"
LabelWorkflowOwner = "workflow_owner_address"
LabelWorkflowID = "workflow_id"
LabelWorkflowExecutionID = "workflow_execution_id"
LabelWorkflowName = "workflow_name"
LabelCapabilityContractAddress = "capability_contract_address"
LabelCapabilityID = "capability_id"
LabelCapabilityVersion = "capability_version"
LabelCapabilityName = "capability_name"
)

type EmitMetadata struct {
Expand Down Expand Up @@ -93,35 +93,35 @@ func (e EmitMetadata) attrs() []any {
a := []any{}

if e.WorkflowOwner != "" {
a = append(a, labelWorkflowOwner, e.WorkflowOwner)
a = append(a, LabelWorkflowOwner, e.WorkflowOwner)
}

if e.WorkflowID != "" {
a = append(a, labelWorkflowID, e.WorkflowID)
a = append(a, LabelWorkflowID, e.WorkflowID)
}

if e.WorkflowExecutionID != "" {
a = append(a, labelWorkflowExecutionID, e.WorkflowExecutionID)
a = append(a, LabelWorkflowExecutionID, e.WorkflowExecutionID)
}

if e.WorkflowName != "" {
a = append(a, labelWorkflowName, e.WorkflowName)
a = append(a, LabelWorkflowName, e.WorkflowName)
}

if e.CapabilityContractAddress != "" {
a = append(a, labelCapabilityContractAddress, e.CapabilityContractAddress)
a = append(a, LabelCapabilityContractAddress, e.CapabilityContractAddress)
}

if e.CapabilityID != "" {
a = append(a, labelCapabilityID, e.CapabilityID)
a = append(a, LabelCapabilityID, e.CapabilityID)
}

if e.CapabilityVersion != "" {
a = append(a, labelCapabilityVersion, e.CapabilityVersion)
a = append(a, LabelCapabilityVersion, e.CapabilityVersion)
}

if e.CapabilityName != "" {
a = append(a, labelCapabilityName, e.CapabilityName)
a = append(a, LabelCapabilityName, e.CapabilityName)
}

return a
Expand Down
40 changes: 35 additions & 5 deletions pkg/custmsg/custom_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,35 @@ func NewLabeler() Labeler {
return Labeler{labels: make(map[string]string)}
}

// WithMapLabels adds multiple key-value pairs to the CustomMessageLabeler for transmission
// With SendLogAsCustomMessage
func (l Labeler) WithMapLabels(labels map[string]string) Labeler {
newCustomMessageLabeler := NewLabeler()

// Copy existing labels from the current agent
for k, v := range l.labels {
newCustomMessageLabeler.labels[k] = v
}

// Add new key-value pairs
for k, v := range labels {
newCustomMessageLabeler.labels[k] = v
}

return newCustomMessageLabeler
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c Labeler) With(keyValues ...string) Labeler {
func (l Labeler) With(keyValues ...string) Labeler {
newCustomMessageLabeler := NewLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
return l
}

// Copy existing labels from the current agent
for k, v := range c.labels {
for k, v := range l.labels {
newCustomMessageLabeler.labels[k] = v
}

Expand All @@ -43,10 +61,22 @@ func (c Labeler) With(keyValues ...string) Labeler {
return newCustomMessageLabeler
}

func (l Labeler) Emit(msg string) error {
return sendLogAsCustomMessageW(msg, l.labels)
}

func (l Labeler) Labels() map[string]string {
copied := make(map[string]string, len(l.labels))
for k, v := range l.labels {
copied[k] = v
}
return copied
}

// SendLogAsCustomMessage emits a BaseMessage With msg and labels as data.
// any key in labels that is not part of orderedLabelKeys will not be transmitted
func (c Labeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, c.labels)
func (l Labeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, l.labels)
}

func sendLogAsCustomMessageW(msg string, labels map[string]string) error {
Expand Down
16 changes: 15 additions & 1 deletion pkg/custmsg/custom_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,19 @@ func Test_CustomMessageAgent(t *testing.T) {
cma1 := cma.With("key1", "value1")
cma2 := cma1.With("key2", "value2")

assert.NotEqual(t, cma1.labels, cma2.labels)
assert.NotEqual(t, cma1.Labels(), cma2.Labels())
}

func Test_CustomMessageAgent_With(t *testing.T) {
cma := NewLabeler()
cma = cma.With("key1", "value1")

assert.Equal(t, cma.Labels(), map[string]string{"key1": "value1"})
}

func Test_CustomMessageAgent_WithMapLabels(t *testing.T) {
cma := NewLabeler()
cma = cma.WithMapLabels(map[string]string{"key1": "value1"})

assert.Equal(t, cma.Labels(), map[string]string{"key1": "value1"})
}
2 changes: 1 addition & 1 deletion pkg/values/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func EmptyMap() *Map {
}
}

func NewMap(m map[string]any) (*Map, error) {
func NewMap[T any](m map[string]T) (*Map, error) {
MStreet3 marked this conversation as resolved.
Show resolved Hide resolved
mv := map[string]Value{}
for k, v := range m {
val, err := Wrap(v)
Expand Down
13 changes: 13 additions & 0 deletions pkg/workflows/sdk/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,22 @@ import (

var BreakErr = capabilities.ErrStopExecution

type MessageEmitter interface {
// Emit sends a message to the labeler's destination.
Emit(string) error

// With sets the labels for the message to be emitted. Labels are passed as key-value pairs
// and are cumulative.
With(kvs ...string) MessageEmitter
}

// Guest interface
type Runtime interface {
Logger() logger.Logger
Fetch(req FetchRequest) (FetchResponse, error)

// Emitter sends the given message and labels to the configured collector.
Emitter() MessageEmitter
}

type FetchRequest struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/workflows/sdk/testutils/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func (nr *NoopRuntime) Logger() logger.Logger {
l, _ := logger.New()
return l
}

func (nr *NoopRuntime) Emitter() sdk.MessageEmitter {
return nil
}
Loading
Loading