Skip to content

Commit

Permalink
new node manager decoupled from transport layer (#4728)
Browse files Browse the repository at this point in the history
Refactor: New Node Manager Implementation

This PR introduces a new node manager implementation that decouples node
lifecycle management from the transport layer. The existing
implementation was tightly coupled with bprotocol specifics, making it
difficult to evolve our networking stack. It also lacked graceful
handling of node re-connection. This refactoring is a key step towards
our new transport layer architecture.

Key Changes:
- Separates node management logic into a clean, transport-agnostic
interface
- Implements reliable connection state tracking with configurable
heartbeat monitoring
- Introduces sequence number tracking for both compute and orchestrator
messages to support ordered delivery
- Maintains fast in-memory state with periodic persistence for
durability
- Provides event notifications for node connection state changes
- Implements proper thread safety for concurrent operations

Visible Changes:
- Node state APIs now include additional fields for connection state and
sequence tracking


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Introduced a custom dictionary for spell checking with new entries.
	- Added a `HeartbeatClient` for managing heartbeat messages.
- Implemented a new `Server` structure for orchestrating node
management.
	- Added new models for connection state management in the Swagger API.

- **Bug Fixes**
	- Updated logic for filtering nodes based on connection status.
	- Enhanced error handling in command execution.

- **Documentation**
- Enhanced Swagger API documentation with new models and updated
descriptions.

- **Tests**
- Added comprehensive test suites for heartbeat functionality and node
management.
- Updated existing tests to reflect changes in data structures and
types, particularly for node approval and rejection scenarios.

- **Chores**
- Removed obsolete files related to previous node management
implementations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
wdbaruni authored Dec 8, 2024
1 parent 46ea6e5 commit e80e1a5
Show file tree
Hide file tree
Showing 76 changed files with 3,343 additions and 2,127 deletions.
2 changes: 2 additions & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,5 @@ IMDS
tlsca
Lenf
traefik
bprotocolcompute
bprotocolorchestrator
2 changes: 1 addition & 1 deletion cmd/cli/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (o *NodeOptions) runNode(cmd *cobra.Command, api client.API) error {
return fmt.Errorf("could not get server node: %w", err)
}

writeErr := output.OutputOneNonTabular(cmd, o.OutputOpts, response.NodeState)
writeErr := output.OutputOneNonTabular(cmd, o.OutputOpts, response.NodeInfo)
if writeErr != nil {
return fmt.Errorf("failed to write node: %w", writeErr)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/cli/agent/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ func (s *NodeSuite) TestNodeJSONOutput() {
_, out, err := s.ExecuteTestCobraCommand("agent", "node", "--output", string(output.JSONFormat))
s.Require().NoError(err, "Could not request node with json output.")

nodeInfo := &models.NodeState{}
nodeInfo := &models.NodeInfo{}
err = marshaller.JSONUnmarshalWithMax([]byte(out), &nodeInfo)
s.Require().NoError(err, "Could not unmarshal the output into json - %+v", out)
s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in json.")
s.Require().Equal(s.Node.ID, nodeInfo.ID(), "Node ID does not match in json.")
}

func (s *NodeSuite) TestNodeYAMLOutput() {
_, out, err := s.ExecuteTestCobraCommand("agent", "node")
s.Require().NoError(err, "Could not request node with yaml output.")

nodeInfo := &models.NodeState{}
nodeInfo := &models.NodeInfo{}
err = marshaller.YAMLUnmarshalWithMax([]byte(out), &nodeInfo)
s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out)
s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in yaml.")
s.Require().Equal(s.Node.ID, nodeInfo.ID(), "Node ID does not match in yaml.")
}
3 changes: 2 additions & 1 deletion cmd/cli/node/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"

"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func (n *NodeActionCmd) run(cmd *cobra.Command, args []string, api client.API) e
Message: n.message,
})
if err != nil {
util.Fatal(cmd, fmt.Errorf("could not %s node %s: %w", n.action, nodeID, err), 1)
return bacerrors.Wrap(err, "failed to %s node %s", n.action, nodeID)
}

