Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move messaging models from pkg/compute to pkg/models/messages #4680

Merged
merged 5 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/compute/store"
Expand Down Expand Up @@ -59,12 +60,12 @@ func (b Bidder) ReturnBidResult(
if response.ShouldWait {
return
}
result := BidResult{
RoutingMetadata: RoutingMetadata{
result := messages.BidResult{
RoutingMetadata: messages.RoutingMetadata{
SourcePeerID: b.nodeID,
TargetPeerID: execution.Job.Meta[models.MetaRequesterID],
},
ExecutionMetadata: NewExecutionMetadata(execution),
ExecutionMetadata: messages.NewExecutionMetadata(execution),
Accepted: response.ShouldBid,
Event: RespondedToBidEvent(response),
}
Expand All @@ -87,13 +88,13 @@ type BidderRequest struct {
// TODO: evaluate the need for async bidding and marking bids as waiting
// https://github.com/bacalhau-project/bacalhau/issues/3732
func (b Bidder) RunBidding(ctx context.Context, bidRequest *BidderRequest) {
routingMetadata := RoutingMetadata{
routingMetadata := messages.RoutingMetadata{
// the source of this response is the bidders nodeID.
SourcePeerID: b.nodeID,
// the target of this response is the source of the request.
TargetPeerID: bidRequest.SourcePeerID,
}
executionMetadata := ExecutionMetadata{
executionMetadata := messages.ExecutionMetadata{
ExecutionID: bidRequest.Execution.ID,
JobID: bidRequest.Execution.JobID,
}
Expand All @@ -102,7 +103,7 @@ func (b Bidder) RunBidding(ctx context.Context, bidRequest *BidderRequest) {

bidResult, err := b.doBidding(ctx, job, bidRequest.ResourceUsage)
if err != nil {
b.callback.OnComputeFailure(ctx, ComputeError{
b.callback.OnComputeFailure(ctx, messages.ComputeError{
RoutingMetadata: routingMetadata,
ExecutionMetadata: executionMetadata,
Event: models.EventFromError(EventTopicExecutionScanning, err),
Expand All @@ -127,13 +128,13 @@ func (b Bidder) handleBidResult(
execution *models.Execution,
) {
var (
routingMetadata = RoutingMetadata{
routingMetadata = messages.RoutingMetadata{
// the source of this response is the bidders nodeID.
SourcePeerID: b.nodeID,
// the target of this response is the source of the request.
TargetPeerID: targetPeer,
}
executionMetadata = ExecutionMetadata{
executionMetadata = messages.ExecutionMetadata{
ExecutionID: execution.ID,
JobID: execution.JobID,
}
Expand All @@ -142,14 +143,14 @@ func (b Bidder) handleBidResult(
if err == nil {
err = errors.New(reason)
}
b.callback.OnComputeFailure(ctx, ComputeError{
b.callback.OnComputeFailure(ctx, messages.ComputeError{
RoutingMetadata: routingMetadata,
ExecutionMetadata: executionMetadata,
Event: models.EventFromError(EventTopicExecutionScanning, err),
})
}
handleBidComplete = func(ctx context.Context, result *bidStrategyResponse) {
b.callback.OnBidComplete(ctx, BidResult{
b.callback.OnBidComplete(ctx, messages.BidResult{
RoutingMetadata: routingMetadata,
ExecutionMetadata: executionMetadata,
Accepted: result.bid,
Expand Down
7 changes: 4 additions & 3 deletions pkg/compute/bidder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/compute/store/boltdb"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/test/mock"

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (s *BidderSuite) TestRunBidding_WithPendingApproval() {
ctx := context.Background()
job := mock.Job()
execution := mock.ExecutionForJob(job)
askForBidRequest := compute.AskForBidRequest{
askForBidRequest := messages.AskForBidRequest{
Execution: execution,
WaitForApproval: true,
}
Expand Down Expand Up @@ -169,7 +170,7 @@ func (s *BidderSuite) TestRunBidding_WithPendingApproval() {

func (s *BidderSuite) TestRunBidding_WithoutPendingApproval() {
ctx := context.Background()
askForBidRequest := compute.AskForBidRequest{
askForBidRequest := messages.AskForBidRequest{
Execution: mock.ExecutionForJob(mock.Job()),
WaitForApproval: false,
}
Expand Down Expand Up @@ -281,7 +282,7 @@ func NewBidResponseMatcher(accepted bool) *BidResponseMatcher {
}

func (m *BidResponseMatcher) Matches(x interface{}) bool {
req, ok := x.(compute.BidResult)
req, ok := x.(messages.BidResult)
if !ok {
return false
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/compute/callback_chain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package compute

import "context"
import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/models/messages"
)

type ChainedCallbackParams struct {
Callbacks []Callback
Expand All @@ -17,25 +21,25 @@ func NewChainedCallback(params ChainedCallbackParams) *ChainedCallback {
}
}

func (c ChainedCallback) OnBidComplete(ctx context.Context, result BidResult) {
func (c ChainedCallback) OnBidComplete(ctx context.Context, result messages.BidResult) {
for _, callback := range c.callbacks {
callback.OnBidComplete(ctx, result)
}
}

func (c ChainedCallback) OnRunComplete(ctx context.Context, result RunResult) {
func (c ChainedCallback) OnRunComplete(ctx context.Context, result messages.RunResult) {
for _, callback := range c.callbacks {
callback.OnRunComplete(ctx, result)
}
}

func (c ChainedCallback) OnCancelComplete(ctx context.Context, result CancelResult) {
func (c ChainedCallback) OnCancelComplete(ctx context.Context, result messages.CancelResult) {
for _, callback := range c.callbacks {
callback.OnCancelComplete(ctx, result)
}
}

func (c ChainedCallback) OnComputeFailure(ctx context.Context, err ComputeError) {
func (c ChainedCallback) OnComputeFailure(ctx context.Context, err messages.ComputeError) {
for _, callback := range c.callbacks {
callback.OnComputeFailure(ctx, err)
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/compute/callback_mock.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package compute

import "context"
import (
"context"

"github.com/bacalhau-project/bacalhau/pkg/models/messages"
)

type CallbackMock struct {
OnBidCompleteHandler func(ctx context.Context, result BidResult)
OnCancelCompleteHandler func(ctx context.Context, result CancelResult)
OnComputeFailureHandler func(ctx context.Context, err ComputeError)
OnRunCompleteHandler func(ctx context.Context, result RunResult)
OnBidCompleteHandler func(ctx context.Context, result messages.BidResult)
OnCancelCompleteHandler func(ctx context.Context, result messages.CancelResult)
OnComputeFailureHandler func(ctx context.Context, err messages.ComputeError)
OnRunCompleteHandler func(ctx context.Context, result messages.RunResult)
}

// OnBidComplete implements Callback
func (c CallbackMock) OnBidComplete(ctx context.Context, result BidResult) {
func (c CallbackMock) OnBidComplete(ctx context.Context, result messages.BidResult) {
if c.OnBidCompleteHandler != nil {
c.OnBidCompleteHandler(ctx, result)
}
}

// OnCancelComplete implements Callback
func (c CallbackMock) OnCancelComplete(ctx context.Context, result CancelResult) {
func (c CallbackMock) OnCancelComplete(ctx context.Context, result messages.CancelResult) {
if c.OnCancelCompleteHandler != nil {
c.OnCancelCompleteHandler(ctx, result)
}
}

// OnComputeFailure implements Callback
func (c CallbackMock) OnComputeFailure(ctx context.Context, err ComputeError) {
func (c CallbackMock) OnComputeFailure(ctx context.Context, err messages.ComputeError) {
if c.OnComputeFailureHandler != nil {
c.OnComputeFailureHandler(ctx, err)
}
}

// OnRunComplete implements Callback
func (c CallbackMock) OnRunComplete(ctx context.Context, result RunResult) {
func (c CallbackMock) OnRunComplete(ctx context.Context, result messages.RunResult) {
if c.OnRunCompleteHandler != nil {
c.OnRunCompleteHandler(ctx, result)
}
Expand Down
49 changes: 27 additions & 22 deletions pkg/compute/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/executor"
"github.com/bacalhau-project/bacalhau/pkg/lib/concurrency"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"

"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
Expand Down Expand Up @@ -51,7 +52,8 @@ func (s BaseEndpoint) GetNodeID() string {
return s.id
}

func (s BaseEndpoint) AskForBid(ctx context.Context, request AskForBidRequest) (AskForBidResponse, error) {
func (s BaseEndpoint) AskForBid(
ctx context.Context, request messages.AskForBidRequest) (messages.AskForBidResponse, error) {
ctx, span := telemetry.NewSpan(
ctx,
telemetry.GetTracer(),
Expand All @@ -66,7 +68,7 @@ func (s BaseEndpoint) AskForBid(ctx context.Context, request AskForBidRequest) (
parsedUsage, err := request.Execution.Job.Task().ResourcesConfig.ToResources()
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Error parsing job resource config")
return AskForBidResponse{ExecutionMetadata: ExecutionMetadata{
return messages.AskForBidResponse{ExecutionMetadata: messages.ExecutionMetadata{
ExecutionID: request.Execution.ID,
JobID: request.Execution.JobID,
}}, err
Expand All @@ -80,13 +82,14 @@ func (s BaseEndpoint) AskForBid(ctx context.Context, request AskForBidRequest) (
ResourceUsage: parsedUsage,
})

return AskForBidResponse{ExecutionMetadata: ExecutionMetadata{
return messages.AskForBidResponse{ExecutionMetadata: messages.ExecutionMetadata{
ExecutionID: request.Execution.ID,
JobID: request.Execution.JobID,
}}, nil
}

func (s BaseEndpoint) BidAccepted(ctx context.Context, request BidAcceptedRequest) (BidAcceptedResponse, error) {
func (s BaseEndpoint) BidAccepted(
ctx context.Context, request messages.BidAcceptedRequest) (messages.BidAcceptedResponse, error) {
log.Ctx(ctx).Debug().Msgf("bid accepted: %s", request.ExecutionID)
err := s.executionStore.UpdateExecutionState(ctx, store.UpdateExecutionRequest{
ExecutionID: request.ExecutionID,
Expand All @@ -98,27 +101,28 @@ func (s BaseEndpoint) BidAccepted(ctx context.Context, request BidAcceptedReques
},
})
if err != nil {
return BidAcceptedResponse{}, err
return messages.BidAcceptedResponse{}, err
}

execution, err := s.executionStore.GetExecution(ctx, request.ExecutionID)
if err != nil {
return BidAcceptedResponse{}, err
return messages.BidAcceptedResponse{}, err
}

// Increment the number of jobs accepted by this compute node:
jobsAccepted.Add(ctx, 1)

err = s.executor.Run(ctx, execution)
if err != nil {
return BidAcceptedResponse{}, err
return messages.BidAcceptedResponse{}, err
}
return BidAcceptedResponse{
ExecutionMetadata: NewExecutionMetadata(execution),
return messages.BidAcceptedResponse{
ExecutionMetadata: messages.NewExecutionMetadata(execution),
}, nil
}

func (s BaseEndpoint) BidRejected(ctx context.Context, request BidRejectedRequest) (BidRejectedResponse, error) {
func (s BaseEndpoint) BidRejected(
ctx context.Context, request messages.BidRejectedRequest) (messages.BidRejectedResponse, error) {
log.Ctx(ctx).Debug().Msgf("bid rejected: %s", request.ExecutionID)
err := s.executionStore.UpdateExecutionState(ctx, store.UpdateExecutionRequest{
ExecutionID: request.ExecutionID,
Expand All @@ -130,25 +134,26 @@ func (s BaseEndpoint) BidRejected(ctx context.Context, request BidRejectedReques
},
})
if err != nil {
return BidRejectedResponse{}, err
return messages.BidRejectedResponse{}, err
}
execution, err := s.executionStore.GetExecution(ctx, request.ExecutionID)
if err != nil {
return BidRejectedResponse{}, err
return messages.BidRejectedResponse{}, err
}
return BidRejectedResponse{
ExecutionMetadata: NewExecutionMetadata(execution),
return messages.BidRejectedResponse{
ExecutionMetadata: messages.NewExecutionMetadata(execution),
}, nil
}

func (s BaseEndpoint) CancelExecution(ctx context.Context, request CancelExecutionRequest) (CancelExecutionResponse, error) {
func (s BaseEndpoint) CancelExecution(
ctx context.Context, request messages.CancelExecutionRequest) (messages.CancelExecutionResponse, error) {
log.Ctx(ctx).Debug().Msgf("canceling execution %s due to %s", request.ExecutionID, request.Justification)
execution, err := s.executionStore.GetExecution(ctx, request.ExecutionID)
if err != nil {
return CancelExecutionResponse{}, err
return messages.CancelExecutionResponse{}, err
}
if execution.IsTerminalComputeState() {
return CancelExecutionResponse{}, fmt.Errorf("cannot cancel execution %s in state %s",
return messages.CancelExecutionResponse{}, fmt.Errorf("cannot cancel execution %s in state %s",
execution.ID, execution.ComputeState.StateType)
}

Expand All @@ -159,21 +164,21 @@ func (s BaseEndpoint) CancelExecution(ctx context.Context, request CancelExecuti
},
})
if err != nil {
return CancelExecutionResponse{}, err
return messages.CancelExecutionResponse{}, err
}

if execution.ComputeState.StateType.IsExecuting() {
err = s.executor.Cancel(ctx, execution)
if err != nil {
return CancelExecutionResponse{}, err
return messages.CancelExecutionResponse{}, err
}
}
return CancelExecutionResponse{
ExecutionMetadata: NewExecutionMetadata(execution),
return messages.CancelExecutionResponse{
ExecutionMetadata: messages.NewExecutionMetadata(execution),
}, nil
}

func (s BaseEndpoint) ExecutionLogs(ctx context.Context, request ExecutionLogsRequest) (
func (s BaseEndpoint) ExecutionLogs(ctx context.Context, request messages.ExecutionLogsRequest) (
<-chan *concurrency.AsyncResult[models.ExecutionLog], error,
) {
return s.logServer.GetLogStream(ctx, executor.LogStreamRequest{
Expand Down
19 changes: 10 additions & 9 deletions pkg/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"

"github.com/bacalhau-project/bacalhau/pkg/compute/store"
Expand Down Expand Up @@ -422,9 +423,9 @@ func (e *BaseExecutor) Run(ctx context.Context, execution *models.Execution) (er
}

// notify requester
e.callback.OnRunComplete(ctx, RunResult{
ExecutionMetadata: NewExecutionMetadata(execution),
RoutingMetadata: RoutingMetadata{
e.callback.OnRunComplete(ctx, messages.RunResult{
ExecutionMetadata: messages.NewExecutionMetadata(execution),
RoutingMetadata: messages.RoutingMetadata{
SourcePeerID: e.ID,
TargetPeerID: execution.Job.Meta[models.MetaRequesterID],
},
Expand Down Expand Up @@ -466,9 +467,9 @@ func (e *BaseExecutor) Cancel(ctx context.Context, execution *models.Execution)
return err
}

e.callback.OnCancelComplete(ctx, CancelResult{
ExecutionMetadata: NewExecutionMetadata(execution),
RoutingMetadata: RoutingMetadata{
e.callback.OnCancelComplete(ctx, messages.CancelResult{
ExecutionMetadata: messages.NewExecutionMetadata(execution),
RoutingMetadata: messages.RoutingMetadata{
SourcePeerID: e.ID,
TargetPeerID: execution.Job.Meta[models.MetaRequesterID],
},
Expand All @@ -490,9 +491,9 @@ func (e *BaseExecutor) handleFailure(ctx context.Context, execution *models.Exec
if updateError != nil {
log.Ctx(ctx).Error().Err(updateError).Msgf("Failed to update execution (%s) state to failed: %s", execution.ID, updateError)
} else {
e.callback.OnComputeFailure(ctx, ComputeError{
ExecutionMetadata: NewExecutionMetadata(execution),
RoutingMetadata: RoutingMetadata{
e.callback.OnComputeFailure(ctx, messages.ComputeError{
ExecutionMetadata: messages.NewExecutionMetadata(execution),
RoutingMetadata: messages.RoutingMetadata{
SourcePeerID: e.ID,
TargetPeerID: execution.Job.Meta[models.MetaRequesterID],
},
Expand Down
Loading
Loading