Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fatal errors #1811

Merged
merged 16 commits into from
Aug 30, 2024
Merged
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ linters:
# - errorlint
- exhaustive
# - exhaustivestruct
- exportloopref
- copyloopvar
# - forbidigo
# - forcetypeassert
# - funlen
Expand Down
1 change: 0 additions & 1 deletion pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func TestService_Check(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().GetKeys(gomock.Any(), gomock.Any()).Return(nil, nil)
Expand Down
51 changes: 51 additions & 0 deletions pkg/foundation/cerrors/fatal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cerrors

import (
"fmt"
)

// FatalError is an error type that will differentiate these from other errors that could be retried.
type FatalError struct {
raulb marked this conversation as resolved.
Show resolved Hide resolved
Err error
}

// NewFatalError creates a new FatalError.
func NewFatalError(err error) *FatalError {
raulb marked this conversation as resolved.
Show resolved Hide resolved
return &FatalError{Err: err}
}

// Unwrap returns the wrapped error.
func (f *FatalError) Unwrap() error {
if f == nil {
raulb marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return f.Err
}

// Error returns the error message.
func (f *FatalError) Error() string {
if f.Err == nil {
return ""
}
return fmt.Sprintf("fatal error: %v", f.Err)
}

// IsFatalError checks if the error is a FatalError.
func IsFatalError(err error) bool {
var fatalErr *FatalError
return As(err, &fatalErr)
}
86 changes: 86 additions & 0 deletions pkg/foundation/cerrors/fatal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cerrors_test

import (
"fmt"
"testing"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/matryer/is"
)

func TestNewFatalError(t *testing.T) {
is := is.New(t)

err := cerrors.New("test error")
fatalErr := cerrors.NewFatalError(err)
wantErr := fmt.Sprintf("fatal error: %v", err)

is.Equal(fatalErr.Error(), wantErr)
}

func TestIsFatalError(t *testing.T) {
is := is.New(t)
err := cerrors.New("test error")

testCases := []struct {
name string
err error
want bool
}{
{
name: "when it's a FatalError",
err: cerrors.NewFatalError(err),
want: true,
},
{
name: "when it's wrapped in",
err: fmt.Errorf("something went wrong: %w", cerrors.NewFatalError(cerrors.New("fatal error"))),
want: true,
},
{
name: "when it's not a FatalError",
err: err,
want: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := cerrors.IsFatalError(tc.err)
is.Equal(got, tc.want)
})
}
}

func TestUnwrap(t *testing.T) {
is := is.New(t)

err := cerrors.New("test error")
fatalErr := cerrors.NewFatalError(err)
raulb marked this conversation as resolved.
Show resolved Hide resolved

is.Equal(cerrors.Unwrap(fatalErr), err)
}

func TestFatalError(t *testing.T) {
is := is.New(t)

err := cerrors.New("test error")
fatalErr := cerrors.NewFatalError(err)
wantErr := fmt.Sprintf("fatal error: %v", err)

is.Equal(fatalErr.Error(), wantErr)
}
1 change: 0 additions & 1 deletion pkg/pipeline/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func TestService_Check(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/stream/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func (n *DLQHandlerNode) Nack(msg *Message, nackMetadata NackMetadata) (err erro

ok := n.window.Nack()
if !ok {
return cerrors.Errorf(
return cerrors.NewFatalError(cerrors.Errorf(
"DLQ nack threshold exceeded (%d/%d), original error: %w",
n.WindowNackThreshold, n.WindowSize, nackMetadata.Reason,
)
))
}

defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
case sdk.ErrorRecord:
err = msg.Nack(v.Error, n.ID())
if err != nil {
return cerrors.Errorf("error executing processor: %w", err)
return cerrors.NewFatalError(cerrors.Errorf("error executing processor: %w", err))
}
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestService_Check(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)
Expand Down