Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timelock: handle log processing shutdown and Listen shutdown #55

Merged
merged 3 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pkg/isclosed/isclosed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package isclosed

import (
"context"
"sync"
)

// All returns a channel that is closed either when each of the channels is read from or the passed
// context is canceled. All is useful for implementing graceful shutdown of a number of Sends or
// other running goroutines that indicate their state via a returned read only channel. The graceful
// shutdown can be circumvented via the context passed to All to ensure shutdowns will not deadlock.
func All(ctx context.Context, done ...<-chan struct{}) <-chan struct{} {
var (
shutdown = make(chan struct{})
wg sync.WaitGroup
)

wg.Add(len(done))
for _, ch := range done {
go func() {
defer wg.Done()

select {
case <-ctx.Done():
case <-ch:
}
}()
}

go func() {
defer close(shutdown)

wg.Wait()
}()

return shutdown
}
96 changes: 96 additions & 0 deletions pkg/isclosed/isclosed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package isclosed

import (
"context"
"testing"
"time"
)

var maxTestTimeout = 3 * time.Second

func TestAll_DoneAfterAllClose(t *testing.T) {
var (
ctx = context.Background()
a, b, c = make(chan struct{}), make(chan struct{}), make(chan struct{})
done = All(ctx, a, b, c)
)

close(a)
close(b)
close(c) // close all channels
eventually(t, done, maxTestTimeout)
}

func TestAll_DoneAfterCtxCancel(t *testing.T) {
var (
ctx, cancel = context.WithCancel(context.Background())
a, b, c = make(chan struct{}), make(chan struct{}), make(chan struct{})
done = All(ctx, a, b, c)
)

close(a)
close(b)
cancel() // c is never closed, but context is canceled
eventually(t, done, maxTestTimeout)
}

func TestAll_DoneAfterCtxCancelWithNilChannels(t *testing.T) {
var (
ctx, cancel = context.WithCancel(context.Background())
done = All(ctx, nil, nil, nil)
)

cancel()
eventually(t, done, maxTestTimeout)
}

// TestAll_DoneNonBlocking verifies that if all the input channels close, the All function's returned
// channel should also close regardless of the order of the input channel arguments.
func TestAll_DoneNonBlocking(t *testing.T) {
var (
ctx = context.Background()
a, b, c = make(chan struct{}), make(chan struct{}), make(chan struct{})
start = make(chan struct{})

// done is only closed once all three input channels are closed as the context is never
// cancelled.
done = All(ctx, c, b, a)
)

// By default channel a closes with no dependencies.
go func() {
<-start
close(a)
}()

// Only close channel b after channel a is closed.
go func() {
<-a
close(b)
}()

// Only close the c channel after both channel b and channel a are closed.
go func() {
<-a
<-b
close(c)
}()

// Start the closing of the channels once all waiting routines are running.
close(start)

// Require that the done channel is eventually closed without context cancellation even with
// dependencies on closing between various channels as long as there is no deadlock state.
eventually(t, done, maxTestTimeout)
}

// eventually blocks until done is closed or d time duration passes.
func eventually(t *testing.T, done <-chan struct{}, d time.Duration) {
t.Helper()

select {
case <-done:
case <-time.After(d):
t.Fatal("timed out waiting for done to close")
}
}
46 changes: 35 additions & 11 deletions pkg/timelock/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/rs/zerolog"

"github.com/smartcontractkit/timelock-worker/pkg/isclosed"
"github.com/smartcontractkit/timelock-worker/pkg/timelock/contract"
)

Expand Down Expand Up @@ -122,7 +124,6 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st
// It handles the retrieval of old and new events, contexts and cancellations.
func (tw *Worker) Listen() error {
ctxwc, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

// Log timelock-worker configuration.
tw.startLog()
Expand Down Expand Up @@ -154,16 +155,18 @@ func (tw *Worker) Listen() error {
select {
case <-ctxwc.Done():
case <-processingDone:
cancel()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call cancel here to stop the scheduling routine

}

tw.logger.Info().Msg("shutting down timelock-worker")
tw.logger.Info().Msg("dumping operation store")
tw.dumpOperationStore(time.Now)

// Wait for all goroutines to finish.
<-historyDone
<-newDone
<-schedulingDone
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a new context as the previous one is either done or was canceled

defer cancel()

<-isclosed.All(shutdownCtx, schedulingDone, historyDone, newDone, processingDone)

return nil
}
Expand Down Expand Up @@ -273,26 +276,47 @@ func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{},
// processLogs is implemented as a fan-in for all the logs channels, merging all the data and handling logs sequentially.
// This function is thread safe.
func (tw *Worker) processLogs(ctx context.Context, oldLog, newLog <-chan types.Log) <-chan struct{} {
done := make(chan struct{})
var (
done, newDone, oldDone = make(chan struct{}), make(chan struct{}), make(chan struct{})
ctxwc, cancel = context.WithCancel(ctx)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a child context we can cancel when both log channels are closed

)

// Cancel the context and shutdown the processing routine if no more logs are available.
go func() {
defer cancel()
<-isclosed.All(ctxwc, oldDone, newDone)
}()

// This is the goroutine watching over the subscribed and historical logs.
go func() {
defer close(done)

for {
select {
case log := <-newLog:
if err := tw.handleLog(ctx, log); err != nil {
case log, open := <-newLog:
if !open {
close(newDone)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if newLog is closed, signal new logs are done and set the channel to nil so this branch never is selected again

newLog = nil
continue
}

if err := tw.handleLog(ctxwc, log); err != nil {
tw.logger.Error().Msgf("error processing new log: %v\n", log)
}

case log := <-oldLog:
if err := tw.handleLog(ctx, log); err != nil {
case log, open := <-oldLog:
if !open {
close(oldDone)
oldLog = nil
continue
}

if err := tw.handleLog(ctxwc, log); err != nil {
tw.logger.Error().Msgf("error processing historical log: %v\n", log)
}

case <-ctx.Done():
tw.logger.Info().Msgf("received OS signal")
case <-ctxwc.Done():
tw.logger.Info().Msgf("cancelled processing logs")
SetReadyStatus(HealthStatusError)
return
}
Expand Down
Loading