Skip to content

Commit

Permalink
feat: add fatal errors
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Aug 28, 2024
1 parent f54797d commit 534d1c4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
50 changes: 50 additions & 0 deletions pkg/foundation/cerrors/fatal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cerrors contains functions related to error handling.
//
// The standard library's errors package is missing some functionality which we need,
// such as stack traces. To be certain that all errors created in Conduit are created
// with the additional information, usage of this package is mandatory.
//
// At present, the package acts as a "thin forwarding layer", where we "mix and match"
// functions from different packages.

package cerrors

// FatalError is an error type that will diferentiate these from other errors that could be retried.
type FatalError struct {
Err error
}

// NewFatalError creates a new FatalError.
func NewFatalError(err error) *FatalError {
return &FatalError{Err: err}
}

// Unwrap returns the wrapped error.
func (f *FatalError) Unwrap() error {
return f.Err
}

// Error returns the error message.
func (f *FatalError) Error() string {
return f.Err.Error()
}

// IsFatalError checks if the error is a FatalError.
func IsFatalError(err error) bool {
_, ok := err.(*FatalError)
return ok
}
4 changes: 2 additions & 2 deletions pkg/pipeline/stream/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err erro

ok := n.window.Nack()
if !ok {
return cerrors.Errorf(
return cerrors.NewFatalError(cerrors.Errorf(
"DLQ nack threshold exceeded (%d/%d), original error: %w",
n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason,
)
))
}

defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
case sdk.ErrorRecord:
err = msg.Nack(v.Error, n.ID())
if err != nil {
return cerrors.Errorf("error executing processor: %w", err)
return cerrors.NewFatalError(cerrors.Errorf("error executing processor: %w", err))
}
}
}
Expand Down

0 comments on commit 534d1c4

Please sign in to comment.