Skip to content

Commit

Permalink
new node manager decoupled from transport layer
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Dec 8, 2024
1 parent 46ea6e5 commit 74dcfa1
Show file tree
Hide file tree
Showing 69 changed files with 2,870 additions and 2,067 deletions.
2 changes: 2 additions & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,5 @@ IMDS
tlsca
Lenf
traefik
bprotocolcompute
bprotocolorchestrator
2 changes: 1 addition & 1 deletion cmd/cli/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (o *NodeOptions) runNode(cmd *cobra.Command, api client.API) error {
return fmt.Errorf("could not get server node: %w", err)
}

writeErr := output.OutputOneNonTabular(cmd, o.OutputOpts, response.NodeState)
writeErr := output.OutputOneNonTabular(cmd, o.OutputOpts, response.NodeInfo)
if writeErr != nil {
return fmt.Errorf("failed to write node: %w", writeErr)
}
Expand Down
20 changes: 10 additions & 10 deletions cmd/cli/node/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var alwaysColumns = []output.TableColumn[*models.NodeState]{
{
ColumnConfig: table.ColumnConfig{Name: "status"},
Value: func(ni *models.NodeState) string {
return ni.Connection.String()
return ni.ConnectionState.Status.String()
},
},
}
Expand Down Expand Up @@ -70,45 +70,45 @@ var toggleColumns = map[string][]output.TableColumn[*models.NodeState]{
"features": {
{
ColumnConfig: table.ColumnConfig{Name: "engines", WidthMax: maxLen(models.EngineNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.ExecutionEngines, " ")
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "inputs from", WidthMax: maxLen(models.StoragesNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.StorageSources, " ")
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "outputs", WidthMax: maxLen(models.PublisherNames), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return strings.Join(cni.Publishers, " ")
}),
},
},
"capacity": {
{
ColumnConfig: table.ColumnConfig{Name: "cpu", WidthMax: len("1.0 / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%.1f / %.1f", cni.AvailableCapacity.CPU, cni.MaxCapacity.CPU)
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "memory", WidthMax: len("10.0 GB / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%s / %s", datasize.ByteSize(cni.AvailableCapacity.Memory).HR(), datasize.ByteSize(cni.MaxCapacity.Memory).HR())
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "disk", WidthMax: len("100.0 GB / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%s / %s", datasize.ByteSize(cni.AvailableCapacity.Disk).HR(), datasize.ByteSize(cni.MaxCapacity.Disk).HR())
}),
},
{
ColumnConfig: table.ColumnConfig{Name: "gpu", WidthMax: len("1 / "), WidthMaxEnforcer: text.WrapSoft},
Value: ifComputeNode(func(cni *models.ComputeNodeInfo) string {
Value: ifComputeNode(func(cni models.ComputeNodeInfo) string {
return fmt.Sprintf("%d / %d", cni.AvailableCapacity.GPU, cni.MaxCapacity.GPU)
}),
},
Expand All @@ -119,9 +119,9 @@ func maxLen(val []string) int {
return lo.Max(lo.Map[string, int](val, func(item string, index int) int { return len(item) })) + 1
}

func ifComputeNode(getFromCNInfo func(*models.ComputeNodeInfo) string) func(state *models.NodeState) string {
func ifComputeNode(getFromCNInfo func(models.ComputeNodeInfo) string) func(state *models.NodeState) string {
return func(ni *models.NodeState) string {
if ni.Info.ComputeNodeInfo == nil {
if !ni.Info.IsComputeNode() {
return ""
}
return getFromCNInfo(ni.Info.ComputeNodeInfo)
Expand Down
19 changes: 10 additions & 9 deletions pkg/compute/management_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/node/heartbeat"
"github.com/bacalhau-project/bacalhau/pkg/models/messages/legacy"
"github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol"
bprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol/compute"
"github.com/bacalhau-project/bacalhau/pkg/version"
)

type ManagementClientParams struct {
NodeID string
LabelsProvider models.LabelsProvider
ManagementProxy ManagementEndpoint
ManagementProxy bprotocol.ManagementEndpoint
NodeInfoDecorator models.NodeInfoDecorator
AvailableCapacityTracker capacity.Tracker
QueueUsageTracker capacity.UsageTracker
RegistrationFilePath string
HeartbeatClient heartbeat.Client
HeartbeatClient *bprotocolcompute.HeartbeatClient
HeartbeatConfig types.Heartbeat
}

Expand All @@ -35,13 +36,13 @@ type ManagementClientParams struct {
type ManagementClient struct {
done chan struct{}
labelsProvider models.LabelsProvider
managementProxy ManagementEndpoint
managementProxy bprotocol.ManagementEndpoint
nodeID string
nodeInfoDecorator models.NodeInfoDecorator
availableCapacityTracker capacity.Tracker
queueUsageTracker capacity.UsageTracker
registrationFile *RegistrationFile
heartbeatClient heartbeat.Client
heartbeatClient *bprotocolcompute.HeartbeatClient
heartbeatConfig types.Heartbeat
}

Expand Down Expand Up @@ -86,7 +87,7 @@ func (m *ManagementClient) RegisterNode(ctx context.Context) error {
}

nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.Register(ctx, messages.RegisterRequest{
response, err := m.managementProxy.Register(ctx, legacy.RegisterRequest{
Info: nodeInfo,
})
if err != nil {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (m *ManagementClient) deliverInfo(ctx context.Context) {
// registered.

nodeInfo := m.getNodeInfo(ctx)
response, err := m.managementProxy.UpdateInfo(ctx, messages.UpdateInfoRequest{
response, err := m.managementProxy.UpdateInfo(ctx, legacy.UpdateInfoRequest{
Info: nodeInfo,
})
if err != nil {
Expand All @@ -130,7 +131,7 @@ func (m *ManagementClient) deliverInfo(ctx context.Context) {
}

func (m *ManagementClient) updateResources(ctx context.Context) {
request := messages.UpdateResourcesRequest{
request := legacy.UpdateResourcesRequest{
NodeID: m.nodeID,
AvailableCapacity: m.availableCapacityTracker.GetAvailableCapacity(ctx),
QueueUsedCapacity: m.queueUsageTracker.GetUsedCapacity(ctx),
Expand Down
69 changes: 0 additions & 69 deletions pkg/compute/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/compute/node_info_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (n *NodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo model
// TODO(forrest): this method takes 10 seconds to run: https://github.com/bacalhau-project/bacalhau/issues/4153
// because the Keys() methods are slow when s3 is considered since we need to check for credentials.
nodeInfo.NodeType = models.NodeTypeCompute
nodeInfo.ComputeNodeInfo = &models.ComputeNodeInfo{
nodeInfo.ComputeNodeInfo = models.ComputeNodeInfo{
ExecutionEngines: n.executors.Keys(ctx),
Publishers: n.publishers.Keys(ctx),
StorageSources: n.storages.Keys(ctx),
Expand Down
12 changes: 0 additions & 12 deletions pkg/compute/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/models/messages/legacy"
)

Expand Down Expand Up @@ -39,14 +38,3 @@ type Callback interface {
OnRunComplete(ctx context.Context, result legacy.RunResult)
OnComputeFailure(ctx context.Context, err legacy.ComputeError)
}

// ManagementEndpoint is the transport-based interface for compute nodes to
// register with the requester node, update information and perform heartbeats.
type ManagementEndpoint interface {
// Register registers a compute node with the requester node.
Register(context.Context, messages.RegisterRequest) (*messages.RegisterResponse, error)
// UpdateInfo sends an update of node info to the requester node
UpdateInfo(context.Context, messages.UpdateInfoRequest) (*messages.UpdateInfoResponse, error)
// UpdateResources updates the resources currently in use by a specific node
UpdateResources(context.Context, messages.UpdateResourcesRequest) (*messages.UpdateResourcesResponse, error)
}
43 changes: 43 additions & 0 deletions pkg/models/messages/legacy/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package legacy

import "github.com/bacalhau-project/bacalhau/pkg/models"

const (
HeartbeatMessageType = "heartbeat"
)

// Heartbeat represents a heartbeat message from a specific node.
// It contains the node ID and the sequence number of the heartbeat
// which is monotonically increasing (reboots aside). We do not
// use timestamps on the client, we rely solely on the server-side
// time to avoid clock drift issues.
type Heartbeat struct {
NodeID string
Sequence uint64
}

type RegisterRequest struct {
Info models.NodeInfo
}

type RegisterResponse struct {
Accepted bool
Reason string
}

type UpdateInfoRequest struct {
Info models.NodeInfo
}

type UpdateInfoResponse struct {
Accepted bool
Reason string
}

type UpdateResourcesRequest struct {
NodeID string
AvailableCapacity models.Resources
QueueUsedCapacity models.Resources
}

type UpdateResourcesResponse struct{}
54 changes: 28 additions & 26 deletions pkg/models/messages/node.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
package messages

import "github.com/bacalhau-project/bacalhau/pkg/models"
import (
"time"

// Heartbeat represents a heartbeat message from a specific node.
// It contains the node ID and the sequence number of the heartbeat
// which is monotonically increasing (reboots aside). We do not
// use timestamps on the client, we rely solely on the server-side
// time to avoid clock drift issues.
type Heartbeat struct {
NodeID string
Sequence uint64
}
"github.com/bacalhau-project/bacalhau/pkg/models"
)

type RegisterRequest struct {
Info models.NodeInfo
// HandshakeRequest is exchanged during initial connection
type HandshakeRequest struct {
NodeInfo models.NodeInfo `json:"NodeInfo"`
StartTime time.Time `json:"StartTime"`
LastOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum"` // Last seq received from orchestrator
}

type RegisterResponse struct {
Accepted bool
Reason string
// HandshakeResponse is sent in response to handshake requests
type HandshakeResponse struct {
Accepted bool `json:"accepted"`
Reason string `json:"reason,omitempty"`
LastComputeSeqNum uint64 `json:"LastComputeSeqNum"` // Last seq received from compute node
}

type UpdateInfoRequest struct {
Info models.NodeInfo
type HeartbeatRequest struct {
NodeID string `json:"NodeID"`
AvailableCapacity models.Resources `json:"AvailableCapacity"`
QueueUsedCapacity models.Resources `json:"QueueUsedCapacity"`
LastOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum"` // Last seq received from orchestrator
}

type UpdateInfoResponse struct {
Accepted bool
Reason string
type HeartbeatResponse struct {
LastComputeSeqNum uint64 `json:"LastComputeSeqNum"` // Last seq received from compute node
}

type UpdateResourcesRequest struct {
NodeID string
AvailableCapacity models.Resources
QueueUsedCapacity models.Resources
// UpdateNodeInfoRequest is used to update the node info
type UpdateNodeInfoRequest struct {
NodeInfo models.NodeInfo `json:"NodeInfo"`
}
type UpdateNodeInfoResponse struct {
Accepted bool `json:"accepted"`
Reason string `json:"reason,omitempty"`
}

type UpdateResourcesResponse struct{}
Loading

0 comments on commit 74dcfa1

Please sign in to comment.