diff --git a/pkg/admission/admission_controller.go b/pkg/admission/admission_controller.go index 7d502d2e77..bafca92277 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/admission_controller.go @@ -115,32 +115,43 @@ func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path) exitCodes := map[int32]struct{}{} for _, policy := range policies { - if policy.Event != "" && policy.ExitCode != nil { + if (policy.Event != "" || len(policy.Events) != 0) && policy.ExitCode != nil { err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously")) break } - if policy.Event == "" && policy.ExitCode == nil { + if policy.Event == "" && len(policy.Events) == 0 && policy.ExitCode == nil { err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified")) break } - if policy.Event != "" { - if allow, ok := policyEventMap[policy.Event]; !ok || !allow { - err = multierror.Append(err, field.Invalid(fldPath, policy.Event, fmt.Sprintf("invalid policy event"))) - break + if len(policy.Event) != 0 || len(policy.Events) != 0 { + bFlag := false + policyEventsList := getEventlist(policy) + for _, event := range policyEventsList { + if allow, ok := policyEventMap[event]; !ok || !allow { + err = multierror.Append(err, field.Invalid(fldPath, event, fmt.Sprintf("invalid policy event"))) + bFlag = true + break + } + + if allow, ok := policyActionMap[policy.Action]; !ok || !allow { + err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action"))) + bFlag = true + break + } + if _, found := policyEvents[event]; found { + err = multierror.Append(err, fmt.Errorf("duplicate event %v across different policy", event)) + bFlag = true + break + } else { + policyEvents[event] = struct{}{} + } } - - if allow, ok := policyActionMap[policy.Action]; !ok || !allow { - err = multierror.Append(err, field.Invalid(fldPath, policy.Action, fmt.Sprintf("invalid policy action"))) + if bFlag == true { break } - if _, found := policyEvents[policy.Event]; found { - err = multierror.Append(err, fmt.Errorf("duplicate event %v", policy.Event)) - break - } else { - policyEvents[policy.Event] = struct{}{} - } + } else { if *policy.ExitCode == 0 { err = multierror.Append(err, fmt.Errorf("0 is not a valid error code")) @@ -162,6 +173,27 @@ func validatePolicies(policies []v1alpha1.LifecyclePolicy, fldPath *field.Path) return err } +func getEventlist(policy v1alpha1.LifecyclePolicy) []v1alpha1.Event { + policyEventsList := policy.Events + if len(policy.Event) > 0 { + policyEventsList = append(policyEventsList, policy.Event) + } + uniquePolicyEventlist := removeDuplicates(policyEventsList) + return uniquePolicyEventlist +} + +func removeDuplicates(EventList []v1alpha1.Event) []v1alpha1.Event { + keys := make(map[v1alpha1.Event]bool) + list := []v1alpha1.Event{} + for _, val := range EventList { + if _, value := keys[val]; !value { + keys[val] = true + list = append(list, val) + } + } + return list +} + func getValidEvents() []v1alpha1.Event { var events []v1alpha1.Event for e, allow := range policyEventMap { diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index 888d87e026..5895b1830c 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -180,6 +180,11 @@ type LifecyclePolicy struct { // +optional Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"` + // The Events recorded by scheduler; the controller takes actions + // according to this Events. + // +optional + Events []Event `json:"events,omitempty" protobuf:"bytes,3,opt,name=events"` + // The exit code of the pod container, controller will take action // according to this code. // Note: only one of `Event` or `ExitCode` can be specified. @@ -189,7 +194,7 @@ type LifecyclePolicy struct { // Timeout is the grace period for controller to take actions. // Default to nil (take action immediately). // +optional - Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"` + Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,4,opt,name=timeout"` } // TaskSpec specifies the task specification of Job diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 7fd00a4301..8e17aa0bfd 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -188,6 +188,11 @@ func (in *JobStatus) DeepCopy() *JobStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LifecyclePolicy) DeepCopyInto(out *LifecyclePolicy) { *out = *in + if in.Events != nil { + in, out := &in.Events, &out.Events + *out = make([]Event, len(*in)) + copy(*out, *in) + } if in.ExitCode != nil { in, out := &in.ExitCode, &out.ExitCode *out = new(int32) diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 0dee416156..530e344fbd 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kbapi "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/controllers/apis" @@ -143,8 +144,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { for _, task := range job.Spec.Tasks { if task.Name == req.TaskName { for _, policy := range task.Policies { - if len(policy.Event) > 0 && len(req.Event) > 0 { - if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + policyEvents := getEventlist(policy) + + if len(policyEvents) > 0 && len(req.Event) > 0 { + if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, vkv1.AnyEvent) { return policy.Action } } @@ -161,8 +164,10 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { // Parse Job level policies for _, policy := range job.Spec.Policies { - if len(policy.Event) > 0 && len(req.Event) > 0 { - if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + policyEvents := getEventlist(policy) + + if len(policyEvents) > 0 && len(req.Event) > 0 { + if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, vkv1.AnyEvent) { return policy.Action } } @@ -176,8 +181,27 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { return vkv1.SyncJobAction } +func getEventlist(policy v1alpha1.LifecyclePolicy) []v1alpha1.Event { + policyEventsList := policy.Events + if len(policy.Event) > 0 { + policyEventsList = append(policyEventsList, policy.Event) + } + return policyEventsList +} + +func checkEventExist(policyEvents []v1alpha1.Event, reqEvent v1alpha1.Event) bool { + for _, event := range policyEvents { + if event == reqEvent { + return true + } + } + return false + +} + func addResourceList(list, req, limit v1.ResourceList) { for name, quantity := range req { + if value, ok := list[name]; !ok { list[name] = *quantity.Copy() } else {