-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
better exchange of starting seqNum during handshakes #4766
better exchange of starting seqNum during handshakes #4766
Conversation
WalkthroughThis pull request introduces several modifications across multiple packages related to node management, message dispatching, and connection handling. The changes primarily focus on improving sequence number tracking, checkpointing mechanisms, and state management in the compute and orchestrator components. Key updates include refactoring node information storage, enhancing message starting positions, adding state retrieval methods, and improving metadata handling in message responses. Changes
Sequence DiagramsequenceDiagram
participant Compute
participant Orchestrator
participant Dispatcher
Compute->>Orchestrator: Handshake Request
Orchestrator-->>Compute: Handshake Response (with StartingSeqNum)
Compute->>Dispatcher: Start with Resolved Iterator
Dispatcher->>Dispatcher: Checkpoint Progress
Compute->>Orchestrator: Heartbeat
Orchestrator-->>Compute: Heartbeat Response (with Metadata)
Possibly related PRs
Suggested reviewers
Poem
Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
pkg/orchestrator/nodes/manager.go (1)
Line range hint
836-852
: Handle potential errors inselfRegister
methodIn the
selfRegister
method, when callingn.Get(ctx, nodeInfo.ID())
, the error handling only checks forNotFoundError
. Consider handling other potential errors as well.Apply this diff to enhance error handling:
state, err := n.Get(ctx, nodeInfo.ID()) if err != nil { if !bacerrors.IsErrorWithCode(err, bacerrors.NotFoundError) { - return bacerrors.New("failed to self-register node: %v", err). + return fmt.Errorf("failed to get node state during self-registration: %w", err) } state = models.NodeState{ Info: nodeInfo, Membership: models.NodeMembership.APPROVED, ConnectionState: models.ConnectionState{ ConnectedSince: n.clock.Now().UTC(), }, } }
🧹 Nitpick comments (13)
pkg/transport/nclprotocol/dispatcher/state.go (1)
39-48
: Add documentation for the GetState method.The method implementation is correct with proper thread-safety, but it's missing documentation that describes its purpose and return value.
-// GetState returns +// GetState returns a snapshot of the current dispatcher state including +// the last acknowledged sequence number, last observed sequence number, +// and last checkpoint position. func (s *dispatcherState) GetState() State {pkg/transport/nclprotocol/dispatcher/dispatcher.go (1)
241-259
: Consider returning the error from doCheckpoint.The implementation is well-structured with proper timeout and error handling. However, consider returning the error to allow callers to handle checkpoint failures, especially during shutdown.
-func (d *Dispatcher) doCheckpoint(ctx context.Context) { +func (d *Dispatcher) doCheckpoint(ctx context.Context) error { checkpointTarget := d.state.getCheckpointSeqNum() if checkpointTarget == 0 { // Nothing new to checkpoint - return + return nil } checkpointCtx, cancel := context.WithTimeout(ctx, d.config.CheckpointTimeout) defer cancel() if err := d.watcher.Checkpoint(checkpointCtx, checkpointTarget); err != nil { log.Error().Err(err). Uint64("target", checkpointTarget). Msg("Failed to checkpoint watcher") - return + return fmt.Errorf("failed to checkpoint watcher: %w", err) } d.state.updateLastCheckpoint(checkpointTarget) + return nil }pkg/orchestrator/nodes/manager.go (4)
465-466
: Remove redundant empty linesThere are consecutive empty lines at lines 465-466. Removing unnecessary empty lines improves code readability.
Apply this diff:
- } - + }
467-470
: Handle potential error when resolving starting sequence numberIn the
Handshake
method, when callingn.resolveStartingOrchestratorSeqNum
, ensure that the error is checked correctly. This is already being done, but consider providing more context in the error message.Apply this diff to enhance the error message:
state.ConnectionState.LastOrchestratorSeqNum, err = n.resolveStartingOrchestratorSeqNum(ctx, isReconnect, existing) if err != nil { - return messages.HandshakeResponse{}, fmt.Errorf("failed to resolve starting sequence number: %w", err) + return messages.HandshakeResponse{}, fmt.Errorf("failed to resolve starting orchestrator sequence number for node %s: %w", request.NodeInfo.ID(), err) }
754-781
: Improve documentation and error messages inresolveStartingOrchestratorSeqNum
The comments in this function are helpful but consider making them more concise and clear. Additionally, when logging errors, provide more context.
Apply this diff:
// resolveStartingOrchestratorSeqNum determines where a node should start receiving messages from. -// -// For reconnecting nodes, we trust the sequence numbers from our store rather than what the -// compute node reports. This prevents issues with compute nodes restarting with same ID but -// fresh state, where they would ask to start from 0. +// For reconnecting nodes, use the stored sequence number to prevent issues with nodes restarting with the same ID but fresh state. // For new nodes, we start them from the latest sequence number to avoid overwhelming them // with historical events. - -// TODO: Add support for snapshots to allow nodes to catch up on missed state without -// replaying all historical events. For now, we always start from latest to avoid -// overwhelming nodes that have been down for a long time. func (n *nodesManager) resolveStartingOrchestratorSeqNum( ctx context.Context, isReconnect bool, existing models.NodeState) (uint64, error) { if isReconnect { // For reconnecting nodes, trust our stored sequence number return existing.ConnectionState.LastOrchestratorSeqNum, nil } // For new nodes, start from latest sequence number latestSeq, err := n.eventstore.GetLatestEventNum(ctx) if err != nil { - return 0, fmt.Errorf("failed to get latest event number: %w", err) + return 0, fmt.Errorf("failed to get latest event number from event store: %w", err) } return latestSeq, nil }
795-798
: Add TODO for sequence number advancement logicThe current implementation allows sequence numbers to move backwards. A TODO comment exists, but consider creating a GitHub issue to track this enhancement.
Would you like me to open a GitHub issue to track the implementation of proper sequence number comparison logic to ensure sequence numbers only advance forward?
pkg/transport/nclprotocol/compute/dataplane.go (1)
177-194
: Clarify the logic inresolveStartingIterator
methodThe method
resolveStartingIterator
currently ignores thelastReceivedSeqNum
. If this is intentional, consider documenting the reasoning more clearly and possibly logging a warning iflastReceivedSeqNum
is non-zero.Enhance the method as follows:
func (dp *DataPlane) resolveStartingIterator(lastReceivedSeqNum uint64) watcher.EventIterator { if lastReceivedSeqNum != 0 { log.Warn().Uint64("lastReceivedSeqNum", lastReceivedSeqNum).Msg("Ignoring non-zero lastReceivedSeqNum; starting from trim horizon") } return watcher.TrimHorizonIterator() }This makes it explicit that
lastReceivedSeqNum
is currently not used and informs if it's non-zero.pkg/transport/nclprotocol/compute/controlplane.go (1)
227-229
: Handle checkpointing error when stoppingIn the
Stop
method, ifcheckpointProgress
fails, the error is logged but not returned. Consider returning the error to inform the caller.Modify the code to return the error:
if err := cp.checkpointProgress(ctx); err != nil { log.Error().Err(err).Msg("Failed to checkpoint progress before stopping") + return err } return nil
This way, the caller is aware of the failure during the shutdown process.
pkg/transport/nclprotocol/compute/dataplane_test.go (2)
173-175
: Simplify context cancellation handlingIn the
TestStartupFailureCleanup
test case, the select statement waiting for context cancellation can be simplified, as the context will be cancelled immediately.Simplify the code:
s.Require().True(errors.Is(s.ctx.Err(), context.Canceled), "Context should be canceled")This directly checks that the context is indeed canceled.
248-299
: Improve test coverage with edge casesIn the
TestStartingPosition
method, consider adding edge cases whereLastReceivedSeqNum
is zero and when there are no initial events to ensure the data plane behaves correctly in all scenarios.Add additional sub-tests within
TestStartingPosition
for these cases.pkg/node/requester.go (1)
55-55
: LGTM: Type change aligns with "trust your own state" approachChanging
NodeInfoStore
fromnodes.Store
tonodes.Lookup
enforces read-only access to node information, which supports the PR's objective of having nodes trust their own state rather than allowing external modifications.pkg/transport/nclprotocol/compute/manager_test.go (2)
118-123
: Document the purpose of different sequence numbers.While the test correctly sets up different sequence numbers for the compute node (124) and orchestrator (100), it would be helpful to add a comment explaining why these specific numbers were chosen and what scenario they're testing.
// Configure handshake response to return a different sequence number +// Using a lower sequence number (100) than the compute node's checkpoint (124) +// to verify that the compute node trusts its own state over orchestrator's suggestion handshakeSeqNum := uint64(100)
Line range hint
114-182
: Review sequence number synchronization strategy.There appears to be a fundamental disconnect between the PR objectives and the test implementation:
- PR Objectives state: "trust your own state" approach where compute node relies on its local state
- Test Implementation shows: compute node adopts orchestrator's sequence number (100) despite having its own state (124)
This discrepancy needs to be resolved to ensure the implementation aligns with the intended design.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
pkg/node/requester.go
(3 hunks)pkg/orchestrator/nodes/manager.go
(5 hunks)pkg/transport/nclprotocol/compute/controlplane.go
(1 hunks)pkg/transport/nclprotocol/compute/dataplane.go
(2 hunks)pkg/transport/nclprotocol/compute/dataplane_test.go
(2 hunks)pkg/transport/nclprotocol/compute/manager.go
(1 hunks)pkg/transport/nclprotocol/compute/manager_test.go
(4 hunks)pkg/transport/nclprotocol/dispatcher/dispatcher.go
(2 hunks)pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go
(1 hunks)pkg/transport/nclprotocol/dispatcher/state.go
(2 hunks)pkg/transport/nclprotocol/orchestrator/manager.go
(3 hunks)
🔇 Additional comments (14)
pkg/transport/nclprotocol/dispatcher/state.go (1)
14-18
: LGTM! Well-designed state encapsulation.
The State
struct effectively encapsulates the sequence numbers and checkpoint state with clear, self-descriptive field names.
pkg/transport/nclprotocol/dispatcher/dispatcher.go (2)
80-82
: LGTM! Clean state access method.
The method provides a clean interface to access dispatcher state information.
233-233
: LGTM! Improved checkpoint consistency.
The changes ensure consistent checkpoint handling for both periodic checkpoints and shutdown scenarios.
Also applies to: 236-236
pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1)
231-255
: LGTM! Well-structured test for shutdown checkpoint behavior.
The test effectively verifies the final checkpoint behavior during shutdown by:
- Isolating the shutdown checkpoint from periodic checkpoints
- Ensuring state synchronization before shutdown
- Verifying the final checkpoint value
pkg/orchestrator/nodes/manager.go (1)
425-425
:
Check for error handling when retrieving existing node state
In the Handshake
method, the variable err
is not declared before being used in line 425. Ensure that the error returned by n.Get(ctx, request.NodeInfo.ID())
is properly handled.
Apply this diff to fix the issue:
- existing, err := n.Get(ctx, request.NodeInfo.ID())
+ existing, err := n.Get(ctx, request.NodeInfo.ID())
+ if err != nil {
+ // handle error appropriately
+ }
Likely invalid or redundant comment.
pkg/transport/nclprotocol/compute/dataplane.go (1)
124-124
: Ensure correct context is passed to watcher.New
When creating the dispatcher watcher with watcher.New()
, ensure that the context passed is managed appropriately to prevent leaks or premature cancellations.
Please confirm that the context ctx
is suitable for the lifetime of the watcher. If ctx
can be cancelled before the watcher is done, consider using a separate context.
pkg/transport/nclprotocol/orchestrator/manager.go (1)
261-261
: LGTM: Added message type metadata to responses
Good practice to include message type metadata in responses. This improves message handling and type safety in the transport layer.
Also applies to: 281-282
pkg/transport/nclprotocol/compute/manager.go (2)
316-320
: LGTM: Core implementation of "trust your own state" approach
The implementation correctly updates the sequence tracker with the orchestrator's number after handshake. The comment clearly explains the rationale that the orchestrator may have reset or chosen a different starting point.
This change is central to the PR's objective of improving sequence number synchronization during handshakes.
316-320
: Verify sequence number handling consistency
Let's verify that sequence number handling is consistent across the codebase.
✅ Verification successful
Based on the results, I can now generate the final response:
Sequence number handling is thread-safe and consistent
The sequence number handling implementation is correct and safe because:
- All sequence operations use atomic operations via
SequenceTracker
- Updates are centralized in the
SequenceTracker
type with atomic Store/Load operations - The handshake's sequence number update follows the same pattern as other usages in the codebase
Key findings:
UpdateLastSeqNum
is implemented using atomic operations (Store
)GetLastSeqNum
is implemented using atomic operations (Load
)- All sequence number updates across the codebase consistently use these atomic methods
- No direct access to the underlying sequence number field was found
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for consistent sequence number handling
# Look for other places where sequence numbers are updated
rg -A 3 "UpdateLastSeqNum|GetLastSeqNum"
# Check for potential race conditions in sequence number handling
ast-grep --pattern 'func $_(ctx context.Context, $$$) {
$$$
$x.UpdateLastSeqNum($_)
$$$
}'
Length of output: 3641
pkg/node/requester.go (1)
80-83
: LGTM: Improved encapsulation of node state management
The removal of nodeStore
from createNodeManager
's return values and using nodesManager
directly in the Requester
struct initialization improves encapsulation of node state management. This change:
- Centralizes node state management within the manager
- Reduces the risk of inconsistent state by eliminating direct store access
- Supports the PR's goal of reliable sequence number synchronization
Let's verify that the node manager is the single source of truth for node state:
Also applies to: 330-334
pkg/transport/nclprotocol/compute/manager_test.go (4)
114-117
: LGTM! Initial checkpoint setup aligns with PR objectives.
The setup correctly simulates a compute node with existing local state (sequence number 124), which aligns with the PR's "trust your own state" approach.
132-136
: LGTM! Handshake verification ensures compute node communicates its state.
The test correctly verifies that the compute node includes its local sequence number in the handshake, which is crucial for the new "trust your own state" approach.
155-163
:
Verify if heartbeat sequence number behavior aligns with PR objectives.
The test expects the compute node to use the orchestrator's sequence number (100) in heartbeats, but this seems to contradict the PR's "trust your own state" approach. According to the PR objectives, shouldn't the compute node continue using its own sequence number (124)?
182-182
:
Verify sequence number behavior in subsequent heartbeats.
The test comment suggests that using the orchestrator's sequence number in subsequent heartbeats is intentional. This needs verification against the PR objectives which emphasize "trust your own state".
✅ Verification successful
Let me gather more context about the sequence number implementation to verify the behavior.
Let me search for more specific information about sequence tracking implementation.
Based on the gathered evidence, I can now provide a final response:
Sequence number handling in heartbeats is correctly implemented
The implementation correctly maintains sequence tracking through the SequenceTracker
component, which is consistently used across handshakes and heartbeats. The code shows that:
- The sequence number is properly initialized from the handshake response (
StartingOrchestratorSeqNum
) - The tracker is continuously updated and maintains the last processed sequence
- Both control plane and connection manager use the same sequence tracker instance
- The sequence number is checkpointed for persistence
The comment "Should continue using sequence number from handshake" aligns with the implementation and does not conflict with "trust your own state" as the sequence tracker maintains the state internally.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for documentation or comments about sequence number handling
rg -A 5 "sequence number" --type md
Length of output: 2139
Script:
#!/bin/bash
# Search for sequence number handling in compute manager and related files
rg -A 10 "LastOrchestratorSeqNum" pkg/transport/nclprotocol/compute/
Length of output: 5971
Script:
#!/bin/bash
# Look for heartbeat sequence number handling
ast-grep --pattern 'func $$$Heartbeat$$$($$$) {
$$$
}'
Length of output: 58
Script:
#!/bin/bash
# Search for sequence number validation or verification logic
rg -A 5 "validateSeq|verifySeq|checkSeq" pkg/transport/nclprotocol/
Length of output: 69
Script:
#!/bin/bash
# Look for sequence tracker implementation
rg -A 10 "incomingSeqTracker" pkg/transport/nclprotocol/
# Also search for sequence tracking related types/interfaces
ast-grep --pattern 'type $$$SeqTracker$$$ interface {
$$$
}'
Length of output: 9310
Script:
#!/bin/bash
# Search for sequence number handling in messages
rg -A 5 "SeqNum" pkg/transport/nclprotocol/messages/
Length of output: 128
if request.LastComputeSeqNum > 0 { | ||
updated.LastComputeSeqNum = request.LastComputeSeqNum | ||
} | ||
n.updateSequenceNumbers(&updated, request.LastOrchestratorSeqNum, request.LastComputeSeqNum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider sequence number validation before updating
In the Heartbeat
method, sequence numbers are updated without validation. It's advisable to ensure that sequence numbers only advance forward to prevent potential rollback due to race conditions.
Implement validation logic to ensure LastOrchestratorSeqNum
and LastComputeSeqNum
do not decrease:
func (n *nodesManager) updateSequenceNumbers(state *models.ConnectionState, orchestratorSeq, computeSeq uint64) {
if orchestratorSeq > state.LastOrchestratorSeqNum {
state.LastOrchestratorSeqNum = orchestratorSeq
}
if computeSeq > state.LastComputeSeqNum {
state.LastComputeSeqNum = computeSeq
}
}
Problem
When nodes restart or lose state, the current sequence number synchronization can lead to message gaps or duplicates. This occurs because nodes unconditionally trust each other's sequence numbers during handshake, without considering local state recovery scenarios.
Solution
This PR implements a "trust your own state" approach for sequence number synchronization during handshakes. Each node relies on its local state to determine its starting point, while using the handshake to inform the other party of its position.
Changes in Handshake Flow
Orchestrator Behavior
Compute Node Behavior
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests
Documentation