Skip to content

Commit

Permalink
timelock: removes global waitgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed May 1, 2024
1 parent 6a9f80f commit 68576f6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 68 deletions.
16 changes: 8 additions & 8 deletions pkg/timelock/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
// - The predecessor operation is finished
// - The operation is ready to be executed
// Otherwise the operation will throw an info log and wait for a future tick.
func (tw *Worker) execute(ctx context.Context, op []*contract.TimelockCallScheduled) {
func (tw *worker) execute(ctx context.Context, op []*contract.TimelockCallScheduled) {
if !isReady(ctx, tw.contract, op[0].Id) {
tw.logger.Info().Msgf("skipping operation %x: not ready", op[0].Id)
}

tw.logger.Debug().Msgf("execute operation %x", op[0].Id)
tx, err := executeCallSchedule(ctx, &tw.executeContract.TimelockTransactor, op, tw.privateKey)
tx, err := tw.executeCallSchedule(ctx, &tw.executeContract.TimelockTransactor, op, tw.privateKey)
if err != nil || tx == nil {
tw.logger.Error().Msgf("execute operation %x error: %s", op[0].Id, err.Error())
} else {
Expand All @@ -35,7 +35,7 @@ func (tw *Worker) execute(ctx context.Context, op []*contract.TimelockCallSchedu
}

// executeCallScheduleOperation is the handler to execute a CallScheduled operation.
func executeCallSchedule(ctx context.Context, c *contract.TimelockTransactor, cs []*contract.TimelockCallScheduled, privateKey *ecdsa.PrivateKey) (*types.Transaction, error) {
func (tw *worker) executeCallSchedule(ctx context.Context, c *contract.TimelockTransactor, cs []*contract.TimelockCallScheduled, privateKey *ecdsa.PrivateKey) (*types.Transaction, error) {
fromAddress, err := privateKeyToAddress(privateKey)
if err != nil {
return nil, err
Expand All @@ -56,7 +56,7 @@ func executeCallSchedule(ctx context.Context, c *contract.TimelockTransactor, cs
tx, err := c.ExecuteBatch(
&bind.TransactOpts{
From: fromAddress,
Signer: signTx,
Signer: tw.signTx,
Context: ctx},
calls,
cs[0].Predecessor,
Expand Down Expand Up @@ -112,17 +112,17 @@ func isPending(ctx context.Context, c *contract.Timelock, id [32]byte) bool {
}

// signTx is a function that implements the type SignerFn, so can be passed as a Signer method.
func signTx(address common.Address, tx *types.Transaction) (*types.Transaction, error) {
if tWorker == nil {
func (tw *worker) signTx(address common.Address, tx *types.Transaction) (*types.Transaction, error) {
if tw == nil {
return nil, fmt.Errorf("timelockWorker can't be instantiated")
}

chainID, err := tWorker.ethClient.NetworkID(context.Background())
chainID, err := tw.ethClient.NetworkID(context.Background())
if err != nil {
return nil, err
}

signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(chainID), tWorker.privateKey)
signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(chainID), tw.privateKey)
if err != nil {
return nil, err
}
Expand Down
26 changes: 15 additions & 11 deletions pkg/timelock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func newScheduler(tick time.Duration) *scheduler {
// call them this way so no process is allowd to add/delete from
// the store, which could cause race conditions like adding/deleting
// while the operation is being executed.
func (tw *Worker) runScheduler(ctx context.Context) {
wg.Add(1)
func (tw *worker) runScheduler(ctx context.Context) <-chan struct{} {
done := make(chan struct{})

go func() {
defer wg.Done()
defer close(done)

for {
select {
case <-tw.ticker.C:
Expand Down Expand Up @@ -94,10 +96,12 @@ func (tw *Worker) runScheduler(ctx context.Context) {
}
}
}()

return done
}

// updateSchedulerDelay updates the internal ticker delay, so it can be reconfigured while running.
func (tw *Worker) updateSchedulerDelay(t time.Duration) {
func (tw *worker) updateSchedulerDelay(t time.Duration) {
if t <= 0 {
tw.logger.Debug().Msgf("internal min delay not changed, invalid duration: %v", t.String())
return
Expand All @@ -108,40 +112,40 @@ func (tw *Worker) updateSchedulerDelay(t time.Duration) {
}

// addToScheduler adds a new CallSchedule operation safely to the store.
func (tw *Worker) addToScheduler(op *contract.TimelockCallScheduled) {
func (tw *worker) addToScheduler(op *contract.TimelockCallScheduled) {
tw.logger.Debug().Msgf("scheduling operation: %x", op.Id)
tw.add <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

// delFromScheduler deletes an operation safely from the store.
func (tw *Worker) delFromScheduler(op operationKey) {
func (tw *worker) delFromScheduler(op operationKey) {
tw.logger.Debug().Msgf("de-scheduling operation: %v", op)
tw.del <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

func (tw *Worker) setSchedulerBusy() {
func (tw *worker) setSchedulerBusy() {
tw.logger.Debug().Msgf("setting scheduler busy")
tw.mu.Lock()
tw.busy = true
tw.mu.Unlock()
}

func (tw *Worker) setSchedulerFree() {
func (tw *worker) setSchedulerFree() {
tw.logger.Debug().Msgf("setting scheduler free")
tw.mu.Lock()
tw.busy = false
tw.mu.Unlock()
}

func (tw *Worker) isSchedulerBusy() bool {
func (tw *worker) isSchedulerBusy() bool {
return tw.busy
}

// dumpOperationStore dumps to the logger and to the log file the current scheduled unexecuted operations.
// maps in go don't guarantee order, so that's why we have to find the earliest block.
func (tw *Worker) dumpOperationStore(now func() time.Time) {
func (tw *worker) dumpOperationStore() {
if len(tw.store) <= 0 {
tw.logger.Info().Msgf("no operations to dump")
return
Expand All @@ -164,7 +168,7 @@ func (tw *Worker) dumpOperationStore(now func() time.Time) {

w := bufio.NewWriter(f)

writeOperationStore(w, tw.logger, tw.store, blocks[0], now)
writeOperationStore(w, tw.logger, tw.store, blocks[0], tw.now)

w.Flush()
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/timelock/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package timelock
import (
"fmt"
"os"
"path"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestWorker_setSchedulerFree(t *testing.T) {
// Test_dumpOperationStore tests the dumpOperationStore method and ensures that it writes the correct contents to the file.
func Test_dumpOperationStore(t *testing.T) {
var (
fName = logPath + logFile
fName = path.Join(logPath, logFile)
logger = zerolog.Nop()
earliestBlock = 42
opKeys = generateOpKeys(t, []string{"1", "2"})
Expand All @@ -113,7 +114,7 @@ func Test_dumpOperationStore(t *testing.T) {
opKeys[1]: {following},
}

worker = &Worker{
worker = &worker{
logger: &logger,
scheduler: scheduler{
store: store,
Expand All @@ -132,10 +133,12 @@ func Test_dumpOperationStore(t *testing.T) {
return date
}

worker.now = nowFunc

wantPrefix := fmt.Sprintf("Process stopped at %v\n", nowFunc().In(time.UTC))

// Write the store to the file.
worker.dumpOperationStore(nowFunc)
worker.dumpOperationStore()

// Read the file and compare the contents.
gotRead, err := os.ReadFile(fName)
Expand Down
Loading

0 comments on commit 68576f6

Please sign in to comment.