Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into zk-enable-autopurge
Browse files Browse the repository at this point in the history
  • Loading branch information
friedemannf committed Nov 8, 2024
2 parents 18ff38d + 68bff71 commit 8f708a7
Show file tree
Hide file tree
Showing 26 changed files with 262 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/small-gifts-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated Operator UI to support StreamSpec job definition
42 changes: 42 additions & 0 deletions .github/workflows/delete-caches.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Cleanup Caches

# See:
# https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries

on:
pull_request:
types:
- closed

jobs:
cleanup-branch-caches:
runs-on: ubuntu-latest
permissions:
# `actions:write` permission is required to delete caches
# See also: https://docs.github.com/en/rest/actions/cache?apiVersion=2022-11-28#delete-a-github-actions-cache-for-a-repository-using-a-cache-id
actions: write
contents: read
steps:
- name: Check out code
uses: actions/checkout@v4.1.2

- name: Cleanup Branch Caches
run: |
gh extension install actions/gh-actions-cache
REPO=${{ github.repository }}
BRANCH=refs/pull/${{ github.event.pull_request.number }}/merge
echo "Fetching list of cache key"
cacheKeysForPR=$(gh actions-cache list -R $REPO -B $BRANCH | cut -f 1 )
## Setting this to not fail the workflow while deleting cache keys.
set +e
echo "Deleting caches..."
for cacheKey in $cacheKeysForPR
do
gh actions-cache delete $cacheKey -R $REPO -B $BRANCH --confirm
done
echo "Done"
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ jobs:
contents: read
needs: [build-chainlink, changes]
if: github.event_name == 'pull_request' && ( needs.changes.outputs.core_changes == 'true' || needs.changes.outputs.github_ci_changes == 'true')
uses: smartcontractkit/.github/.github/workflows/run-e2e-tests.yml@ca50645120f0f07ed8c0df08175a5d6b3e4ac454 #ctf-run-tests@0.1.2
uses: smartcontractkit/.github/.github/workflows/run-e2e-tests.yml@5412507526722a7b1c5d719fa686eed5a1bc4035 #ctf-run-tests@0.2.0
with:
workflow_name: Run Core E2E Tests For PR
chainlink_version: ${{ inputs.evm-ref || github.sha }}
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/integration_tests/keystone/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ consensus:
observations:
- "$(trigger.outputs)"
config:
report_id: "0001"
report_id: "0001"
key_id: "evm"
aggregation_method: "data_feeds"
aggregation_config:
feeds:
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241106142051-c7bded1c08ae
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241107134205-25e45ecd73ba
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.14.0-mercury-20240807.0.20241106193309-5560cd76211a
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241106140121-4c9ee21ab422 h1:VfH/AW5NtTmroY9zz6OYCPFbFTqpMyJ2ubgT9ahYf3U=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241106140121-4c9ee21ab422/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241106142051-c7bded1c08ae h1:uqce0bjNVYzFrrVLafXgyn8SVNdfOtZekLfAwQihHiA=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241106142051-c7bded1c08ae/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241107134205-25e45ecd73ba h1:S9RFy8UzhZ/kWXVbTauzZCyhsodTDrB7i0w6ULYQi/0=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241107134205-25e45ecd73ba/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestDefaultEvmBatchCaller_batchCallLimit(t *testing.T) {
{numCalls: 10, batchSize: 100, parallelRpcCallsLimit: 10},
{numCalls: 1, batchSize: 100, parallelRpcCallsLimit: 10},
{numCalls: 1000, batchSize: 10, parallelRpcCallsLimit: 2},
{numCalls: rand.Uint() % 1000, batchSize: rand.Uint() % 500, parallelRpcCallsLimit: rand.Uint() % 500},
{numCalls: 1 + rand.Uint()%1000, batchSize: 1 + rand.Uint()%500, parallelRpcCallsLimit: 1 + rand.Uint()%500},
}

for _, tc := range testCases {
Expand Down
52 changes: 38 additions & 14 deletions core/services/standardcapabilities/standard_capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package standardcapabilities
import (
"context"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -12,6 +14,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/plugins"
)

const defaultStartTimeout = 3 * time.Minute

type standardCapabilities struct {
services.StateMachine
log logger.Logger
Expand All @@ -26,6 +30,10 @@ type standardCapabilities struct {
oracleFactory core.OracleFactory

capabilitiesLoop *loop.StandardCapabilitiesService

wg sync.WaitGroup
stopChan services.StopChan
startTimeout time.Duration
}

func newStandardCapabilities(
Expand All @@ -51,6 +59,7 @@ func newStandardCapabilities(
pipelineRunner: pipelineRunner,
relayerSet: relayerSet,
oracleFactory: oracleFactory,
stopChan: make(chan struct{}),
}
}

Expand All @@ -63,38 +72,53 @@ func (s *standardCapabilities) Start(ctx context.Context) error {
Cmd: cmdName,
Env: nil,
})

if err != nil {
return fmt.Errorf("error registering loop: %v", err)
}

s.capabilitiesLoop = loop.NewStandardCapabilitiesService(s.log, opts, cmdFn)

if err = s.capabilitiesLoop.Start(ctx); err != nil {
return fmt.Errorf("error starting standard capabilities service: %v", err)
}

if err = s.capabilitiesLoop.WaitCtx(ctx); err != nil {
return fmt.Errorf("error waiting for standard capabilities service to start: %v", err)
}
s.wg.Add(1)
go func() {
defer s.wg.Done()

if err = s.capabilitiesLoop.Service.Initialise(ctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog,
s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil {
return fmt.Errorf("error initialising standard capabilities service: %v", err)
}
if s.startTimeout == 0 {
s.startTimeout = defaultStartTimeout
}

capabilityInfos, err := s.capabilitiesLoop.Service.Infos(ctx)
if err != nil {
return fmt.Errorf("error getting standard capabilities service info: %v", err)
}
cctx, cancel := s.stopChan.CtxWithTimeout(s.startTimeout)
defer cancel()

if err = s.capabilitiesLoop.WaitCtx(cctx); err != nil {
s.log.Errorf("error waiting for standard capabilities service to start: %v", err)
return
}

if err = s.capabilitiesLoop.Service.Initialise(cctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog,
s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil {
s.log.Errorf("error initialising standard capabilities service: %v", err)
return
}

capabilityInfos, err := s.capabilitiesLoop.Service.Infos(cctx)
if err != nil {
s.log.Errorf("error getting standard capabilities service info: %v", err)
return
}

s.log.Info("Started standard capabilities for job spec", "spec", s.spec, "capabilities", capabilityInfos)
s.log.Info("Started standard capabilities for job spec", "spec", s.spec, "capabilities", capabilityInfos)
}()

return nil
})
}

func (s *standardCapabilities) Close() error {
close(s.stopChan)
s.wg.Wait()
return s.StopOnce("StandardCapabilities", func() error {
if s.capabilitiesLoop != nil {
return s.capabilitiesLoop.Close()
Expand Down
99 changes: 99 additions & 0 deletions core/services/standardcapabilities/standard_capabilities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package standardcapabilities

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

func TestStandardCapabilityStart(t *testing.T) {
t.Run("NOK-not_found_binary_does_not_block", func(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)

pluginRegistrar := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return &plugins.RegisteredLoop{}, nil }, func(loopId string) {})
registry := mocks.NewCapabilitiesRegistry(t)

spec := &job.StandardCapabilitiesSpec{
Command: "not/found/path/to/binary",
OracleFactory: job.OracleFactoryConfig{
Enabled: true,
BootstrapPeers: []string{
"12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690",
},
OCRContractAddress: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6",
ChainID: "31337",
Network: "evm",
}}

standardCapability := newStandardCapabilities(lggr, spec, pluginRegistrar, &telemetryServiceMock{}, &kvstoreMock{}, registry, &errorLogMock{}, &pipelineRunnerServiceMock{}, &relayerSetMock{}, &oracleFactoryMock{})
standardCapability.startTimeout = 1 * time.Second
err := standardCapability.Start(ctx)
require.NoError(t, err)

standardCapability.wg.Wait()
})
}

type telemetryServiceMock struct{}

func (t *telemetryServiceMock) Send(ctx context.Context, network string, chainID string, contractID string, telemetryType string, payload []byte) error {
return nil
}

type kvstoreMock struct{}

func (k *kvstoreMock) Store(ctx context.Context, key string, val []byte) error {
return nil
}
func (k *kvstoreMock) Get(ctx context.Context, key string) ([]byte, error) {
return nil, nil
}

type errorLogMock struct{}

func (e *errorLogMock) SaveError(ctx context.Context, msg string) error {
return nil
}

type relayerSetMock struct{}

func (r *relayerSetMock) Get(ctx context.Context, relayID types.RelayID) (core.Relayer, error) {
return nil, nil
}
func (r *relayerSetMock) List(ctx context.Context, relayIDs ...types.RelayID) (map[types.RelayID]core.Relayer, error) {
return nil, nil
}

type pipelineRunnerServiceMock struct{}

func (p *pipelineRunnerServiceMock) ExecuteRun(ctx context.Context, spec string, vars core.Vars, options core.Options) (core.TaskResults, error) {
return nil, nil
}

type oracleFactoryMock struct{}

func (o *oracleFactoryMock) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) {
return &oracleMock{}, nil
}

