-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
new node manager decoupled from transport layer #4728
Conversation
WalkthroughThe changes in this pull request encompass a range of modifications across multiple files, primarily focusing on the integration of a new transport protocol and the restructuring of node management functionalities. Key updates include the introduction of new types and interfaces, modifications to existing method signatures to accommodate legacy message types, and the removal of outdated components. Additionally, the handling of node information has shifted from using state-based models to information-based models, enhancing the clarity of data representation. The updates also include new test cases and adjustments to existing tests to align with these structural changes. Changes
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Outside diff range and nitpick comments (28)
pkg/node/requester.go (2)
72-75
: Handle potential error when creating NATS clientEnsure that the error returned by
transportLayer.CreateClient(ctx)
is properly wrapped or logged to provide more context in case of failures. This can aid in debugging connection issues with the NATS server.Apply this diff to wrap the error with additional context:
if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create NATS client: %w", err) }
Line range hint
288-327
: EnsurenodesManager
is properly stopped during cleanupCurrently, the
nodesManager
is started increateNodeManager
but not stopped in thecleanupFunc
. This could lead to resource leaks or goroutines left running after the node shuts down. It's important to stop thenodesManager
during cleanup to ensure resources are released properly.Apply this diff to stop the
nodesManager
in thecleanupFunc
:cleanupFunc := func(ctx context.Context) { // existing cleanup code + // Stop the node manager + if cleanupErr = nodesManager.Stop(ctx); cleanupErr != nil { + logDebugIfContextCancelled(ctx, cleanupErr, "failed to stop node manager") + } }pkg/orchestrator/nodes/manager.go (3)
135-169
: Handle concurrentStart
calls gracefullyThe
Start
method currently returns an error if thenodesManager
is already running. To make the method more robust, consider making it idempotent by simply returning ifStart
is called multiple times, without treating it as an error.Apply this diff to make
Start
idempotent:func (n *nodesManager) Start(ctx context.Context) error { n.mu.Lock() defer n.mu.Unlock() if n.running { - return errors.New("node manager already running") + return nil } // rest of the code
450-491
: Improve concurrency handling inHeartbeat
methodThe
Heartbeat
method uses a retry loop with a fixed number of attempts to handle concurrent updates. This approach might not scale well under high contention. Consider using atomic operations or locks to manage concurrent access toliveState
, ensuring thread safety and reducing the chances of conflicts.
373-419
: Validate node type before processing handshakeIn the
Handshake
method, the check to validate if the node is a compute node happens after accessing the store to check for existing nodes. To optimize performance, consider validating the node type before accessing the store, which can reduce unnecessary I/O operations for invalid nodes.Apply this diff to rearrange the validation:
// Validate the node is compute type if !request.NodeInfo.IsComputeNode() { return messages.HandshakeResponse{ Accepted: false, Reason: "node is not a compute node", }, nil } +// Check if node is already registered existing, err := n.store.Get(ctx, request.NodeInfo.ID()) if err == nil { // existing node logic
pkg/orchestrator/nodes/manager_test.go (1)
470-489
: Add assertions forQueueUsedCapacity
updatesIn the
TestResourceUpdates
test, whileAvailableCapacity
is updated and asserted,QueueUsedCapacity
is updated but not asserted. To ensure full coverage, consider adding assertions forQueueUsedCapacity
to verify that it is updated correctly.Apply this diff to add assertions:
// Update resources via heartbeat newResources := models.Resources{CPU: 8, Memory: 16384, GPU: 2} +newQueueUsed := models.Resources{CPU: 2, Memory: 4096} _, err = s.manager.Heartbeat(s.ctx, nodes.ExtendedHeartbeatRequest{ HeartbeatRequest: messages.HeartbeatRequest{ NodeID: nodeInfo.ID(), AvailableCapacity: newResources, + QueueUsedCapacity: newQueueUsed, }, }) s.Require().NoError(err) // Verify resource update state, err := s.manager.Get(s.ctx, nodeInfo.ID()) s.Require().NoError(err) assert.Equal(s.T(), newResources, state.Info.ComputeNodeInfo.AvailableCapacity) +assert.Equal(s.T(), newQueueUsed, state.Info.ComputeNodeInfo.QueueUsedCapacity)pkg/transport/bprotocol/types.go (1)
9-18
: LGTM! Clean interface design that supports decoupling.The
ManagementEndpoint
interface is well-designed with clear separation of concerns, focusing purely on transport-layer operations. The use of legacy types suggests proper backward compatibility handling.Consider adding a version identifier to the interface name (e.g.,
ManagementEndpointV1
) to make future interface evolution easier while maintaining backward compatibility.pkg/transport/bprotocol/compute/heartbeat.go (2)
16-18
: Simplify constructor by removing unnecessary error return.The constructor doesn't perform any operations that could fail, making the error return value unnecessary.
-func NewHeartbeatClient(nodeID string, publisher ncl.Publisher) (*HeartbeatClient, error) { - return &HeartbeatClient{publisher: publisher, nodeID: nodeID}, nil +func NewHeartbeatClient(nodeID string, publisher ncl.Publisher) *HeartbeatClient { + return &HeartbeatClient{publisher: publisher, nodeID: nodeID} }
28-30
: Document why Close is a no-op or implement cleanup.The
Close
method is currently empty. Either document why no cleanup is needed or implement necessary cleanup operations.func (h *HeartbeatClient) Close(ctx context.Context) error { + // No cleanup needed as the publisher lifecycle is managed externally return nil }
pkg/orchestrator/nodes/errors.go (2)
Line range hint
17-19
: Simplify error string formattingThe
.Error()
call onfmt.Errorf()
is redundant asfmt.Errorf()
already returns an error which implements the Error() method.func (e ErrNodeNotFound) Error() string { - return fmt.Errorf("nodeInfo not found for nodeID: %s", e.nodeID).Error() + return fmt.Sprintf("nodeInfo not found for nodeID: %s", e.nodeID) }
Line range hint
33-35
: Simplify error string formattingSimilar to above, the
.Error()
call is redundant.func (e ErrMultipleNodesFound) Error() string { - return fmt.Errorf("multiple nodes found for nodeID prefix: %s, matching nodeIDs: %v", e.nodeIDPrefix, e.matchingNodeIDs).Error() + return fmt.Sprintf("multiple nodes found for nodeID prefix: %s, matching nodeIDs: %v", e.nodeIDPrefix, e.matchingNodeIDs) }pkg/models/messages/legacy/node.go (1)
43-43
: Document the empty response typeConsider adding a comment explaining why
UpdateResourcesResponse
is empty. This helps future maintainers understand if this is intentional or if fields might be added later.+// UpdateResourcesResponse is intentionally empty as the operation +// is fire-and-forget and no response data is needed. type UpdateResourcesResponse struct{}pkg/models/messages/node.go (1)
23-31
: Consider adding timestamps for latency tracking.The heartbeat mechanism could benefit from including timestamps to:
- Track network latency
- Detect clock skew between nodes
- Improve debugging capabilities
type HeartbeatRequest struct { NodeID string `json:"NodeID"` AvailableCapacity models.Resources `json:"AvailableCapacity"` QueueUsedCapacity models.Resources `json:"QueueUsedCapacity"` LastOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum"` + Timestamp time.Time `json:"Timestamp"` }
pkg/models/node_info_provider.go (3)
33-45
: Consider adding validation for nil decoratorWhile the registration methods are well-implemented, the decorator registration should validate the input.
Consider adding nil check:
func (n *BaseNodeInfoProvider) RegisterNodeInfoDecorator(decorator NodeInfoDecorator) { + if decorator == nil { + return + } n.nodeInfoDecorators = append(n.nodeInfoDecorators, decorator) }
47-61
: Consider adding context validation and timeout handlingThe GetNodeInfo implementation looks good but could benefit from additional context handling.
Consider adding context validation and timeout handling:
func (n *BaseNodeInfoProvider) GetNodeInfo(ctx context.Context) NodeInfo { + if ctx == nil { + ctx = context.Background() + } + + if ctx.Err() != nil { + return NodeInfo{ + NodeID: n.nodeID, + BacalhauVersion: n.bacalhauVersion, + } + } info := NodeInfo{ NodeID: n.nodeID, BacalhauVersion: n.bacalhauVersion, Labels: n.labelsProvider.GetLabels(ctx), NodeType: NodeTypeRequester, SupportedProtocols: n.supportedProtocol, } for _, decorator := range n.nodeInfoDecorators { + select { + case <-ctx.Done(): + break + default: info = decorator.DecorateNodeInfo(ctx, info) + } } return info }
19-19
: Fix typo in field nameThe field name should be plural to match its slice type.
- supportedProtocol []Protocol + supportedProtocols []Protocolpkg/compute/node_info_decorator.go (1)
Line range hint
49-60
: LGTM! Good improvement in data ownership semanticsThe change from pointer to value type for
ComputeNodeInfo
is a positive architectural improvement that:
- Makes data ownership explicit
- Prevents unintended state sharing between nodes
- Improves thread safety
- Aligns well with the goal of decoupling node management from transport layer
pkg/orchestrator/selection/selector/node_selector.go (1)
17-17
: LGTM: Clean interface transition to nodes.LookupThe change from
orchestrator.NodeDiscoverer
tonodes.Lookup
aligns well with the PR objective of decoupling node management from transport layer.This change improves modularity by providing a cleaner separation of concerns between node discovery and transport mechanisms.
Also applies to: 23-23
pkg/orchestrator/watchers/protocol_router.go (2)
17-17
: LGTM: Consistent interface adoptionThe change to use
nodes.Lookup
maintains consistency with the broader refactoring effort.This change completes the decoupling pattern across the codebase, ensuring consistent node management interfaces.
Also applies to: 23-23
Line range hint
67-73
: Consider documenting protocol fallback behaviorThe fallback to
ProtocolBProtocolV2
when no supported protocols are found should be documented, especially given the TODO comment about v1.5 support.Add a comment explaining the fallback behavior and its temporary nature:
if len(matchingProtocols) == 0 { + // Temporary fallback mechanism for backward compatibility + // This will be removed when v1.5 support is discontinued preferredProtocol = models.ProtocolBProtocolV2 } else {pkg/nats/proxy/management_handler.go (1)
Line range hint
82-91
: Consider adding validation for legacy request typesWhile the transition to legacy types is clean, consider adding validation for the legacy request types to ensure backward compatibility.
func (h *ManagementHandler) processRegistration(ctx context.Context, msg *nats.Msg) (*legacy.RegisterResponse, error) { request := new(legacy.RegisterRequest) err := json.Unmarshal(msg.Data, request) if err != nil { log.Ctx(ctx).Error().Msgf("error decoding %s: %s", reflect.TypeOf(request), err) return nil, err } + if err := request.Validate(); err != nil { + log.Ctx(ctx).Error().Msgf("invalid request: %s", err) + return nil, err + } return h.endpoint.Register(ctx, *request) }cmd/cli/node/columns.go (1)
73-111
: Consider performance implications of value semantics.The shift from pointer to value semantics for
ComputeNodeInfo
could impact performance with large structs. However, in this UI context, the impact should be minimal as the data size is typically small.Consider benchmarking if this type is used in performance-critical paths.
pkg/publicapi/endpoint/orchestrator/node.go (2)
210-220
: Improve error handling with custom error typesConsider creating custom error types for unsupported actions instead of using fmt.Errorf. This would provide better error handling capabilities for API clients.
+type UnsupportedActionError struct { + Action string +} + +func (e UnsupportedActionError) Error() string { + return fmt.Sprintf("unsupported action %s", e.Action) +} var action func(context.Context, string) error if args.Action == string(apimodels.NodeActionApprove) { action = e.nodeManager.ApproveNode } else if args.Action == string(apimodels.NodeActionReject) { action = e.nodeManager.RejectNode } else if args.Action == string(apimodels.NodeActionDelete) { action = e.nodeManager.DeleteNode } else { action = func(context.Context, string) error { - return fmt.Errorf("unsupported action %s", args.Action) + return UnsupportedActionError{Action: args.Action} } }
223-227
: Consider adding action result detailsThe response only includes a success boolean. Consider adding more details about the action performed and its outcome to improve API usability.
type PutNodeResponse struct { Success bool + Action string + Message string } if err := action(ctx, nodeID); err != nil { return err } return c.JSON(http.StatusOK, apimodels.PutNodeResponse{ Success: true, + Action: args.Action, + Message: fmt.Sprintf("Successfully performed %s action on node %s", args.Action, nodeID), })pkg/compute/management_client.go (2)
23-23
: Consider documenting the transport layer transition.The shift to
bprotocol
types represents a significant architectural change. Consider adding documentation about:
- The rationale behind using bprotocol
- The relationship between ManagementEndpoint and HeartbeatClient
- Migration guide for implementations using the old interfaces
Also applies to: 28-28, 39-39, 45-45
90-92
: Ensure consistent error handling in legacy request handling.The error handling in the legacy request methods could be more informative. Consider:
- Including the original error in the wrapped error message
- Adding context about the failed operation
- return errors.New("failed to register with requester node") + return fmt.Errorf("failed to register with requester node: %w", err)Also applies to: 118-121, 134-138
pkg/orchestrator/selection/ranking/over_subscription_test.go (1)
33-33
: LGTM! The transition to value types improves safety and clarity.The change from pointer types to value types for
ComputeNodeInfo
is a good architectural decision as it:
- Eliminates potential nil pointer dereferences
- Makes data ownership more explicit
- Reduces memory indirection
Consider adding test cases for zero values of
ComputeNodeInfo
to ensure proper handling of default values, as the transition to value types makes this scenario more relevant.Also applies to: 42-42, 55-55, 68-68, 81-81, 93-93, 105-105, 118-118, 131-131, 144-144, 157-157, 170-170, 183-183
pkg/transport/bprotocol/orchestrator/heartbeat_test.go (1)
231-232
: Define test constants for magic numbersConsider extracting these magic numbers into named constants for better maintainability and clarity.
+const ( + testNumNodes = 10 + testNumHeartbeatsPerNode = 100 +) - numNodes := 10 - numHeartbeatsPerNode := 100 + numNodes := testNumNodes + numHeartbeatsPerNode := testNumHeartbeatsPerNode
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (69)
.cspell/custom-dictionary.txt
(1 hunks)cmd/cli/agent/node.go
(1 hunks)cmd/cli/node/columns.go
(3 hunks)pkg/compute/management_client.go
(5 hunks)pkg/compute/mocks.go
(0 hunks)pkg/compute/node_info_decorator.go
(1 hunks)pkg/compute/types.go
(0 hunks)pkg/models/messages/legacy/node.go
(1 hunks)pkg/models/messages/node.go
(1 hunks)pkg/models/node_info.go
(2 hunks)pkg/models/node_info_provider.go
(1 hunks)pkg/models/node_state.go
(1 hunks)pkg/models/resource.go
(1 hunks)pkg/nats/proxy/management_handler.go
(5 hunks)pkg/nats/proxy/management_proxy.go
(4 hunks)pkg/nats/transport/nats.go
(4 hunks)pkg/node/compute.go
(3 hunks)pkg/node/heartbeat/client.go
(0 hunks)pkg/node/heartbeat/heartbeat_test.go
(0 hunks)pkg/node/heartbeat/server.go
(0 hunks)pkg/node/heartbeat/types.go
(0 hunks)pkg/node/manager/node_manager.go
(0 hunks)pkg/node/ncl.go
(1 hunks)pkg/node/node.go
(2 hunks)pkg/node/requester.go
(13 hunks)pkg/orchestrator/interfaces.go
(0 hunks)pkg/orchestrator/mocks.go
(2 hunks)pkg/orchestrator/nodes/errors.go
(1 hunks)pkg/orchestrator/nodes/inmemory/inmemory.go
(6 hunks)pkg/orchestrator/nodes/inmemory/inmemory_test.go
(11 hunks)pkg/orchestrator/nodes/kvstore/kvstore.go
(6 hunks)pkg/orchestrator/nodes/kvstore/kvstore_test.go
(9 hunks)pkg/orchestrator/nodes/manager.go
(1 hunks)pkg/orchestrator/nodes/manager_test.go
(1 hunks)pkg/orchestrator/nodes/mocks.go
(1 hunks)pkg/orchestrator/nodes/types.go
(1 hunks)pkg/orchestrator/selection/discovery/info_provider.go
(1 hunks)pkg/orchestrator/selection/ranking/available_capacity_test.go
(2 hunks)pkg/orchestrator/selection/ranking/features.go
(2 hunks)pkg/orchestrator/selection/ranking/features_test.go
(7 hunks)pkg/orchestrator/selection/ranking/max_usage.go
(1 hunks)pkg/orchestrator/selection/ranking/max_usage_test.go
(2 hunks)pkg/orchestrator/selection/ranking/over_subscription.go
(1 hunks)pkg/orchestrator/selection/ranking/over_subscription_test.go
(13 hunks)pkg/orchestrator/selection/selector/node_selector.go
(3 hunks)pkg/orchestrator/watchers/bprotocol_dispatcher_test.go
(3 hunks)pkg/orchestrator/watchers/ncl_message_creator_test.go
(3 hunks)pkg/orchestrator/watchers/protocol_router.go
(1 hunks)pkg/orchestrator/watchers/protocol_router_test.go
(2 hunks)pkg/publicapi/apimodels/agent.go
(1 hunks)pkg/publicapi/endpoint/agent/endpoint.go
(2 hunks)pkg/publicapi/endpoint/orchestrator/endpoint.go
(1 hunks)pkg/publicapi/endpoint/orchestrator/node.go
(4 hunks)pkg/publicapi/endpoint/shared/endpoint.go
(2 hunks)pkg/publicapi/test/agent_test.go
(1 hunks)pkg/routing/kvstore/migration_test.go
(0 hunks)pkg/routing/kvstore/migrations.go
(0 hunks)pkg/routing/mocks.go
(0 hunks)pkg/routing/node_info_provider.go
(0 hunks)pkg/routing/tracing/metrics.go
(0 hunks)pkg/routing/tracing/tracing.go
(0 hunks)pkg/routing/types.go
(0 hunks)pkg/test/compute/mocks.go
(1 hunks)pkg/test/teststack/stack.go
(1 hunks)pkg/transport/bprotocol/compute/heartbeat.go
(1 hunks)pkg/transport/bprotocol/orchestrator/heartbeat_test.go
(1 hunks)pkg/transport/bprotocol/orchestrator/server.go
(1 hunks)pkg/transport/bprotocol/types.go
(1 hunks)pkg/transport/dispatcher/dispatcher.go
(0 hunks)
💤 Files with no reviewable changes (16)
- pkg/routing/tracing/metrics.go
- pkg/transport/dispatcher/dispatcher.go
- pkg/routing/types.go
- pkg/compute/types.go
- pkg/node/heartbeat/heartbeat_test.go
- pkg/orchestrator/interfaces.go
- pkg/node/heartbeat/types.go
- pkg/node/heartbeat/client.go
- pkg/routing/kvstore/migration_test.go
- pkg/compute/mocks.go
- pkg/routing/node_info_provider.go
- pkg/routing/tracing/tracing.go
- pkg/routing/kvstore/migrations.go
- pkg/node/heartbeat/server.go
- pkg/routing/mocks.go
- pkg/node/manager/node_manager.go
✅ Files skipped from review due to trivial changes (1)
- .cspell/custom-dictionary.txt
🔇 Additional comments (73)
pkg/orchestrator/nodes/types.go (2)
12-28
: Manager
interface is well-defined and comprehensive
The Manager
interface appropriately extends Lookup
and includes essential methods for node management, such as Handshake
, UpdateNodeInfo
, Heartbeat
, and lifecycle control methods like Start
and Stop
.
42-50
: Store
interface design aligns with best practices
The Store
interface correctly extends Lookup
and adds necessary methods Put
and Delete
for managing node state. This separation of concerns improves code maintainability and clarity.
pkg/transport/bprotocol/orchestrator/server.go (1)
43-51
: Handle unexpected payload types more robustly
In the HandleMessage
method, if the payload type is unexpected, it returns an error. Ensure that the error is appropriately handled by the caller to prevent silent failures.
Confirm that the caller of HandleMessage
properly handles and logs the error returned when an unexpected payload type is encountered.
pkg/node/node.go (2)
149-155
: Initialization of nodeInfoProvider
is correct and follows best practices
The nodeInfoProvider
is properly initialized with the necessary parameters and label providers, ensuring that node information is accurately represented.
220-231
: Self-registration logic is appropriate for requester-only nodes
The condition to register the requester node when orchestrator mode is enabled and compute mode is disabled is correctly implemented. The node state is set with appropriate membership and connection status.
pkg/orchestrator/selection/discovery/info_provider.go (1)
7-14
: Updated NewDebugInfoProvider
to use nodes.Lookup
The NewDebugInfoProvider
function has been correctly updated to use the nodes.Lookup
interface instead of the deprecated orchestrator.NodeDiscoverer
. This aligns with the new node management architecture and ensures compatibility.
pkg/publicapi/apimodels/agent.go (1)
34-34
: Verify API compatibility with NodeInfo transition.
The change from NodeState
to NodeInfo
aligns with the decoupling objective but may impact API consumers.
✅ Verification successful
Let me analyze the results. The search shows that NodeState
is being replaced with NodeInfo
as part of a broader transition. The key findings from the search results:
GetAgentNodeResponse
has no direct usages found in the codebase.- The change from
NodeState
toNodeInfo
appears to be part of a controlled transition whereNodeInfo
is a subset ofNodeState
(as evidenced by the comment inpkg/publicapi/test/agent_test.go
). - The comment explicitly states: "we are only asserting NodeInfos are equal (which excludes approvals and liveness from NodeState)".
Let me generate the final response:
API change is safe and aligned with decoupling strategy
The transition from NodeState
to NodeInfo
is a deliberate reduction in scope, where NodeInfo
represents a subset of NodeState
excluding approvals and liveness information. This change aligns with the API's evolution towards more focused data models.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for direct NodeState usage that might need updates
rg "NodeState" --type go
# Search for GetAgentNodeResponse usage patterns
ast-grep --pattern 'GetAgentNodeResponse'
Length of output: 21705
pkg/transport/bprotocol/compute/heartbeat.go (1)
20-26
: Clarify legacy topic handling in SendHeartbeat.
The comment mentions sending to "current and legacy topics" but the implementation only shows one publish call.
pkg/orchestrator/nodes/errors.go (1)
Line range hint 1-35
: LGTM on error handling structure
The error types are well-designed with:
- Clear separation of concerns
- Informative error messages
- Good UX consideration in truncating long lists
- Proper encapsulation of error details
pkg/models/messages/legacy/node.go (2)
9-17
: Well-documented heartbeat implementation
Excellent documentation explaining the rationale behind using sequence numbers instead of timestamps. This helps prevent subtle distributed systems issues related to clock synchronization.
1-43
: LGTM on message structure design
The message types are well-structured with:
- Clear request/response pairs
- Consistent field naming
- Good type reuse (models.NodeInfo, models.Resources)
- Proper separation into legacy package
pkg/publicapi/endpoint/orchestrator/endpoint.go (2)
Line range hint 1-50
: LGTM on endpoint structure
The endpoint changes maintain clean separation of concerns while implementing the new node management interface. Route handlers and middleware remain properly organized.
16-16
: Verify NodeManager interface compatibility
The change from manager.NodeManager
to nodes.Manager
is aligned with the PR objective of decoupling from transport layer.
Also applies to: 23-23
✅ Verification successful
Let me gather more information about the old interface to ensure compatibility.
Let me try a different approach to find the interface methods.
Interface change is compatible and correctly implemented
Based on the evidence from the codebase:
- The new
nodes.Manager
interface includes all the required methods (GetByPrefix
,List
,ApproveNode
,RejectNode
,DeleteNode
) that are used in the endpoint implementation. - The implementation in
nodesManager
correctly provides these methods with matching signatures. - All callers in
pkg/publicapi/endpoint/orchestrator/
are using the methods consistently with the new interface.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all required methods from the old interface are still available in the new one
# and that all callers have been updated.
# Check interface method signatures
ast-grep --pattern 'type Manager interface {
$$$
}'
# Find all calls to NodeManager methods
rg -A 2 '\bnodeManager\.' pkg/publicapi/endpoint/orchestrator/
Length of output: 3217
Script:
#!/bin/bash
# Check the old interface definition and its usage
ast-grep --pattern 'type NodeManager interface {
$$$
}'
# Find GetByPrefix and List methods in the new interface
ast-grep --pattern 'GetByPrefix(ctx context.Context, $$$) ($$$)'
ast-grep --pattern 'List(ctx context.Context) ($$$)'
Length of output: 181
Script:
#!/bin/bash
# Search for the old interface and its methods using ripgrep
rg -A 10 'type (Node)?Manager' pkg/orchestrator/
# Find the implementation of GetByPrefix and List methods
rg -A 2 'func.*GetByPrefix|func.*List' pkg/orchestrator/nodes/
Length of output: 11260
pkg/models/messages/node.go (2)
9-20
: LGTM! Well-structured handshake protocol implementation.
The handshake protocol is well-designed with:
- Clear separation between request and response
- Sequence numbers for reliable delivery
- Proper error handling via acceptance and reason fields
34-40
: LGTM! Clean separation of node info updates.
The update mechanism properly separates node info updates from resource updates and includes appropriate response handling.
pkg/models/node_state.go (3)
14-18
: LGTM! Clean separation of connection state.
Good practice:
- Clear deprecation notice
- Separation of durable node info from connection state
- Maintaining backward compatibility
21-37
: LGTM! Comprehensive connection tracking.
The ConnectionState structure provides excellent tracking with:
- Sequence numbers for reliable delivery
- Timestamps for connection events
- Error tracking for debugging
39-40
: 🛠️ Refactor suggestion
Consider thread safety in IsConnected method.
The method accesses ConnectionState without synchronization. If the ConnectionState can be modified concurrently, this could lead to race conditions.
Consider adding synchronization:
+import "sync/atomic"
type ConnectionState struct {
- Status NodeConnectionState
+ status atomic.Value // holds NodeConnectionState
// ... other fields ...
}
func (s *NodeState) IsConnected() bool {
- return s.ConnectionState.Status == NodeStates.CONNECTED
+ if status, ok := s.ConnectionState.status.Load().(NodeConnectionState); ok {
+ return status == NodeStates.CONNECTED
+ }
+ return false
}
pkg/test/compute/mocks.go (1)
6-7
: LGTM! Clean interface transition.
Good practices:
- Clear import of legacy types
- Compile-time interface verification
- Clean transition to new protocol package
Also applies to: 39-39
pkg/publicapi/test/agent_test.go (2)
35-35
: LGTM! Test updates align with NodeState to NodeInfo transition
The changes correctly reflect the transition from NodeState to NodeInfo in the test assertions. The comment provides clear explanation about comparing only NodeInfo fields due to heartbeat behavior.
Also applies to: 43-43
51-51
: LGTM! Consistent assertion update for compute node test
The assertion update maintains consistency with the NodeState to NodeInfo transition.
cmd/cli/agent/node.go (1)
59-59
: LGTM! CLI output updated to use NodeInfo
The change correctly updates the CLI output to use NodeInfo instead of NodeState, maintaining consistency with the architectural changes.
pkg/models/node_info_provider.go (2)
7-31
: LGTM! Well-structured provider implementation with clear parameter separation
The implementation provides a clean separation between parameters and provider state. The constructor properly initializes all fields including an empty decorator slice.
1-64
: Verify usage of NodeInfoProvider interface
Let's verify that this new implementation is being used consistently across the codebase.
✅ Verification successful
Based on the search results, I can now generate the final response:
Implementation of NodeInfoProvider is consistent across the codebase
The BaseNodeInfoProvider
is being used correctly and consistently:
- It's instantiated in
pkg/node/node.go
with proper parameters including NodeID, BacalhauVersion, and SupportedProtocols - The interface is properly used in endpoint packages (
pkg/publicapi/endpoint/shared
andpkg/publicapi/endpoint/agent
) - The implementation satisfies the interface contract (verified by compile-time check)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for NodeInfoProvider interface usage
rg "NodeInfoProvider" -A 3
# Search for BaseNodeInfoProvider instantiation
rg "NewBaseNodeInfoProvider" -A 3
Length of output: 5139
pkg/node/ncl.go (1)
9-9
: LGTM! Verify consistent usage of legacy types
The transition to legacy message types is a good step in decoupling node management from transport layer. This change maintains backward compatibility while allowing for future improvements.
Let's verify consistent usage of legacy types across the codebase:
Also applies to: 16-16
✅ Verification successful
Let me check the content of the file that still uses the old heartbeat types to understand if it's an oversight or intentional.
No issues found - the code correctly uses legacy types
The file pkg/transport/bprotocol/compute/heartbeat.go
is actually correctly importing and using the legacy types from pkg/models/messages/legacy
. The initial grep result was a false positive as it matched the struct initialization syntax messages.Heartbeat{...}
where messages
is actually aliased to the legacy package. This confirms consistent usage across the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining direct usage of non-legacy heartbeat types
# that might have been missed during the transition
echo "Checking for potential missed transitions to legacy types..."
# Check for old heartbeat message type usage
rg -l "heartbeat\.HeartbeatMessageType|messages\.Heartbeat\{" \
--type go \
--glob '!pkg/models/messages/legacy/**'
# Check for consistent usage of legacy types in message handling
ast-grep --pattern 'Register($_, messages.Heartbeat{})'
Length of output: 347
Script:
#!/bin/bash
# Check the content of the file using the old types
cat pkg/transport/bprotocol/compute/heartbeat.go
# Also check if there are any references to legacy types in this file
rg "legacy\." pkg/transport/bprotocol/compute/heartbeat.go
Length of output: 972
pkg/orchestrator/selection/ranking/max_usage.go (1)
Line range hint 36-43
: Verify proper initialization of ComputeNodeInfo
The removal of the nil check assumes ComputeNodeInfo is always properly initialized. While this aligns with the transition to value types, we should verify that all code paths properly initialize this field to prevent potential panics.
Let's verify the initialization paths:
Consider adding validation in the NodeInfo constructor to ensure ComputeNodeInfo is properly initialized, making the assumption of non-nil explicit in the code.
✅ Verification successful
Based on the comprehensive analysis of the codebase, I can now provide a final response regarding the safety of removing the nil check for ComputeNodeInfo.
ComputeNodeInfo is safely initialized in all code paths
The removal of the nil check is safe because:
- ComputeNodeInfo is properly initialized in the NodeInfoDecorator.DecorateNodeInfo method (pkg/compute/node_info_decorator.go), which is the central point for decorating compute node information.
- The field is always initialized as a value type (not a pointer) in the NodeInfo struct.
- The code base consistently checks IsComputeNode() before accessing ComputeNodeInfo, providing an additional safety layer.
- All test cases and actual usage patterns show proper initialization of the struct.
Key evidence:
- The NodeInfoDecorator always initializes all fields of ComputeNodeInfo when decorating node info
- The struct is used as a value type, not a pointer, making it impossible to be nil
- The type system ensures the struct is always initialized with zero values at minimum
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check initialization patterns of ComputeNodeInfo across the codebase
echo "Checking ComputeNodeInfo initialization patterns..."
# Find all struct initializations containing ComputeNodeInfo
ast-grep --pattern 'NodeInfo{
$$$
ComputeNodeInfo: $_,
$$$
}'
# Find potential direct field assignments
rg -A 2 "\.ComputeNodeInfo\s*=\s*" --type go
# Look for test cases to ensure proper initialization
rg -l "TestNode.*Info" --type go
Length of output: 512
Script:
#!/bin/bash
# Let's check for the ComputeNodeInfo type definition and its usage patterns
# Check the type definition and any constructor patterns
ast-grep --pattern 'type ComputeNodeInfo struct {
$$$
}'
# Look for any factory methods or constructors that might initialize NodeInfo
ast-grep --pattern 'func New$_NodeInfo($$$) $_ {
$$$
}'
# Check for any validation methods
ast-grep --pattern 'func ($_ NodeInfo) Validate() $_ {
$$$
}'
# Look for test files that might show initialization patterns
rg -A 5 "ComputeNodeInfo" --type go --glob "*_test.go"
Length of output: 17112
Script:
#!/bin/bash
# Let's check how ComputeNodeInfo is initialized in the node_info_decorator.go file
# and verify if there are any paths where it might not be initialized
# Check the node_info_decorator implementation
rg -A 10 "type NodeInfoDecorator" --type go
# Check where NodeInfo is created/modified
ast-grep --pattern 'func ($_ *NodeInfoDecorator) $_($$$) models.NodeInfo {
$$$
}'
# Look for any direct modifications to NodeInfo
rg -A 5 "models\.NodeInfo\{" --type go
# Check for any validation or initialization methods
rg -A 5 "func.*NodeInfo" --type go | grep -v "test"
Length of output: 55223
pkg/orchestrator/selection/ranking/features.go (2)
8-9
: LGTM: Import reordering
The import reordering maintains clean code organization.
58-58
: Verify safety of removed nil check for ComputeNodeInfo
The removal of the nil check for node.ComputeNodeInfo
could lead to panics if the assumption that it's always present is violated. Let's verify this assumption across the codebase.
pkg/orchestrator/selection/selector/node_selector.go (1)
84-84
: LGTM: Improved node state handling
The update to use nodeState.ConnectionState.Status
provides a more structured approach to checking node connectivity.
pkg/models/node_info.go (2)
40-42
: LGTM: Clean interface design
The new NodeInfoProvider
interface follows Go's best practices with a single, focused method. This aligns well with the PR's objective of decoupling node management.
90-90
: Verify the impact of making ComputeNodeInfo non-nullable
The change from pointer to value type means ComputeNodeInfo
can no longer be nil. This could affect code that relies on nil checks.
✅ Verification successful
Based on the search results, I can now provide a final response:
Making ComputeNodeInfo non-nullable is safe to proceed
The codebase shows that ComputeNodeInfo is primarily accessed through optional chaining in the frontend (node.Info?.ComputeNodeInfo) and there are no direct nil checks on the ComputeNodeInfo field itself. The field is:
- Used safely in the frontend with optional chaining (e.g.,
node.Info?.ComputeNodeInfo?.AvailableCapacity
) - Initialized with zero values in manager.go when needed (e.g.,
state.Info.ComputeNodeInfo.AvailableCapacity = models.Resources{}
) - Always populated for compute nodes through the node_info_decorator.go
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for potential nil checks on ComputeNodeInfo
rg -A 2 "ComputeNodeInfo.*== *nil|ComputeNodeInfo.*!= *nil"
# Search for direct field access that might need zero-value handling
ast-grep --pattern 'ComputeNodeInfo.$_'
Length of output: 103
Script:
#!/bin/bash
# Search for any usage of ComputeNodeInfo field
rg -A 2 "ComputeNodeInfo"
# Search for struct definition of ComputeNodeInfo
rg -A 5 "type ComputeNodeInfo struct"
# Search for any method or function that uses NodeInfo struct
ast-grep --pattern 'func $_($_ *NodeInfo) $_'
Length of output: 37323
pkg/publicapi/endpoint/shared/endpoint.go (2)
14-16
: LGTM: Clean dependency injection
The EndpointParams
struct cleanly injects the new NodeInfoProvider
, maintaining good separation of concerns.
76-76
: LGTM: Simplified node info retrieval
The endpoint now directly uses GetNodeInfo
, making the code more straightforward and maintaining the same functionality.
pkg/nats/proxy/management_handler.go (2)
13-14
: LGTM: Clean transition to legacy types
The imports cleanly separate legacy message types and the new transport protocol.
22-22
: LGTM: Clean management handler structure
The handler's structure cleanly integrates with the new protocol while maintaining legacy support.
Also applies to: 25-25
pkg/test/teststack/stack.go (1)
79-79
: LGTM: NodeDiscoverer to NodeInfoStore transition
The change from NodeDiscoverer.List
to NodeInfoStore.List
aligns with the PR's objective of decoupling the node manager. The test's functionality remains intact as it still properly verifies node discovery.
Let's verify if NodeInfoStore is consistently used across test files:
✅ Verification successful
The search results show that NodeDiscoverer
is no longer used in test files, while NodeInfoStore
is consistently used across the test suite, particularly in agent_test.go
and has a dedicated test suite in kvstore_test.go
. Let me verify one more aspect to ensure the transition is complete:
Let me check one final aspect - the mock implementations to ensure the transition is properly handled:
Let me check one final aspect to understand the complete transition:
Based on the comprehensive verification results, I can now provide the final response:
LGTM: Verified NodeDiscoverer to NodeInfoStore transition
The change from NodeDiscoverer.List
to NodeInfoStore.List
is correct and consistent with the codebase's architecture. The verification shows:
NodeDiscoverer
only exists in mocks, indicating it's being phased outNodeInfoStore
is properly implemented and tested across the codebase- The transition maintains the same functionality while aligning with the decoupling objective
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining NodeDiscoverer usage in test files
rg -g '**/*_test.go' 'NodeDiscoverer'
# Search for NodeInfoStore usage in test files to confirm adoption
rg -g '**/*_test.go' 'NodeInfoStore'
Length of output: 1846
Script:
#!/bin/bash
# Check if NodeDiscoverer still exists in the codebase
rg 'type.*NodeDiscoverer'
# Check the implementation of NodeInfoStore.List to ensure it provides equivalent functionality
ast-grep --pattern 'func $_($_ *$_) List($_ context.Context) ($_, error) {
$$$
}'
Length of output: 243
Script:
#!/bin/bash
# Check the content of mocks.go to understand the context of NodeDiscoverer mock
cat pkg/orchestrator/mocks.go
# Also check for any NodeInfoStore mocks
rg 'Mock.*NodeInfoStore'
Length of output: 15231
Script:
#!/bin/bash
# Check for the NodeInfoStore interface definition
rg -A 5 'type NodeInfoStore interface'
# Check for any remaining NodeState references in the codebase
rg 'NodeState' -g '!*_test.go' -g '!*.md'
Length of output: 18256
pkg/publicapi/endpoint/agent/endpoint.go (2)
19-19
: LGTM: Consistent transition to NodeInfoProvider
The replacement of NodeStateProvider with NodeInfoProvider across structs and initialization is consistent with the architectural changes. The transition maintains clean dependency injection patterns.
Also applies to: 26-26, 34-34
89-91
: Verify API backward compatibility
The endpoint now returns NodeInfo instead of NodeState. While this aligns with the new architecture, we should ensure API clients are prepared for this change.
Let's check for potential API clients:
Consider implementing API versioning or providing a transition period if this is a breaking change for API clients.
pkg/orchestrator/selection/ranking/features_test.go (2)
26-26
: LGTM: Improved struct initialization
The change from pointer to value type for ComputeNodeInfo initialization improves code clarity and reduces unnecessary indirection.
Also applies to: 30-30, 34-34, 38-38, 42-42
73-73
: Verify ranking behavior change for unknown nodes
The ranking for unknown nodes has been changed from 0 to -1, which appears to be a more explicit way to indicate unsupported nodes.
Let's verify this behavior is consistent across the codebase:
Also applies to: 84-84, 96-96, 110-110, 125-125
✅ Verification successful
Based on the search results, I can now generate the final response as I have sufficient information to verify the ranking behavior for unknown nodes:
Ranking behavior for unknown nodes is consistently set to -1
Looking at the test files, particularly in features_test.go
and chain_test.go
, the ranking value of -1 for unknown or unsupported nodes is consistently applied across all test cases. This indicates a deliberate design choice to use -1 as a clear indicator for nodes that should not be selected, which is more explicit than the previous value of 0.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other ranking tests to ensure consistent behavior
ast-grep --pattern 'assertEquals($_, $_, "unknown", $_)'
# Search for ranking logic implementation
rg -A 5 'func.*RankNodes'
Length of output: 21645
pkg/nats/proxy/management_proxy.go (2)
50-52
: LGTM: Method signatures consistently updated.
The method signatures have been consistently updated to use legacy types while maintaining the same behavior.
Also applies to: 68-70, 85-87
14-14
: Verify the impact of transitioning to legacy types.
The transition from messages
to legacy
package suggests a compatibility layer. While the changes look correct, we should ensure all consumers are updated accordingly.
Consider documenting the rationale for this transition and any migration guidelines for dependent code.
Also applies to: 22-22, 26-26
✅ Verification successful
All references to message types have been properly updated to use the legacy package
The codebase scan shows consistent usage of the legacy
package across all relevant files. No remaining references to the old messages
package were found, and all consumers (management_proxy.go
, management_handler.go
, management_client.go
, etc.) are correctly using the legacy types. The transition appears to be complete and consistent.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for any remaining references to the old message types
rg "messages\.(Register|UpdateInfo|UpdateResources)(Request|Response)" --type go
# Look for any mixed usage of legacy and non-legacy types
rg "legacy\.(Register|UpdateInfo|UpdateResources)(Request|Response)" --type go
Length of output: 4827
pkg/orchestrator/nodes/inmemory/inmemory.go (2)
58-58
: LGTM: Consistent error handling migration.
Error handling has been consistently migrated from the routing
package to the nodes
package, improving the separation of concerns.
Also applies to: 62-62, 98-98, 102-102
38-38
: Verify the impact of the Add → Put method rename.
The method rename from Add
to Put
and interface update to nodes.Store
suggest an API change. This could affect existing consumers.
Also applies to: 166-166
✅ Verification successful
No impact from Add → Put method rename
The method rename is part of a clean interface change. The Store interface in pkg/orchestrator/nodes/types.go
defines the Put
method, and all implementations (inmemory, kvstore, and mocks) consistently use Put
. There are no references to the old Add
method in the codebase, indicating this was likely renamed in a coordinated change across all implementations.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Find potential consumers of the old Add method
rg "\.Add\(\s*ctx\s*,\s*models\.NodeState\s*\)" --type go
# Verify all implementations of the new Store interface
ast-grep --pattern 'type $_ struct {
$$$
}
func (r *$_) Put(ctx context.Context, state models.NodeState) error {
$$$
}'
Length of output: 186
Script:
#!/bin/bash
# Search for any references to Add or Put methods with NodeState
rg "Add|Put.*NodeState" --type go -B 2 -A 2
# Find Store interface definition and implementations
ast-grep --pattern 'type Store interface {
$$$
}'
# Look for recent changes in the file
git log -p -- pkg/orchestrator/nodes/inmemory/inmemory.go | head -n 50
Length of output: 73451
Script:
#!/bin/bash
# Search for Store interface definition
rg "type Store interface" -A 10 --type go
# Search for any references to the old Add method in git history
git log -p | rg "Add.*NodeState"
# Look for Store implementations
rg "func.*Put.*NodeState" --type go -B 2 -A 2
Length of output: 2520
cmd/cli/node/columns.go (2)
122-124
: LGTM: Helper function correctly updated.
The ifComputeNode
helper has been correctly updated to handle value semantics, maintaining consistent behavior.
34-34
: Verify the ConnectionState.Status string representation.
The change from Connection.String()
to ConnectionState.Status.String()
suggests a structural change in how node status is represented.
✅ Verification successful
The change to use ConnectionState.Status.String()
is correct and aligns with the codebase structure.
The codebase shows a clear transition where:
ConnectionState
is the new structure containing theStatus
field of typeNodeConnectionState
- The old
Connection
field inNodeState
is marked as deprecated with a comment "Use ConnectionState.Status instead" - The change is consistently used across the codebase, as seen in multiple files (node.go, node_state.go, columns.go)
- The string representation is properly implemented in the
NodeConnectionState
type
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for any other usage of the old Connection.String() method
rg "\.Connection\.String\(\)" --type go
# Verify the new ConnectionState.Status type
ast-grep --pattern 'type $_ struct {
$$$
ConnectionState struct {
Status $_
$$$
}
$$$
}'
Length of output: 148
Script:
#!/bin/bash
# Let's try a broader search for ConnectionState and Status fields
rg "ConnectionState" --type go -A 5 -B 5
# Also search for any Status-related string conversions
rg "Status.*String\(\)" --type go -A 2 -B 2
# Look for type definitions containing Status
ast-grep --pattern 'type $_ struct {
Status $_
$$$
}'
# Search for any Connection-related types and methods
rg "type.*Connection" --type go -A 5
Length of output: 47570
pkg/orchestrator/nodes/kvstore/kvstore.go (3)
59-60
: Method renamed from Add to Put for clarity
The renaming better reflects the semantics of the operation, following key-value store conventions.
79-79
: Consistent error handling with new namespace
Error handling has been updated to use the new nodes
package namespace, improving consistency. The addition of NewErrMultipleNodesFound
provides better error handling for prefix searches.
Also applies to: 101-101, 112-112, 114-114
Line range hint 121-167
: Verify filter compatibility after namespace change
The List method's filter type has changed from routing.NodeStateFilter
to nodes.NodeStateFilter
.
✅ Verification successful
Let me gather more information to verify the filter compatibility after the namespace change.
Filter type change is correctly implemented across the codebase
The verification shows that the change from routing.NodeStateFilter
to nodes.NodeStateFilter
is consistent throughout the codebase:
- The type is properly defined in
pkg/orchestrator/nodes/types.go
- All implementations (kvstore, inmemory) and interfaces correctly use
nodes.NodeStateFilter
- No references to the old
routing.NodeStateFilter
were found - The filter type is consistently used with the same signature
func(models.NodeState) bool
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if the filter interface is used consistently across the codebase
ast-grep --pattern 'type NodeStateFilter func($_) bool'
Length of output: 141
Script:
#!/bin/bash
# Check for any other usages of NodeStateFilter in the codebase
rg "NodeStateFilter" -A 2
# Check for any imports of the old routing package that might still be using the old filter type
rg "routing.NodeStateFilter"
Length of output: 2589
pkg/orchestrator/watchers/protocol_router_test.go (1)
22-22
: Mock type updated to align with new package structure
The change from routing.MockNodeInfoStore
to nodes.MockLookup
aligns with the decoupling of node management from routing.
Also applies to: 33-33
pkg/orchestrator/selection/ranking/max_usage_test.go (1)
26-26
: Consider performance impact of value type usage
The change from pointer to value type for ComputeNodeInfo
could impact performance if the struct is large or frequently copied. Consider benchmarking to ensure this change doesn't introduce performance overhead.
Also applies to: 30-30, 34-34
pkg/orchestrator/nodes/kvstore/kvstore_test.go (2)
17-18
: LGTM: Package restructuring aligns with decoupling objectives
The import path changes from routing to nodes package align with the PR objective of decoupling node manager from transport layer.
63-64
: Consistent method naming improves API clarity
The transition from Add
to Put
method provides better semantic clarity for the operation being performed. This change is consistently applied across all test cases.
Also applies to: 97-98, 127-128, 173-174, 194-194, 198-198
pkg/orchestrator/nodes/inmemory/inmemory_test.go (2)
14-15
: LGTM: Consistent package restructuring
The import path changes maintain consistency with the kvstore implementation, demonstrating proper separation of concerns.
43-44
: Method signature changes maintain consistency
The transition from Add
to Put
is consistently applied across all test cases, maintaining API consistency with the kvstore implementation.
Also applies to: 78-79, 114-115, 160-161, 179-179, 183-183, 202-202
pkg/publicapi/endpoint/orchestrator/node.go (1)
79-79
: Verify ComputeNodeInfo initialization
The direct access to ComputeNodeInfo.AvailableCapacity
assumes it's always initialized. Consider adding a nil check to prevent potential panic.
pkg/orchestrator/watchers/ncl_message_creator_test.go (1)
16-16
: LGTM: Clean transition to the new node lookup interface.
The change from routing.MockNodeInfoStore
to nodes.MockLookup
aligns well with the PR's objective of decoupling node management from the transport layer. The mock initialization and usage remain consistent.
Also applies to: 24-24, 35-35
pkg/compute/management_client.go (1)
14-16
: Verify the impact of legacy message types.
The transition to legacy message types suggests a backward compatibility layer. Ensure this doesn't impact the long-term maintainability of the codebase.
✅ Verification successful
Legacy message types are part of a planned transition strategy
The usage of legacy message types appears to be part of an intentional backward compatibility layer with clear patterns across the codebase:
- The legacy types are consistently used in protocol-related components (bprotocol, NATS proxy)
- Comments indicate an ongoing transition strategy (e.g., "Default to legacy protocol", "TODO: Remove the legacy orchestrator ID")
- The implementation follows a clean pattern with proper interfaces and type safety
This suggests the codebase maintainability is being actively managed during the transition period rather than being compromised.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for any direct usage of non-legacy message types that might need migration
rg -t go "messages\.(Register|UpdateInfo|UpdateResources)Request" --glob '!pkg/models/messages/legacy/**'
Length of output: 107
Script:
#!/bin/bash
# Let's check for usage of legacy message types and their patterns
rg -t go "legacy\.(Register|UpdateInfo|UpdateResources)Request"
# Also check for any imports of the legacy package
rg -t go "messages/legacy"
# Look for any migration or compatibility related comments/docs
rg -t go "//.*legacy" -g "*.go"
Length of output: 8505
pkg/orchestrator/selection/ranking/available_capacity_test.go (1)
10-11
: LGTM! Changes are consistent with the codebase-wide refactor.
The import reordering and transition to value types for ComputeNodeInfo
maintain consistency with the architectural changes across the codebase.
Also applies to: 259-263
pkg/orchestrator/watchers/bprotocol_dispatcher_test.go (3)
17-17
: LGTM: Import change aligns with the decoupling effort.
The addition of the nodes package import reflects the architectural change to decouple node management from the transport layer.
26-26
: LGTM: Type change maintains test functionality.
The nodeStore field type change from *routing.MockNodeInfoStore
to *nodes.MockLookup
maintains the test's functionality while aligning with the new architecture.
41-41
: LGTM: Mock initialization updated correctly.
The mock initialization is correctly updated to use the new nodes.NewMockLookup
constructor.
pkg/nats/transport/nats.go (4)
16-17
: LGTM: Import changes reflect architectural update.
The addition of the bprotocol package import and removal of pubsub-related imports align with the decoupling of node management from the transport layer.
124-124
: LGTM: Type change enhances separation of concerns.
The managementProxy field type change to bprotocol.ManagementEndpoint
improves the separation between transport and node management concerns.
Line range hint 359-364
: LGTM: Method signature updated consistently.
The RegisterManagementEndpoint method signature is correctly updated to use the new bprotocol.ManagementEndpoint type.
383-385
: LGTM: Getter method updated consistently.
The ManagementProxy method return type is correctly updated to match the field type change.
pkg/orchestrator/mocks.go (2)
13-13
: LGTM: Import change aligns with mock updates.
The replacement of routing with nodes package import reflects the architectural change in the mocks.
Line range hint 291-300
: LGTM: Mock method signature updated correctly.
The List method signature in MockNodeDiscoverer is correctly updated to use nodes.NodeStateFilter, maintaining consistency with the new architecture.
pkg/node/compute.go (3)
33-34
: LGTM: Clean transport layer decoupling
The transition to bprotocol
packages and simplified heartbeat client initialization successfully decouples the node management from the transport layer. The removal of direct NATS dependency in the heartbeat client creation improves modularity.
Also applies to: 222-222
Line range hint 1-402
: LGTM: Well-structured changes
The changes are focused on the transport layer decoupling while maintaining the existing error handling and core functionality. The modifications are minimal and clean.
64-64
: Verify management proxy implementations
The parameter type change from compute.ManagementEndpoint
to bprotocol.ManagementEndpoint
is aligned with the decoupling strategy.
✅ Verification successful
Based on the search results, I can now provide a final response:
Type change is properly implemented across the codebase
The change from compute.ManagementEndpoint
to bprotocol.ManagementEndpoint
is correctly implemented:
- The interface is properly defined in
pkg/transport/bprotocol/types.go
- All implementations correctly use the new interface:
pkg/test/compute/mocks.go
: Mock implementation with compile-time checkpkg/transport/bprotocol/orchestrator/server.go
: Server implementationpkg/nats/proxy/management_handler.go
: NATS handler implementation
- All dependencies are updated to use the new type:
pkg/compute/management_client.go
: Client implementationpkg/nats/transport/nats.go
: Transport layer
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining references to the old ManagementEndpoint type
# and verify all implementations of the new interface
# Search for old type references
rg "compute\.ManagementEndpoint"
# Search for implementations of the new interface
ast-grep --pattern 'type $_ struct {
$$$
}
func ($v $_) $m($_) $_ {
$$$
}'
Length of output: 116
Script:
#!/bin/bash
# Let's try a more targeted approach to find interface implementations and usage
# Search for ManagementEndpoint interface definition and usage
rg -A 5 "ManagementEndpoint"
# Search specifically in bprotocol package for the interface
fd -e go . | grep "bprotocol" | xargs cat
# Look for struct types that might implement the interface
rg -A 10 "type.*struct" | grep -A 10 "Management"
Length of output: 51054
pkg/orchestrator/nodes/mocks.go (2)
1-3
: LGTM: Generated mocks
This is an auto-generated mock file created by MockGen. No manual modifications needed.
16-402
: Verify complete interface coverage
The generated mocks appear to cover the core node management interfaces. Let's verify no interfaces are missing.
✅ Verification successful
All required interfaces are properly mocked
The mock implementations in mocks.go
correctly cover all the interfaces defined in pkg/orchestrator/nodes/types.go
:
Manager
interface including all its methods (Handshake, UpdateNodeInfo, Heartbeat, etc.)Lookup
interface with its methods (Get, GetByPrefix, List)Store
interface with its methods (Put, Delete) and embedded Lookup interface
The mock implementations match the interface definitions exactly, with no missing methods or interfaces.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify all node-related interfaces are mocked
# Search for interface definitions
ast-grep --pattern 'type $name interface {
$$$
}'
# Search for any unmocked interfaces
rg "type \w+Interface interface"
Length of output: 88
Script:
#!/bin/bash
# Let's search for interface definitions in the same directory
fd -e go . pkg/orchestrator/nodes/ --exec cat {} | grep -A 10 "^type.*interface"
# Also search for any Manager/Store/Lookup interface definitions
rg "type (Manager|Store|Lookup) interface" -A 5
Length of output: 3558
@@ -39,7 +39,7 @@ func (s *OverSubscriptionNodeRanker) RankNodes( | |||
var rank int | |||
var reason string | |||
|
|||
if node.ComputeNodeInfo == nil || node.ComputeNodeInfo.MaxCapacity.IsZero() { | |||
if node.ComputeNodeInfo.MaxCapacity.IsZero() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference on ComputeNodeInfo
Accessing node.ComputeNodeInfo.MaxCapacity
without checking if ComputeNodeInfo
is nil
may lead to a runtime nil pointer dereference if ComputeNodeInfo
is nil
. It's important to ensure that node.ComputeNodeInfo
is not nil
before accessing its fields.
Consider adding a nil check to prevent potential crashes:
+ if node.ComputeNodeInfo == nil || node.ComputeNodeInfo.MaxCapacity.IsZero() {
rank = orchestrator.RankUnsuitable
reason = "node queue usage is unknown"
} else {
// existing logic
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer a pointer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wdbaruni, thank you for pointing that out. Since ComputeNodeInfo
is no longer a pointer, the nil check is unnecessary. I apologize for the confusion.
(A curious rabbit tilts its head in understanding)
✏️ Learnings added
Learnt from: wdbaruni
PR: bacalhau-project/bacalhau#4728
File: pkg/orchestrator/selection/ranking/over_subscription.go:42-42
Timestamp: 2024-12-08T15:03:20.765Z
Learning: In the codebase, `ComputeNodeInfo` is now a value type within `NodeInfo`, and is not a pointer. Therefore, nil checks on `ComputeNodeInfo` are unnecessary.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (4)
webui/lib/api/schema/swagger.json (1)
Line range hint
1859-1884
: Good deprecation handling in NodeState modelThe changes to NodeState are well-structured:
- Properly marked old Connection field as deprecated
- Clear migration path to new ConnectionState
- Improved documentation for Info field
Consider adding a migration guide in the documentation to help users transition from the deprecated Connection field to the new ConnectionState.Status.
cmd/cli/agent/node_test.go (1)
29-42
: Consider expanding test coverageWhile the basic ID verification is good, consider adding tests for:
- Validation of other NodeInfo fields
- Error cases (malformed output)
- Edge cases specific to the new NodeInfo structure
Would you like me to provide examples of additional test cases?
pkg/node/requester.go (2)
72-76
: Consider adding connection timeout and error contextThe NATS connection creation should include timeout and detailed error context for better operational debugging.
- natsConn, err := transportLayer.CreateClient(ctx) - if err != nil { - return nil, err - } + natsConn, err := transportLayer.CreateClient(ctx) + if err != nil { + return nil, pkgerrors.Wrap(err, "failed to create NATS connection") + } + + // Ensure connection is valid + if !natsConn.IsConnected() { + return nil, fmt.Errorf("NATS connection is not established") + }
9-10
: Consider using structured logging consistentlyThe code imports
zerolog/log
but some error handling sections don't utilize structured logging.Replace direct error returns with structured logging where appropriate throughout the file.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
webui/lib/api/generated/types.gen.ts
is excluded by!**/generated/**
📒 Files selected for processing (5)
cmd/cli/agent/node_test.go
(1 hunks)pkg/node/requester.go
(14 hunks)pkg/swagger/docs.go
(3 hunks)pkg/swagger/swagger.json
(2 hunks)webui/lib/api/schema/swagger.json
(2 hunks)
🔇 Additional comments (7)
webui/lib/api/schema/swagger.json (1)
1268-1302
: Well-structured ConnectionState model with comprehensive tracking fields
The new ConnectionState model is well-designed with:
- Clear separation of connection tracking timestamps
- Message sequencing for reliable delivery
- Error tracking
- Proper status handling through NodeConnectionState
pkg/swagger/swagger.json (1)
Line range hint 1-2500
: Identical changes to webui/lib/api/schema/swagger.json
This file contains the same Swagger schema changes as previously reviewed.
pkg/swagger/docs.go (1)
1-1
: Properly marked as generated code
The file is correctly marked as generated with "DO NOT EDIT" comment.
cmd/cli/agent/node_test.go (2)
39-42
: LGTM: Consistent with JSON test implementation
The YAML test changes mirror the JSON test implementation, maintaining consistency in the testing approach.
29-32
: Verify unmarshalling compatibility with NodeInfo structure
The change from NodeState
to NodeInfo
looks good, but we should verify the JSON structure compatibility.
✅ Verification successful
Let me analyze the results. The search shows that NodeState
is still actively used in the codebase, particularly in the orchestrator and node management components. However, the change in the test file appears to be part of a planned transition where NodeInfo
is being used for specific scenarios while NodeState
continues to exist for its original purposes.
Let's verify the NodeInfo
structure and its relationship with NodeState
to ensure this change is consistent:
Based on the results, I can now generate the final response. The search shows that NodeInfo
is a valid structure that contains core node identification and capability information, while NodeState
(from previous search) includes additional runtime state information. The change in the test from NodeState
to NodeInfo
appears to be correct as it's using the appropriate structure for the JSON unmarshalling test case.
The change to NodeInfo is correct and consistent with the codebase structure
The transition from NodeState
to NodeInfo
in the test is appropriate as:
NodeInfo
contains the core node identification fields needed for the test- The test is specifically verifying JSON output format, which aligns with
NodeInfo
's JSON-tagged fields - The ID comparison test uses
ID()
method which is available on both types
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify NodeInfo structure usage and compatibility
# Check for any remaining NodeState references that might need updating
rg "NodeState" --type go
# Look for other tests that might be affected by this change
ast-grep --pattern 'func Test$_($_ *testing.T) {
$$$
NodeState
$$$
}'
Length of output: 21621
Script:
#!/bin/bash
# Check NodeInfo structure and its relationship with NodeState
rg "type NodeInfo" --type go -A 5
ast-grep --pattern 'type NodeInfo struct {
$$$
}'
Length of output: 2945
pkg/node/requester.go (2)
318-323
: LGTM: Proper cleanup of node manager
The addition of node manager cleanup in the cleanup function is correct and follows the proper order of operations.
394-398
: Verify node manager configuration values
Ensure that the configuration values for NodeDisconnectedAfter
and ManualApproval
are properly validated.
✅ Verification successful
Let me gather more information about the validation of these configuration values.
Let me check one more aspect to see if there's any validation in the node manager implementation.
Based on the code analysis, I can now provide the final response:
NodeDisconnectedAfter and ManualApproval values are properly handled
The node manager implementation includes proper handling of these configuration values:
-
NodeDisconnectedAfter:
- Used to calculate health check frequency with reasonable bounds (1s to 30s)
- Cannot be zero as it's marked as "Required" in the params struct
- Used directly in health checks to determine node disconnection state
-
ManualApproval:
- Boolean flag that properly sets the node's initial approval state
- When true, nodes start in PENDING state requiring manual approval
- When false, nodes are automatically APPROVED
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for configuration validation in the codebase
rg -A 5 "NodeDisconnectedAfter|ManualApproval" --type go
# Search for any existing validation patterns
ast-grep --pattern 'if $config.$_ <= 0 {
$$$
}'
Length of output: 15160
Script:
#!/bin/bash
# Search for validation of NodeDisconnectedAfter in the node manager
ast-grep --pattern 'func NewNodeManager($params) {
$$$
}'
# Search for Duration validation patterns
rg -A 5 "DisconnectTimeout.*AsTimeDuration|NodeDisconnectedAfter.*AsTimeDuration" --type go
# Look for any validation in the config package
rg -A 5 "func.*Validate" --type go pkg/config/
Length of output: 4037
Script:
#!/bin/bash
# Search for the node manager implementation to check validation
cat pkg/orchestrator/nodes/manager.go
# Check for any duration validation in the config types
rg -A 5 "Duration.*Validate|Validate.*Duration" --type go
Length of output: 20574
heartbeatServer := bprotocolorchestrator.NewServer(nodesManager) | ||
if err = transportLayer.RegisterManagementEndpoint(heartbeatServer); err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for heartbeat server registration failure
The heartbeat server registration should include proper cleanup of resources in case of failure.
heartbeatServer := bprotocolorchestrator.NewServer(nodesManager)
if err = transportLayer.RegisterManagementEndpoint(heartbeatServer); err != nil {
- return nil, err
+ if cleanupErr := nodesManager.Stop(ctx); cleanupErr != nil {
+ log.Error().Err(cleanupErr).Msg("failed to cleanup node manager after heartbeat server registration failure")
+ }
+ return nil, pkgerrors.Wrap(err, "failed to register heartbeat server")
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
heartbeatServer := bprotocolorchestrator.NewServer(nodesManager) | |
if err = transportLayer.RegisterManagementEndpoint(heartbeatServer); err != nil { | |
return nil, err | |
} | |
heartbeatServer := bprotocolorchestrator.NewServer(nodesManager) | |
if err = transportLayer.RegisterManagementEndpoint(heartbeatServer); err != nil { | |
if cleanupErr := nodesManager.Stop(ctx); cleanupErr != nil { | |
log.Error().Err(cleanupErr).Msg("failed to cleanup node manager after heartbeat server registration failure") | |
} | |
return nil, pkgerrors.Wrap(err, "failed to register heartbeat server") | |
} |
pkg/node/requester.go
Outdated
func createNodeManager(ctx context.Context, cfg NodeConfig, natsConn *nats.Conn) ( | ||
nodes.Manager, nodes.Store, error) { | ||
nodeInfoStore, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ | ||
BucketName: kvstore.BucketNameCurrent, | ||
Client: transportLayer.Client(), | ||
Client: natsConn, | ||
}) | ||
if err != nil { | ||
return nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info") | ||
return nil, nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info") | ||
} | ||
|
||
tracingInfoStore := tracing.NewNodeStore(nodeInfoStore) | ||
nodeManager, err := nodes.NewNodeManager(nodes.NodeManagerParams{ | ||
Store: nodeInfoStore, | ||
NodeDisconnectedAfter: cfg.BacalhauConfig.Orchestrator.NodeManager.DisconnectTimeout.AsTimeDuration(), | ||
ManualApproval: cfg.BacalhauConfig.Orchestrator.NodeManager.ManualApproval, | ||
}) | ||
|
||
if err != nil { | ||
return nil, nil, pkgerrors.Wrap(err, "failed to create node manager") | ||
} | ||
|
||
// Once the KV store has been created, it can be offered to the transport layer to be used as a consumer | ||
// of node info. | ||
if err = transportLayer.RegisterNodeInfoConsumer(ctx, tracingInfoStore); err != nil { | ||
return nil, pkgerrors.Wrap(err, "failed to register node info consumer with nats transport") | ||
if err = nodeManager.Start(ctx); err != nil { | ||
return nil, nil, pkgerrors.Wrap(err, "failed to start node manager") | ||
} | ||
return tracingInfoStore, nil | ||
|
||
return nodeManager, nodeInfoStore, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding retry mechanism for node manager initialization
The node manager creation and initialization is critical for the system's operation. Consider adding a retry mechanism for transient failures.
func createNodeManager(ctx context.Context, cfg NodeConfig, natsConn *nats.Conn) (
nodes.Manager, nodes.Store, error) {
+ var nodeInfoStore nodes.Store
+ var nodeManager nodes.Manager
+ var err error
+
+ // Retry configuration
+ maxRetries := 3
+ retryDelay := time.Second
+
+ for i := 0; i < maxRetries; i++ {
nodeInfoStore, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: kvstore.BucketNameCurrent,
Client: natsConn,
})
if err != nil {
+ if i < maxRetries-1 {
+ log.Warn().Err(err).Msgf("Failed to create node store (attempt %d/%d), retrying...", i+1, maxRetries)
+ time.Sleep(retryDelay)
+ continue
+ }
return nil, nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info")
}
nodeManager, err = nodes.NewNodeManager(nodes.NodeManagerParams{
Store: nodeInfoStore,
NodeDisconnectedAfter: cfg.BacalhauConfig.Orchestrator.NodeManager.DisconnectTimeout.AsTimeDuration(),
ManualApproval: cfg.BacalhauConfig.Orchestrator.NodeManager.ManualApproval,
})
if err != nil {
+ if i < maxRetries-1 {
+ log.Warn().Err(err).Msgf("Failed to create node manager (attempt %d/%d), retrying...", i+1, maxRetries)
+ time.Sleep(retryDelay)
+ continue
+ }
return nil, nil, pkgerrors.Wrap(err, "failed to create node manager")
}
if err = nodeManager.Start(ctx); err != nil {
+ if i < maxRetries-1 {
+ log.Warn().Err(err).Msgf("Failed to start node manager (attempt %d/%d), retrying...", i+1, maxRetries)
+ time.Sleep(retryDelay)
+ continue
+ }
return nil, nil, pkgerrors.Wrap(err, "failed to start node manager")
}
+ break
+ }
return nodeManager, nodeInfoStore, nil
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func createNodeManager(ctx context.Context, cfg NodeConfig, natsConn *nats.Conn) ( | |
nodes.Manager, nodes.Store, error) { | |
nodeInfoStore, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ | |
BucketName: kvstore.BucketNameCurrent, | |
Client: transportLayer.Client(), | |
Client: natsConn, | |
}) | |
if err != nil { | |
return nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info") | |
return nil, nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info") | |
} | |
tracingInfoStore := tracing.NewNodeStore(nodeInfoStore) | |
nodeManager, err := nodes.NewNodeManager(nodes.NodeManagerParams{ | |
Store: nodeInfoStore, | |
NodeDisconnectedAfter: cfg.BacalhauConfig.Orchestrator.NodeManager.DisconnectTimeout.AsTimeDuration(), | |
ManualApproval: cfg.BacalhauConfig.Orchestrator.NodeManager.ManualApproval, | |
}) | |
if err != nil { | |
return nil, nil, pkgerrors.Wrap(err, "failed to create node manager") | |
} | |
// Once the KV store has been created, it can be offered to the transport layer to be used as a consumer | |
// of node info. | |
if err = transportLayer.RegisterNodeInfoConsumer(ctx, tracingInfoStore); err != nil { | |
return nil, pkgerrors.Wrap(err, "failed to register node info consumer with nats transport") | |
if err = nodeManager.Start(ctx); err != nil { | |
return nil, nil, pkgerrors.Wrap(err, "failed to start node manager") | |
} | |
return tracingInfoStore, nil | |
return nodeManager, nodeInfoStore, nil | |
func createNodeManager(ctx context.Context, cfg NodeConfig, natsConn *nats.Conn) ( | |
nodes.Manager, nodes.Store, error) { | |
var nodeInfoStore nodes.Store | |
var nodeManager nodes.Manager | |
var err error | |
// Retry configuration | |
maxRetries := 3 | |
retryDelay := time.Second | |
for i := 0; i < maxRetries; i++ { | |
nodeInfoStore, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ | |
BucketName: kvstore.BucketNameCurrent, | |
Client: natsConn, | |
}) | |
if err != nil { | |
if i < maxRetries-1 { | |
log.Warn().Err(err).Msgf("Failed to create node store (attempt %d/%d), retrying...", i+1, maxRetries) | |
time.Sleep(retryDelay) | |
continue | |
} | |
return nil, nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info") | |
} | |
nodeManager, err = nodes.NewNodeManager(nodes.NodeManagerParams{ | |
Store: nodeInfoStore, | |
NodeDisconnectedAfter: cfg.BacalhauConfig.Orchestrator.NodeManager.DisconnectTimeout.AsTimeDuration(), | |
ManualApproval: cfg.BacalhauConfig.Orchestrator.NodeManager.ManualApproval, | |
}) | |
if err != nil { | |
if i < maxRetries-1 { | |
log.Warn().Err(err).Msgf("Failed to create node manager (attempt %d/%d), retrying...", i+1, maxRetries) | |
time.Sleep(retryDelay) | |
continue | |
} | |
return nil, nil, pkgerrors.Wrap(err, "failed to create node manager") | |
} | |
if err = nodeManager.Start(ctx); err != nil { | |
if i < maxRetries-1 { | |
log.Warn().Err(err).Msgf("Failed to start node manager (attempt %d/%d), retrying...", i+1, maxRetries) | |
time.Sleep(retryDelay) | |
continue | |
} | |
return nil, nil, pkgerrors.Wrap(err, "failed to start node manager") | |
} | |
break | |
} | |
return nodeManager, nodeInfoStore, nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
pkg/node/requester.go
(14 hunks)pkg/orchestrator/nodes/manager.go
(1 hunks)pkg/orchestrator/nodes/manager_test.go
(1 hunks)pkg/orchestrator/nodes/types.go
(1 hunks)pkg/transport/bprotocol/orchestrator/heartbeat_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/transport/bprotocol/orchestrator/heartbeat_test.go
🔇 Additional comments (9)
pkg/node/requester.go (7)
268-271
: Add error handling for heartbeat server registration failure
This is a duplicate of a previous review comment. The suggestion to add proper cleanup of resources in case of heartbeat server registration failure is still valid.
318-323
: LGTM: Proper cleanup handling for node manager
The cleanup function now correctly includes stopping the node manager and handles errors appropriately.
384-408
: Consider adding retry mechanism for node manager initialization
This is a duplicate of a previous review comment. The suggestion to add a retry mechanism for transient failures during node manager creation and initialization is still valid.
7-7
: LGTM: Clean import organization
The new imports are properly organized and relevant to the changes made in the file.
Also applies to: 25-26, 41-41
125-125
: LGTM: Consistent usage of node manager
The node manager is consistently used across different components (selector, debug info provider, and endpoint parameters).
Also applies to: 217-217, 227-227
394-398
: Verify node manager configuration values
Please ensure that the configuration values for NodeDisconnectedAfter
and ManualApproval
are properly documented and their implications are clear.
✅ Verification successful
Configuration values are well documented
The configuration values for both NodeDisconnectedAfter
and ManualApproval
are properly documented in multiple locations:
NodeDisconnectedAfter
: "This is the time period after which a compute node is considered to be disconnected. If the compute node does not deliver a heartbeat everyNodeDisconnectedAfter
then it is considered disconnected."ManualApproval
: "ManualApproval, if true, requires manual approval for new compute nodes joining the cluster."
The documentation is consistently present in:
- API schema (swagger.json)
- Configuration type definitions
- Generated documentation files
- Configuration descriptions
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for documentation of node manager configuration
rg -A 5 "NodeDisconnectedAfter|ManualApproval" --type md
# Search for usage of these configuration values
ast-grep --pattern 'NodeDisconnectedAfter: $_'
ast-grep --pattern 'ManualApproval: $_'
Length of output: 149
Script:
#!/bin/bash
# Let's try a broader search for configuration documentation
rg -i "node.*disconnect|manual.*approval" --type md
# Search for the configuration struct definition
rg -A 10 "type ManagerParams struct"
# Look for any configuration examples or defaults
fd config.yaml --exec cat {}
# Search for any comments or documentation near the configuration usage
rg -B 3 -A 3 "NodeDisconnectedAfter|ManualApproval"
Length of output: 29543
56-56
: Ensure consistent usage of NodeInfoStore type
The NodeInfoStore
field type has been updated to use nodes.Store
. Verify that all interactions with this field are using the correct interface methods.
Also applies to: 339-339
✅ Verification successful
The previous ast-grep pattern didn't yield results, so I'm using ripgrep to find all occurrences of NodeInfoStore and examining the Store interface definition to verify compatibility.
I'm searching for the Store interface definition to verify the methods that need to be supported, and looking for actual usages of NodeInfoStore to verify they match the interface.
Based on the search results, I can now generate the final response. The code shows the nodes.Store
interface definition and all actual usages of NodeInfoStore
in the codebase.
NodeInfoStore interface usage is consistent
The nodes.Store
interface requires implementation of Lookup
(which includes Get
and List
methods) and Put
/Delete
operations. All observed usages in the codebase are consistent with this interface:
List()
used in test utilitiesGet()
used in agent testsPut()
used in node package
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for all usages of NodeInfoStore to verify interface compatibility
ast-grep --pattern '$_.NodeInfoStore'
Length of output: 39
Script:
#!/bin/bash
# Search for all usages of NodeInfoStore using ripgrep
rg "NodeInfoStore" -A 2 -B 2
# Also search for the Store interface definition in nodes package
fd -e go -x grep -l "type Store" {} | xargs cat
Length of output: 51835
Script:
#!/bin/bash
# Search for the Store interface definition in the nodes package
rg "type Store interface" -A 10 -B 2
# Search for usages of NodeInfoStore
rg "NodeInfoStore\." -A 2 -B 2
Length of output: 3092
pkg/orchestrator/nodes/types.go (1)
1-138
: Well-structured code with clear interfaces and thorough documentation
The interfaces and types for node lifecycle and health management are well-defined and clearly documented. This enhances maintainability and readability of the codebase.
pkg/orchestrator/nodes/manager_test.go (1)
384-466
: Ensure nodes are approved when ManualApproval
is enabled
In the TestConcurrentOperations
test, the NodeManager
is initialized with ManualApproval: true
, but the nodes are not being approved after the handshake. This can cause the nodes to remain in a pending state, affecting the test outcome. Please approve the nodes after the handshake to accurately test node operations under manual approval settings.
Apply this diff to approve nodes after handshake:
resp, err := manager.Handshake(ctx, messages.HandshakeRequest{
NodeInfo: nodeInfo,
})
s.Require().NoError(err)
s.Require().True(resp.Accepted)
+// Approve the node since ManualApproval is enabled
+err = manager.ApproveNode(ctx, nodeID)
+s.Require().NoError(err)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
cmd/cli/node/action.go (1)
9-9
: LGTM: Improved error handling with better contextThe change from direct fatal error to using
bacerrors.Wrap
improves error propagation and provides better context. This aligns well with the PR's objective of making the system more robust.Consider adding error codes to make error handling more systematic:
- return bacerrors.Wrap(err, "failed to %s node %s", n.action, nodeID) + return bacerrors.WithCode(err, bacerrors.NodeOperationError, "failed to %s node %s", n.action, nodeID)Also applies to: 60-60
pkg/orchestrator/nodes/kvstore/kvstore_test.go (1)
221-223
: Consider adding validation for ExecutionEnginesThe
ComputeNodeInfo
structure might benefit from validation to ensure non-empty engine list.func generateNodeInfo(peerID string, engines ...string) models.NodeInfo { + if len(engines) == 0 { + engines = []string{models.EngineDocker} // or other default + } return models.NodeInfo{ NodeID: peerID, NodeType: models.NodeTypeCompute, ComputeNodeInfo: models.ComputeNodeInfo{ ExecutionEngines: engines, }, } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (9)
cmd/cli/node/action.go
(2 hunks)cmd/cli/node/action_test.go
(2 hunks)pkg/orchestrator/nodes/errors.go
(1 hunks)pkg/orchestrator/nodes/inmemory/inmemory.go
(6 hunks)pkg/orchestrator/nodes/inmemory/inmemory_test.go
(11 hunks)pkg/orchestrator/nodes/kvstore/kvstore.go
(6 hunks)pkg/orchestrator/nodes/kvstore/kvstore_test.go
(9 hunks)pkg/orchestrator/nodes/manager.go
(1 hunks)pkg/orchestrator/nodes/manager_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- pkg/orchestrator/nodes/inmemory/inmemory.go
- pkg/orchestrator/nodes/inmemory/inmemory_test.go
- pkg/orchestrator/nodes/kvstore/kvstore.go
🔇 Additional comments (9)
pkg/orchestrator/nodes/kvstore/kvstore_test.go (4)
16-19
: LGTM: Clean package restructuring
The import changes properly reflect the decoupling of node management from the routing package, aligning with the PR's objective.
33-33
: LGTM: Improved interface abstraction
The change from routing.NodeInfoStore
to nodes.Store
provides better separation of concerns and a more generic interface.
81-81
: LGTM: Consistent error handling
The error handling has been standardized using bacerrors
package with proper error codes, making error checking more reliable.
Also applies to: 103-103, 110-110, 121-121
64-64
: Verify consistent usage of Put method across codebase
The method name change from Add
to Put
is more RESTful. Let's verify this change is consistent across the codebase.
Also applies to: 65-65, 87-87, 98-98, 99-99, 128-128, 129-129, 141-141, 142-142, 174-174, 175-175, 195-195, 199-199
✅ Verification successful
Based on the search results, I can see that there are no remaining instances of the Add
method in the context of node store implementations. All the occurrences of .Add(
in the codebase are related to different functionalities:
- Time operations:
time.Add()
- Resource calculations:
total.Add()
,resources.Add()
- Metrics and counters:
counter.Add()
,gauge.Add()
- Collection operations:
wg.Add()
,chain.Add()
- HTTP headers:
Header.Add()
Let's verify if there are any remaining references to the node store's Add
method by searching for it specifically in the node store related files:
Let me try one final verification to ensure we catch any potential references by searching for both the old and new method names in the node store implementations:
Based on the complete source code of the node store implementations, I can now provide the final response:
Method name change from Add
to Put
is consistent across the codebase
The codebase consistently uses the Put
method for storing node states in the key-value store. There are no remaining references to the Add
method in the node store implementations. The interface and implementation are aligned with RESTful naming conventions.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining usage of Add method in node store implementations
# Search for Add method definitions
echo "Checking for Add method definitions:"
ast-grep --pattern 'func $$$Add($$$) $$$'
# Search for Add method calls
echo "Checking for Add method calls:"
rg '\.Add\('
Length of output: 17663
Script:
#!/bin/bash
# Search for Add method specifically in node store related files
echo "Checking node store files for Add method:"
fd "store.*\.go" pkg/orchestrator/nodes/ --exec rg "Add\(" {}
Length of output: 158
Script:
#!/bin/bash
# Search for both Add and Put methods in node store related files
echo "Checking node store implementations:"
fd ".*store.*\.go" pkg/orchestrator/nodes/ --type f --exec cat {}
Length of output: 10971
pkg/orchestrator/nodes/errors.go (1)
39-45
: Verify the appropriateness of HTTP status code for handshake requirement
The NewErrHandshakeRequired
function returns an HTTP status code of 428 Precondition Required
. Ensure that this status code accurately reflects the client's error in the context of requiring a handshake. Alternatively, consider using 401 Unauthorized
or 400 Bad Request
if more appropriate for your API's error handling conventions.
Run the following script to check the usage of HTTP status codes in error handling:
✅ Verification successful
HTTP 428 Precondition Required is the correct status code for handshake requirement
The current implementation using http.StatusPreconditionRequired
(428) is the most appropriate choice for the handshake requirement scenario because:
-
According to RFC 6585, status code 428 is specifically designed for situations where the server requires the client to meet certain preconditions before processing the request, which perfectly matches the handshake requirement case.
-
The code is consistently used in the node management context where a handshake is a prerequisite for node operations, as evidenced by its usage in:
- Node info updates (
UpdateNodeInfoResponse
) - Heartbeat operations (
HeartbeatResponse
)
- Node info updates (
-
Alternative status codes would be less accurate:
- 401 Unauthorized is for authentication failures
- 400 Bad Request implies invalid request syntax
- 428 Precondition Required correctly indicates a valid request that cannot be processed until a precondition (handshake) is met
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all instances where HTTP status codes are set in the codebase.
# Search for lines setting HTTP status codes in error responses.
rg --type go '\.WithHTTPStatusCode\(http\.Status[A-Za-z]+\)' -A 2
Length of output: 67
Script:
#!/bin/bash
# Let's try a broader search pattern to find HTTP status code usage
rg --type go "http\.Status[A-Za-z]+" -A 2
# Also search for the specific error type and its usage
rg --type go "HandshakeRequired" -A 2
# Look for similar error handling patterns
rg --type go "NewErr.*handshake" -i -A 2
Length of output: 41855
pkg/orchestrator/nodes/manager.go (1)
121-129
:
Prevent potential division by zero in heartbeatCheckFrequency
calculation
In the NewManager
function, if params.NodeDisconnectedAfter
is zero or uninitialized, dividing by heartbeatCheckFrequencyFactor
can cause a division by zero error. To prevent this, ensure that NodeDisconnectedAfter
is validated to be greater than zero before performing the division.
Apply this diff to add validation:
func NewManager(params ManagerParams) (Manager, error) {
+ if params.NodeDisconnectedAfter <= 0 {
+ return nil, fmt.Errorf("NodeDisconnectedAfter must be greater than zero")
+ }
if params.Clock == nil {
params.Clock = clock.New()
}
// Determine initial approval state based on configuration
pkg/orchestrator/nodes/manager_test.go (1)
413-419
:
Ensure nodes are approved when ManualApproval
is enabled
In the TestConcurrentOperations
test, the NodeManager
is initialized with ManualApproval: true
, but the nodes are not being approved after the handshake. This could cause the nodes to remain in a pending state, affecting the test outcome. Approve the nodes after the handshake to accurately test node operations under manual approval settings.
Apply this diff to approve nodes after the handshake:
resp, err := manager.Handshake(ctx, messages.HandshakeRequest{
NodeInfo: nodeInfo,
})
s.Require().NoError(err)
s.Require().True(resp.Accepted)
+// Approve the node since ManualApproval is enabled
+err = manager.ApproveNode(ctx, nodeID)
+s.Require().NoError(err)
cmd/cli/node/action_test.go (2)
44-51
: Handle error correctly when approving an already approved node
The test now correctly expects an error when attempting to approve an already approved node. This change ensures that the test accurately reflects the system's behavior and properly checks for the expected error message.
68-70
: Handle error correctly when rejecting an already rejected node
Similarly, the test correctly expects an error when trying to reject an already rejected node. This adjustment improves the test's robustness by verifying that the appropriate error is returned.
Refactor: New Node Manager Implementation
This PR introduces a new node manager implementation that decouples node lifecycle management from the transport layer. The existing implementation was tightly coupled with bprotocol specifics, making it difficult to evolve our networking stack. It also lacked graceful handling of node re-connection. This refactoring is a key step towards our new transport layer architecture.
Key Changes:
Visible Changes:
Summary by CodeRabbit
New Features
HeartbeatClient
for managing heartbeat messages.Server
structure for orchestrating node management.Bug Fixes
Documentation
Tests
Chores