Skip to content

Commit

Permalink
feat: add DRY_RUN config option
Browse files Browse the repository at this point in the history
Add a `DRY_RUN` configuration option that controls whether the service
executes operations or simply logs the received events.

This is a temporary feature needed to help test the recently added logic
to monitor contract events using http connections without causing any
side-effects.
  • Loading branch information
gustavogama-cll committed Nov 15, 2024
1 parent f623949 commit ba27050
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 161 deletions.
9 changes: 8 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func startCommand() *cobra.Command {

nodeURL, privateKey, timelockAddress, callProxyAddress string
fromBlock, pollPeriod, eventListenerPollPeriod int64
dryRun bool
)

// Initialize timelock-worker configuration.
Expand All @@ -40,6 +41,7 @@ func startCommand() *cobra.Command {
startCmd.Flags().Int64Var(&fromBlock, "from-block", timelockConf.FromBlock, "Start watching from this block")
startCmd.Flags().Int64Var(&pollPeriod, "poll-period", timelockConf.PollPeriod, "Poll period in seconds")
startCmd.Flags().Int64Var(&eventListenerPollPeriod, "event-listener-poll-period", timelockConf.EventListenerPollPeriod, "Event Listener poll period in seconds")
startCmd.Flags().BoolVar(&dryRun, "dry-run", timelockConf.DryRun, "Enable \"dry run\" mode -- monitor events but don't trigger any calls")

return &startCmd
}
Expand Down Expand Up @@ -86,8 +88,13 @@ func startTimelock(cmd *cobra.Command) {
logs.Fatal().Msgf("value of poll-period not set: %s", err.Error())
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
logs.Fatal().Msgf("value of dry-run not set: %s", err.Error())
}

tWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey,
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, logs)
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, dryRun, logs)
if err != nil {
logs.Fatal().Msgf("error creating the timelock-worker: %s", err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/ethereum/go-ethereum v1.13.15
github.com/prometheus/client_golang v1.19.0
github.com/rs/zerolog v1.31.0
github.com/samber/lo v1.47.0
github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240917103524-56f1a8d2cd4b
github.com/smartcontractkit/chain-selectors v1.0.17
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -95,9 +96,9 @@ require (
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ github.com/sagikazarmark/locafero v0.3.0 h1:zT7VEGWC2DTflmccN/5T1etyKvxSxpHsjb9c
github.com/sagikazarmark/locafero v0.3.0/go.mod h1:w+v7UsPNFwzF1cHuOajOOzoq4U7v/ig1mpRjqV+Bu1U=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4=
Expand Down Expand Up @@ -559,8 +561,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -630,8 +632,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package cli
import (
"fmt"
"os"
"slices"
"strconv"
"strings"

"github.com/spf13/viper"
)
Expand All @@ -17,6 +19,7 @@ type Config struct {
FromBlock int64 `mapstructure:"FROM_BLOCK"`
PollPeriod int64 `mapstructure:"POLL_PERIOD"`
EventListenerPollPeriod int64 `mapstructure:"EVENT_LISTENER_POLL_PERIOD"`
DryRun bool `mapstructure:"DRY_RUN"`
}

// NewTimelockCLI return a new Timelock instance configured.
Expand Down Expand Up @@ -80,5 +83,10 @@ func NewTimelockCLI() (*Config, error) {
c.EventListenerPollPeriod = int64(pp)
}

if os.Getenv("DRY_RUN") != "" {
trueValues := []string{"true", "yes", "on", "enabled", "1"}
c.DryRun = slices.Contains(trueValues, strings.ToLower(os.Getenv("DRY_RUN")))
}

return &c, nil
}
12 changes: 9 additions & 3 deletions pkg/cli/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Test_NewTimelockCLI(t *testing.T) {
name: "load from file",
setup: func(t *testing.T) {
unsetenvs(t, "NODE_URL", "TIMELOCK_ADDRESS", "CALL_PROXY_ADDRESS", "PRIVATE_KEY", "FROM_BLOCK",
"POLL_PERIOD", "EVENT_LISTENER_POLL_PERIOD")
"POLL_PERIOD", "EVENT_LISTENER_POLL_PERIOD", "DRY_RUN")

err := os.WriteFile(configFileName, []byte(string(
"NODE_URL=wss://goerli/test\n"+
Expand All @@ -30,7 +30,8 @@ func Test_NewTimelockCLI(t *testing.T) {
"PRIVATE_KEY=9876543210\n"+
"FROM_BLOCK=1\n"+
"POLL_PERIOD=2\n"+
"EVENT_LISTENER_POLL_PERIOD=3\n",
"EVENT_LISTENER_POLL_PERIOD=3\n"+
"DRY_RUN=true\n",
)), os.FileMode(0644))
require.NoError(t, err)

Expand All @@ -44,6 +45,7 @@ func Test_NewTimelockCLI(t *testing.T) {
FromBlock: 1,
PollPeriod: 2,
EventListenerPollPeriod: 3,
DryRun: true,
},
},
{
Expand All @@ -56,7 +58,8 @@ func Test_NewTimelockCLI(t *testing.T) {
"PRIVATE_KEY=9876543210\n"+
"FROM_BLOCK=1\n"+
"POLL_PERIOD=2\n"+
"EVENT_LISTENER_POLL_PERIOD=3\n",
"EVENT_LISTENER_POLL_PERIOD=3\n"+
"DRY_RUN=true\n",
)), os.FileMode(0644))
require.NoError(t, err)

Expand All @@ -67,6 +70,7 @@ func Test_NewTimelockCLI(t *testing.T) {
t.Setenv("FROM_BLOCK", "4")
t.Setenv("POLL_PERIOD", "5")
t.Setenv("EVENT_LISTENER_POLL_PERIOD", "6")
t.Setenv("DRY_RUN", "false")

t.Cleanup(func() { os.Remove(configFileName) })
},
Expand Down Expand Up @@ -95,6 +99,7 @@ func Test_NewTimelockCLI(t *testing.T) {
t.Setenv("FROM_BLOCK", "4")
t.Setenv("POLL_PERIOD", "5")
t.Setenv("EVENT_LISTENER_POLL_PERIOD", "6")
t.Setenv("DRY_RUN", "yes")

t.Cleanup(func() { os.Remove(configFileName) })
},
Expand All @@ -106,6 +111,7 @@ func Test_NewTimelockCLI(t *testing.T) {
FromBlock: 4,
PollPeriod: 5,
EventListenerPollPeriod: 6,
DryRun: true,
},
},
{
Expand Down
1 change: 1 addition & 0 deletions pkg/timelock/const_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ var (
testFromBlock = big.NewInt(0)
testPollPeriod = 5
testEventListenerPollPeriod = 0
testDryRun = false
testLogger = logger.Logger("info", "human")
)
8 changes: 4 additions & 4 deletions pkg/timelock/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func Test_isOperation(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down Expand Up @@ -56,7 +56,7 @@ func Test_isOperation(t *testing.T) {

func Test_isReady(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_isReady(t *testing.T) {

func Test_isDone(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down Expand Up @@ -144,7 +144,7 @@ func Test_isDone(t *testing.T) {

func Test_isPending(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down
4 changes: 4 additions & 0 deletions pkg/timelock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,17 @@ func (tw *Worker) updateSchedulerDelay(t time.Duration) {

// addToScheduler adds a new CallSchedule operation safely to the store.
func (tw *Worker) addToScheduler(op *contract.TimelockCallScheduled) {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.logger.Debug().Msgf("scheduling operation: %x", op.Id)
tw.add <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

// delFromScheduler deletes an operation safely from the store.
func (tw *Worker) delFromScheduler(op operationKey) {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.logger.Debug().Msgf("de-scheduling operation: %v", op)
tw.del <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
Expand Down
8 changes: 4 additions & 4 deletions pkg/timelock/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Test_newScheduler(t *testing.T) {

func TestWorker_updateSchedulerDelay(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

// Should never fail
testWorker.updateSchedulerDelay(1 * time.Second)
Expand All @@ -56,7 +56,7 @@ func TestWorker_updateSchedulerDelay(t *testing.T) {

func TestWorker_isSchedulerBusy(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

isBusy := testWorker.isSchedulerBusy()
assert.Equal(t, false, isBusy, "scheduler should be busy by default")
Expand All @@ -72,7 +72,7 @@ func TestWorker_isSchedulerBusy(t *testing.T) {

func TestWorker_setSchedulerBusy(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

testWorker.setSchedulerBusy()
isBusy := testWorker.isSchedulerBusy()
Expand All @@ -81,7 +81,7 @@ func TestWorker_setSchedulerBusy(t *testing.T) {

func TestWorker_setSchedulerFree(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

testWorker.setSchedulerFree()
isBusy := testWorker.isSchedulerBusy()
Expand Down
18 changes: 13 additions & 5 deletions pkg/timelock/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Worker struct {
fromBlock *big.Int
pollPeriod int64
listenerPollPeriod int64
dryRun bool
logger *zerolog.Logger
privateKey *ecdsa.PrivateKey
scheduler
Expand All @@ -49,7 +50,7 @@ var validNodeUrlSchemes = []string{"http", "https", "ws", "wss"}
// It's a singleton, so further executions will retrieve the same timelockWorker.
func NewTimelockWorker(
nodeURL, timelockAddress, callProxyAddress, privateKey string, fromBlock *big.Int,
pollPeriod int64, listenerPollPeriod int64, logger *zerolog.Logger,
pollPeriod int64, listenerPollPeriod int64, dryRun bool, logger *zerolog.Logger,
) (*Worker, error) {
// Sanity check on each provided variable before allocating more resources.
u, err := url.ParseRequestURI(nodeURL)
Expand Down Expand Up @@ -126,6 +127,7 @@ func NewTimelockWorker(
fromBlock: fromBlock,
pollPeriod: pollPeriod,
listenerPollPeriod: listenerPollPeriod,
dryRun: dryRun,
logger: logger,
privateKey: privateKeyECDSA,
scheduler: *newScheduler(time.Duration(pollPeriod) * time.Second),
Expand Down Expand Up @@ -196,7 +198,7 @@ func (tw *Worker) setupFilterQuery(fromBlock *big.Int) ethereum.FilterQuery {
// retrieveNewLogs returns a "control channel" and a "logs channels". The logs channel is where
// new log events will be asynchronously pushed to.
//
// The actual retrieveal is performed by either `subscribeNewLogs`, if the node connection
// The actual retrieval is performed by either `subscribeNewLogs`, if the node connection
// supports subscriptions, or `pollNewLogs` otherwise. In practice, the ethclient library
// simply checks if the given node URL is "http(s)" or not.
func (tw *Worker) retrieveNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
Expand Down Expand Up @@ -449,7 +451,9 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {

if !isDone(ctx, tw.contract, cs.Id) && isOperation(ctx, tw.contract, cs.Id) {
tw.logger.Info().Hex(fieldTXHash, cs.Raw.TxHash[:]).Uint64(fieldBlockNumber, cs.Raw.BlockNumber).Msgf("%s received", eventCallScheduled)
tw.addToScheduler(cs)
if !tw.dryRun {
tw.addToScheduler(cs)
}
}

// A CallExecuted which is in Done status should delete the task in the scheduler store.
Expand All @@ -461,7 +465,9 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {

if isDone(ctx, tw.contract, cs.Id) {
tw.logger.Info().Hex(fieldTXHash, cs.Raw.TxHash[:]).Uint64(fieldBlockNumber, cs.Raw.BlockNumber).Msgf("%s received, skipping operation", eventCallExecuted)
tw.delFromScheduler(cs.Id)
if !tw.dryRun {
tw.delFromScheduler(cs.Id)
}
}

// A Cancelled which is in Done status should delete the task in the scheduler store.
Expand All @@ -473,7 +479,9 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {

if isDone(ctx, tw.contract, cs.Id) {
tw.logger.Info().Hex(fieldTXHash, cs.Raw.TxHash[:]).Uint64(fieldBlockNumber, cs.Raw.BlockNumber).Msgf("%s received, cancelling operation", eventCancelled)
tw.delFromScheduler(cs.Id)
if !tw.dryRun {
tw.delFromScheduler(cs.Id)
}
}
default:
tw.logger.Info().Str("event", event.Name).Msgf("discarding event")
Expand Down
Loading

0 comments on commit ba27050

Please sign in to comment.