if response.Success {
Expand Down
13 changes: 7 additions & 6 deletions cmd/cli/node/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func (s *NodeActionSuite) TestListNodes() {
nodeID := cells[0]

// Try to approve, expect failure
_, out, err = s.ExecuteTestCobraCommand(
_, _, err = s.ExecuteTestCobraCommand(
"node",
"approve",
nodeID,
)
s.Require().NoError(err)
s.Require().Contains(out, "node already approved")
s.Require().Contains(out, nodeID)
s.Require().Error(err)
s.Require().ErrorContains(err, "already approved")
s.Require().ErrorContains(err, nodeID)

// Now reject the node
_, out, err = s.ExecuteTestCobraCommand(
Expand All @@ -65,8 +65,9 @@ func (s *NodeActionSuite) TestListNodes() {
"reject",
nodeID,
)
s.Require().NoError(err)
s.Require().Contains(out, "node already rejected")
s.Require().Error(err)
s.Require().ErrorContains(err, "already rejected")
s.Require().ErrorContains(err, nodeID)

// Set it to approve again
_, out, err = s.ExecuteTestCobraCommand(
Expand Down
20 changes: 10 additions & 10 deletions cmd/cli/node/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var alwaysColumns = []output.TableColumn[*models.NodeState]{
{
ColumnConfig: table.ColumnConfig{Name: "status"},
Value: func(ni *models.NodeState) string {
return ni.Connection.String()
return ni.ConnectionState.Status.String()
},
},
}
Expand Down Expand Up @@ -70,45 +70,45 @@ var toggleColumns = map[string][]output.TableColumn[*models.NodeState]{
"features": {
{
ColumnConfig: table.ColumnConfig{Name: "engines", WidthMax: maxLen(models.EngineNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.ExecutionEngines, " ")
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "inputs from", WidthMax: maxLen(models.StoragesNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.StorageSources, " ")
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "outputs", WidthMax: maxLen(models.PublisherNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.Publishers, " ")
}),
},
},
"capacity": {
{
ColumnConfig: table.ColumnConfig{Name: "cpu", WidthMax: len("1.0 / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%.1f / %.1f", cni.AvailableCapacity.CPU, cni.MaxCapacity.CPU)
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "memory", WidthMax: len("10.0 GB / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%s / %s", datasize.ByteSize(cni.AvailableCapacity.Memory).HR(), datasize.ByteSize(cni.MaxCapacity.Memory).HR())
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "disk", WidthMax: len("100.0 GB / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%s / %s", datasize.ByteSize(cni.AvailableCapacity.Disk).HR(), datasize.ByteSize(cni.MaxCapacity.Disk).HR())
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "gpu", WidthMax: len("1 / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%d / %d", cni.AvailableCapacity.GPU, cni.MaxCapacity.GPU)
}),
},
Expand All @@ -119,9 +119,9 @@ func maxLen(val []string) int {
return lo.Max(lo.Map[string, int](val, func(item string, index int) int { return len(item) })) + 1
}

func ifComputeNode(getFromCNInfo func(*models.ComputeNodeInfo) string) func(state *models.NodeState) string {
func ifComputeNode(getFromCNInfo func(models.ComputeNodeInfo) string) func(state *models.NodeState) string {
return func(ni *models.NodeState) string {
if ni.Info.ComputeNodeInfo == nil {
if !ni.Info.IsComputeNode() {
return ""
}
return getFromCNInfo(ni.Info.ComputeNodeInfo)
Expand Down
19 changes: 10 additions & 9 deletions pkg/compute/management_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/node/heartbeat"
"github.com/bacalhau-project/bacalhau/pkg/models/messages/legacy"
"github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol"
bprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/version"
)

type ManagementClientParams struct {
NodeID string
LabelsProvider models.LabelsProvider
ManagementProxy ManagementEndpoint
ManagementProxy bprotocol.ManagementEndpoint
NodeInfoDecorator models.NodeInfoDecorator
AvailableCapacityTracker capacity.Tracker
QueueUsageTracker capacity.UsageTracker
RegistrationFilePath string
HeartbeatClient heartbeat.Client
HeartbeatClient *bprotocolcompute.HeartbeatClient
HeartbeatConfig types.Heartbeat
}

Expand All @@ -35,13 +36,13 @@ type ManagementClientParams struct {
type ManagementClient struct {
done chan struct{}
labelsProvider models.LabelsProvider
managementProxy ManagementEndpoint
managementProxy bprotocol.ManagementEndpoint
nodeID string
nodeInfoDecorator models.NodeInfoDecorator
availableCapacityTracker capacity.Tracker
queueUsageTracker capacity.UsageTracker
registrationFile *RegistrationFile
heartbeatClient heartbeat.Client
heartbeatClient *bprotocolcompute.HeartbeatClient
heartbeatConfig types.Heartbeat
}

Expand Down Expand Up @@ -86,7 +87,7 @@ func (m *ManagementClient) RegisterNode(ctx context.Context) error {
}

nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.Register(ctx, messages.RegisterRequest{
response, err := m.managementProxy.Register(ctx, legacy.RegisterRequest{
Info: nodeInfo,
})
if err != nil {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (m *ManagementClient) deliverInfo(ctx context.Context) {
// registered.

nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.UpdateInfo(ctx, messages.UpdateInfoRequest{
response, err := m.managementProxy.UpdateInfo(ctx, legacy.UpdateInfoRequest{
Info: nodeInfo,
})
if err != nil {
Expand All @@ -130,7 +131,7 @@ func (m *ManagementClient) deliverInfo(ctx context.Context) {
}

func (m *ManagementClient) updateResources(ctx context.Context) {
request := messages.UpdateResourcesRequest{
request := legacy.UpdateResourcesRequest{
NodeID: m.nodeID,
AvailableCapacity: m.availableCapacityTracker.GetAvailableCapacity(ctx),
QueueUsedCapacity: m.queueUsageTracker.GetUsedCapacity(ctx),
Expand Down
69 changes: 0 additions & 69 deletions pkg/compute/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/compute/node_info_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (n *NodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo model
// TODO(forrest): this method takes 10 seconds to run: https://github.com/bacalhau-project/bacalhau/issues/4153
// because the Keys() methods are slow when s3 is considered since we need to check for credentials.
nodeInfo.NodeType = models.NodeTypeCompute
nodeInfo.ComputeNodeInfo = &models.ComputeNodeInfo{
nodeInfo.ComputeNodeInfo = models.ComputeNodeInfo{
ExecutionEngines: n.executors.Keys(ctx),
Publishers: n.publishers.Keys(ctx),
StorageSources: n.storages.Keys(ctx),
Expand Down
12 changes: 0 additions & 12 deletions pkg/compute/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/models/messages/legacy"
)

Expand Down Expand Up @@ -39,14 +38,3 @@ type Callback interface {
OnRunComplete(ctx context.Context, result legacy.RunResult)
OnComputeFailure(ctx context.Context, err legacy.ComputeError)
}

// ManagementEndpoint is the transport-based interface for compute nodes to
// register with the requester node, update information and perform heartbeats.
type ManagementEndpoint interface {
// Register registers a compute node with the requester node.
Register(context.Context, messages.RegisterRequest) (*messages.RegisterResponse, error)
// UpdateInfo sends an update of node info to the requester node
UpdateInfo(context.Context, messages.UpdateInfoRequest) (*messages.UpdateInfoResponse, error)
// UpdateResources updates the resources currently in use by a specific node
UpdateResources(context.Context, messages.UpdateResourcesRequest) (*messages.UpdateResourcesResponse, error)
}
43 changes: 43 additions & 0 deletions pkg/models/messages/legacy/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package legacy

import "github.com/bacalhau-project/bacalhau/pkg/models"

const (
HeartbeatMessageType = "heartbeat"
)

// Heartbeat represents a heartbeat message from a specific node.
// It contains the node ID and the sequence number of the heartbeat
// which is monotonically increasing (reboots aside). We do not
// use timestamps on the client, we rely solely on the server-side
// time to avoid clock drift issues.
type Heartbeat struct {
NodeID string
Sequence uint64
}

type RegisterRequest struct {
Info models.NodeInfo
}

type RegisterResponse struct {
Accepted bool
Reason string
}

type UpdateInfoRequest struct {
Info models.NodeInfo
}

type UpdateInfoResponse struct {
Accepted bool
Reason string
}

type UpdateResourcesRequest struct {
NodeID string
AvailableCapacity models.Resources
QueueUsedCapacity models.Resources
}

type UpdateResourcesResponse struct{}
Loading

0 comments on commit e80e1a5

Please sign in to comment.