Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better exchange of sequence numbers during handshakes #4765

Closed
wants to merge 1 commit into from

Conversation

wdbaruni
Copy link
Member

@wdbaruni wdbaruni commented Dec 15, 2024

Summary by CodeRabbit

  • New Features

    • Added support for shutdown notifications with new message types and handling methods.
    • Introduced enhanced error handling and state management for various components.
    • Implemented new test cases to validate shutdown and recovery functionalities.
  • Bug Fixes

    • Improved error reporting and handling during connection management and recovery processes.
  • Documentation

    • Updated comments and documentation for clarity on new features and changes.
  • Tests

    • Added comprehensive tests for shutdown notifications and recovery scenarios, ensuring robust coverage.

Copy link
Contributor

coderabbitai bot commented Dec 15, 2024

Walkthrough

This pull request introduces a comprehensive set of changes focused on enhancing node shutdown notification mechanisms, improving error handling, and refining state management across various components of the distributed system. The modifications span multiple packages, introducing new constants, structs, and methods to facilitate graceful node disconnection, better sequence number tracking, and more robust recovery processes. Key areas of improvement include message handling for shutdown notices, state reset capabilities in dispatchers, and enhanced connection management.

Changes

File Change Summary
pkg/models/messages/constants.go Added ShutdownNoticeRequestMessageType and ShutdownNoticeResponseType constants
pkg/models/messages/control_plane.go Introduced ShutdownNoticeRequest and ShutdownNoticeResponse structs
pkg/node/requester.go Updated NodeInfoStore type from nodes.Store to nodes.Lookup
pkg/orchestrator/nodes/manager.go Added ShutdownNotice method, modified Handshake and Heartbeat methods
pkg/transport/nclprotocol/compute/* Enhanced health tracking, connection management, and shutdown notification handling
pkg/transport/nclprotocol/dispatcher/* Improved state management, recovery process, and checkpointing

Sequence Diagram

sequenceDiagram
    participant Compute Node
    participant Orchestrator
    participant Health Tracker
    
    Compute Node->>Orchestrator: ShutdownNoticeRequest
    Orchestrator->>Health Tracker: Update Connection State
    Orchestrator-->>Compute Node: ShutdownNoticeResponse
    Compute Node->>Compute Node: Save Last Sequence Numbers
Loading

Possibly related PRs

Suggested reviewers

  • frrist
  • jamlo

Poem

🐰 Nodes dance their farewell song,
Graceful shutdown, sequence numbers strong,
Messages whisper, "Time to rest",
CodeRabbit's system passes the test!
Hop, hop, connection's last embrace! 🌈

Tip

CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request. This feature will be included in our Pro Plan when released.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🔭 Outside diff range comments (1)
pkg/models/messages/control_plane.go (1)

Line range hint 19-19: Fix incorrect JSON tag and comment for StartingOrchestratorSeqNum

The field comment and JSON tag are inconsistent with the field name:

  • Comment says "Seq to start sending to compute node" but uses "Last" in the text
  • JSON tag LastOrchestratorSeqNum doesn't match field name StartingOrchestratorSeqNum

Apply this diff to fix the inconsistency:

-	StartingOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum"` // Seq to start sending to compute node
+	StartingOrchestratorSeqNum uint64 `json:"StartingOrchestratorSeqNum"` // Starting seq to send to compute node
🧹 Nitpick comments (11)
pkg/orchestrator/nodes/manager.go (1)

840-856: Consider adding validation to prevent sequence number regression

The updateSequenceNumbers method currently trusts the sequence numbers provided, which could allow them to move backward in certain failure scenarios. Implementing validation logic to ensure sequence numbers only advance forward would enhance robustness.

Consider adding checks to compare current and new sequence numbers, updating them only if the new values are greater than the existing ones. This prevents potential inconsistencies in message ordering.

pkg/transport/nclprotocol/dispatcher/state.go (2)

39-40: Add missing documentation for GetState method

The GetState method is missing its documentation comment.

Add a descriptive comment:

-// GetState returns
+// GetState returns a snapshot of the current dispatcher state including
+// sequence numbers and checkpoint information

Line range hint 51-55: Add sequence number validation in update methods

The sequence number update methods should validate inputs to prevent potential issues with invalid values.

Consider adding validation:

 func (s *dispatcherState) updateLastAcked(seqNum uint64) {
+	if seqNum == 0 {
+		log.Warn().Uint64("seqNum", seqNum).Msg("Ignoring invalid sequence number in updateLastAcked")
+		return
+	}
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.lastAckedSeqNum = max(seqNum, s.lastAckedSeqNum)
 }

 func (s *dispatcherState) updateLastObserved(seqNum uint64) {
+	if seqNum == 0 {
+		log.Warn().Uint64("seqNum", seqNum).Msg("Ignoring invalid sequence number in updateLastObserved")
+		return
+	}
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	s.lastObservedSeq = seqNum
 }

Also applies to: 57-61

pkg/transport/nclprotocol/compute/health_tracker.go (1)

76-81: Rename HandshakeRequired method for clarity

The method name could be more descriptive about its action.

Consider renaming to better indicate it's setting a state:

-// HandshakeRequired marks that a handshake is required
-func (ht *HealthTracker) HandshakeRequired() {
+// SetHandshakeRequired marks that a handshake is required
+func (ht *HealthTracker) SetHandshakeRequired() {
pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)

198-222: Consider enhancing shutdown sequence verification

While the test effectively verifies the watcher stop sequence, it could be enhanced to verify all documented steps:

  1. Recovery stop
  2. StopCh close
  3. Watcher stop
  4. Goroutine wait

Consider adding expectations for recovery stop and verifying goroutine completion:

 gomock.InOrder(
+    suite.recovery.EXPECT().stop(),
     suite.watcher.EXPECT().Stop(gomock.Any()),
 )
+
+// Verify goroutines complete
+done := make(chan struct{})
+go func() {
+    d.routinesWg.Wait()
+    close(done)
+}()
+
+select {
+case <-done:
+    // Success
+case <-time.After(100 * time.Millisecond):
+    suite.Fail("Goroutines did not complete")
+}
pkg/transport/nclprotocol/dispatcher/dispatcher.go (2)

129-134: Consider adding timeout handling for recovery stop

While the shutdown sequence is correct, the recovery stop operation could potentially block. Consider adding timeout handling for a more robust shutdown.

-// Signal recovery to stop
-d.recovery.stop()
+// Signal recovery to stop with timeout
+recoveryCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+defer cancel()
+if err := d.recovery.stop(recoveryCtx); err != nil {
+    log.Warn().Err(err).Msg("Recovery stop timed out")
+}

252-270: Consider adding metrics for checkpoint operations

The checkpoint implementation is solid, but adding metrics would help monitor the health and performance of the checkpointing system.

Consider adding metrics for:

  • Checkpoint attempts
  • Checkpoint successes/failures
  • Checkpoint duration
  • Time since last successful checkpoint

Example implementation:

 func (d *Dispatcher) doCheckpoint(ctx context.Context) {
+    metrics.CheckpointAttempts.Inc()
+    start := time.Now()
+    defer func() {
+        metrics.CheckpointDuration.Observe(time.Since(start).Seconds())
+    }()
+
     checkpointTarget := d.state.getCheckpointSeqNum()
     if checkpointTarget == 0 { // Nothing new to checkpoint
         return
     }
     // ... existing code ...
     if err := d.watcher.Checkpoint(checkpointCtx, checkpointTarget); err != nil {
+        metrics.CheckpointFailures.Inc()
         log.Error().Err(err).
             Uint64("target", checkpointTarget).
             Msg("Failed to checkpoint watcher")
         return
     }
+    metrics.CheckpointSuccesses.Inc()
+    metrics.LastSuccessfulCheckpoint.SetToCurrentTime()
     d.state.updateLastCheckpoint(checkpointTarget)
 }
pkg/transport/nclprotocol/compute/dataplane.go (1)

177-194: Well-documented implementation with future extensibility

The implementation prioritizes reliability by always starting from the beginning, with clear documentation explaining the tradeoff and future optimization possibilities.

Consider adding metrics/logging to track:

  1. How often nodes restart with the same ID
  2. The gap between lastReceivedSeqNum and actual processed messages
    This data would help evaluate when to implement the optimization of using lastReceivedSeqNum.
pkg/transport/nclprotocol/compute/dataplane_test.go (1)

249-299: LGTM: Comprehensive test coverage

The test effectively verifies that all messages are processed from the beginning, regardless of the LastReceivedSeqNum value.

Consider adding a test case where events are added after the dispatcher starts to verify continuous processing:

 	}
+	// Verify continuous processing
+	additionalEvents := []models.ExecutionUpsert{
+		{
+			Current: &models.Execution{
+				ID:     "test-job-3",
+				NodeID: "test-node",
+			},
+		},
+	}
+	
+	for _, event := range additionalEvents {
+		err := s.config.EventStore.StoreEvent(s.ctx, watcher.StoreEventRequest{
+			Operation:  watcher.OperationCreate,
+			ObjectType: compute.EventObjectExecutionUpsert,
+			Object:     event,
+		})
+		s.Require().NoError(err)
+	}
+	
+	// We should receive the additional message
+	select {
+	case msg := <-s.msgChan:
+		s.Require().Equal(messages.BidResultMessageType, msg.Metadata.Get(envelope.KeyMessageType))
+		receivedMessages++
+	case <-time.After(time.Second):
+		s.Require().Failf("Timeout waiting for additional message", 
+			"Only received %d of %d expected messages",
+			receivedMessages, len(initialEvents)+len(additionalEvents))
+	}
pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1)

231-255: LGTM: Good coverage of checkpoint creation on stop

The test effectively verifies that the final checkpoint is created when the dispatcher stops.

Consider adding verification that no checkpoint exists before stopping:

 	// Wait for processing
 	s.Eventually(func() bool {
 		return d.State().LastAckedSeqNum == 5
 	}, time.Second, 10*time.Millisecond)
+
+	// Verify no checkpoint exists yet
+	_, err := s.store.GetCheckpoint(s.ctx, "test-watcher")
+	s.Require().Error(err)
 
 	// Stop dispatcher - should trigger final checkpoint
 	err := d.Stop(s.ctx)
pkg/transport/nclprotocol/orchestrator/manager.go (1)

287-311: Consider adding error logging

The implementation is solid and follows the established patterns. However, consider adding debug-level logging for the shutdown process to aid in troubleshooting.

 func (cm *ComputeManager) handleShutdownRequest(ctx context.Context, msg *envelope.Message) (*envelope.Message, error) {
 	notification := msg.Payload.(*messages.ShutdownNoticeRequest)
+	log.Debug().
+		Str("nodeID", notification.NodeID).
+		Msg("Received shutdown notice")
 
 	// Get data plane to access sequence numbers
 	dataPlane, exists := cm.getDataPlane(notification.NodeID)
 	if !exists {
+		log.Debug().
+			Str("nodeID", notification.NodeID).
+			Msg("No active data plane found during shutdown")
 		return nil, fmt.Errorf("no active data plane for node %s", notification.NodeID)
 	}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 247a620 and 3b017ae.

📒 Files selected for processing (20)
  • pkg/models/messages/constants.go (1 hunks)
  • pkg/models/messages/control_plane.go (1 hunks)
  • pkg/node/requester.go (3 hunks)
  • pkg/orchestrator/nodes/manager.go (6 hunks)
  • pkg/orchestrator/nodes/types.go (2 hunks)
  • pkg/transport/nclprotocol/compute/controlplane.go (5 hunks)
  • pkg/transport/nclprotocol/compute/dataplane.go (2 hunks)
  • pkg/transport/nclprotocol/compute/dataplane_test.go (2 hunks)
  • pkg/transport/nclprotocol/compute/health_tracker.go (5 hunks)
  • pkg/transport/nclprotocol/compute/manager.go (8 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher.go (5 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/recovery.go (6 hunks)
  • pkg/transport/nclprotocol/dispatcher/recovery_test.go (2 hunks)
  • pkg/transport/nclprotocol/dispatcher/state.go (2 hunks)
  • pkg/transport/nclprotocol/orchestrator/manager.go (4 hunks)
  • pkg/transport/nclprotocol/registry.go (1 hunks)
  • pkg/transport/nclprotocol/test/control_plane.go (6 hunks)
  • pkg/transport/nclprotocol/types.go (1 hunks)
🔇 Additional comments (50)
pkg/transport/nclprotocol/dispatcher/recovery.go (8)

29-30: Addition of stopCh and wg fields for managing recovery lifecycle

Introducing stopCh and wg to the recovery struct enhances control over the recovery process, enabling graceful stopping and proper synchronization of goroutines.


40-40: Initialize stopCh in newRecovery

Initializing stopCh in the constructor ensures that the stop channel is ready for use when the recovery process begins.


86-86: Add to WaitGroup before starting the recovery goroutine

Calling r.wg.Add(1) before launching the goroutine ensures accurate tracking of active goroutines, which is essential for synchronized shutdown.


92-92: Ensure wg.Done() is called when recovery loop exits

Using defer r.wg.Done() guarantees that the WaitGroup counter is decremented when the recovery loop exits, preventing potential hangs during shutdown.


103-114: Implement interruptible backoff in recoveryLoop

Introducing a timer with a select statement allows the backoff to be interrupted by stopCh or context cancellation, enabling responsive shutdowns.


122-129: Handle stop signals during watcher restart attempts

Adding stopCh and ctx.Done() checks during watcher restarts ensures that the recovery loop can exit promptly if a stop signal is received, enhancing responsiveness.


145-145: Reinitialize stopCh in reset method

Resetting stopCh prepares the recovery process for a new cycle by ensuring a fresh stop channel is in place.


156-169: Add stop method for graceful shutdown of recovery process

The stop method allows external control over the recovery process, enabling safe termination and cleanup of resources.

pkg/transport/nclprotocol/compute/controlplane.go (4)

6-6: Import strings package for error handling

The addition of the strings import is necessary for enhanced error parsing in the heartbeat process.


194-211: Implement sendShutdownNotification for graceful shutdown

Adding sendShutdownNotification allows the node to inform the orchestrator of a graceful shutdown, enhancing coordination and preventing message duplication.


238-249: Enhance Stop method to send shutdown notification

Modifying the Stop method to send a shutdown notice before closing improves shutdown procedures by notifying the orchestrator of the node's status.


259-261: Checkpoint progress before stopping control plane

Calling checkpointProgress prior to stopping ensures that the latest state is saved, facilitating accurate resumption upon restart.

pkg/transport/nclprotocol/dispatcher/recovery_test.go (7)

113-139: Refactor TestRecoveryLoopWithFailures to use handleError

Updating the test to initiate recovery via handleError improves test accuracy by aligning with actual recovery trigger mechanisms.


143-166: Refactor TestRecoveryLoopWithRunningWatcher for better alignment

Adjusting the test to use handleError enhances realism and ensures the test covers the intended recovery path.


170-199: Update TestRecoveryLoopWithContextCancellation to handle context cancellation

Enhancing the test to verify recovery behavior upon context cancellation strengthens coverage of edge cases related to context management.


220-262: Add TestStopDuringRecovery to verify mid-recovery stop behavior

Introducing this test ensures that the recovery process can be stopped gracefully while it is in progress, and that resources are properly released.


264-302: Add TestStopAfterRecoveryComplete to verify post-recovery stop

This test confirms that calling stop after recovery has completed does not cause issues and completes immediately as expected.


304-322: Introduce TestMultipleStopCalls to ensure idempotency of stop

Verifying that multiple calls to stop have no adverse effects ensures that the method is safe and robust against repeated invocations.


324-351: Add TestStopAndReset to validate recovery after stopping and resetting

This test confirms that the recovery process can be restarted successfully after being stopped and reset, ensuring proper cleanup and reinitialization.

pkg/transport/nclprotocol/compute/manager.go (6)

5-5: Import errors package for error aggregation

Including the errors package allows the use of errors.Join to combine multiple errors, improving error reporting.


Line range hint 158-190: Update cleanup method to return aggregated errors

Modifying cleanup to return an error provides better visibility into any issues that occur during resource cleanup.


208-212: Invoke cleanup before establishing a new connection

Calling cleanup prior to reconnecting ensures that previous connections are properly closed and resources are freed, preventing leaks.


216-218: Handle errors during cleanup in connect method

Properly handling and logging errors that occur during cleanup aids in debugging and maintaining resource integrity.


326-330: Update sequence tracker with orchestrator's starting sequence number

Accepting the orchestrator's sequence number during handshake ensures synchronization and correct message processing upon reconnection.


Line range hint 434-449: Enhance connection health checks to include handshake requirements

By checking if a handshake is required, the connection manager can more effectively detect unhealthy states and initiate reconnection procedures.

pkg/orchestrator/nodes/manager.go (6)

Line range hint 425-442: Handling existing nodes and reconnections appropriately

The Handshake method correctly checks for existing nodes and handles rejected nodes, reconnections, and new node registrations appropriately. This ensures proper management of node states during the handshake process.


467-470: Resolving starting sequence numbers during handshake

The addition of resolveStartingOrchestratorSeqNum enhances the handshake process by determining the appropriate starting sequence number for nodes based on their state. This prevents issues with nodes that may restart with a fresh state.


567-567: Updating sequence numbers in heartbeat

The call to n.updateSequenceNumbers correctly updates the last known sequence numbers for orchestrator and compute messages during the heartbeat process, ensuring proper message tracking.


585-640: Implementing ShutdownNotice for graceful node shutdown

The ShutdownNotice method appropriately handles shutdown notifications from nodes, updating their connection state to disconnected and preserving sequence numbers. This enables graceful disconnection and state preservation.


811-839: Defining sequence number resolution logic

The resolveStartingOrchestratorSeqNum method correctly determines the starting sequence number for new and reconnecting nodes, preventing them from being overwhelmed with historical events or missing necessary updates.


Line range hint 893-908: Enhancing self-registration with error handling

The selfRegister method now handles node retrieval errors appropriately and initializes node state when the node is not found. This ensures that the node registers itself correctly even if it's a new addition.

pkg/models/messages/constants.go (1)

16-16: Adding constants for shutdown notice messages

The addition of ShutdownNoticeRequestMessageType and ShutdownNoticeResponseType constants is appropriate for handling shutdown notices within the messaging system, facilitating proper message identification.

Also applies to: 21-21

pkg/transport/nclprotocol/registry.go (1)

26-26: Registering shutdown notice messages in the registry

The registration of ShutdownNoticeRequest and ShutdownNoticeResponse in the message registry ensures that shutdown notices are correctly handled by the messaging system.

Also applies to: 32-32

pkg/transport/nclprotocol/types.go (1)

53-53: Verify initialization of the HandshakeRequired field

The HandshakeRequired field has been added to the ConnectionHealth struct. Ensure that this field is properly initialized wherever ConnectionHealth instances are created to prevent potential issues due to uninitialized boolean fields defaulting to false.

pkg/models/messages/control_plane.go (1)

43-54: LGTM! Well-structured shutdown notice types

The new shutdown notice types properly handle sequence number exchange:

  • ShutdownNoticeRequest includes LastOrchestratorSeqNum to track last received sequence from orchestrator
  • ShutdownNoticeResponse includes LastComputeSeqNum to track last received sequence from compute node
  • Pattern is consistent with existing handshake types
pkg/orchestrator/nodes/types.go (2)

144-150: LGTM! Structure follows established patterns

The ExtendedShutdownNoticeRequest struct follows the same pattern as ExtendedHeartbeatRequest, appropriately extending the base request with sequence tracking.


67-69: LGTM! Verify integration with existing shutdown flows

The new ShutdownNotice method signature and documentation are well-defined. The method appropriately handles graceful shutdown notifications and maintains sequence numbers.

Let's verify the integration with existing shutdown flows:

✅ Verification successful

Let me analyze the existing shutdown flows to verify the integration of the new ShutdownNotice method. I'll search for its implementation and usage.


Let me try a different approach to find the shutdown notice implementation and related code.


Based on the search results, I can now verify the integration. The ShutdownNotice method is properly implemented in pkg/orchestrator/nodes/manager.go and is well-integrated with the existing shutdown flows:

  1. The implementation in nodesManager correctly handles node state transitions and sequence numbers
  2. It's properly registered in the protocol registry (pkg/transport/nclprotocol/registry.go)
  3. It's integrated in both compute and orchestrator sides:
    • Compute side sends notifications (pkg/transport/nclprotocol/compute/controlplane.go)
    • Orchestrator side listens and processes them (pkg/transport/nclprotocol/orchestrator/manager.go)
  4. Test infrastructure is in place (pkg/transport/nclprotocol/test/control_plane.go)

LGTM! The ShutdownNotice method is properly integrated with existing shutdown flows

The implementation follows the established patterns for graceful shutdown, maintaining sequence numbers, and state transitions. The integration is complete across both compute and orchestrator components.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing shutdown-related code to ensure consistent handling
rg -A 5 "shutdown|terminate|clean.*disconnect" 

Length of output: 29877


Script:

#!/bin/bash
# Search for ShutdownNotice implementation and related code
ast-grep --pattern 'func $$$ShutdownNotice(ctx context.Context, request ExtendedShutdownNoticeRequest) (messages.ShutdownNoticeResponse, error) {
  $$$
}'

# Search for any test files related to shutdown notice
fd -e go -x rg -l "ShutdownNotice.*Test"

Length of output: 199


Script:

#!/bin/bash
# Search for ShutdownNotice implementation with more context
rg -A 10 "func.*ShutdownNotice"

# Search for ShutdownNoticeRequest usage
rg "ShutdownNoticeRequest" --type go

Length of output: 6015

pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)

175-196: LGTM! Test covers important restart scenario

The test effectively verifies that the dispatcher's state is properly reset when restarted after stopping. The sequence of operations and expectations is well-structured.

pkg/transport/nclprotocol/dispatcher/dispatcher.go (2)

80-82: LGTM! Clean state accessor implementation

The State() method provides a clean interface to access the dispatcher's state.


96-98: LGTM! Proper state reset sequence

The state and recovery reset sequence is correctly placed before starting the dispatcher, ensuring a clean initial state.

pkg/transport/nclprotocol/compute/dataplane.go (1)

124-124: LGTM: Safer approach to message iteration

The change to use resolveStartingIterator instead of AfterSequenceNumberIterator ensures no messages are missed when a compute node restarts, prioritizing reliability over optimization.

Let's verify the impact on message processing:

✅ Verification successful

Based on the search results, I can now generate the final response:

LGTM: Change to resolveStartingIterator is well-implemented and isolated

The change from AfterSequenceNumberIterator to resolveStartingIterator is a defensive improvement that ensures no messages are missed during compute node restarts. The implementation is well-documented and its behavior is explicitly commented to always return TrimHorizonIterator regardless of the lastReceivedSeqNum parameter.

The codebase search confirms:

  • The change is isolated to compute dataplane's message processing
  • The orchestrator dataplane continues to use AfterSequenceNumberIterator
  • The watcher library properly supports both iterator types
  • No other components rely on the compute dataplane's specific iterator choice
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any custom sequence number handling in the codebase
# that might be affected by this change

# Look for sequence number related code
rg -A 3 "AfterSequenceNumberIterator|TrimHorizonIterator|lastReceivedSeqNum"

Length of output: 27891

pkg/transport/nclprotocol/compute/dataplane_test.go (1)

173-175: LGTM: Improved test readability

pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1)

257-293: LGTM: Thorough verification of state reset behavior

The test effectively verifies that the dispatcher properly resets its state on restart while maintaining the checkpoint, ensuring only new events are processed.

pkg/transport/nclprotocol/orchestrator/manager.go (2)

113-114: LGTM: Clean handler registration

The shutdown notice handler is properly registered alongside other control plane message handlers.


263-263: LGTM: Consistent metadata handling

The addition of message type metadata to responses follows the established pattern and improves message traceability.

Also applies to: 283-285

pkg/transport/nclprotocol/test/control_plane.go (3)

37-47: LGTM: Well-structured mock behavior extension

The shutdown response fields and callback follow the established pattern of other mock behaviors.


165-178: LGTM: Consistent mock handler implementation

The shutdown handler implementation follows the established pattern of other handlers in the mock responder.


211-218: LGTM: Thread-safe shutdown request tracking

The implementation properly handles concurrent access to shutdown records and follows the established pattern.

Also applies to: 247-253

pkg/node/requester.go (2)

330-330: LGTM: Simplified dependency injection

The direct use of nodesManager for NodeInfoStore simplifies the dependency chain while maintaining the correct interface.


55-55: Verify the interface reduction impact

The change from nodes.Store to nodes.Lookup suggests better interface segregation. Let's verify that all necessary methods are available in the new interface.

✅ Verification successful

Interface reduction is safe and improves design

The change from nodes.Store to nodes.Lookup is a positive refactoring that follows interface segregation principles. The evidence shows:

  1. nodes.Lookup is a focused interface providing only read operations (Get, GetByPrefix, List)
  2. nodes.Store embeds Lookup and adds mutation operations (Put, Delete)
  3. The requester.go file only shows NodeInfoStore being initialized and stored, with no direct method calls visible, suggesting minimal usage

This change reduces coupling by limiting the interface to only what's needed, making the code more maintainable and following the principle of least privilege.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for NodeInfoStore usage to verify interface compatibility
ast-grep --pattern 'r.NodeInfoStore.$_($_)'

Length of output: 45


Script:

#!/bin/bash
# Search for NodeInfoStore usage in the file
rg -A 2 "NodeInfoStore\." pkg/node/requester.go

# Search for Lookup interface definition
ast-grep --pattern 'type Lookup interface {
  $$$
}'

# Search for Store interface definition for comparison
ast-grep --pattern 'type Store interface {
  $$$
}'

Length of output: 5929


Script:

#!/bin/bash
# Search for actual usage of NodeInfoStore in requester.go to verify methods being called
rg -B 2 -A 2 "NodeInfoStore" pkg/node/requester.go

Length of output: 432

Comment on lines +108 to +111
if strings.Contains(err.Error(), "handshake required") {
cp.healthTracker.HandshakeRequired()
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid fragile string matching in error handling

Using strings.Contains(err.Error(), "handshake required") to detect handshake errors can be unreliable if error messages change.

Consider defining a specific error type or constant to identify handshake requirements more reliably.

@@ -36,6 +36,7 @@ func (ht *HealthTracker) MarkConnected() {
ht.health.LastSuccessfulHeartbeat = ht.clock.Now()
ht.health.ConsecutiveFailures = 0
ht.health.LastError = nil
ht.health.HandshakeRequired = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

HandshakeRequired field lacks test coverage

The test suite for HealthTracker does not include any assertions for the HandshakeRequired field state changes. While the connection state transitions are well tested, the HandshakeRequired field modifications in Connected, Disconnected, and Connecting states are not verified.

  • Add test assertions for HandshakeRequired field in TestInitialState
  • Add verification of HandshakeRequired state in TestConnectionStateTransitions
  • Consider adding specific test cases for HandshakeRequired state transitions
🔗 Analysis chain

Verify test coverage for HandshakeRequired reset

The HandshakeRequired field is reset in multiple places. Let's verify test coverage exists.

Also applies to: 50-50, 59-59


Let me gather more information about test coverage.


Let me check the actual test implementations to verify coverage of HandshakeRequired state changes.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for test cases covering HandshakeRequired reset
rg -l "TestHealthTracker.*(?:Connected|Disconnected|Connecting)" --type go

# Search for HandshakeRequired field usage in tests
rg "HandshakeRequired\s*=\s*(true|false)" --type go

Length of output: 485


Script:

#!/bin/bash
# Search for test files related to health tracker
fd "health.*test.*\.go$"

# Look for test cases that might be testing the HandshakeRequired behavior
rg -A 5 "func Test.*Health" --type go

# Look for mock or test setup of health tracker
rg "NewHealthTracker|health.*test" --type go

Length of output: 1605


Script:

#!/bin/bash
# Look at the health tracker test implementation
cat pkg/transport/nclprotocol/compute/health_tracker_test.go

Length of output: 3357

@@ -294,7 +324,7 @@ func (cm *ComputeManager) handleConnectionStateChange(event nodes.NodeConnection
if event.Current == models.NodeStates.DISCONNECTED {
if dataPlane, ok := cm.dataPlanes.LoadAndDelete(event.NodeID); ok {
if dp, ok := dataPlane.(*DataPlane); ok {
if err := dp.Stop(context.Background()); err != nil {
if err := dp.Stop(context.TODO()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Context should be propagated from the event handler

The use of context.TODO() in the handleConnectionStateChange method is not ideal as this is an event handler that should propagate context from its caller. Looking at the implementation:

  • The Stop method across the codebase consistently accepts and uses context for timeout/cancellation control
  • The event handler is dealing with node disconnection, which should respect shutdown timeouts
  • Other similar operations in the codebase properly propagate context

The method signature should be updated to:

func (cm *ComputeManager) handleConnectionStateChange(ctx context.Context, event nodes.NodeConnectionEvent)
🔗 Analysis chain

Verify context handling strategy

The change from context.Background() to context.TODO() suggests that proper context handling needs to be implemented. Consider whether a context should be passed from the caller instead.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for similar patterns of context handling in connection state changes
rg -A 5 'context\.(Background|TODO)\(\).*Stop'

Length of output: 48


Script:

#!/bin/bash
# Let's first find the context of this Stop method usage
ast-grep --pattern 'func handleConnectionStateChange($$$) {
  $$$
  Stop($$$)
  $$$
}'

# Also search for Stop method definition and its usage patterns
ast-grep --pattern 'Stop($$$)'

# Let's also check for any context parameter in the function signature
ast-grep --pattern 'func ($_ $_) Stop(ctx context.Context) $_'

Length of output: 19802


Script:

#!/bin/bash
# Let's find the handleConnectionStateChange function to understand the context
ast-grep --pattern 'func ($_ $_) handleConnectionStateChange($$$) {
  $$$
}'

Length of output: 1803

@wdbaruni wdbaruni closed this Dec 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant