Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Dec 8, 2024
1 parent 894687e commit d35d183
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func createNodeManager(ctx context.Context, cfg NodeConfig, natsConn *nats.Conn)
return nil, nil, pkgerrors.Wrap(err, "failed to create node info store using NATS transport connection info")
}

nodeManager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
nodeManager, err := nodes.NewManager(nodes.ManagerParams{
Store: nodeInfoStore,
NodeDisconnectedAfter: cfg.BacalhauConfig.Orchestrator.NodeManager.DisconnectTimeout.AsTimeDuration(),
ManualApproval: cfg.BacalhauConfig.Orchestrator.NodeManager.ManualApproval,
Expand Down
160 changes: 135 additions & 25 deletions pkg/orchestrator/nodes/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,17 @@ const (
)

// nodesManager handles node lifecycle, health checking, and state management.
// It tracks node connection states, handles handshakes and heartbeats, and
// maintains node membership status.
// It maintains both in-memory state for fast access and persistent storage
// for durability. The manager provides:
// - Node registration and handshake handling
// - Health monitoring via heartbeats
// - Connection state tracking with notifications
// - Resource capacity monitoring
// - Background state persistence
//
// Thread safety is ensured through sync.Map for live state and proper mutex
// usage for control operations. Background tasks handle health checks and
// state persistence with configurable intervals.
type nodesManager struct {
// Core dependencies
store Store // Persistent storage for node states
Expand Down Expand Up @@ -61,28 +70,44 @@ type nodesManager struct {
}
}

// NodeManagerParams holds the configuration for creating a new nodesManager
type NodeManagerParams struct {
Store Store // Required: persistent storage
Clock clock.Clock // Optional: defaults to real clock
NodeDisconnectedAfter time.Duration // Required: timeout for node health
HealthCheckFrequency time.Duration // Optional: defaults to 1/3 of disconnectedAfter
ManualApproval bool // Whether nodes need manual approval
PersistInterval time.Duration // Interval for state persistence
PersistTimeout time.Duration // Timeout for state persistence
ShutdownTimeout time.Duration // Timeout for graceful shutdown
// ManagerParams holds configuration for creating a new node manager.
type ManagerParams struct {
// Store provides persistent storage for node states
Store Store

// Clock is the time source (defaults to real clock if nil)
Clock clock.Clock

// NodeDisconnectedAfter is how long to wait before marking nodes as disconnected
NodeDisconnectedAfter time.Duration

// HealthCheckFrequency is how often to check node health (optional)
HealthCheckFrequency time.Duration

// ManualApproval determines if nodes require manual approval
ManualApproval bool

// PersistInterval is how often to persist state changes (optional)
PersistInterval time.Duration

// PersistTimeout is the timeout for persistence operations (optional)
PersistTimeout time.Duration

// ShutdownTimeout is the timeout for graceful shutdown (optional)
ShutdownTimeout time.Duration
}

// trackedLiveState holds the current resource state for a node
// trackedLiveState holds the runtime state for an active node.
// This includes current connection status and resource utilization.
type trackedLiveState struct {
connectionState models.ConnectionState
availableCapacity models.Resources
queueUsedCapacity models.Resources
}

// NewNodeManager creates a new nodesManager with the given configuration.
// NewManager creates a new nodesManager with the given configuration.
// It initializes the manager but does not start background tasks - call Start() for that.
func NewNodeManager(params NodeManagerParams) (Manager, error) {
func NewManager(params ManagerParams) (Manager, error) {
if params.Clock == nil {
params.Clock = clock.New()
}
Expand Down Expand Up @@ -128,9 +153,12 @@ func NewNodeManager(params NodeManagerParams) (Manager, error) {
}, nil
}

// Start initializes the nodesManager and begins background tasks.
// It loads existing node states from storage and starts health checking.
// Returns an error if the manager is already running or if state loading fails.
// Start initializes the manager and begins background tasks.
// It launches health checking and state persistence routines.
// The manager will monitor the provided context and initiate
// shutdown if it is cancelled.
//
// Returns error if already running or fails to initialize.
func (n *nodesManager) Start(ctx context.Context) error {
n.mu.Lock()
defer n.mu.Unlock()
Expand Down Expand Up @@ -168,9 +196,12 @@ func (n *nodesManager) Start(ctx context.Context) error {
return nil
}

// Stop gracefully shuts down the nodesManager and its background tasks.
// It waits for tasks to complete or until the context is cancelled.
// Returns nil if already stopped or successfully stopped, context error if timed out.
// Stop gracefully shuts down the manager and its background tasks.
// It ensures final state persistence and waits for tasks to complete
// up to the configured shutdown timeout.
//
// Returns nil if successfully stopped or already stopped,
// context.Err() if shutdown times out.
func (n *nodesManager) Stop(ctx context.Context) error {
n.mu.Lock()
if !n.running {
Expand All @@ -196,7 +227,7 @@ func (n *nodesManager) Stop(ctx context.Context) error {
}
}

// Running returns true if the nodesManager is currently running.
// Running returns whether the manager is currently active.
func (n *nodesManager) Running() bool {
n.mu.RLock()
defer n.mu.RUnlock()
Expand All @@ -214,7 +245,9 @@ func (n *nodesManager) startBackgroundTask(name string, fn func()) {
}

// healthCheckLoop runs periodic health checks on all nodes.
// It marks nodes as disconnected if they haven't sent a heartbeat within the timeout period.
// It runs on the configured check frequency and marks nodes
// as disconnected if they haven't sent a heartbeat within
// the disconnect timeout period.
func (n *nodesManager) healthCheckLoop() {
ticker := n.clock.Ticker(n.heartbeatCheckFrequency)
defer ticker.Stop()
Expand Down Expand Up @@ -285,6 +318,9 @@ func (n *nodesManager) checkNodeHealth() {
}
}

// persistenceLoop periodically persists live state changes to storage.
// It runs on the configured persist interval and ensures durability
// of connection state and resource tracking.
func (n *nodesManager) persistenceLoop() {
ticker := n.clock.Ticker(n.persistInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -335,6 +371,19 @@ func (n *nodesManager) persistLiveState() {
})
}

// Handshake handles initial node registration or reconnection.
// For new nodes, it:
// - Validates the node type
// - Creates initial node state
// - Assigns default approval status
//
// For existing nodes, it:
// - Verifies the node isn't rejected
// - Restores previous membership status
// - Updates connection state
//
// Returns HandshakeResponse with acceptance status and reason.
// The LastComputeSeqNum is included for message ordering.
func (n *nodesManager) Handshake(
ctx context.Context, request messages.HandshakeRequest) (messages.HandshakeResponse, error) {
log.Debug().Msgf("handshake request received with info %+v", request.NodeInfo)
Expand Down Expand Up @@ -418,6 +467,13 @@ func (n *nodesManager) Handshake(
}, nil
}

// UpdateNodeInfo updates a node's information and capabilities.
// The node must:
// - Be already registered (handshake completed)
// - Not be in rejected state
//
// Returns UpdateNodeInfoResponse with acceptance status and reason.

func (n *nodesManager) UpdateNodeInfo(
ctx context.Context, request messages.UpdateNodeInfoRequest) (messages.UpdateNodeInfoResponse, error) {
existing, err := n.Get(ctx, request.NodeInfo.ID())
Expand Down Expand Up @@ -447,6 +503,14 @@ func (n *nodesManager) UpdateNodeInfo(
}, nil
}

// Heartbeat processes a node's heartbeat message and updates its state.
// It updates:
// - Last heartbeat timestamp
// - Message sequence numbers
// - Resource capacities
//
// The update is retried up to 3 times on concurrent modification.
// Returns HeartbeatResponse with the last known compute sequence number.
func (n *nodesManager) Heartbeat(
ctx context.Context, request ExtendedHeartbeatRequest) (messages.HeartbeatResponse, error) {
// Retry loop for concurrent updates, such as handshakes or health checks
Expand Down Expand Up @@ -489,6 +553,14 @@ func (n *nodesManager) Heartbeat(
return messages.HeartbeatResponse{}, errors.New("concurrent update conflict")
}

// ApproveNode approves a node for cluster participation.
// The node must be in PENDING state. The operation updates
// both persistent and live state.
//
// Returns error if:
// - Node not found
// - Already approved
// - Storage update fails
func (n *nodesManager) ApproveNode(ctx context.Context, nodeID string) error {
state, err := n.GetByPrefix(ctx, nodeID)
if err != nil {
Expand All @@ -503,6 +575,17 @@ func (n *nodesManager) ApproveNode(ctx context.Context, nodeID string) error {
return n.store.Put(ctx, state)
}

// RejectNode rejects a node from cluster participation.
// The operation:
// - Updates node to REJECTED state
// - Marks node as disconnected
// - Removes live state tracking
// - Triggers connection state change notification
//
// Returns error if:
// - Node not found
// - Already rejected
// - Storage update fails
func (n *nodesManager) RejectNode(ctx context.Context, nodeID string) error {
state, err := n.GetByPrefix(ctx, nodeID)
if err != nil {
Expand Down Expand Up @@ -538,6 +621,15 @@ func (n *nodesManager) RejectNode(ctx context.Context, nodeID string) error {
return nil
}

// DeleteNode removes a node from the cluster.
// The operation:
// - Removes node from persistent storage
// - Removes live state tracking
// - Triggers connection state change notification if was connected
//
// Returns error if:
// - Node not found
// - Storage deletion fails
func (n *nodesManager) DeleteNode(ctx context.Context, nodeID string) error {
state, err := n.store.GetByPrefix(ctx, nodeID)
if err != nil {
Expand All @@ -564,6 +656,14 @@ func (n *nodesManager) DeleteNode(ctx context.Context, nodeID string) error {
return nil
}

// OnConnectionStateChange registers a handler for node connection state changes.
// Handlers are called synchronously when node state transitions between:
// - CONNECTED <-> DISCONNECTED
//
// Events include:
// - Previous and current state
// - Timestamp of change
// - Node identifier
func (n *nodesManager) OnConnectionStateChange(handler ConnectionStateChangeHandler) {
n.handlers.Lock()
defer n.handlers.Unlock()
Expand All @@ -580,7 +680,8 @@ func (n *nodesManager) notifyConnectionStateChange(event NodeConnectionEvent) {
}
}

// isNodeDisconnected checks if a node is considered disconnected based on its connection state
// isNodeDisconnected determines if a node should be considered disconnected
// based on its last heartbeat time and the configured disconnect timeout.
func (n *nodesManager) isNodeDisconnected(connState models.ConnectionState) bool {
return connState.Status == models.NodeStates.CONNECTED &&
n.clock.Since(connState.LastHeartbeat) > n.disconnectedAfter
Expand Down Expand Up @@ -621,7 +722,16 @@ func (n *nodesManager) List(ctx context.Context, filters ...NodeStateFilter) ([]
return states, nil
}

// enrichState adds both connection state and resources to the node state
// enrichState adds live tracking data to a node state.
// For connected nodes, it adds:
// - Current connection state
// - Available resource capacity
// - Queue resource usage
//
// For disconnected nodes:
// - Marks as disconnected
// - Clears resource tracking
// - Preserves disconnect timestamp
func (n *nodesManager) enrichState(state *models.NodeState) {
// If we have live state, use it
if entry, ok := n.liveState.Load(state.Info.ID()); ok {
Expand Down
18 changes: 9 additions & 9 deletions pkg/orchestrator/nodes/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *NodeManagerTestSuite) SetupTest() {
TTL: time.Hour,
})

manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand Down Expand Up @@ -392,7 +392,7 @@ func (s *NodeManagerTestSuite) TestConcurrentOperations() {
var wg sync.WaitGroup
wg.Add(numNodes)

manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: clock.New(), // Use real clock for this test
NodeDisconnectedAfter: s.disconnected,
Expand Down Expand Up @@ -551,7 +551,7 @@ func (s *NodeManagerTestSuite) TestConnectionStateChangeEvents() {

func (s *NodeManagerTestSuite) TestStartStop() {
// Create a new manager without starting it
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand All @@ -577,7 +577,7 @@ func (s *NodeManagerTestSuite) TestStartStop() {
}
func (s *NodeManagerTestSuite) TestStartAlreadyStarted() {
// Create and start a manager
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand All @@ -602,7 +602,7 @@ func (s *NodeManagerTestSuite) TestStartAlreadyStarted() {
}

func (s *NodeManagerTestSuite) TestStartContextCancellation() {
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand All @@ -629,7 +629,7 @@ func (s *NodeManagerTestSuite) TestStartContextCancellation() {

func (s *NodeManagerTestSuite) TestStopAlreadyStopped() {
// Create and start a manager
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand All @@ -656,7 +656,7 @@ func (s *NodeManagerTestSuite) TestStopAlreadyStopped() {
func (s *NodeManagerTestSuite) TestPeriodicStatePersistence() {
// Create manager with short persist interval
persistInterval := 100 * time.Millisecond
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand Down Expand Up @@ -719,7 +719,7 @@ func (s *NodeManagerTestSuite) TestPeriodicStatePersistence() {

func (s *NodeManagerTestSuite) TestStatePersistenceOnStop() {
// Create manager
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand Down Expand Up @@ -763,7 +763,7 @@ func (s *NodeManagerTestSuite) TestStatePersistenceOnStop() {

func (s *NodeManagerTestSuite) TestPersistenceWithContextCancellation() {
// Create manager with short persist interval
manager, err := nodes.NewNodeManager(nodes.NodeManagerParams{
manager, err := nodes.NewManager(nodes.ManagerParams{
Store: s.store,
Clock: s.clock,
NodeDisconnectedAfter: s.disconnected,
Expand Down
Loading

0 comments on commit d35d183

Please sign in to comment.