Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error buffer util #8620

Merged
merged 3 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import (
"sync/atomic"
"time"

"errors"

cryptop2p "github.com/libp2p/go-libp2p-core/crypto"
"golang.org/x/exp/constraints"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/robfig/cron/v3"
uuid "github.com/satori/go.uuid"
"golang.org/x/crypto/bcrypt"
Expand Down Expand Up @@ -118,7 +120,7 @@ func NewSecret(n int) string {
b := make([]byte, n)
_, err := rand.Read(b)
if err != nil {
panic(errors.Wrap(err, "generating secret failed"))
panic(pkgerrors.Wrap(err, "generating secret failed"))
}
return base64.StdEncoding.EncodeToString(b)
}
Expand Down Expand Up @@ -296,7 +298,7 @@ func Sha256(in string) (string, error) {
hasher := sha3.New256()
_, err := hasher.Write([]byte(in))
if err != nil {
return "", errors.Wrap(err, "sha256 write error")
return "", pkgerrors.Wrap(err, "sha256 write error")
}
return hex.EncodeToString(hasher.Sum(nil)), nil
}
Expand Down Expand Up @@ -374,7 +376,7 @@ func CheckUint256(n *big.Int) error {
func HexToUint256(s string) (*big.Int, error) {
rawNum, err := hexutil.Decode(s)
if err != nil {
return nil, errors.Wrapf(err, "while parsing %s as hex: ", s)
return nil, pkgerrors.Wrapf(err, "while parsing %s as hex: ", s)
}
rv := big.NewInt(0).SetBytes(rawNum) // can't be negative number
if err := CheckUint256(rv); err != nil {
Expand Down Expand Up @@ -625,7 +627,7 @@ func (q *BoundedPriorityQueue[T]) Empty() bool {
// }
func WrapIfError(err *error, msg string) {
if *err != nil {
*err = errors.Wrap(*err, msg)
*err = pkgerrors.Wrap(*err, msg)
}
}

Expand Down Expand Up @@ -744,7 +746,7 @@ func ValidateCronSchedule(schedule string) error {
}
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
_, err := parser.Parse(schedule)
return errors.Wrapf(err, "invalid cron schedule '%v'", schedule)
return pkgerrors.Wrapf(err, "invalid cron schedule '%v'", schedule)
}

// ResettableTimer stores a timer
Expand Down Expand Up @@ -862,7 +864,7 @@ func (once *StartStopOnce) StartOnce(name string, fn func() error) error {
success := once.state.CompareAndSwap(int32(StartStopOnce_Unstarted), int32(StartStopOnce_Starting))

if !success {
return errors.Errorf("%v has already been started once; state=%v", name, StartStopOnceState(once.state.Load()))
return pkgerrors.Errorf("%v has already been started once; state=%v", name, StartStopOnceState(once.state.Load()))
}

once.Lock()
Expand Down Expand Up @@ -899,11 +901,11 @@ func (once *StartStopOnce) StopOnce(name string, fn func() error) error {
state := once.state.Load()
switch state {
case int32(StartStopOnce_Stopped):
return errors.Wrapf(ErrAlreadyStopped, "%s has already been stopped", name)
return pkgerrors.Wrapf(ErrAlreadyStopped, "%s has already been stopped", name)
case int32(StartStopOnce_Unstarted):
return errors.Wrapf(ErrCannotStopUnstarted, "%s has not been started", name)
return pkgerrors.Wrapf(ErrCannotStopUnstarted, "%s has not been started", name)
default:
return errors.Errorf("%v cannot be stopped from this state; state=%v", name, StartStopOnceState(state))
return pkgerrors.Errorf("%v cannot be stopped from this state; state=%v", name, StartStopOnceState(state))
}
}

Expand Down Expand Up @@ -1112,3 +1114,46 @@ func MinKey[U any, T constraints.Ordered](elems []U, key func(U) T) T {

return min
}

// ErrorBuffer uses joinedErrors interface to join multiple errors into a single error.
// This is useful to track the most recent N errors in a service and flush them as a single error.

type ErrorBuffer struct {
// buffer is a slice of errors
buffer []error
// Cap is the maximum number of errors that the buffer can hold.
// Exceeding the cap results in discarding the oldest error
Cap int

sync.RWMutex
}

func (eb *ErrorBuffer) Flush() (err error) {
eb.RLock()
defer eb.RUnlock()
err = errors.Join(eb.buffer...)
eb.buffer = nil
return
}

func (eb *ErrorBuffer) Append(incoming error) {
eb.Lock()
defer eb.Unlock()
// if at capacity, drop first element
if len(eb.buffer) == eb.Cap && eb.Cap != 0 {
eb.buffer = append(eb.buffer[1:], incoming)
return
}
eb.buffer = append(eb.buffer, incoming)
krehermann marked this conversation as resolved.
Show resolved Hide resolved
}

// UnwrapError returns a list of underlying errors if passed error implements joinedError or return the err in a single-element list otherwise.
//
//nolint:errorlint // error type checks will fail on wrapped errors. Disabled since we are not doing checks on error types.
func UnwrapError(err error) []error {
joined, ok := err.(interface{ Unwrap() []error })
if !ok {
return []error{err}
}
return joined.Unwrap()
}
54 changes: 54 additions & 0 deletions core/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,3 +983,57 @@ func TestTryParseHex(t *testing.T) {
assert.Equal(t, []byte{0x1, 0x23}, b)
})
}

func TestErrorBuffer(t *testing.T) {
t.Parallel()

err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")

t.Run("happy path", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{}
buff.Append(err1)
buff.Append(err2)
combined := buff.Flush()
errs := utils.UnwrapError(combined)
assert.Equal(t, 2, len(errs))
assert.Equal(t, err1.Error(), errs[0].Error())
assert.Equal(t, err2.Error(), errs[1].Error())
})

t.Run("ovewrite oldest error when cap exceeded", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{Cap: 2}
buff.Append(err1)
buff.Append(err2)
buff.Append(err3)
combined := buff.Flush()
errs := utils.UnwrapError(combined)
assert.Equal(t, 2, len(errs))
assert.Equal(t, err2.Error(), errs[0].Error())
assert.Equal(t, err3.Error(), errs[1].Error())
})

t.Run("does not overwrite the buffer if cap == 0", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{}
for i := 1; i <= 20; i++ {
buff.Append(errors.Errorf("err#%d", i))
}

combined := buff.Flush()
errs := utils.UnwrapError(combined)
assert.Equal(t, 20, len(errs))
assert.Equal(t, "err#20", errs[19].Error())
})

t.Run("UnwrapError returns the a single element err array if passed err is not a joinedError", func(t *testing.T) {
t.Parallel()
errs := utils.UnwrapError(err1)
assert.Equal(t, 1, len(errs))
assert.Equal(t, err1.Error(), errs[0].Error())
})

}