Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fatal errors #1811

Merged
merged 16 commits into from
Aug 30, 2024
Merged
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ linters:
# - errorlint
- exhaustive
# - exhaustivestruct
- exportloopref
- copyloopvar
# - forbidigo
# - forcetypeassert
# - funlen
Expand Down
1 change: 0 additions & 1 deletion pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func TestService_Check(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().GetKeys(gomock.Any(), gomock.Any()).Return(nil, nil)
Expand Down
41 changes: 41 additions & 0 deletions pkg/foundation/cerrors/fatal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cerrors

// FatalError is an error type that will diferentiate these from other errors that could be retried.
raulb marked this conversation as resolved.
Show resolved Hide resolved
type FatalError struct {
raulb marked this conversation as resolved.
Show resolved Hide resolved
Err error
}

// NewFatalError creates a new FatalError.
func NewFatalError(err error) *FatalError {
raulb marked this conversation as resolved.
Show resolved Hide resolved
return &FatalError{Err: err}
}

// Unwrap returns the wrapped error.
func (f *FatalError) Unwrap() error {
return f.Err
}

// Error returns the error message.
func (f *FatalError) Error() string {
return f.Err.Error()
raulb marked this conversation as resolved.
Show resolved Hide resolved
}

// IsFatalError checks if the error is a FatalError.
func IsFatalError(err error) bool {
_, ok := err.(*FatalError)
raulb marked this conversation as resolved.
Show resolved Hide resolved
return ok
}
78 changes: 78 additions & 0 deletions pkg/foundation/cerrors/fatal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cerrors_test

import (
"testing"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
)

func TestNewFatalError(t *testing.T) {
err := cerrors.New("test error")
fatalErr := cerrors.NewFatalError(err)

if fatalErr.Error() != err.Error() {
raulb marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("expected error message to be %s, got %s", err.Error(), fatalErr.Error())
}
}

func TestIsFatalError(t *testing.T) {
err := cerrors.New("test error")

testCases := []struct {
name string
err error
want bool
}{
{
name: "FatalError",
err: cerrors.NewFatalError(err),
want: true,
},
{
name: "No Fatal Error",
err: cerrors.New("test error"),
raulb marked this conversation as resolved.
Show resolved Hide resolved
want: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := cerrors.IsFatalError(tc.err)
if got != tc.want {
raulb marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("IsFatalError(%v) = %v; want %v", tc.err, got, tc.want)
}
})
}
}

func TestUnwrap(t *testing.T) {
err := cerrors.New("test error")
fatalErr := cerrors.NewFatalError(err)
raulb marked this conversation as resolved.
Show resolved Hide resolved

if cerrors.Unwrap(fatalErr) != err {
t.Errorf("expected error to unwrap to %s, got %s", err.Error(), cerrors.Unwrap(fatalErr).Error())
}
}

func TestFatalError(t *testing.T) {
err := cerrors.New("test error")
fatalErr := cerrors.NewFatalError(err)

if fatalErr.Error() != err.Error() {
t.Errorf("expected error message to be %s, got %s", err.Error(), fatalErr.Error())
}
}
1 change: 0 additions & 1 deletion pkg/pipeline/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func TestService_Check(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/stream/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err erro

ok := n.window.Nack()
if !ok {
return cerrors.Errorf(
return cerrors.NewFatalError(cerrors.Errorf(
"DLQ nack threshold exceeded (%d/%d), original error: %w",
n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason,
)
))
}

defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
case sdk.ErrorRecord:
err = msg.Nack(v.Error, n.ID())
if err != nil {
return cerrors.Errorf("error executing processor: %w", err)
return cerrors.NewFatalError(cerrors.Errorf("error executing processor: %w", err))
}
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestService_Check(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)
Expand Down