diff --git a/.golangci.yml b/.golangci.yml index 320a3d081..b06ed5b7d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -73,7 +73,7 @@ linters: # - errorlint - exhaustive # - exhaustivestruct - - exportloopref + - copyloopvar # - forbidigo # - forcetypeassert # - funlen diff --git a/pkg/connector/service_test.go b/pkg/connector/service_test.go index 9b47af728..30b5f6268 100644 --- a/pkg/connector/service_test.go +++ b/pkg/connector/service_test.go @@ -119,7 +119,6 @@ func TestService_Check(t *testing.T) { } for _, tc := range testCases { - tc := tc t.Run(tc.name, func(t *testing.T) { is := is.New(t) db.EXPECT().GetKeys(gomock.Any(), gomock.Any()).Return(nil, nil) diff --git a/pkg/foundation/cerrors/fatal.go b/pkg/foundation/cerrors/fatal.go new file mode 100644 index 000000000..2223007b6 --- /dev/null +++ b/pkg/foundation/cerrors/fatal.go @@ -0,0 +1,48 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cerrors + +import ( + "fmt" +) + +// fatalError is an error type that will differentiate these from other errors that could be retried. +type fatalError struct { + Err error +} + +// NewFatalError creates a new fatalError. +func NewFatalError(err error) *fatalError { + return &fatalError{Err: err} +} + +// Unwrap returns the wrapped error. +func (f *fatalError) Unwrap() error { + return f.Err +} + +// Error returns the error message. +func (f *fatalError) Error() string { + if f.Err == nil { + return "" + } + return fmt.Sprintf("fatal error: %v", f.Err) +} + +// IsFatalError checks if the error is a fatalError. +func IsFatalError(err error) bool { + var fatalErr *fatalError + return As(err, &fatalErr) +} diff --git a/pkg/foundation/cerrors/fatal_test.go b/pkg/foundation/cerrors/fatal_test.go new file mode 100644 index 000000000..b3108bf12 --- /dev/null +++ b/pkg/foundation/cerrors/fatal_test.go @@ -0,0 +1,86 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cerrors_test + +import ( + "fmt" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/matryer/is" +) + +func TestNewFatalError(t *testing.T) { + is := is.New(t) + + err := cerrors.New("test error") + fatalErr := cerrors.NewFatalError(err) + wantErr := fmt.Sprintf("fatal error: %v", err) + + is.Equal(fatalErr.Error(), wantErr) +} + +func TestIsFatalError(t *testing.T) { + is := is.New(t) + err := cerrors.New("test error") + + testCases := []struct { + name string + err error + want bool + }{ + { + name: "when it's a fatalError", + err: cerrors.NewFatalError(err), + want: true, + }, + { + name: "when it's wrapped in", + err: fmt.Errorf("something went wrong: %w", cerrors.NewFatalError(cerrors.New("fatal error"))), + want: true, + }, + { + name: "when it's not a fatalError", + err: err, + want: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := cerrors.IsFatalError(tc.err) + is.Equal(got, tc.want) + }) + } +} + +func TestUnwrap(t *testing.T) { + is := is.New(t) + + err := cerrors.New("test error") + fatalErr := cerrors.NewFatalError(err) + + is.Equal(cerrors.Unwrap(fatalErr), err) +} + +func TestFatalError(t *testing.T) { + is := is.New(t) + + err := cerrors.New("test error") + fatalErr := cerrors.NewFatalError(err) + wantErr := fmt.Sprintf("fatal error: %v", err) + + is.Equal(fatalErr.Error(), wantErr) +} diff --git a/pkg/pipeline/service_test.go b/pkg/pipeline/service_test.go index 1ea79a1ad..a5a80502a 100644 --- a/pkg/pipeline/service_test.go +++ b/pkg/pipeline/service_test.go @@ -77,7 +77,6 @@ func TestService_Check(t *testing.T) { } for _, tc := range testCases { - tc := tc t.Run(tc.name, func(t *testing.T) { is := is.New(t) db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr) diff --git a/pkg/pipeline/stream/dlq.go b/pkg/pipeline/stream/dlq.go index f1aa4a350..d480ed402 100644 --- a/pkg/pipeline/stream/dlq.go +++ b/pkg/pipeline/stream/dlq.go @@ -146,10 +146,10 @@ func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err erro ok := n.window.Nack() if !ok { - return cerrors.Errorf( + return cerrors.NewFatalError(cerrors.Errorf( "DLQ nack threshold exceeded (%d/%d), original error: %w", n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason, - ) + )) } defer func() { diff --git a/pkg/pipeline/stream/processor.go b/pkg/pipeline/stream/processor.go index 8079b8d28..81ac298aa 100644 --- a/pkg/pipeline/stream/processor.go +++ b/pkg/pipeline/stream/processor.go @@ -112,7 +112,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error { case sdk.ErrorRecord: err = msg.Nack(v.Error, n.ID()) if err != nil { - return cerrors.Errorf("error executing processor: %w", err) + return cerrors.NewFatalError(cerrors.Errorf("error executing processor: %w", err)) } } } diff --git a/pkg/processor/service_test.go b/pkg/processor/service_test.go index cc2e69220..f1eb59382 100644 --- a/pkg/processor/service_test.go +++ b/pkg/processor/service_test.go @@ -87,7 +87,6 @@ func TestService_Check(t *testing.T) { } for _, tc := range testCases { - tc := tc t.Run(tc.name, func(t *testing.T) { is := is.New(t) db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)