Skip to content

Commit

Permalink
fix bug that when submit job failes but sparkapp status do not change (
Browse files Browse the repository at this point in the history
  • Loading branch information
wackxu authored and liyinan926 committed Dec 16, 2019
1 parent c8e269b commit b7055ea
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 116 deletions.
17 changes: 11 additions & 6 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,21 @@ func newSparkApplicationController(
controller.applicationLister = crdInformer.Lister()

podsInformer := informerFactory.Core().V1().Pods()
sparkPodEventHandler := newSparkPodEventHandler(controller.queue.AddRateLimited, controller.applicationLister)
sparkObjectEventHandler := newSparkObjectEventHandler(controller.queue.AddRateLimited, controller.applicationLister)
podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sparkPodEventHandler.onPodAdded,
UpdateFunc: sparkPodEventHandler.onPodUpdated,
DeleteFunc: sparkPodEventHandler.onPodDeleted,
AddFunc: sparkObjectEventHandler.onObjectAdded,
UpdateFunc: sparkObjectEventHandler.onObjectUpdated,
DeleteFunc: sparkObjectEventHandler.onObjectDeleted,
})
controller.podLister = podsInformer.Lister()

jobLister := informerFactory.Batch().V1().Jobs().Lister()
controller.subJobManager = &realSubmissionJobManager{kubeClient: kubeClient, jobLister: jobLister}
jobInformer := informerFactory.Batch().V1().Jobs()
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sparkObjectEventHandler.onObjectAdded,
UpdateFunc: sparkObjectEventHandler.onObjectUpdated,
DeleteFunc: sparkObjectEventHandler.onObjectDeleted,
})
controller.subJobManager = &realSubmissionJobManager{kubeClient: kubeClient, jobLister: jobInformer.Lister()}

