Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new node manager decoupled from transport layer #4728

Merged
merged 6 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,5 @@ IMDS
tlsca
Lenf
traefik
bprotocolcompute
bprotocolorchestrator
2 changes: 1 addition & 1 deletion cmd/cli/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (o *NodeOptions) runNode(cmd *cobra.Command, api client.API) error {
return fmt.Errorf("could not get server node: %w", err)
}

writeErr := output.OutputOneNonTabular(cmd, o.OutputOpts, response.NodeState)
writeErr := output.OutputOneNonTabular(cmd, o.OutputOpts, response.NodeInfo)
if writeErr != nil {
return fmt.Errorf("failed to write node: %w", writeErr)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/cli/agent/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ func (s *NodeSuite) TestNodeJSONOutput() {
_, out, err := s.ExecuteTestCobraCommand("agent", "node", "--output", string(output.JSONFormat))
s.Require().NoError(err, "Could not request node with json output.")

nodeInfo := &models.NodeState{}
nodeInfo := &models.NodeInfo{}
err = marshaller.JSONUnmarshalWithMax([]byte(out), &nodeInfo)
s.Require().NoError(err, "Could not unmarshal the output into json - %+v", out)
s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in json.")
s.Require().Equal(s.Node.ID, nodeInfo.ID(), "Node ID does not match in json.")
}

func (s *NodeSuite) TestNodeYAMLOutput() {
_, out, err := s.ExecuteTestCobraCommand("agent", "node")
s.Require().NoError(err, "Could not request node with yaml output.")

nodeInfo := &models.NodeState{}
nodeInfo := &models.NodeInfo{}
err = marshaller.YAMLUnmarshalWithMax([]byte(out), &nodeInfo)
s.Require().NoError(err, "Could not unmarshal the output into yaml - %+v", out)
s.Require().Equal(s.Node.ID, nodeInfo.Info.ID(), "Node ID does not match in yaml.")
s.Require().Equal(s.Node.ID, nodeInfo.ID(), "Node ID does not match in yaml.")
}
20 changes: 10 additions & 10 deletions cmd/cli/node/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var alwaysColumns = []output.TableColumn[*models.NodeState]{
{
ColumnConfig: table.ColumnConfig{Name: "status"},
Value: func(ni *models.NodeState) string {
return ni.Connection.String()
return ni.ConnectionState.Status.String()
},
},
}
Expand Down Expand Up @@ -70,45 +70,45 @@ var toggleColumns = map[string][]output.TableColumn[*models.NodeState]{
"features": {
{
ColumnConfig: table.ColumnConfig{Name: "engines", WidthMax: maxLen(models.EngineNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.ExecutionEngines, " ")
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "inputs from", WidthMax: maxLen(models.StoragesNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.StorageSources, " ")
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "outputs", WidthMax: maxLen(models.PublisherNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.Publishers, " ")
}),
},
},
"capacity": {
{
ColumnConfig: table.ColumnConfig{Name: "cpu", WidthMax: len("1.0 / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%.1f / %.1f", cni.AvailableCapacity.CPU, cni.MaxCapacity.CPU)
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "memory", WidthMax: len("10.0 GB / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%s / %s", datasize.ByteSize(cni.AvailableCapacity.Memory).HR(), datasize.ByteSize(cni.MaxCapacity.Memory).HR())
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "disk", WidthMax: len("100.0 GB / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%s / %s", datasize.ByteSize(cni.AvailableCapacity.Disk).HR(), datasize.ByteSize(cni.MaxCapacity.Disk).HR())
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "gpu", WidthMax: len("1 / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%d / %d", cni.AvailableCapacity.GPU, cni.MaxCapacity.GPU)
}),
},
Expand All @@ -119,9 +119,9 @@ func maxLen(val []string) int {
return lo.Max(lo.Map[string, int](val, func(item string, index int) int { return len(item) })) + 1
}

func ifComputeNode(getFromCNInfo func(*models.ComputeNodeInfo) string) func(state *models.NodeState) string {
func ifComputeNode(getFromCNInfo func(models.ComputeNodeInfo) string) func(state *models.NodeState) string {
return func(ni *models.NodeState) string {
if ni.Info.ComputeNodeInfo == nil {
if !ni.Info.IsComputeNode() {
return ""
}
return getFromCNInfo(ni.Info.ComputeNodeInfo)
Expand Down
19 changes: 10 additions & 9 deletions pkg/compute/management_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/node/heartbeat"
"github.com/bacalhau-project/bacalhau/pkg/models/messages/legacy"
"github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol"
bprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/version"
)

type ManagementClientParams struct {
NodeID string
LabelsProvider models.LabelsProvider
ManagementProxy ManagementEndpoint
ManagementProxy bprotocol.ManagementEndpoint
NodeInfoDecorator models.NodeInfoDecorator
AvailableCapacityTracker capacity.Tracker
QueueUsageTracker capacity.UsageTracker
RegistrationFilePath string
HeartbeatClient heartbeat.Client
HeartbeatClient *bprotocolcompute.HeartbeatClient
HeartbeatConfig types.Heartbeat
}

Expand All @@ -35,13 +36,13 @@ type ManagementClientParams struct {
type ManagementClient struct {
done chan struct{}
labelsProvider models.LabelsProvider
managementProxy ManagementEndpoint
managementProxy bprotocol.ManagementEndpoint
nodeID string
nodeInfoDecorator models.NodeInfoDecorator
availableCapacityTracker capacity.Tracker
queueUsageTracker capacity.UsageTracker
registrationFile *RegistrationFile
heartbeatClient heartbeat.Client
heartbeatClient *bprotocolcompute.HeartbeatClient
heartbeatConfig types.Heartbeat
}

Expand Down Expand Up @@ -86,7 +87,7 @@ func (m *ManagementClient) RegisterNode(ctx context.Context) error {
}

nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.Register(ctx, messages.RegisterRequest{
response, err := m.managementProxy.Register(ctx, legacy.RegisterRequest{
Info: nodeInfo,
})
if err != nil {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (m *ManagementClient) deliverInfo(ctx context.Context) {
// registered.

nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.UpdateInfo(ctx, messages.UpdateInfoRequest{
response, err := m.managementProxy.UpdateInfo(ctx, legacy.UpdateInfoRequest{
Info: nodeInfo,
})
if err != nil {
Expand All @@ -130,7 +131,7 @@ func (m *ManagementClient) deliverInfo(ctx context.Context) {
}

func (m *ManagementClient) updateResources(ctx context.Context) {
request := messages.UpdateResourcesRequest{
request := legacy.UpdateResourcesRequest{
NodeID: m.nodeID,
AvailableCapacity: m.availableCapacityTracker.GetAvailableCapacity(ctx),
QueueUsedCapacity: m.queueUsageTracker.GetUsedCapacity(ctx),
Expand Down
69 changes: 0 additions & 69 deletions pkg/compute/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/compute/node_info_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (n *NodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo model
// TODO(forrest): this method takes 10 seconds to run: https://github.com/bacalhau-project/bacalhau/issues/4153
// because the Keys() methods are slow when s3 is considered since we need to check for credentials.
nodeInfo.NodeType = models.NodeTypeCompute
nodeInfo.ComputeNodeInfo = &models.ComputeNodeInfo{
nodeInfo.ComputeNodeInfo = models.ComputeNodeInfo{
ExecutionEngines: n.executors.Keys(ctx),
Publishers: n.publishers.Keys(ctx),
StorageSources: n.storages.Keys(ctx),
Expand Down
12 changes: 0 additions & 12 deletions pkg/compute/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/models/messages/legacy"
)

Expand Down Expand Up @@ -39,14 +38,3 @@ type Callback interface {
OnRunComplete(ctx context.Context, result legacy.RunResult)
OnComputeFailure(ctx context.Context, err legacy.ComputeError)
}

// ManagementEndpoint is the transport-based interface for compute nodes to
// register with the requester node, update information and perform heartbeats.
type ManagementEndpoint interface {
// Register registers a compute node with the requester node.
Register(context.Context, messages.RegisterRequest) (*messages.RegisterResponse, error)
// UpdateInfo sends an update of node info to the requester node
UpdateInfo(context.Context, messages.UpdateInfoRequest) (*messages.UpdateInfoResponse, error)
// UpdateResources updates the resources currently in use by a specific node
UpdateResources(context.Context, messages.UpdateResourcesRequest) (*messages.UpdateResourcesResponse, error)
}
43 changes: 43 additions & 0 deletions pkg/models/messages/legacy/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package legacy

import "github.com/bacalhau-project/bacalhau/pkg/models"

const (
HeartbeatMessageType = "heartbeat"
)

// Heartbeat represents a heartbeat message from a specific node.
// It contains the node ID and the sequence number of the heartbeat
// which is monotonically increasing (reboots aside). We do not
// use timestamps on the client, we rely solely on the server-side
// time to avoid clock drift issues.
type Heartbeat struct {
NodeID string
Sequence uint64
}

type RegisterRequest struct {
Info models.NodeInfo
}

type RegisterResponse struct {
Accepted bool
Reason string
}

type UpdateInfoRequest struct {
Info models.NodeInfo
}

type UpdateInfoResponse struct {
Accepted bool
Reason string
}

type UpdateResourcesRequest struct {
NodeID string
AvailableCapacity models.Resources
QueueUsedCapacity models.Resources
}

type UpdateResourcesResponse struct{}
Loading
Loading