-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
better exchange of sequence numbers during handshakes #4765
Conversation
WalkthroughThis pull request introduces a comprehensive set of changes focused on enhancing node shutdown notification mechanisms, improving error handling, and refining state management across various components of the distributed system. The modifications span multiple packages, introducing new constants, structs, and methods to facilitate graceful node disconnection, better sequence number tracking, and more robust recovery processes. Key areas of improvement include message handling for shutdown notices, state reset capabilities in dispatchers, and enhanced connection management. Changes
Sequence DiagramsequenceDiagram
participant Compute Node
participant Orchestrator
participant Health Tracker
Compute Node->>Orchestrator: ShutdownNoticeRequest
Orchestrator->>Health Tracker: Update Connection State
Orchestrator-->>Compute Node: ShutdownNoticeResponse
Compute Node->>Compute Node: Save Last Sequence Numbers
Possibly related PRs
Suggested reviewers
Poem
Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
pkg/models/messages/control_plane.go (1)
Line range hint
19-19
: Fix incorrect JSON tag and comment for StartingOrchestratorSeqNumThe field comment and JSON tag are inconsistent with the field name:
- Comment says "Seq to start sending to compute node" but uses "Last" in the text
- JSON tag
LastOrchestratorSeqNum
doesn't match field nameStartingOrchestratorSeqNum
Apply this diff to fix the inconsistency:
- StartingOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum"` // Seq to start sending to compute node + StartingOrchestratorSeqNum uint64 `json:"StartingOrchestratorSeqNum"` // Starting seq to send to compute node
🧹 Nitpick comments (11)
pkg/orchestrator/nodes/manager.go (1)
840-856
: Consider adding validation to prevent sequence number regressionThe
updateSequenceNumbers
method currently trusts the sequence numbers provided, which could allow them to move backward in certain failure scenarios. Implementing validation logic to ensure sequence numbers only advance forward would enhance robustness.Consider adding checks to compare current and new sequence numbers, updating them only if the new values are greater than the existing ones. This prevents potential inconsistencies in message ordering.
pkg/transport/nclprotocol/dispatcher/state.go (2)
39-40
: Add missing documentation for GetState methodThe GetState method is missing its documentation comment.
Add a descriptive comment:
-// GetState returns +// GetState returns a snapshot of the current dispatcher state including +// sequence numbers and checkpoint information
Line range hint
51-55
: Add sequence number validation in update methodsThe sequence number update methods should validate inputs to prevent potential issues with invalid values.
Consider adding validation:
func (s *dispatcherState) updateLastAcked(seqNum uint64) { + if seqNum == 0 { + log.Warn().Uint64("seqNum", seqNum).Msg("Ignoring invalid sequence number in updateLastAcked") + return + } s.mu.Lock() defer s.mu.Unlock() s.lastAckedSeqNum = max(seqNum, s.lastAckedSeqNum) } func (s *dispatcherState) updateLastObserved(seqNum uint64) { + if seqNum == 0 { + log.Warn().Uint64("seqNum", seqNum).Msg("Ignoring invalid sequence number in updateLastObserved") + return + } s.mu.Lock() defer s.mu.Unlock() s.lastObservedSeq = seqNum }Also applies to: 57-61
pkg/transport/nclprotocol/compute/health_tracker.go (1)
76-81
: Rename HandshakeRequired method for clarityThe method name could be more descriptive about its action.
Consider renaming to better indicate it's setting a state:
-// HandshakeRequired marks that a handshake is required -func (ht *HealthTracker) HandshakeRequired() { +// SetHandshakeRequired marks that a handshake is required +func (ht *HealthTracker) SetHandshakeRequired() {pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)
198-222
: Consider enhancing shutdown sequence verificationWhile the test effectively verifies the watcher stop sequence, it could be enhanced to verify all documented steps:
- Recovery stop
- StopCh close
- Watcher stop
- Goroutine wait
Consider adding expectations for recovery stop and verifying goroutine completion:
gomock.InOrder( + suite.recovery.EXPECT().stop(), suite.watcher.EXPECT().Stop(gomock.Any()), ) + +// Verify goroutines complete +done := make(chan struct{}) +go func() { + d.routinesWg.Wait() + close(done) +}() + +select { +case <-done: + // Success +case <-time.After(100 * time.Millisecond): + suite.Fail("Goroutines did not complete") +}pkg/transport/nclprotocol/dispatcher/dispatcher.go (2)
129-134
: Consider adding timeout handling for recovery stopWhile the shutdown sequence is correct, the recovery stop operation could potentially block. Consider adding timeout handling for a more robust shutdown.
-// Signal recovery to stop -d.recovery.stop() +// Signal recovery to stop with timeout +recoveryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) +defer cancel() +if err := d.recovery.stop(recoveryCtx); err != nil { + log.Warn().Err(err).Msg("Recovery stop timed out") +}
252-270
: Consider adding metrics for checkpoint operationsThe checkpoint implementation is solid, but adding metrics would help monitor the health and performance of the checkpointing system.
Consider adding metrics for:
- Checkpoint attempts
- Checkpoint successes/failures
- Checkpoint duration
- Time since last successful checkpoint
Example implementation:
func (d *Dispatcher) doCheckpoint(ctx context.Context) { + metrics.CheckpointAttempts.Inc() + start := time.Now() + defer func() { + metrics.CheckpointDuration.Observe(time.Since(start).Seconds()) + }() + checkpointTarget := d.state.getCheckpointSeqNum() if checkpointTarget == 0 { // Nothing new to checkpoint return } // ... existing code ... if err := d.watcher.Checkpoint(checkpointCtx, checkpointTarget); err != nil { + metrics.CheckpointFailures.Inc() log.Error().Err(err). Uint64("target", checkpointTarget). Msg("Failed to checkpoint watcher") return } + metrics.CheckpointSuccesses.Inc() + metrics.LastSuccessfulCheckpoint.SetToCurrentTime() d.state.updateLastCheckpoint(checkpointTarget) }pkg/transport/nclprotocol/compute/dataplane.go (1)
177-194
: Well-documented implementation with future extensibilityThe implementation prioritizes reliability by always starting from the beginning, with clear documentation explaining the tradeoff and future optimization possibilities.
Consider adding metrics/logging to track:
- How often nodes restart with the same ID
- The gap between lastReceivedSeqNum and actual processed messages
This data would help evaluate when to implement the optimization of using lastReceivedSeqNum.pkg/transport/nclprotocol/compute/dataplane_test.go (1)
249-299
: LGTM: Comprehensive test coverageThe test effectively verifies that all messages are processed from the beginning, regardless of the LastReceivedSeqNum value.
Consider adding a test case where events are added after the dispatcher starts to verify continuous processing:
} + // Verify continuous processing + additionalEvents := []models.ExecutionUpsert{ + { + Current: &models.Execution{ + ID: "test-job-3", + NodeID: "test-node", + }, + }, + } + + for _, event := range additionalEvents { + err := s.config.EventStore.StoreEvent(s.ctx, watcher.StoreEventRequest{ + Operation: watcher.OperationCreate, + ObjectType: compute.EventObjectExecutionUpsert, + Object: event, + }) + s.Require().NoError(err) + } + + // We should receive the additional message + select { + case msg := <-s.msgChan: + s.Require().Equal(messages.BidResultMessageType, msg.Metadata.Get(envelope.KeyMessageType)) + receivedMessages++ + case <-time.After(time.Second): + s.Require().Failf("Timeout waiting for additional message", + "Only received %d of %d expected messages", + receivedMessages, len(initialEvents)+len(additionalEvents)) + }pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1)
231-255
: LGTM: Good coverage of checkpoint creation on stopThe test effectively verifies that the final checkpoint is created when the dispatcher stops.
Consider adding verification that no checkpoint exists before stopping:
// Wait for processing s.Eventually(func() bool { return d.State().LastAckedSeqNum == 5 }, time.Second, 10*time.Millisecond) + + // Verify no checkpoint exists yet + _, err := s.store.GetCheckpoint(s.ctx, "test-watcher") + s.Require().Error(err) // Stop dispatcher - should trigger final checkpoint err := d.Stop(s.ctx)pkg/transport/nclprotocol/orchestrator/manager.go (1)
287-311
: Consider adding error loggingThe implementation is solid and follows the established patterns. However, consider adding debug-level logging for the shutdown process to aid in troubleshooting.
func (cm *ComputeManager) handleShutdownRequest(ctx context.Context, msg *envelope.Message) (*envelope.Message, error) { notification := msg.Payload.(*messages.ShutdownNoticeRequest) + log.Debug(). + Str("nodeID", notification.NodeID). + Msg("Received shutdown notice") // Get data plane to access sequence numbers dataPlane, exists := cm.getDataPlane(notification.NodeID) if !exists { + log.Debug(). + Str("nodeID", notification.NodeID). + Msg("No active data plane found during shutdown") return nil, fmt.Errorf("no active data plane for node %s", notification.NodeID) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
pkg/models/messages/constants.go
(1 hunks)pkg/models/messages/control_plane.go
(1 hunks)pkg/node/requester.go
(3 hunks)pkg/orchestrator/nodes/manager.go
(6 hunks)pkg/orchestrator/nodes/types.go
(2 hunks)pkg/transport/nclprotocol/compute/controlplane.go
(5 hunks)pkg/transport/nclprotocol/compute/dataplane.go
(2 hunks)pkg/transport/nclprotocol/compute/dataplane_test.go
(2 hunks)pkg/transport/nclprotocol/compute/health_tracker.go
(5 hunks)pkg/transport/nclprotocol/compute/manager.go
(8 hunks)pkg/transport/nclprotocol/dispatcher/dispatcher.go
(5 hunks)pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go
(1 hunks)pkg/transport/nclprotocol/dispatcher/dispatcher_test.go
(1 hunks)pkg/transport/nclprotocol/dispatcher/recovery.go
(6 hunks)pkg/transport/nclprotocol/dispatcher/recovery_test.go
(2 hunks)pkg/transport/nclprotocol/dispatcher/state.go
(2 hunks)pkg/transport/nclprotocol/orchestrator/manager.go
(4 hunks)pkg/transport/nclprotocol/registry.go
(1 hunks)pkg/transport/nclprotocol/test/control_plane.go
(6 hunks)pkg/transport/nclprotocol/types.go
(1 hunks)
🔇 Additional comments (50)
pkg/transport/nclprotocol/dispatcher/recovery.go (8)
29-30
: Addition of stopCh
and wg
fields for managing recovery lifecycle
Introducing stopCh
and wg
to the recovery
struct enhances control over the recovery process, enabling graceful stopping and proper synchronization of goroutines.
40-40
: Initialize stopCh
in newRecovery
Initializing stopCh
in the constructor ensures that the stop channel is ready for use when the recovery process begins.
86-86
: Add to WaitGroup
before starting the recovery goroutine
Calling r.wg.Add(1)
before launching the goroutine ensures accurate tracking of active goroutines, which is essential for synchronized shutdown.
92-92
: Ensure wg.Done()
is called when recovery loop exits
Using defer r.wg.Done()
guarantees that the WaitGroup
counter is decremented when the recovery loop exits, preventing potential hangs during shutdown.
103-114
: Implement interruptible backoff in recoveryLoop
Introducing a timer with a select
statement allows the backoff to be interrupted by stopCh
or context cancellation, enabling responsive shutdowns.
122-129
: Handle stop signals during watcher restart attempts
Adding stopCh
and ctx.Done()
checks during watcher restarts ensures that the recovery loop can exit promptly if a stop signal is received, enhancing responsiveness.
145-145
: Reinitialize stopCh
in reset
method
Resetting stopCh
prepares the recovery process for a new cycle by ensuring a fresh stop channel is in place.
156-169
: Add stop
method for graceful shutdown of recovery process
The stop
method allows external control over the recovery process, enabling safe termination and cleanup of resources.
pkg/transport/nclprotocol/compute/controlplane.go (4)
6-6
: Import strings
package for error handling
The addition of the strings
import is necessary for enhanced error parsing in the heartbeat process.
194-211
: Implement sendShutdownNotification
for graceful shutdown
Adding sendShutdownNotification
allows the node to inform the orchestrator of a graceful shutdown, enhancing coordination and preventing message duplication.
238-249
: Enhance Stop
method to send shutdown notification
Modifying the Stop
method to send a shutdown notice before closing improves shutdown procedures by notifying the orchestrator of the node's status.
259-261
: Checkpoint progress before stopping control plane
Calling checkpointProgress
prior to stopping ensures that the latest state is saved, facilitating accurate resumption upon restart.
pkg/transport/nclprotocol/dispatcher/recovery_test.go (7)
113-139
: Refactor TestRecoveryLoopWithFailures
to use handleError
Updating the test to initiate recovery via handleError
improves test accuracy by aligning with actual recovery trigger mechanisms.
143-166
: Refactor TestRecoveryLoopWithRunningWatcher
for better alignment
Adjusting the test to use handleError
enhances realism and ensures the test covers the intended recovery path.
170-199
: Update TestRecoveryLoopWithContextCancellation
to handle context cancellation
Enhancing the test to verify recovery behavior upon context cancellation strengthens coverage of edge cases related to context management.
220-262
: Add TestStopDuringRecovery
to verify mid-recovery stop behavior
Introducing this test ensures that the recovery process can be stopped gracefully while it is in progress, and that resources are properly released.
264-302
: Add TestStopAfterRecoveryComplete
to verify post-recovery stop
This test confirms that calling stop
after recovery has completed does not cause issues and completes immediately as expected.
304-322
: Introduce TestMultipleStopCalls
to ensure idempotency of stop
Verifying that multiple calls to stop
have no adverse effects ensures that the method is safe and robust against repeated invocations.
324-351
: Add TestStopAndReset
to validate recovery after stopping and resetting
This test confirms that the recovery process can be restarted successfully after being stopped and reset, ensuring proper cleanup and reinitialization.
pkg/transport/nclprotocol/compute/manager.go (6)
5-5
: Import errors
package for error aggregation
Including the errors
package allows the use of errors.Join
to combine multiple errors, improving error reporting.
Line range hint 158-190
: Update cleanup
method to return aggregated errors
Modifying cleanup
to return an error provides better visibility into any issues that occur during resource cleanup.
208-212
: Invoke cleanup
before establishing a new connection
Calling cleanup
prior to reconnecting ensures that previous connections are properly closed and resources are freed, preventing leaks.
216-218
: Handle errors during cleanup in connect
method
Properly handling and logging errors that occur during cleanup aids in debugging and maintaining resource integrity.
326-330
: Update sequence tracker with orchestrator's starting sequence number
Accepting the orchestrator's sequence number during handshake ensures synchronization and correct message processing upon reconnection.
Line range hint 434-449
: Enhance connection health checks to include handshake requirements
By checking if a handshake is required, the connection manager can more effectively detect unhealthy states and initiate reconnection procedures.
pkg/orchestrator/nodes/manager.go (6)
Line range hint 425-442
: Handling existing nodes and reconnections appropriately
The Handshake
method correctly checks for existing nodes and handles rejected nodes, reconnections, and new node registrations appropriately. This ensures proper management of node states during the handshake process.
467-470
: Resolving starting sequence numbers during handshake
The addition of resolveStartingOrchestratorSeqNum
enhances the handshake process by determining the appropriate starting sequence number for nodes based on their state. This prevents issues with nodes that may restart with a fresh state.
567-567
: Updating sequence numbers in heartbeat
The call to n.updateSequenceNumbers
correctly updates the last known sequence numbers for orchestrator and compute messages during the heartbeat process, ensuring proper message tracking.
585-640
: Implementing ShutdownNotice for graceful node shutdown
The ShutdownNotice
method appropriately handles shutdown notifications from nodes, updating their connection state to disconnected and preserving sequence numbers. This enables graceful disconnection and state preservation.
811-839
: Defining sequence number resolution logic
The resolveStartingOrchestratorSeqNum
method correctly determines the starting sequence number for new and reconnecting nodes, preventing them from being overwhelmed with historical events or missing necessary updates.
Line range hint 893-908
: Enhancing self-registration with error handling
The selfRegister
method now handles node retrieval errors appropriately and initializes node state when the node is not found. This ensures that the node registers itself correctly even if it's a new addition.
pkg/models/messages/constants.go (1)
16-16
: Adding constants for shutdown notice messages
The addition of ShutdownNoticeRequestMessageType
and ShutdownNoticeResponseType
constants is appropriate for handling shutdown notices within the messaging system, facilitating proper message identification.
Also applies to: 21-21
pkg/transport/nclprotocol/registry.go (1)
26-26
: Registering shutdown notice messages in the registry
The registration of ShutdownNoticeRequest
and ShutdownNoticeResponse
in the message registry ensures that shutdown notices are correctly handled by the messaging system.
Also applies to: 32-32
pkg/transport/nclprotocol/types.go (1)
53-53
: Verify initialization of the HandshakeRequired
field
The HandshakeRequired
field has been added to the ConnectionHealth
struct. Ensure that this field is properly initialized wherever ConnectionHealth
instances are created to prevent potential issues due to uninitialized boolean fields defaulting to false
.
pkg/models/messages/control_plane.go (1)
43-54
: LGTM! Well-structured shutdown notice types
The new shutdown notice types properly handle sequence number exchange:
ShutdownNoticeRequest
includesLastOrchestratorSeqNum
to track last received sequence from orchestratorShutdownNoticeResponse
includesLastComputeSeqNum
to track last received sequence from compute node- Pattern is consistent with existing handshake types
pkg/orchestrator/nodes/types.go (2)
144-150
: LGTM! Structure follows established patterns
The ExtendedShutdownNoticeRequest
struct follows the same pattern as ExtendedHeartbeatRequest
, appropriately extending the base request with sequence tracking.
67-69
: LGTM! Verify integration with existing shutdown flows
The new ShutdownNotice
method signature and documentation are well-defined. The method appropriately handles graceful shutdown notifications and maintains sequence numbers.
Let's verify the integration with existing shutdown flows:
✅ Verification successful
Let me analyze the existing shutdown flows to verify the integration of the new ShutdownNotice
method. I'll search for its implementation and usage.
Let me try a different approach to find the shutdown notice implementation and related code.
Based on the search results, I can now verify the integration. The ShutdownNotice
method is properly implemented in pkg/orchestrator/nodes/manager.go
and is well-integrated with the existing shutdown flows:
- The implementation in
nodesManager
correctly handles node state transitions and sequence numbers - It's properly registered in the protocol registry (
pkg/transport/nclprotocol/registry.go
) - It's integrated in both compute and orchestrator sides:
- Compute side sends notifications (
pkg/transport/nclprotocol/compute/controlplane.go
) - Orchestrator side listens and processes them (
pkg/transport/nclprotocol/orchestrator/manager.go
)
- Compute side sends notifications (
- Test infrastructure is in place (
pkg/transport/nclprotocol/test/control_plane.go
)
LGTM! The ShutdownNotice method is properly integrated with existing shutdown flows
The implementation follows the established patterns for graceful shutdown, maintaining sequence numbers, and state transitions. The integration is complete across both compute and orchestrator components.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing shutdown-related code to ensure consistent handling
rg -A 5 "shutdown|terminate|clean.*disconnect"
Length of output: 29877
Script:
#!/bin/bash
# Search for ShutdownNotice implementation and related code
ast-grep --pattern 'func $$$ShutdownNotice(ctx context.Context, request ExtendedShutdownNoticeRequest) (messages.ShutdownNoticeResponse, error) {
$$$
}'
# Search for any test files related to shutdown notice
fd -e go -x rg -l "ShutdownNotice.*Test"
Length of output: 199
Script:
#!/bin/bash
# Search for ShutdownNotice implementation with more context
rg -A 10 "func.*ShutdownNotice"
# Search for ShutdownNoticeRequest usage
rg "ShutdownNoticeRequest" --type go
Length of output: 6015
pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)
175-196
: LGTM! Test covers important restart scenario
The test effectively verifies that the dispatcher's state is properly reset when restarted after stopping. The sequence of operations and expectations is well-structured.
pkg/transport/nclprotocol/dispatcher/dispatcher.go (2)
80-82
: LGTM! Clean state accessor implementation
The State()
method provides a clean interface to access the dispatcher's state.
96-98
: LGTM! Proper state reset sequence
The state and recovery reset sequence is correctly placed before starting the dispatcher, ensuring a clean initial state.
pkg/transport/nclprotocol/compute/dataplane.go (1)
124-124
: LGTM: Safer approach to message iteration
The change to use resolveStartingIterator
instead of AfterSequenceNumberIterator
ensures no messages are missed when a compute node restarts, prioritizing reliability over optimization.
Let's verify the impact on message processing:
✅ Verification successful
Based on the search results, I can now generate the final response:
LGTM: Change to resolveStartingIterator
is well-implemented and isolated
The change from AfterSequenceNumberIterator
to resolveStartingIterator
is a defensive improvement that ensures no messages are missed during compute node restarts. The implementation is well-documented and its behavior is explicitly commented to always return TrimHorizonIterator
regardless of the lastReceivedSeqNum
parameter.
The codebase search confirms:
- The change is isolated to compute dataplane's message processing
- The orchestrator dataplane continues to use
AfterSequenceNumberIterator
- The watcher library properly supports both iterator types
- No other components rely on the compute dataplane's specific iterator choice
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any custom sequence number handling in the codebase
# that might be affected by this change
# Look for sequence number related code
rg -A 3 "AfterSequenceNumberIterator|TrimHorizonIterator|lastReceivedSeqNum"
Length of output: 27891
pkg/transport/nclprotocol/compute/dataplane_test.go (1)
173-175
: LGTM: Improved test readability
pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1)
257-293
: LGTM: Thorough verification of state reset behavior
The test effectively verifies that the dispatcher properly resets its state on restart while maintaining the checkpoint, ensuring only new events are processed.
pkg/transport/nclprotocol/orchestrator/manager.go (2)
113-114
: LGTM: Clean handler registration
The shutdown notice handler is properly registered alongside other control plane message handlers.
263-263
: LGTM: Consistent metadata handling
The addition of message type metadata to responses follows the established pattern and improves message traceability.
Also applies to: 283-285
pkg/transport/nclprotocol/test/control_plane.go (3)
37-47
: LGTM: Well-structured mock behavior extension
The shutdown response fields and callback follow the established pattern of other mock behaviors.
165-178
: LGTM: Consistent mock handler implementation
The shutdown handler implementation follows the established pattern of other handlers in the mock responder.
211-218
: LGTM: Thread-safe shutdown request tracking
The implementation properly handles concurrent access to shutdown records and follows the established pattern.
Also applies to: 247-253
pkg/node/requester.go (2)
330-330
: LGTM: Simplified dependency injection
The direct use of nodesManager for NodeInfoStore simplifies the dependency chain while maintaining the correct interface.
55-55
: Verify the interface reduction impact
The change from nodes.Store
to nodes.Lookup
suggests better interface segregation. Let's verify that all necessary methods are available in the new interface.
✅ Verification successful
Interface reduction is safe and improves design
The change from nodes.Store
to nodes.Lookup
is a positive refactoring that follows interface segregation principles. The evidence shows:
nodes.Lookup
is a focused interface providing only read operations (Get
,GetByPrefix
,List
)nodes.Store
embedsLookup
and adds mutation operations (Put
,Delete
)- The
requester.go
file only shows NodeInfoStore being initialized and stored, with no direct method calls visible, suggesting minimal usage
This change reduces coupling by limiting the interface to only what's needed, making the code more maintainable and following the principle of least privilege.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for NodeInfoStore usage to verify interface compatibility
ast-grep --pattern 'r.NodeInfoStore.$_($_)'
Length of output: 45
Script:
#!/bin/bash
# Search for NodeInfoStore usage in the file
rg -A 2 "NodeInfoStore\." pkg/node/requester.go
# Search for Lookup interface definition
ast-grep --pattern 'type Lookup interface {
$$$
}'
# Search for Store interface definition for comparison
ast-grep --pattern 'type Store interface {
$$$
}'
Length of output: 5929
Script:
#!/bin/bash
# Search for actual usage of NodeInfoStore in requester.go to verify methods being called
rg -B 2 -A 2 "NodeInfoStore" pkg/node/requester.go
Length of output: 432
if strings.Contains(err.Error(), "handshake required") { | ||
cp.healthTracker.HandshakeRequired() | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid fragile string matching in error handling
Using strings.Contains(err.Error(), "handshake required")
to detect handshake errors can be unreliable if error messages change.
Consider defining a specific error type or constant to identify handshake requirements more reliably.
@@ -36,6 +36,7 @@ func (ht *HealthTracker) MarkConnected() { | |||
ht.health.LastSuccessfulHeartbeat = ht.clock.Now() | |||
ht.health.ConsecutiveFailures = 0 | |||
ht.health.LastError = nil | |||
ht.health.HandshakeRequired = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
HandshakeRequired field lacks test coverage
The test suite for HealthTracker does not include any assertions for the HandshakeRequired field state changes. While the connection state transitions are well tested, the HandshakeRequired field modifications in Connected, Disconnected, and Connecting states are not verified.
- Add test assertions for HandshakeRequired field in
TestInitialState
- Add verification of HandshakeRequired state in
TestConnectionStateTransitions
- Consider adding specific test cases for HandshakeRequired state transitions
🔗 Analysis chain
Verify test coverage for HandshakeRequired reset
The HandshakeRequired field is reset in multiple places. Let's verify test coverage exists.
Also applies to: 50-50, 59-59
Let me gather more information about test coverage.
Let me check the actual test implementations to verify coverage of HandshakeRequired state changes.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for test cases covering HandshakeRequired reset
rg -l "TestHealthTracker.*(?:Connected|Disconnected|Connecting)" --type go
# Search for HandshakeRequired field usage in tests
rg "HandshakeRequired\s*=\s*(true|false)" --type go
Length of output: 485
Script:
#!/bin/bash
# Search for test files related to health tracker
fd "health.*test.*\.go$"
# Look for test cases that might be testing the HandshakeRequired behavior
rg -A 5 "func Test.*Health" --type go
# Look for mock or test setup of health tracker
rg "NewHealthTracker|health.*test" --type go
Length of output: 1605
Script:
#!/bin/bash
# Look at the health tracker test implementation
cat pkg/transport/nclprotocol/compute/health_tracker_test.go
Length of output: 3357
@@ -294,7 +324,7 @@ func (cm *ComputeManager) handleConnectionStateChange(event nodes.NodeConnection | |||
if event.Current == models.NodeStates.DISCONNECTED { | |||
if dataPlane, ok := cm.dataPlanes.LoadAndDelete(event.NodeID); ok { | |||
if dp, ok := dataPlane.(*DataPlane); ok { | |||
if err := dp.Stop(context.Background()); err != nil { | |||
if err := dp.Stop(context.TODO()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Context should be propagated from the event handler
The use of context.TODO()
in the handleConnectionStateChange
method is not ideal as this is an event handler that should propagate context from its caller. Looking at the implementation:
- The
Stop
method across the codebase consistently accepts and uses context for timeout/cancellation control - The event handler is dealing with node disconnection, which should respect shutdown timeouts
- Other similar operations in the codebase properly propagate context
The method signature should be updated to:
func (cm *ComputeManager) handleConnectionStateChange(ctx context.Context, event nodes.NodeConnectionEvent)
🔗 Analysis chain
Verify context handling strategy
The change from context.Background()
to context.TODO()
suggests that proper context handling needs to be implemented. Consider whether a context should be passed from the caller instead.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for similar patterns of context handling in connection state changes
rg -A 5 'context\.(Background|TODO)\(\).*Stop'
Length of output: 48
Script:
#!/bin/bash
# Let's first find the context of this Stop method usage
ast-grep --pattern 'func handleConnectionStateChange($$$) {
$$$
Stop($$$)
$$$
}'
# Also search for Stop method definition and its usage patterns
ast-grep --pattern 'Stop($$$)'
# Let's also check for any context parameter in the function signature
ast-grep --pattern 'func ($_ $_) Stop(ctx context.Context) $_'
Length of output: 19802
Script:
#!/bin/bash
# Let's find the handleConnectionStateChange function to understand the context
ast-grep --pattern 'func ($_ $_) handleConnectionStateChange($$$) {
$$$
}'
Length of output: 1803
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests