Skip to content

Commit

Permalink
Pipeline the Pipeline (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzaffi committed Aug 21, 2023
1 parent 1816271 commit 0095fc9
Show file tree
Hide file tree
Showing 9 changed files with 966 additions and 151 deletions.
75 changes: 74 additions & 1 deletion conduit/pipeline/common.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,83 @@
package pipeline

import log "github.com/sirupsen/logrus"
import (
"context"
"fmt"
"time"

log "github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/data"
)

// HandlePanic function to log panics in a common way
func HandlePanic(logger *log.Logger) {
if r := recover(); r != nil {
logger.Panicf("conduit pipeline experienced a panic: %v", r)
}
}

type empty struct{}

type pluginInput interface {
uint64 | data.BlockData | string
}

type pluginOutput interface {
pluginInput | empty
}

// Retries is a wrapper for retrying a function call f() with a cancellation context,
// a delay and a max retry count. It attempts to call the wrapped function at least once
// and only after the first attempt will pay attention to a context cancellation.
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish
// the round with at least one attempt for each pipeline component.
// - Retry behavior is configured by p.cfg.retryCount.
// - when 0, the function will retry forever or until the context is canceled
// - when > 0, the function will retry p.cfg.retryCount times before giving up
//
// - Upon success:
// - a nil error is returned even if there were intermediate failures
// - the returned duration dur measures the time spent in the call to f() that succeeded
//
// - Upon failure:
// - the return value y is the zero value of type Y and a non-nil error is returned
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries
// - when p.cfg.retryCount == 0, the error will be the last error encountered
// - the returned duration dur is the total time spent in the function, including retries
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
// the first time through, we don't sleep or mind ctx's done signal
if i > 0 {
select {
case <-p.ctx.Done():
dur = time.Since(start)
err = fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, context.Cause(p.ctx))
return
default:
time.Sleep(p.cfg.RetryDelay)
}
}
opStart := time.Now()
y, err = f(x)
if err == nil {
dur = time.Since(opStart)
return
}
p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err)
}

dur = time.Since(start)
err = fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err)
return
}

// RetriesNoOutput applies the same logic as Retries, but for functions that return no output.
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(x X) (empty, error) {
return empty{}, f(x)
}, a, p, msg)
return d, err
}
213 changes: 213 additions & 0 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package pipeline

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/algorand/conduit/conduit/data"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

// TestRetries tests the retry logic
func TestRetries(t *testing.T) {
errSentinelCause := errors.New("succeed after has failed")

succeedAfterFactory := func(succeedAfter uint64, never bool) func(uint64) (uint64, error) {
tries := uint64(0)

return func(x uint64) (uint64, error) {
if tries >= succeedAfter && !never {
// ensure not to return the zero value on success
return tries + 1, nil
}
tries++
return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1)
}
}

succeedAfterFactoryNoOutput := func(succeedAfter uint64, never bool) func(uint64) error {
tries := uint64(0)

return func(x uint64) error {
if tries >= succeedAfter && !never {
return nil
}
tries++
return fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1)
}
}

cases := []struct {
name string
retryCount uint64
succeedAfter uint64
neverSucceed bool // neverSucceed trumps succeedAfter
}{
{
name: "retry forever succeeds after 0",
retryCount: 0,
succeedAfter: 0,
neverSucceed: false,
},
{
name: "retry forever succeeds after 1",
retryCount: 0,
succeedAfter: 1,
neverSucceed: false,
},
{
name: "retry forever succeeds after 7",
retryCount: 0,
succeedAfter: 7,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 0",
retryCount: 5,
succeedAfter: 0,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 1",
retryCount: 5,
succeedAfter: 1,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 5",
retryCount: 5,
succeedAfter: 5,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 7",
retryCount: 5,
succeedAfter: 7,
neverSucceed: false,
},
{
name: "retry 5 never succeeds",
retryCount: 5,
succeedAfter: 0,
neverSucceed: true,
},
{
name: "retry foerever never succeeds",
retryCount: 0,
succeedAfter: 0,
neverSucceed: true,
},
}

for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfter := succeedAfterFactory(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
y := <-yChan
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

// note we subtract 1 from y below because succeedAfter has added 1 to its output
// to disambiguate with the zero value which occurs on failure
require.Equal(t, tc.succeedAfter, y-1, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
require.Equal(t, tc.succeedAfter, y-1, tc.name)
}
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfterNoOutput := succeedAfterFactoryNoOutput(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
}
}
})
}
}
Loading

0 comments on commit 0095fc9

Please sign in to comment.