controller.cacheSynced = func() bool {
return crdInformer.Informer().HasSynced() && podsInformer.Informer().HasSynced()
Expand Down
97 changes: 97 additions & 0 deletions pkg/controller/sparkapplication/spark_object_eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2018 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sparkapplication

import (
"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

crdlisters "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/listers/sparkoperator.k8s.io/v1beta2"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
)

// sparkObjectEventHandler monitors Spark application resources and update the SparkApplication objects accordingly.
type sparkObjectEventHandler struct {
applicationLister crdlisters.SparkApplicationLister
// call-back function to enqueue SparkApp key for processing.
enqueueFunc func(appKey interface{})
}

// newSparkObjectEventHandler creates a new sparkObjectEventHandler instance.
func newSparkObjectEventHandler(enqueueFunc func(appKey interface{}), lister crdlisters.SparkApplicationLister) *sparkObjectEventHandler {
monitor := &sparkObjectEventHandler{
enqueueFunc: enqueueFunc,
applicationLister: lister,
}
return monitor
}

func (s *sparkObjectEventHandler) onObjectAdded(obj interface{}) {
object := obj.(metav1.Object)
glog.V(2).Infof("Object %s added in namespace %s.", object.GetName(), object.GetNamespace())
s.enqueueSparkAppForUpdate(object)
}

func (s *sparkObjectEventHandler) onObjectUpdated(old, updated interface{}) {
oldObj := old.(metav1.Object)
updatedObj := updated.(metav1.Object)

if updatedObj.GetResourceVersion() == oldObj.GetResourceVersion() {
return
}
glog.V(2).Infof("Object %s updated in namespace %s.", updatedObj.GetName(), updatedObj.GetNamespace())
s.enqueueSparkAppForUpdate(updatedObj)

}

func (s *sparkObjectEventHandler) onObjectDeleted(obj interface{}) {
var deletedObject metav1.Object

switch obj.(type) {
case metav1.Object:
deletedObject = obj.(metav1.Object)
case cache.DeletedFinalStateUnknown:
deletedObj := obj.(cache.DeletedFinalStateUnknown).Obj
deletedObject = deletedObj.(metav1.Object)
}

if deletedObject == nil {
return
}
glog.V(2).Infof("Object %s deleted in namespace %s.", deletedObject.GetName(), deletedObject.GetNamespace())
s.enqueueSparkAppForUpdate(deletedObject)
}

func (s *sparkObjectEventHandler) enqueueSparkAppForUpdate(object metav1.Object) {
appName, exists := getAppName(object)
if !exists {
return
}

if submissionID, exists := object.GetLabels()[config.SubmissionIDLabel]; exists {
app, err := s.applicationLister.SparkApplications(object.GetNamespace()).Get(appName)
if err != nil || app.Status.SubmissionID != submissionID {
return
}
}

appKey := createMetaNamespaceKey(object.GetNamespace(), appName)
glog.V(2).Infof("Enqueuing SparkApplication %s for app update processing.", appKey)
s.enqueueFunc(appKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestOnPodAdded(t *testing.T) {
Phase: apiv1.PodPending,
},
}
go monitor.onPodAdded(driverPod)
go monitor.onObjectAdded(driverPod)

key, _ := queue.Get()
actualNamespace, actualAppName, err := cache.SplitMetaNamespaceKey(key.(string))
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestOnPodAdded(t *testing.T) {
Phase: apiv1.PodRunning,
},
}
go monitor.onPodAdded(executorPod)
go monitor.onObjectAdded(executorPod)

key, _ = queue.Get()

Expand Down Expand Up @@ -133,7 +133,7 @@ func TestOnPodUpdated(t *testing.T) {
newDriverPod := oldDriverPod.DeepCopy()
newDriverPod.ResourceVersion = "2"
newDriverPod.Status.Phase = apiv1.PodSucceeded
go monitor.onPodUpdated(oldDriverPod, newDriverPod)
go monitor.onObjectUpdated(oldDriverPod, newDriverPod)

key, _ := queue.Get()

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestOnPodUpdated(t *testing.T) {
newExecutorPod := oldExecutorPod.DeepCopy()
newExecutorPod.ResourceVersion = "2"
newExecutorPod.Status.Phase = apiv1.PodFailed
go monitor.onPodUpdated(oldExecutorPod, newExecutorPod)
go monitor.onObjectUpdated(oldExecutorPod, newExecutorPod)

key, _ = queue.Get()

Expand Down Expand Up @@ -219,7 +219,7 @@ func TestOnPodDeleted(t *testing.T) {
Phase: apiv1.PodRunning,
},
}
go monitor.onPodDeleted(driverPod)
go monitor.onObjectDeleted(driverPod)

key, _ := queue.Get()
actualNamespace, actualAppName, err := cache.SplitMetaNamespaceKey(key.(string))
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestOnPodDeleted(t *testing.T) {
Phase: apiv1.PodSucceeded,
},
}
go monitor.onPodDeleted(executorPod)
go monitor.onObjectDeleted(executorPod)

key, _ = queue.Get()
actualNamespace, actualAppName, err = cache.SplitMetaNamespaceKey(key.(string))
Expand All @@ -280,9 +280,9 @@ func TestOnPodDeleted(t *testing.T) {
actualNamespace)
}

func newMonitor() (*sparkPodEventHandler, workqueue.RateLimitingInterface) {
func newMonitor() (*sparkObjectEventHandler, workqueue.RateLimitingInterface) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"spark-application-controller-test")
monitor := newSparkPodEventHandler(queue.AddRateLimited, nil)
monitor := newSparkObjectEventHandler(queue.AddRateLimited, nil)
return monitor, queue
}
97 changes: 0 additions & 97 deletions pkg/controller/sparkapplication/spark_pod_eventhandler.go

This file was deleted.

10 changes: 5 additions & 5 deletions pkg/controller/sparkapplication/sparkapp_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ package sparkapplication
import (
"fmt"

v1 "k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/policy"

"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
apiv1 "k8s.io/api/core/v1"
)

// Helper method to create a key with namespace and appName
func createMetaNamespaceKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func getAppName(pod *apiv1.Pod) (string, bool) {
appName, ok := pod.Labels[config.SparkAppNameLabel]
func getAppName(object metav1.Object) (string, bool) {
appName, ok := object.GetLabels()[config.SparkAppNameLabel]
return appName, ok
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func driverStateToApplicationState(podStatus apiv1.PodStatus) v1beta2.Applicatio
}
}

func getVolumeFSType(v v1.Volume) (policy.FSType, error) {
func getVolumeFSType(v apiv1.Volume) (policy.FSType, error) {
switch {
case v.HostPath != nil:
return policy.HostPath, nil
Expand Down

0 comments on commit b7055ea

Please sign in to comment.