Skip to content

Commit

Permalink
Add error buffer util (#8620)
Browse files Browse the repository at this point in the history
  • Loading branch information
essamhassan authored Mar 6, 2023
1 parent e4a2ab7 commit f07b8cc
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 10 deletions.
65 changes: 55 additions & 10 deletions core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import (
"sync/atomic"
"time"

"errors"

cryptop2p "github.com/libp2p/go-libp2p-core/crypto"
"golang.org/x/exp/constraints"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/robfig/cron/v3"
uuid "github.com/satori/go.uuid"
"golang.org/x/crypto/bcrypt"
Expand Down Expand Up @@ -118,7 +120,7 @@ func NewSecret(n int) string {
b := make([]byte, n)
_, err := rand.Read(b)
if err != nil {
panic(errors.Wrap(err, "generating secret failed"))
panic(pkgerrors.Wrap(err, "generating secret failed"))
}
return base64.StdEncoding.EncodeToString(b)
}
Expand Down Expand Up @@ -296,7 +298,7 @@ func Sha256(in string) (string, error) {
hasher := sha3.New256()
_, err := hasher.Write([]byte(in))
if err != nil {
return "", errors.Wrap(err, "sha256 write error")
return "", pkgerrors.Wrap(err, "sha256 write error")
}
return hex.EncodeToString(hasher.Sum(nil)), nil
}
Expand Down Expand Up @@ -374,7 +376,7 @@ func CheckUint256(n *big.Int) error {
func HexToUint256(s string) (*big.Int, error) {
rawNum, err := hexutil.Decode(s)
if err != nil {
return nil, errors.Wrapf(err, "while parsing %s as hex: ", s)
return nil, pkgerrors.Wrapf(err, "while parsing %s as hex: ", s)
}
rv := big.NewInt(0).SetBytes(rawNum) // can't be negative number
if err := CheckUint256(rv); err != nil {
Expand Down Expand Up @@ -625,7 +627,7 @@ func (q *BoundedPriorityQueue[T]) Empty() bool {
// }
func WrapIfError(err *error, msg string) {
if *err != nil {
*err = errors.Wrap(*err, msg)
*err = pkgerrors.Wrap(*err, msg)
}
}

Expand Down Expand Up @@ -744,7 +746,7 @@ func ValidateCronSchedule(schedule string) error {
}
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
_, err := parser.Parse(schedule)
return errors.Wrapf(err, "invalid cron schedule '%v'", schedule)
return pkgerrors.Wrapf(err, "invalid cron schedule '%v'", schedule)
}

// ResettableTimer stores a timer
Expand Down Expand Up @@ -862,7 +864,7 @@ func (once *StartStopOnce) StartOnce(name string, fn func() error) error {
success := once.state.CompareAndSwap(int32(StartStopOnce_Unstarted), int32(StartStopOnce_Starting))

if !success {
return errors.Errorf("%v has already been started once; state=%v", name, StartStopOnceState(once.state.Load()))
return pkgerrors.Errorf("%v has already been started once; state=%v", name, StartStopOnceState(once.state.Load()))
}

once.Lock()
Expand Down Expand Up @@ -899,11 +901,11 @@ func (once *StartStopOnce) StopOnce(name string, fn func() error) error {
state := once.state.Load()
switch state {
case int32(StartStopOnce_Stopped):
return errors.Wrapf(ErrAlreadyStopped, "%s has already been stopped", name)
return pkgerrors.Wrapf(ErrAlreadyStopped, "%s has already been stopped", name)
case int32(StartStopOnce_Unstarted):
return errors.Wrapf(ErrCannotStopUnstarted, "%s has not been started", name)
return pkgerrors.Wrapf(ErrCannotStopUnstarted, "%s has not been started", name)
default:
return errors.Errorf("%v cannot be stopped from this state; state=%v", name, StartStopOnceState(state))
return pkgerrors.Errorf("%v cannot be stopped from this state; state=%v", name, StartStopOnceState(state))
}
}

Expand Down Expand Up @@ -1112,3 +1114,46 @@ func MinKey[U any, T constraints.Ordered](elems []U, key func(U) T) T {

return min
}

// ErrorBuffer uses joinedErrors interface to join multiple errors into a single error.
// This is useful to track the most recent N errors in a service and flush them as a single error.

type ErrorBuffer struct {
// buffer is a slice of errors
buffer []error
// Cap is the maximum number of errors that the buffer can hold.
// Exceeding the cap results in discarding the oldest error
Cap int

sync.RWMutex
}

func (eb *ErrorBuffer) Flush() (err error) {
eb.RLock()
defer eb.RUnlock()
err = errors.Join(eb.buffer...)
eb.buffer = nil
return
}

func (eb *ErrorBuffer) Append(incoming error) {
eb.Lock()
defer eb.Unlock()
// if at capacity, drop first element
if len(eb.buffer) == eb.Cap && eb.Cap != 0 {
eb.buffer = append(eb.buffer[1:], incoming)
return
}
eb.buffer = append(eb.buffer, incoming)
}

// UnwrapError returns a list of underlying errors if passed error implements joinedError or return the err in a single-element list otherwise.
//
//nolint:errorlint // error type checks will fail on wrapped errors. Disabled since we are not doing checks on error types.
func UnwrapError(err error) []error {
joined, ok := err.(interface{ Unwrap() []error })
if !ok {
return []error{err}
}
return joined.Unwrap()
}
54 changes: 54 additions & 0 deletions core/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,3 +983,57 @@ func TestTryParseHex(t *testing.T) {
assert.Equal(t, []byte{0x1, 0x23}, b)
})
}

func TestErrorBuffer(t *testing.T) {
t.Parallel()

err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")

t.Run("happy path", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{}
buff.Append(err1)
buff.Append(err2)
combined := buff.Flush()
errs := utils.UnwrapError(combined)
assert.Equal(t, 2, len(errs))
assert.Equal(t, err1.Error(), errs[0].Error())
assert.Equal(t, err2.Error(), errs[1].Error())
})

t.Run("ovewrite oldest error when cap exceeded", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{Cap: 2}
buff.Append(err1)
buff.Append(err2)
buff.Append(err3)
combined := buff.Flush()
errs := utils.UnwrapError(combined)
assert.Equal(t, 2, len(errs))
assert.Equal(t, err2.Error(), errs[0].Error())
assert.Equal(t, err3.Error(), errs[1].Error())
})

t.Run("does not overwrite the buffer if cap == 0", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{}
for i := 1; i <= 20; i++ {
buff.Append(errors.Errorf("err#%d", i))
}

combined := buff.Flush()
errs := utils.UnwrapError(combined)
assert.Equal(t, 20, len(errs))
assert.Equal(t, "err#20", errs[19].Error())
})

t.Run("UnwrapError returns the a single element err array if passed err is not a joinedError", func(t *testing.T) {
t.Parallel()
errs := utils.UnwrapError(err1)
assert.Equal(t, 1, len(errs))
assert.Equal(t, err1.Error(), errs[0].Error())
})

}

0 comments on commit f07b8cc

Please sign in to comment.