From 74d39f706af60f14941fbf971b0303d229738f2d Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 1 Oct 2024 17:35:46 -0500 Subject: [PATCH] pkg/workflows/sdk: add WorkflowSpec.FormatChart for mermaid flowcharts --- Makefile | 4 + go.mod | 4 +- pkg/capabilities/capabilities.go | 21 + pkg/workflows/dependency_graph.go | 2 +- pkg/workflows/dependency_graph_chart.go | 78 ++++ pkg/workflows/dependency_graph_chart_test.go | 405 ++++++++++++++++++ pkg/workflows/models_yaml_test.go | 2 +- pkg/workflows/sdk/builder_test.go | 76 +--- pkg/workflows/sdk/testutils/utils.go | 15 +- pkg/workflows/sdk/workflow_spec.go | 4 +- pkg/workflows/sdk/workflow_spec_test.go | 83 ++++ .../testdata/fixtures/charts/README.md | 8 + .../fixtures/charts/builder_parallel.md | 31 ++ .../fixtures/charts/notstreamssepolia.md | 13 + .../testdata/fixtures/charts/parallel.md | 33 ++ .../fixtures/charts/parallel_serialized.md | 31 ++ .../testdata/fixtures/charts/serial.md | 17 + 17 files changed, 741 insertions(+), 86 deletions(-) create mode 100644 pkg/workflows/dependency_graph_chart.go create mode 100644 pkg/workflows/dependency_graph_chart_test.go create mode 100644 pkg/workflows/sdk/workflow_spec_test.go create mode 100644 pkg/workflows/testdata/fixtures/charts/README.md create mode 100644 pkg/workflows/testdata/fixtures/charts/builder_parallel.md create mode 100644 pkg/workflows/testdata/fixtures/charts/notstreamssepolia.md create mode 100644 pkg/workflows/testdata/fixtures/charts/parallel.md create mode 100644 pkg/workflows/testdata/fixtures/charts/parallel_serialized.md create mode 100644 pkg/workflows/testdata/fixtures/charts/serial.md diff --git a/Makefile b/Makefile index 8955675da..4360e7744 100644 --- a/Makefile +++ b/Makefile @@ -47,3 +47,7 @@ lint-workspace: lint: @./script/lint.sh $(GOLANGCI_LINT_VERSION) "$(GOLANGCI_LINT_COMMON_OPTS)" $(GOLANGCI_LINT_DIRECTORY) "--new-from-rev=origin/main" + +.PHONY: test-quiet +test-quiet: + go test ./... | grep -v "\[no test files\]" | grep -v "\(cached\)" diff --git a/go.mod b/go.mod index 062c250d4..cc3a95ed9 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/smartcontractkit/chainlink-common -go 1.22.0 - -toolchain go1.22.7 +go 1.23 require ( github.com/andybalholm/brotli v1.1.0 diff --git a/pkg/capabilities/capabilities.go b/pkg/capabilities/capabilities.go index 4c4e6c215..8003ab613 100644 --- a/pkg/capabilities/capabilities.go +++ b/pkg/capabilities/capabilities.go @@ -1,6 +1,7 @@ package capabilities import ( + "cmp" "context" "fmt" "regexp" @@ -53,6 +54,26 @@ func (c CapabilityType) IsValid() error { return fmt.Errorf("invalid capability type: %s", c) } +func (c CapabilityType) cmpOrder() int { + switch c { + case CapabilityTypeTrigger: + return 0 + case CapabilityTypeAction: + return 1 + case CapabilityTypeConsensus: + return 2 + case CapabilityTypeTarget: + return 3 + case CapabilityTypeUnknown: + return 4 + default: + return 5 + } +} +func (c CapabilityType) Compare(c2 CapabilityType) int { + return cmp.Compare(c.cmpOrder(), c2.cmpOrder()) +} + // CapabilityResponse is a struct for the Execute response of a capability. type CapabilityResponse struct { Value *values.Map diff --git a/pkg/workflows/dependency_graph.go b/pkg/workflows/dependency_graph.go index a68a25860..3968f08f9 100644 --- a/pkg/workflows/dependency_graph.go +++ b/pkg/workflows/dependency_graph.go @@ -129,7 +129,7 @@ func BuildDependencyGraph(spec sdk.WorkflowSpec) (*DependencyGraph, error) { Graph: g, Triggers: triggerSteps, } - return wf, err + return wf, nil } var ( diff --git a/pkg/workflows/dependency_graph_chart.go b/pkg/workflows/dependency_graph_chart.go new file mode 100644 index 000000000..a637e827b --- /dev/null +++ b/pkg/workflows/dependency_graph_chart.go @@ -0,0 +1,78 @@ +package workflows + +import ( + "cmp" + "fmt" + "maps" + "slices" + "strings" + "text/template" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +func (g *DependencyGraph) FormatChart() (string, error) { + var sb strings.Builder + steps := slices.Clone(g.Spec.Triggers) + steps = append(steps, g.Spec.Steps()...) + slices.SortFunc(steps, func(a, b sdk.StepDefinition) int { + return cmp.Or( + a.CapabilityType.Compare(b.CapabilityType), + cmp.Compare(a.Ref, b.Ref), + cmp.Compare(a.ID, b.ID), + ) + }) + preds, err := g.Graph.PredecessorMap() + if err != nil { + return "", fmt.Errorf("failed to get graph predecessors: %w", err) + } + type stepAndOutput struct { + Step sdk.StepDefinition + Inputs []string + } + nodes := make([]stepAndOutput, len(steps)) + for i, step := range steps { + inputs := slices.Collect(maps.Keys(preds[step.Ref])) + if step.CapabilityType != capabilities.CapabilityTypeTrigger { + inputs = append(inputs, KeywordTrigger) + } + nodes[i] = stepAndOutput{Step: step, Inputs: inputs} + } + err = tmpl.Execute(&sb, nodes) + if err != nil { + return "", err + } + return sb.String(), nil +} + +var tmpl = template.Must(template.New("").Funcs(map[string]any{ + "replace": strings.ReplaceAll, +}).Parse(`flowchart +{{ range $i, $e := . }} + {{ $ref := .Step.Ref -}} + {{ $id := replace .Step.ID "@" "[at]" -}} + {{ $name := printf "%s
(%s)" .Step.CapabilityType $id -}} + {{ if .Step.Ref -}} + {{ $name = printf "%s
%s" .Step.Ref $name -}} + {{ else -}} + {{ $ref = printf "%s%d" "unnamed" $i -}} + {{ end -}} + {{ if eq .Step.CapabilityType "trigger" -}} + {{ $ref }}[\"{{$name}}"/] + {{ else if eq .Step.CapabilityType "consensus" -}} + {{ $ref }}[["{{$name}}"]] + {{ else if eq .Step.CapabilityType "target" -}} + {{ $ref }}[/"{{$name}}"\] + {{ else -}} + {{ $ref }}["{{$name}}"] + {{ end -}} + {{ if .Step.Inputs.OutputRef -}} + {{ .Step.Inputs.OutputRef }} --> {{ .Step.Ref }} + {{ else -}} + {{ range $out := .Inputs -}} + {{ $out }} --> {{ $ref }} + {{ end -}} + {{ end -}} +{{ end -}} +`)) diff --git a/pkg/workflows/dependency_graph_chart_test.go b/pkg/workflows/dependency_graph_chart_test.go new file mode 100644 index 000000000..1c7614005 --- /dev/null +++ b/pkg/workflows/dependency_graph_chart_test.go @@ -0,0 +1,405 @@ +package workflows_test + +import ( + "context" + "embed" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictrigger" + ocr3 "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/ocr3cap" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/targets/chainwriter" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams" + "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" + "github.com/smartcontractkit/chainlink-common/pkg/workflows" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +var update = flag.Bool("update", true, "update golden files") + +func TestMain(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) +} + +//go:generate go test -run=TestDependencyGraphFormatChart . -update +//go:embed testdata/fixtures/charts +var charts embed.FS + +// requireEqualChart compares the formatted workflow chart against the golden file testdata/fixtures/charts/.md, +// or updates it when update is true. +func requireEqualChart(t *testing.T, name string, workflow sdk.WorkflowSpec) { + t.Helper() + path := filepath.Join("testdata/fixtures/charts", name+".md") + + graph, err := workflows.BuildDependencyGraph(workflow) + require.NoError(t, err) + s, err := graph.FormatChart() + require.NoError(t, err) + + got := fmt.Sprintf("```mermaid\n%s\n```", s) + + if *update { + require.NoError(t, os.WriteFile(path, []byte(got), 0600)) + return + } + + b, err := charts.ReadFile(path) + require.NoError(t, err) + + require.Equal(t, string(b), got) +} + +// TestDependencyGraphFormatChart depends on charts golden files, and will regenerate them +// when the -update flag is used. +func TestDependencyGraphFormatChart(t *testing.T) { + for _, tt := range []struct { + name string + workflow sdk.WorkflowSpec + }{ + {"notstreamssepolia", notStreamSepoliaWorkflowSpec}, + {"serial", serialWorkflowSpec}, + {"parallel", parallelWorkflowSpec}, + {"builder_parallel", buildSimpleWorkflowSpec(t, + sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Owner: "test", Name: "parallel"}), + )}, + } { + t.Run(tt.name, func(t *testing.T) { + requireEqualChart(t, tt.name, tt.workflow) + }) + } +} + +func buildSimpleWorkflowSpec(t *testing.T, w *sdk.WorkflowSpecFactory) sdk.WorkflowSpec { + trigger := basictrigger.TriggerConfig{ + Name: "trigger", + Number: 100, + }.New(w) + + foo := sdk.Compute1(w, "get-foo", sdk.Compute1Inputs[string]{ + Arg0: trigger.CoolOutput(), + }, func(runtime sdk.Runtime, s string) (int64, error) { + ctx := context.Background() + req, err := http.NewRequest("GET", "https://foo.com/"+s, nil) + if err != nil { + return -1, fmt.Errorf("failed to create request for foo.com: %w", err) + } + req = req.WithContext(ctx) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return -1, fmt.Errorf("failed to get data from foo.com: %w", err) + } + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return -1, fmt.Errorf("failed to read response from foo.com: %w", err) + } + return strconv.ParseInt(string(b), 10, 64) + }) + bar := sdk.Compute1(w, "get-bar", sdk.Compute1Inputs[string]{ + Arg0: trigger.CoolOutput(), + }, func(runtime sdk.Runtime, s string) (int64, error) { + ctx := context.Background() + req, err := http.NewRequest("GET", "https://bar.io/api/"+s, nil) + if err != nil { + return -1, fmt.Errorf("failed to create request for bar.io: %w", err) + } + req = req.WithContext(ctx) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return -1, fmt.Errorf("failed to get data from bar.io: %w", err) + } + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return -1, fmt.Errorf("failed to read response from bar.io: %w", err) + } + return strconv.ParseInt(string(b), 10, 64) + }) + baz := sdk.Compute1(w, "get-baz", sdk.Compute1Inputs[string]{ + Arg0: trigger.CoolOutput(), + }, func(runtime sdk.Runtime, s string) (int64, error) { + ctx := context.Background() + query := url.Values{"id": []string{s}}.Encode() + req, err := http.NewRequest("GET", "https://baz.com/v2/path/to/thing?"+query, nil) + if err != nil { + return -1, fmt.Errorf("failed to create request for baz.com/v2: %w", err) + } + req = req.WithContext(ctx) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return -1, fmt.Errorf("failed to get data from baz.com/v2: %w", err) + } + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return -1, fmt.Errorf("failed to read response from baz.com/v2: %w", err) + } + return strconv.ParseInt(string(b), 10, 64) + }) + + compute := sdk.Compute3(w, "compute", sdk.Compute3Inputs[int64, int64, int64]{ + Arg0: foo.Value(), + Arg1: bar.Value(), + Arg2: baz.Value(), + }, func(runtime sdk.Runtime, foo, bar, baz int64) ([]streams.Feed, error) { + val, err := mathutil.Median(foo, bar, baz) + if err != nil { + return nil, fmt.Errorf("failed to calculate median: %w", err) + } + return []streams.Feed{{ + Metadata: streams.SignersMetadata{}, + Payload: []streams.FeedReport{ + {FullReport: []byte(strconv.FormatInt(val, 10))}, + }, + Timestamp: 0, + }}, nil + }) + + consensus := ocr3.DataFeedsConsensusConfig{}.New(w, "consensus", ocr3.DataFeedsConsensusInput{ + Observations: compute.Value(), + }) + + chainwriter.TargetConfig{ + Address: "0xfakeaddr", + }.New(w, "id", chainwriter.TargetInput{ + SignedReport: consensus, + }) + spec, err := w.Spec() + require.NoError(t, err) + return spec +} + +var notStreamSepoliaWorkflowSpec = sdk.WorkflowSpec{ + Name: "notccipethsep", + Owner: "0x00000000000000000000000000000000000000aa", + Triggers: []sdk.StepDefinition{ + { + ID: "notstreams@1.0.0", + Ref: "trigger", + Inputs: sdk.StepInputs{}, + Config: map[string]any{"maxFrequencyMs": 5000}, + CapabilityType: capabilities.CapabilityTypeTrigger, + }, + }, + Actions: make([]sdk.StepDefinition, 0), + Consensus: []sdk.StepDefinition{ + { + ID: "offchain_reporting@1.0.0", + Ref: "data-feeds-report", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"observations": []any{ + map[string]any{ + "Metadata": map[string]any{ + "MinRequiredSignatures": 1, + "Signers": []string{"$(trigger.outputs.Metadata.Signer)"}, + }, + "Payload": []map[string]any{ + { + "BenchmarkPrice": "$(trigger.outputs.Payload.BuyPrice)", + "FeedID": anyFakeFeedID, + "FullReport": "$(trigger.outputs.Payload.FullReport)", + "ObservationTimestamp": "$(trigger.outputs.Payload.ObservationTimestamp)", + "ReportContext": "$(trigger.outputs.Payload.ReportContext)", + "Signatures": []string{"$(trigger.outputs.Payload.Signature)"}, + }, + }, + "Timestamp": "$(trigger.outputs.Timestamp)", + }, + }}, + }, + Config: map[string]any{ + "aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{ + AllowedPartialStaleness: "0.5", + Feeds: map[string]ocr3.FeedValue{ + anyFakeFeedID: { + Deviation: "0.5", + Heartbeat: 3600, + }, + }, + }, + "aggregation_method": "data_feeds", + "encoder": "EVM", + "encoder_config": ocr3.EncoderConfig{ + "Abi": "(bytes32 FeedID, uint224 Price, uint32 Timestamp)[] Reports", + }, + "report_id": "0001", + }, + CapabilityType: capabilities.CapabilityTypeConsensus, + }, + }, + Targets: []sdk.StepDefinition{ + { + ID: "write_ethereum-testnet-sepolia@1.0.0", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"}, + }, + Config: map[string]any{ + "address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416", + "deltaStage": "45s", + "schedule": "oneAtATime", + }, + CapabilityType: capabilities.CapabilityTypeTarget, + }, + }, +} + +var serialWorkflowSpec = sdk.WorkflowSpec{ + Name: "serial", + Owner: "owner", + Triggers: []sdk.StepDefinition{ + { + ID: "notstreams@1.0.0", + Ref: "trigger", + Inputs: sdk.StepInputs{}, + Config: map[string]any{"maxFrequencyMs": 5000}, + CapabilityType: capabilities.CapabilityTypeTrigger, + }, + }, + Actions: []sdk.StepDefinition{ + { + ID: "custom_compute@1.0.0", + Ref: "Compute", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"Arg0": "$(trigger.outputs)"}, + }, + Config: map[string]any{ + "binary": "$(ENV.binary)", + "config": "$(ENV.config)", + }, + CapabilityType: capabilities.CapabilityTypeAction, + }, + }, + Consensus: []sdk.StepDefinition{ + { + ID: "offchain_reporting@1.0.0", + Ref: "data-feeds-report", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"observations": "$(Compute.outputs.Value)"}, + }, + Config: map[string]any{ + "aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{ + AllowedPartialStaleness: "false", + Feeds: map[string]ocr3.FeedValue{ + anyFakeFeedID: { + Deviation: "0.5", + Heartbeat: 3600, + }, + }, + }, + "aggregation_method": "data_feeds", + "encoder": ocr3.EncoderEVM, + "encoder_config": ocr3.EncoderConfig{}, + "report_id": "0001", + }, + CapabilityType: capabilities.CapabilityTypeConsensus, + }, + }, + Targets: []sdk.StepDefinition{ + { + ID: "write_ethereum-testnet-sepolia@1.0.0", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"}, + }, + Config: map[string]any{ + "address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416", + "deltaStage": "45s", + "schedule": "oneAtATime", + }, + CapabilityType: capabilities.CapabilityTypeTarget, + }, + }, +} + +var parallelWorkflowSpec = sdk.WorkflowSpec{ + Name: "parallel", + Owner: "owner", + Triggers: []sdk.StepDefinition{ + { + ID: "chain_reader@1.0.0", + Ref: "trigger", + Inputs: sdk.StepInputs{}, + Config: map[string]any{"maxFrequencyMs": 5000}, + CapabilityType: capabilities.CapabilityTypeTrigger, + }, + }, + Actions: []sdk.StepDefinition{ + { + ID: "http@1.0.0", + Ref: "get-foo", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"Arg0": "$(trigger.outputs)"}, + }, + CapabilityType: capabilities.CapabilityTypeAction, + }, + { + ID: "custom_compute@1.0.0", + Ref: "compute-foo", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"Arg0": "$(get-foo.outputs)"}, + }, + CapabilityType: capabilities.CapabilityTypeAction, + }, + { + ID: "http@1.0.0", + Ref: "get-bar", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"Arg0": "$(trigger.outputs)"}, + }, + CapabilityType: capabilities.CapabilityTypeAction, + }, + { + ID: "custom_compute@1.0.0", + Ref: "compute-bar", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"Arg0": "$(get-bar.outputs)"}, + }, + CapabilityType: capabilities.CapabilityTypeAction, + }, + { + ID: "chain_reader@1.0.0", + Ref: "read-token-price", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"Arg0": "$(trigger.outputs)"}, + }, + CapabilityType: capabilities.CapabilityTypeAction, + }, + }, + Consensus: []sdk.StepDefinition{ + { + ID: "offchain_reporting@1.0.0", + Ref: "data-feeds-report", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{ + "observations": []string{ + "$(compute-foo.outputs.Value)", + "$(compute-bar.outputs.Value)", + }, + "token_price": "$(read-token-price.outputs.Value)", + }, + }, + CapabilityType: capabilities.CapabilityTypeConsensus, + }, + }, + Targets: []sdk.StepDefinition{ + { + ID: "write_ethereum-testnet-sepolia@1.0.0", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"}, + }, + CapabilityType: capabilities.CapabilityTypeTarget, + }, + }, +} + +const anyFakeFeedID = "0x0000000000000000000000000000000000000000000000000000000000000000" diff --git a/pkg/workflows/models_yaml_test.go b/pkg/workflows/models_yaml_test.go index 89ca32fff..7bd5e85e4 100644 --- a/pkg/workflows/models_yaml_test.go +++ b/pkg/workflows/models_yaml_test.go @@ -48,7 +48,7 @@ var transformJSON = cmp.FilterValues(func(x, y []byte) bool { return out })) -func TestWorkflowSpecMarshalling(t *testing.T) { +func TestWorkflowSpecYamlMarshalling(t *testing.T) { t.Parallel() fixtureReader := yamlFixtureReaderBytes(t, "marshalling") diff --git a/pkg/workflows/sdk/builder_test.go b/pkg/workflows/sdk/builder_test.go index 68133913f..bb9bc95c8 100644 --- a/pkg/workflows/sdk/builder_test.go +++ b/pkg/workflows/sdk/builder_test.go @@ -209,81 +209,7 @@ func TestBuilder_ValidSpec(t *testing.T) { actual, err := factory.Spec() require.NoError(t, err) - expected := sdk.WorkflowSpec{ - Name: "notccipethsep", - Owner: "0x00000000000000000000000000000000000000aa", - Triggers: []sdk.StepDefinition{ - { - ID: "notstreams@1.0.0", - Ref: "trigger", - Inputs: sdk.StepInputs{}, - Config: map[string]any{"maxFrequencyMs": 5000}, - CapabilityType: capabilities.CapabilityTypeTrigger, - }, - }, - Actions: make([]sdk.StepDefinition, 0), - Consensus: []sdk.StepDefinition{ - { - ID: "offchain_reporting@1.0.0", - Ref: "data-feeds-report", - Inputs: sdk.StepInputs{ - Mapping: map[string]any{"observations": []map[string]any{ - { - "Metadata": map[string]any{ - "MinRequiredSignatures": 1, - "Signers": []string{"$(trigger.outputs.Metadata.Signer)"}, - }, - "Payload": []map[string]any{ - { - "BenchmarkPrice": "$(trigger.outputs.Payload.BuyPrice)", - "FeedID": anyFakeFeedID, - "FullReport": "$(trigger.outputs.Payload.FullReport)", - "ObservationTimestamp": "$(trigger.outputs.Payload.ObservationTimestamp)", - "ReportContext": "$(trigger.outputs.Payload.ReportContext)", - "Signatures": []string{"$(trigger.outputs.Payload.Signature)"}, - }, - }, - "Timestamp": "$(trigger.outputs.Timestamp)", - }, - }}, - }, - Config: map[string]any{ - "aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{ - AllowedPartialStaleness: "0.5", - Feeds: map[string]ocr3.FeedValue{ - anyFakeFeedID: { - Deviation: "0.5", - Heartbeat: 3600, - }, - }, - }, - "aggregation_method": "data_feeds", - "encoder": "EVM", - "encoder_config": ocr3.EncoderConfig{ - "Abi": "(bytes32 FeedID, uint224 Price, uint32 Timestamp)[] Reports", - }, - "report_id": "0001", - }, - CapabilityType: capabilities.CapabilityTypeConsensus, - }, - }, - Targets: []sdk.StepDefinition{ - { - ID: "write_ethereum-testnet-sepolia@1.0.0", - Inputs: sdk.StepInputs{ - Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"}, - }, - Config: map[string]any{ - "address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416", - "deltaStage": "45s", - "schedule": "oneAtATime", - }, - CapabilityType: capabilities.CapabilityTypeTarget, - }, - }, - } - - testutils.AssertWorkflowSpec(t, expected, actual) + testutils.AssertWorkflowSpec(t, notStreamSepoliaWorkflowSpec, actual) }) t.Run("maps work correctly", func(t *testing.T) { diff --git a/pkg/workflows/sdk/testutils/utils.go b/pkg/workflows/sdk/testutils/utils.go index 21cb1a1f5..03dae10bd 100644 --- a/pkg/workflows/sdk/testutils/utils.go +++ b/pkg/workflows/sdk/testutils/utils.go @@ -1,6 +1,7 @@ package testutils import ( + "bytes" "encoding/json" "testing" @@ -11,11 +12,15 @@ import ( ) func AssertWorkflowSpec(t *testing.T, expectedSpec, testWorkflowSpec sdk.WorkflowSpec) { - expected, err := json.Marshal(expectedSpec) - require.NoError(t, err) + var b bytes.Buffer + e := json.NewEncoder(&b) + e.SetIndent("", " ") + require.NoError(t, e.Encode(expectedSpec)) + expected := b.String() - actual, err := json.Marshal(testWorkflowSpec) - require.NoError(t, err) + b.Reset() + require.NoError(t, e.Encode(testWorkflowSpec)) + actual := b.String() - assert.Equal(t, string(expected), string(actual)) + assert.Equal(t, expected, actual) } diff --git a/pkg/workflows/sdk/workflow_spec.go b/pkg/workflows/sdk/workflow_spec.go index 2f3338654..3c2f95d86 100644 --- a/pkg/workflows/sdk/workflow_spec.go +++ b/pkg/workflows/sdk/workflow_spec.go @@ -1,6 +1,8 @@ package sdk -import "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +import ( + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) type StepInputs struct { OutputRef string diff --git a/pkg/workflows/sdk/workflow_spec_test.go b/pkg/workflows/sdk/workflow_spec_test.go new file mode 100644 index 000000000..08c1b59e3 --- /dev/null +++ b/pkg/workflows/sdk/workflow_spec_test.go @@ -0,0 +1,83 @@ +package sdk_test + +import ( + _ "embed" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + ocr3 "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/ocr3cap" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" +) + +var notStreamSepoliaWorkflowSpec = sdk.WorkflowSpec{ + Name: "notccipethsep", + Owner: "0x00000000000000000000000000000000000000aa", + Triggers: []sdk.StepDefinition{ + { + ID: "notstreams@1.0.0", + Ref: "trigger", + Inputs: sdk.StepInputs{}, + Config: map[string]any{"maxFrequencyMs": 5000}, + CapabilityType: capabilities.CapabilityTypeTrigger, + }, + }, + Actions: make([]sdk.StepDefinition, 0), + Consensus: []sdk.StepDefinition{ + { + ID: "offchain_reporting@1.0.0", + Ref: "data-feeds-report", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"observations": []map[string]any{ + { + "Metadata": map[string]any{ + "MinRequiredSignatures": 1, + "Signers": []string{"$(trigger.outputs.Metadata.Signer)"}, + }, + "Payload": []map[string]any{ + { + "BenchmarkPrice": "$(trigger.outputs.Payload.BuyPrice)", + "FeedID": anyFakeFeedID, + "FullReport": "$(trigger.outputs.Payload.FullReport)", + "ObservationTimestamp": "$(trigger.outputs.Payload.ObservationTimestamp)", + "ReportContext": "$(trigger.outputs.Payload.ReportContext)", + "Signatures": []string{"$(trigger.outputs.Payload.Signature)"}, + }, + }, + "Timestamp": "$(trigger.outputs.Timestamp)", + }, + }}, + }, + Config: map[string]any{ + "aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{ + AllowedPartialStaleness: "0.5", + Feeds: map[string]ocr3.FeedValue{ + anyFakeFeedID: { + Deviation: "0.5", + Heartbeat: 3600, + }, + }, + }, + "aggregation_method": "data_feeds", + "encoder": "EVM", + "encoder_config": ocr3.EncoderConfig{ + "Abi": "(bytes32 FeedID, uint224 Price, uint32 Timestamp)[] Reports", + }, + "report_id": "0001", + }, + CapabilityType: capabilities.CapabilityTypeConsensus, + }, + }, + Targets: []sdk.StepDefinition{ + { + ID: "write_ethereum-testnet-sepolia@1.0.0", + Inputs: sdk.StepInputs{ + Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"}, + }, + Config: map[string]any{ + "address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416", + "deltaStage": "45s", + "schedule": "oneAtATime", + }, + CapabilityType: capabilities.CapabilityTypeTarget, + }, + }, +} diff --git a/pkg/workflows/testdata/fixtures/charts/README.md b/pkg/workflows/testdata/fixtures/charts/README.md new file mode 100644 index 000000000..5291e530c --- /dev/null +++ b/pkg/workflows/testdata/fixtures/charts/README.md @@ -0,0 +1,8 @@ +# WorkflowSpec Charts + +This directory contains WorkflowSpec chart golden files. They are validated against test data by TestWorkflowSpecFormatChart, +and can be regenerated by passing the `-update` flag: +```sh +go test -run=TestWorkflowSpecFormatChart ./pkg/workflows/sdk/ -update +``` +You can also invoke go:generate on package sdk, which will do the same. diff --git a/pkg/workflows/testdata/fixtures/charts/builder_parallel.md b/pkg/workflows/testdata/fixtures/charts/builder_parallel.md new file mode 100644 index 000000000..451bcc7ee --- /dev/null +++ b/pkg/workflows/testdata/fixtures/charts/builder_parallel.md @@ -0,0 +1,31 @@ +```mermaid +flowchart + + trigger[\"trigger
trigger
(basic-test-trigger[at]1.0.0)"/] + + compute["compute
action
(custom_compute[at]1.0.0)"] + get-baz --> compute + get-foo --> compute + get-bar --> compute + trigger --> compute + + get-bar["get-bar
action
(custom_compute[at]1.0.0)"] + trigger --> get-bar + trigger --> get-bar + + get-baz["get-baz
action
(custom_compute[at]1.0.0)"] + trigger --> get-baz + trigger --> get-baz + + get-foo["get-foo
action
(custom_compute[at]1.0.0)"] + trigger --> get-foo + trigger --> get-foo + + consensus[["consensus
consensus
(offchain_reporting[at]1.0.0)"]] + compute --> consensus + trigger --> consensus + + unnamed6[/"target
(id)"\] + trigger --> unnamed6 + +``` \ No newline at end of file diff --git a/pkg/workflows/testdata/fixtures/charts/notstreamssepolia.md b/pkg/workflows/testdata/fixtures/charts/notstreamssepolia.md new file mode 100644 index 000000000..f13f56371 --- /dev/null +++ b/pkg/workflows/testdata/fixtures/charts/notstreamssepolia.md @@ -0,0 +1,13 @@ +```mermaid +flowchart + + trigger[\"trigger
trigger
(notstreams[at]1.0.0)"/] + + data-feeds-report[["data-feeds-report
consensus
(offchain_reporting[at]1.0.0)"]] + trigger --> data-feeds-report + trigger --> data-feeds-report + + unnamed2[/"target
(write_ethereum-testnet-sepolia[at]1.0.0)"\] + trigger --> unnamed2 + +``` \ No newline at end of file diff --git a/pkg/workflows/testdata/fixtures/charts/parallel.md b/pkg/workflows/testdata/fixtures/charts/parallel.md new file mode 100644 index 000000000..a5f17ee44 --- /dev/null +++ b/pkg/workflows/testdata/fixtures/charts/parallel.md @@ -0,0 +1,33 @@ +```mermaid +flowchart + + trigger[\"trigger
trigger
(chain_reader[at]1.0.0)"/] + + compute-bar["compute-bar
action
(custom_compute[at]1.0.0)"] + get-bar --> compute-bar + trigger --> compute-bar + + compute-foo["compute-foo
action
(custom_compute[at]1.0.0)"] + get-foo --> compute-foo + trigger --> compute-foo + + get-bar["get-bar
action
(http[at]1.0.0)"] + trigger --> get-bar + trigger --> get-bar + + get-foo["get-foo
action
(http[at]1.0.0)"] + trigger --> get-foo + trigger --> get-foo + + read-token-price["read-token-price
action
(chain_reader[at]1.0.0)"] + trigger --> read-token-price + trigger --> read-token-price + + data-feeds-report[["data-feeds-report
consensus
(offchain_reporting[at]1.0.0)"]] + read-token-price --> data-feeds-report + trigger --> data-feeds-report + + unnamed7[/"target
(write_ethereum-testnet-sepolia[at]1.0.0)"\] + trigger --> unnamed7 + +``` \ No newline at end of file diff --git a/pkg/workflows/testdata/fixtures/charts/parallel_serialized.md b/pkg/workflows/testdata/fixtures/charts/parallel_serialized.md new file mode 100644 index 000000000..a8f06c097 --- /dev/null +++ b/pkg/workflows/testdata/fixtures/charts/parallel_serialized.md @@ -0,0 +1,31 @@ +```mermaid +flowchart + + trigger-chain-event[\"trigger-chain-event
trigger
(chain_reader[at]1.0.0)"/] + + compute-bar["compute-bar
action
(custom_compute[at]1.0.0)"] + get-bar --> compute-bar + + compute-foo["compute-foo
action
(custom_compute[at]1.0.0)"] + get-foo --> compute-foo + + get-bar["get-bar
action
(http[at]1.0.0)"] + compute-foo -..-> get-bar + trigger-chain-event --> get-bar + + get-foo["get-foo
action
(http[at]1.0.0)"] + trigger-chain-event --> get-foo + + read-token-price["read-token-price
action
(chain_reader[at]1.0.0)"] + compute-bar -..-> read-token-price + trigger-chain-event --> read-token-price + + data-feeds-report[["data-feeds-report
consensus
(offchain_reporting[at]1.0.0)"]] + compute-bar -- Value --> data-feeds-report + compute-foo -- Value --> data-feeds-report + read-token-price -- Value --> data-feeds-report + + unnamed7[/"target
(write_ethereum-testnet-sepolia[at]1.0.0)"\] + data-feeds-report --> unnamed7 + +``` \ No newline at end of file diff --git a/pkg/workflows/testdata/fixtures/charts/serial.md b/pkg/workflows/testdata/fixtures/charts/serial.md new file mode 100644 index 000000000..a32f6ed7a --- /dev/null +++ b/pkg/workflows/testdata/fixtures/charts/serial.md @@ -0,0 +1,17 @@ +```mermaid +flowchart + + trigger[\"trigger
trigger
(notstreams[at]1.0.0)"/] + + Compute["Compute
action
(custom_compute[at]1.0.0)"] + trigger --> Compute + trigger --> Compute + + data-feeds-report[["data-feeds-report
consensus
(offchain_reporting[at]1.0.0)"]] + Compute --> data-feeds-report + trigger --> data-feeds-report + + unnamed3[/"target
(write_ethereum-testnet-sepolia[at]1.0.0)"\] + trigger --> unnamed3 + +``` \ No newline at end of file