Skip to content

Commit

Permalink
test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
walkah committed Dec 11, 2024
1 parent 0406e23 commit ee15bfb
Showing 1 changed file with 30 additions and 38 deletions.
68 changes: 30 additions & 38 deletions pkg/executor/bacalhau/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func newBacalhauClient(apiHost string) (*BacalhauClient, error) {
}

func (c *BacalhauClient) getID() (string, error) {

getNodeRequest := apimodels.GetAgentNodeRequest{}
response, err := c.api.Agent().Node(context.Background(), &getNodeRequest)
if err != nil {
Expand All @@ -41,7 +41,7 @@ func (c *BacalhauClient) alive() (bool, error) {
if err != nil {
return false, err
}

return response.IsReady(), nil
}

Expand All @@ -55,22 +55,22 @@ func (c *BacalhauClient) getVersion() (string, error) {
}

func (c *BacalhauClient) postJob(job bacalhau.Job) (*apimodels.PutJobResponse, error) {

translatedJob := translateJob(job)

putJobRequest := apimodels.PutJobRequest{
Job: translatedJob,
}

return c.api.Jobs().Put(context.Background(), &putJobRequest)
}

func (c *BacalhauClient) getJob(jobID string) (*apimodels.GetJobResponse, error) {
getJobRequest := apimodels.GetJobRequest{
JobID: jobID,
JobID: jobID,
Include: "executions",
}

response, err := c.api.Jobs().Get(context.Background(), &getJobRequest)
if err != nil {
return nil, err
Expand All @@ -79,7 +79,7 @@ func (c *BacalhauClient) getJob(jobID string) (*apimodels.GetJobResponse, error)
return response, nil
}

func (c *BacalhauClient) getJobResult(jobId string ) (string, error) {
func (c *BacalhauClient) getJobResult(jobId string) (string, error) {
getJobResultsRequest := apimodels.ListJobResultsRequest{
JobID: jobId,
}
Expand All @@ -95,10 +95,8 @@ func (c *BacalhauClient) getJobResult(jobId string ) (string, error) {
return response.Items[0].Params["URL"].(string), nil
}



func (c *BacalhauClient) getNodes() ([]*models.NodeState, error) {

getNodesRequest := apimodels.ListNodesRequest{}
response, err := c.api.Nodes().List(context.Background(), &getNodesRequest)
if err != nil {
Expand Down Expand Up @@ -132,33 +130,31 @@ func (c *BacalhauClient) getMachineSpecs() ([]data.MachineSpec, error) {
return specs, nil
}



func translateJob(job bacalhau.Job) *models.Job {
return &models.Job{
ID: job.Metadata.ID,
Name: job.Metadata.ID,
Namespace: "default",
Type: models.JobTypeBatch,
Priority: 0,
Count: 1,
Meta: make(map[string]string),
Labels: make(map[string]string),
ID: job.Metadata.ID,
Name: job.Metadata.ID,
Namespace: "default",
Type: models.JobTypeBatch,
Priority: 0,
Count: 1,
Meta: make(map[string]string),
Labels: make(map[string]string),
Tasks: []*models.Task{
{
Name: "main",
Engine: &models.SpecConfig{
Type: "docker",
Type: "docker",
Params: map[string]interface{}{
"Image": job.Spec.Docker.Image,
"Entrypoint": job.Spec.Docker.Entrypoint,
"Image": job.Spec.Docker.Image,
"Entrypoint": job.Spec.Docker.Entrypoint,
"EnvironmentVariables": job.Spec.Docker.EnvironmentVariables,
},
},
Publisher: &models.SpecConfig{
Type: "local",
},

ResourcesConfig: &models.ResourcesConfig{
CPU: job.Spec.Resources.CPU,
Memory: job.Spec.Resources.Memory,
Expand All @@ -182,7 +178,6 @@ func translateJob(job bacalhau.Job) *models.Job {
}
}


func translateEnvironmentVariables(envVars []string) map[string]string {
result := make(map[string]string)
for _, env := range envVars {
Expand All @@ -194,7 +189,6 @@ func translateEnvironmentVariables(envVars []string) map[string]string {
return result
}


func translateConstraints(constraints []bacalhau.LabelSelectorRequirement) []*models.LabelSelectorRequirement {
if len(constraints) == 0 {
return nil
Expand All @@ -209,8 +203,7 @@ func translateConstraints(constraints []bacalhau.LabelSelectorRequirement) []*mo
}
}
return translated
}

}

func translateInputSources(sources []bacalhau.StorageSpec) []*models.InputSource {
if len(sources) == 0 {
Expand All @@ -221,7 +214,7 @@ func translateInputSources(sources []bacalhau.StorageSpec) []*models.InputSource
for i, source := range sources {
translated[i] = &models.InputSource{
Source: &models.SpecConfig{
Type: string(source.StorageSource),
Type: source.StorageSource.String(),
},
Target: source.Path,
}
Expand All @@ -231,16 +224,16 @@ func translateInputSources(sources []bacalhau.StorageSpec) []*models.InputSource
}
}
if source.StorageSource == bacalhau.StorageSourceLocalDirectory {
translated[i].Source.Params = map[string]interface{}{
"SourcePath": source.SourcePath,
"ReadWrite": source.ReadWrite,
}
translated[i].Source.Params = map[string]interface{}{
"SourcePath": source.SourcePath,
"ReadWrite": source.ReadWrite,
}
}
if source.StorageSource == bacalhau.StorageSourceS3 {
translated[i].Source.Params = map[string]interface{}{
"Bucket": source.S3.Bucket,
"Key": source.S3.Key,
"Endpoint": source.S3.Endpoint,
"Bucket": source.S3.Bucket,
"Key": source.S3.Key,
"Endpoint": source.S3.Endpoint,
"ChecksumSHA256": source.S3.ChecksumSHA256,
}
}
Expand All @@ -267,4 +260,3 @@ func translateOutputSources(sources []bacalhau.StorageSpec) []*models.ResultPath
}
return translated
}

0 comments on commit ee15bfb

Please sign in to comment.