Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Minor fixes #115

Merged
merged 1 commit into from
Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions pkg/controller.v1beta1/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,11 @@ func (pc *PyTorchController) Run(threadiness int, stopCh <-chan struct{}) error

// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced); !ok {
return fmt.Errorf("failed to wait for job caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.PodInformerSynced); !ok {
return fmt.Errorf("failed to wait for pod caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for service caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced,
pc.PodInformerSynced, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Infof("Starting %v workers", threadiness)
// Launch workers to process PyTorchJob resources.
for i := 0; i < threadiness; i++ {
Expand All @@ -213,15 +206,26 @@ func (pc *PyTorchController) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (pc *PyTorchController) processNextWorkItem() bool {
key, quit := pc.WorkQueue.Get()
obj, quit := pc.WorkQueue.Get()
if quit {
return false
}
defer pc.WorkQueue.Done(key)
defer pc.WorkQueue.Done(obj)

var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
pc.WorkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return true
}

logger := pylogger.LoggerForKey(key.(string))
logger := pylogger.LoggerForKey(key)

pytorchJob, err := pc.getPyTorchJobFromKey(key.(string))
pytorchJob, err := pc.getPyTorchJobFromKey(key)
if err != nil {
if err == errNotExists {
logger.Infof("PyTorchJob has been deleted: %v", key)
Expand All @@ -240,7 +244,7 @@ func (pc *PyTorchController) processNextWorkItem() bool {
}

// Sync PyTorchJob to mapch the actual state to this desired state.
forget, err := pc.syncHandler(key.(string))
forget, err := pc.syncHandler(key)
if err == nil {
if forget {
pc.WorkQueue.Forget(key)
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller.v1beta1/pytorch/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (pc *PyTorchController) getPyTorchJobFromKey(key string) (*v1beta1.PyTorchJ
return nil, errNotExists
}

job, err := jobFromUnstructured(obj)
if err != nil {
return nil, err
}
return job, nil
return jobFromUnstructured(obj)
}

func jobFromUnstructured(obj interface{}) (*v1beta1.PyTorchJob, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1beta1/pytorch/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
failedMarshalPyTorchJobReason = "FailedInvalidPyTorchJobSpec"
failedMarshalPyTorchJobReason = "InvalidPyTorchJobSpec"
)

// When a pod is added, set the defaults and enqueue the current pytorchjob.
Expand Down
35 changes: 19 additions & 16 deletions pkg/controller.v2/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,11 @@ func (pc *PyTorchController) Run(threadiness int, stopCh <-chan struct{}) error

// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced); !ok {
return fmt.Errorf("failed to wait for job caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.PodInformerSynced); !ok {
return fmt.Errorf("failed to wait for pod caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for service caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced,
pc.PodInformerSynced, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Infof("Starting %v workers", threadiness)
// Launch workers to process PyTorchJob resources.
for i := 0; i < threadiness; i++ {
Expand All @@ -213,15 +206,25 @@ func (pc *PyTorchController) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (pc *PyTorchController) processNextWorkItem() bool {
key, quit := pc.WorkQueue.Get()
obj, quit := pc.WorkQueue.Get()
if quit {
return false
}
defer pc.WorkQueue.Done(key)

logger := pylogger.LoggerForKey(key.(string))
defer pc.WorkQueue.Done(obj)

var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
pc.WorkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return true
}
logger := pylogger.LoggerForKey(key)

pytorchJob, err := pc.getPyTorchJobFromKey(key.(string))
pytorchJob, err := pc.getPyTorchJobFromKey(key)
if err != nil {
if err == errNotExists {
logger.Infof("PyTorchJob has been deleted: %v", key)
Expand All @@ -240,7 +243,7 @@ func (pc *PyTorchController) processNextWorkItem() bool {
}

// Sync PyTorchJob to mapch the actual state to this desired state.
forget, err := pc.syncHandler(key.(string))
forget, err := pc.syncHandler(key)
if err == nil {
if forget {
pc.WorkQueue.Forget(key)
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller.v2/pytorch/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (pc *PyTorchController) getPyTorchJobFromKey(key string) (*v1alpha2.PyTorch
return nil, errNotExists
}

job, err := jobFromUnstructured(obj)
if err != nil {
return nil, err
}
return job, nil
return jobFromUnstructured(obj)
}

func jobFromUnstructured(obj interface{}) (*v1alpha2.PyTorchJob, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v2/pytorch/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
failedMarshalPyTorchJobReason = "FailedInvalidPyTorchJobSpec"
failedMarshalPyTorchJobReason = "InvalidPyTorchJobSpec"
)

// When a pod is added, set the defaults and enqueue the current pytorchjob.
Expand Down