From 5a1062c45e59a65db4863e29d8192c04461450ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 28 Aug 2024 13:27:18 +0200 Subject: [PATCH] feat: add fatal errors --- pkg/foundation/cerrors/fatal.go | 50 ++++++++++++++++++++++++++++++++ pkg/pipeline/stream/dlq.go | 4 +-- pkg/pipeline/stream/processor.go | 2 +- 3 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 pkg/foundation/cerrors/fatal.go diff --git a/pkg/foundation/cerrors/fatal.go b/pkg/foundation/cerrors/fatal.go new file mode 100644 index 000000000..87de86792 --- /dev/null +++ b/pkg/foundation/cerrors/fatal.go @@ -0,0 +1,50 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package cerrors contains functions related to error handling. +// +// The standard library's errors package is missing some functionality which we need, +// such as stack traces. To be certain that all errors created in Conduit are created +// with the additional information, usage of this package is mandatory. +// +// At present, the package acts as a "thin forwarding layer", where we "mix and match" +// functions from different packages. + +package cerrors + +// FatalError is an error type that will diferentiate these from other errors that could be retried. +type FatalError struct { + Err error +} + +// NewFatalError creates a new FatalError. +func NewFatalError(err error) *FatalError { + return &FatalError{Err: err} +} + +// Unwrap returns the wrapped error. +func (f *FatalError) Unwrap() error { + return f.Err +} + +// Error returns the error message. +func (f *FatalError) Error() string { + return f.Err.Error() +} + +// IsFatalError checks if the error is a FatalError. +func IsFatalError(err error) bool { + _, ok := err.(*FatalError) + return ok +} diff --git a/pkg/pipeline/stream/dlq.go b/pkg/pipeline/stream/dlq.go index f1aa4a350..d480ed402 100644 --- a/pkg/pipeline/stream/dlq.go +++ b/pkg/pipeline/stream/dlq.go @@ -146,10 +146,10 @@ func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err erro ok := n.window.Nack() if !ok { - return cerrors.Errorf( + return cerrors.NewFatalError(cerrors.Errorf( "DLQ nack threshold exceeded (%d/%d), original error: %w", n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason, - ) + )) } defer func() { diff --git a/pkg/pipeline/stream/processor.go b/pkg/pipeline/stream/processor.go index 8079b8d28..81ac298aa 100644 --- a/pkg/pipeline/stream/processor.go +++ b/pkg/pipeline/stream/processor.go @@ -112,7 +112,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error { case sdk.ErrorRecord: err = msg.Nack(v.Error, n.ID()) if err != nil { - return cerrors.Errorf("error executing processor: %w", err) + return cerrors.NewFatalError(cerrors.Errorf("error executing processor: %w", err)) } } }