Skip to content

Commit

Permalink
better errors and test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Dec 8, 2024
1 parent d35d183 commit 420d30d
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 54 deletions.
3 changes: 2 additions & 1 deletion cmd/cli/node/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"

"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func (n *NodeActionCmd) run(cmd *cobra.Command, args []string, api client.API) e
Message: n.message,
})
if err != nil {
util.Fatal(cmd, fmt.Errorf("could not %s node %s: %w", n.action, nodeID, err), 1)
return bacerrors.Wrap(err, "failed to %s node %s", n.action, nodeID)
}

if response.Success {
Expand Down
13 changes: 7 additions & 6 deletions cmd/cli/node/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func (s *NodeActionSuite) TestListNodes() {
nodeID := cells[0]

// Try to approve, expect failure
_, out, err = s.ExecuteTestCobraCommand(
_, _, err = s.ExecuteTestCobraCommand(
"node",
"approve",
nodeID,
)
s.Require().NoError(err)
s.Require().Contains(out, "node already approved")
s.Require().Contains(out, nodeID)
s.Require().Error(err)
s.Require().ErrorContains(err, "already approved")
s.Require().ErrorContains(err, nodeID)

// Now reject the node
_, out, err = s.ExecuteTestCobraCommand(
Expand All @@ -65,8 +65,9 @@ func (s *NodeActionSuite) TestListNodes() {
"reject",
nodeID,
)
s.Require().NoError(err)
s.Require().Contains(out, "node already rejected")
s.Require().Error(err)
s.Require().ErrorContains(err, "already rejected")
s.Require().ErrorContains(err, nodeID)

// Set it to approve again
_, out, err = s.ExecuteTestCobraCommand(
Expand Down
74 changes: 55 additions & 19 deletions pkg/orchestrator/nodes/errors.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,71 @@
package nodes

import (
"fmt"
"net/http"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
)

const errComponent = "NodesManager"

const (
MultipleNodesFound bacerrors.ErrorCode = "MultipleNodesFound"
ConflictNodeState bacerrors.ErrorCode = "ConflictNodeState"
HandshakeRequired bacerrors.ErrorCode = "HandshakeRequired"
ConcurrentUpdate bacerrors.ErrorCode = "ConcurrentUpdate"
)

// ErrNodeNotFound is returned when nodeInfo was not found for a requested node id
type ErrNodeNotFound struct {
nodeID string
// NewErrNodeNotFound returns a standardized error for when a node is not found
func NewErrNodeNotFound(nodeID string) bacerrors.Error {
return bacerrors.New("node not found: %s", nodeID).
WithCode(bacerrors.NotFoundError).
WithComponent(errComponent)
}

func NewErrNodeNotFound(nodeID string) ErrNodeNotFound {
return ErrNodeNotFound{nodeID: nodeID}
// NewErrMultipleNodesFound returns a standardized error for when multiple nodes match a prefix
func NewErrMultipleNodesFound(nodeIDPrefix string, matchingNodeIDs []string) bacerrors.Error {
if len(matchingNodeIDs) > 3 {
matchingNodeIDs = matchingNodeIDs[:3]
matchingNodeIDs = append(matchingNodeIDs, "...")
}
return bacerrors.New("multiple nodes found for prefix: %s, matching IDs: %v", nodeIDPrefix, matchingNodeIDs).
WithCode(MultipleNodesFound).
WithHTTPStatusCode(http.StatusConflict).
WithComponent(errComponent).
WithHint("Use a more specific node ID prefix")
}

func (e ErrNodeNotFound) Error() string {
return fmt.Errorf("nodeInfo not found for nodeID: %s", e.nodeID).Error()
// NewErrHandshakeRequired returns a standardized error for when a handshake is required
func NewErrHandshakeRequired(nodeID string) bacerrors.Error {
return bacerrors.New("node %s not connected - handshake required", nodeID).
WithCode(HandshakeRequired).
WithComponent(errComponent).
WithHTTPStatusCode(http.StatusPreconditionRequired).
WithRetryable()
}

// ErrMultipleNodesFound is returned when multiple nodes were found for a requested node id prefix
type ErrMultipleNodesFound struct {
nodeIDPrefix string
matchingNodeIDs []string
// NewErrNodeAlreadyApproved returns a standardized error for when a node is already approved
func NewErrNodeAlreadyApproved(nodeID string) bacerrors.Error {
return bacerrors.New("node %s already approved", nodeID).
WithCode(ConflictNodeState).
WithHTTPStatusCode(http.StatusConflict).
WithComponent(errComponent)
}

func NewErrMultipleNodesFound(nodeIDPrefix string, matchingNodeIDs []string) ErrMultipleNodesFound {
if len(matchingNodeIDs) > 3 {
matchingNodeIDs = append(matchingNodeIDs[:3], "...")
}
return ErrMultipleNodesFound{nodeIDPrefix: nodeIDPrefix, matchingNodeIDs: matchingNodeIDs}
// NewErrNodeAlreadyRejected returns a standardized error for when a node is already rejected
func NewErrNodeAlreadyRejected(nodeID string) bacerrors.Error {
return bacerrors.New("node %s already rejected", nodeID).
WithCode(ConflictNodeState).
WithHTTPStatusCode(http.StatusConflict).
WithComponent(errComponent)
}

func (e ErrMultipleNodesFound) Error() string {
return fmt.Errorf("multiple nodes found for nodeID prefix: %s, matching nodeIDs: %v", e.nodeIDPrefix, e.matchingNodeIDs).Error()
// NewErrConcurrentModification returns a standardized error for concurrent update conflicts
func NewErrConcurrentModification() bacerrors.Error {
return bacerrors.New("concurrent modification detected").
WithCode(ConcurrentUpdate).
WithHTTPStatusCode(http.StatusConflict).
WithComponent(errComponent).
WithRetryable().
WithHint("Request should be retried")
}
5 changes: 2 additions & 3 deletions pkg/orchestrator/nodes/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package inmemory

import (
"context"
"errors"
"sync"
"time"

"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes"
)
Expand Down Expand Up @@ -74,8 +74,7 @@ func (r *NodeStore) GetByPrefix(ctx context.Context, prefix string) (models.Node
return state, nil
}
// return the error if it's not a node not found error
var errNotFound nodes.ErrNodeNotFound
if !errors.As(err, &errNotFound) {
if !bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError) {
return models.NodeState{}, err
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/orchestrator/nodes/inmemory/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes/inmemory"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (s *InMemoryNodeStoreSuite) Test_GetNotFound() {
ctx := context.Background()
_, err := s.store.Get(ctx, nodeIDs[0])
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))

}

Expand All @@ -80,14 +81,14 @@ func (s *InMemoryNodeStoreSuite) Test_GetByPrefix_MultipleMatches() {

_, err := s.store.GetByPrefix(ctx, "Qm")
s.Error(err)
s.IsType(nodes.ErrMultipleNodesFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, nodes.MultipleNodesFound))
}

func (s *InMemoryNodeStoreSuite) Test_GetByPrefix_NoMatch() {
ctx := context.Background()
_, err := s.store.GetByPrefix(ctx, "nonexistent")
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func (s *InMemoryNodeStoreSuite) Test_GetByPrefix_ExpiredNode() {
Expand All @@ -104,7 +105,7 @@ func (s *InMemoryNodeStoreSuite) Test_GetByPrefix_ExpiredNode() {

_, err := store.GetByPrefix(ctx, "QmdZQ7")
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func (s *InMemoryNodeStoreSuite) Test_List() {
Expand Down Expand Up @@ -210,7 +211,7 @@ func (s *InMemoryNodeStoreSuite) Test_Eviction() {
time.Sleep(ttl + 100*time.Millisecond)
_, err = s.store.Get(ctx, nodeInfo0.Info.ID())
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func generateNodeState(peerID string, engines ...string) models.NodeState {
Expand Down
2 changes: 0 additions & 2 deletions pkg/orchestrator/nodes/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
const (
// BucketNameCurrent is the bucket name for bacalhau version v1.3.1 and beyond.
BucketNameCurrent = "node_v1"
// BucketNameV0 is the bucket name for bacalhau version v1.3.0 and below.
BucketNameV0 = "nodes"
)

type NodeStoreParams struct {
Expand Down
9 changes: 5 additions & 4 deletions pkg/orchestrator/nodes/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes/kvstore"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (s *KVNodeInfoStoreSuite) Test_GetNotFound() {
ctx := context.Background()
_, err := s.store.Get(ctx, nodeIDs[0])
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func (s *KVNodeInfoStoreSuite) Test_GetByPrefix_SingleMatch() {
Expand All @@ -99,14 +100,14 @@ func (s *KVNodeInfoStoreSuite) Test_GetByPrefix_MultipleMatches() {

_, err := s.store.GetByPrefix(ctx, "Qm")
s.Error(err)
s.IsType(nodes.ErrMultipleNodesFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, nodes.MultipleNodesFound))
}

func (s *KVNodeInfoStoreSuite) Test_GetByPrefix_NoMatch_Empty() {
ctx := context.Background()
_, err := s.store.GetByPrefix(ctx, "nonexistent")
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func (s *KVNodeInfoStoreSuite) Test_GetByPrefix_NoMatch_NotEmpty() {
Expand All @@ -117,7 +118,7 @@ func (s *KVNodeInfoStoreSuite) Test_GetByPrefix_NoMatch_NotEmpty() {

_, err := s.store.GetByPrefix(ctx, "nonexistent")
s.Error(err)
s.IsType(nodes.ErrNodeNotFound{}, err)
s.True(bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func (s *KVNodeInfoStoreSuite) Test_List() {
Expand Down
22 changes: 11 additions & 11 deletions pkg/orchestrator/nodes/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package nodes

import (
"context"
"fmt"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
)
Expand Down Expand Up @@ -164,7 +163,9 @@ func (n *nodesManager) Start(ctx context.Context) error {
defer n.mu.Unlock()

if n.running {
return errors.New("node manager already running")
return bacerrors.New("node manager already running").
WithCode(bacerrors.BadRequestError).
WithComponent(errComponent)
}

// Initialize clean state
Expand Down Expand Up @@ -478,10 +479,9 @@ func (n *nodesManager) UpdateNodeInfo(
ctx context.Context, request messages.UpdateNodeInfoRequest) (messages.UpdateNodeInfoResponse, error) {
existing, err := n.Get(ctx, request.NodeInfo.ID())
if err != nil {
if errors.As(err, &ErrNodeNotFound{}) {
if bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError) {
// return an error that a handshake is first required
return messages.UpdateNodeInfoResponse{},
fmt.Errorf("node %s not yet registered - handshake required", request.NodeInfo.ID())
return messages.UpdateNodeInfoResponse{}, NewErrHandshakeRequired(request.NodeInfo.ID())
}
return messages.UpdateNodeInfoResponse{}, err
}
Expand Down Expand Up @@ -519,12 +519,12 @@ func (n *nodesManager) Heartbeat(
// Get existing live state
existingEntry, ok := n.liveState.Load(request.NodeID)
if !ok {
return messages.HeartbeatResponse{}, fmt.Errorf("node %s not connected - handshake required", request.NodeID)
return messages.HeartbeatResponse{}, NewErrHandshakeRequired(request.NodeID)
}

existing := existingEntry.(*trackedLiveState)
if existing.connectionState.Status != models.NodeStates.CONNECTED {
return messages.HeartbeatResponse{}, fmt.Errorf("node %s not connected - handshake required", request.NodeID)
return messages.HeartbeatResponse{}, NewErrHandshakeRequired(request.NodeID)
}

// updated connection state
Expand All @@ -550,7 +550,7 @@ func (n *nodesManager) Heartbeat(
LastComputeSeqNum: updated.LastComputeSeqNum,
}, nil
}
return messages.HeartbeatResponse{}, errors.New("concurrent update conflict")
return messages.HeartbeatResponse{}, NewErrConcurrentModification()
}

// ApproveNode approves a node for cluster participation.
Expand All @@ -568,7 +568,7 @@ func (n *nodesManager) ApproveNode(ctx context.Context, nodeID string) error {
}

if state.Membership == models.NodeMembership.APPROVED {
return errors.New("node already approved")
return NewErrNodeAlreadyApproved(nodeID)
}

state.Membership = models.NodeMembership.APPROVED
Expand All @@ -593,7 +593,7 @@ func (n *nodesManager) RejectNode(ctx context.Context, nodeID string) error {
}

if state.Membership == models.NodeMembership.REJECTED {
return errors.New("node already rejected")
return NewErrNodeAlreadyRejected(nodeID)
}

// Update persistent state first
Expand Down
5 changes: 2 additions & 3 deletions pkg/orchestrator/nodes/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package nodes_test

import (
"context"
"errors"
"fmt"
"sync"
"testing"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes"
Expand Down Expand Up @@ -502,8 +502,7 @@ func (s *NodeManagerTestSuite) TestNodeDeletion() {
// Verify node is deleted
_, err = s.manager.Get(s.ctx, nodeInfo.ID())
assert.Error(s.T(), err)
var notFound nodes.ErrNodeNotFound
assert.True(s.T(), errors.As(err, &notFound))
assert.True(s.T(), bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError))
}

func (s *NodeManagerTestSuite) TestConnectionStateChangeEvents() {
Expand Down

0 comments on commit 420d30d

Please sign in to comment.