type oracleMock struct{}

func (o *oracleMock) Start(ctx context.Context) error {
return nil
}
func (o *oracleMock) Close(ctx context.Context) error {
return nil
}
2 changes: 1 addition & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ var (
)

func (e *Engine) resumeInProgressExecutions(ctx context.Context) error {
wipExecutions, err := e.executionStates.GetUnfinished(ctx, defaultOffset, defaultLimit)
wipExecutions, err := e.executionStates.GetUnfinished(ctx, e.workflow.id, defaultOffset, defaultLimit)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
)

const testWorkflowId = "<workflow-id>"
Expand Down Expand Up @@ -149,6 +148,12 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,
return newTestEngine(t, reg, sdkSpec, opts...)
}

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
return map[string]string{}, nil
}

// newTestEngine creates a new engine with some test defaults.
func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec, opts ...func(c *Config)) (*Engine, *testHooks) {
initFailed := make(chan struct{})
Expand All @@ -174,7 +179,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec
onExecutionFinished: func(weid string) {
executionFinished <- weid
},
SecretsFetcher: syncer.NewWorkflowRegistry(),
SecretsFetcher: mockSecretsFetcher{},
clock: clock,
}
for _, o := range opts {
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Store interface {
UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error)
UpdateStatus(ctx context.Context, executionID string, status string) error
Get(ctx context.Context, executionID string) (WorkflowExecution, error)
GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error)
GetUnfinished(ctx context.Context, workflowID string, offset, limit int) ([]WorkflowExecution, error)
}

var _ Store = (*DBStore)(nil)
21 changes: 11 additions & 10 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ type DBStore struct {
// `workflowExecutionRow` describes a row
// of the `workflow_executions` table
type workflowExecutionRow struct {
ID string
WorkflowID *string
Status string
CreatedAt *time.Time
UpdatedAt *time.Time
FinishedAt *time.Time
ID string `db:"id"`
WorkflowID *string `db:"workflow_id"`
Status string `db:"status"`
CreatedAt *time.Time `db:"created_at"`
UpdatedAt *time.Time `db:"updated_at"`
FinishedAt *time.Time `db:"finished_at"`
}

// `workflowStepRow` describes a row
Expand Down Expand Up @@ -362,7 +362,7 @@ func (d *DBStore) transact(ctx context.Context, fn func(*DBStore) error) error {
)
}

func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error) {
func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset, limit int) ([]WorkflowExecution, error) {
sql := `
SELECT
workflow_steps.workflow_execution_id AS ws_workflow_execution_id,
Expand All @@ -382,12 +382,13 @@ func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]Workf
JOIN workflow_steps
ON workflow_steps.workflow_execution_id = workflow_executions.id
WHERE workflow_executions.status = $1
AND workflow_executions.workflow_id = $2
ORDER BY workflow_executions.created_at DESC
LIMIT $2
OFFSET $3
LIMIT $3
OFFSET $4
`
var joinRecords []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, limit, offset)
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, workflowID, limit, offset)
if err != nil {
return []WorkflowExecution{}, err
}
Expand Down
Loading

0 comments on commit 8f708a7

Please sign in to comment.