Skip to content

Commit

Permalink
fix: deleting cronjobs from old plugins
Browse files Browse the repository at this point in the history
* update ClusterScan SyncStatus function to always update the fields

* check if exists plugins status entry for a plugin name from an issue

* fix clusterscan controller to deal with old plugins
  • Loading branch information
matheusfm authored Mar 17, 2023
1 parent de4b3ea commit c538bed
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 155 deletions.
63 changes: 35 additions & 28 deletions apis/zora/v1alpha1/clusterscan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"sort"
"strings"

"github.com/undistro/zora/pkg/apis"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/undistro/zora/pkg/apis"
)

// ClusterScanSpec defines the desired state of ClusterScan
Expand Down Expand Up @@ -127,48 +128,54 @@ func (in *ClusterScanStatus) GetPluginStatus(name string) *PluginScanStatus {
return in.Plugins[name]
}

// SyncStatus fills ClusterScan status and time fields based on PluginStatus
// SyncStatus updates ClusterScan status and time fields based on PluginStatus
func (in *ClusterScanStatus) SyncStatus() {
var names []string
var failed, active, complete int
in.NextScheduleTime = nil
for n, p := range in.Plugins {
names = append(names, n)
if in.LastScheduleTime == nil || in.LastScheduleTime.Before(p.LastScheduleTime) {
in.LastScheduleTime = p.LastScheduleTime
var names, failed, active, complete []string
var sechedule, finishedTime, successful, next *metav1.Time
for name, plugin := range in.Plugins {
names = append(names, name)
if sechedule == nil || sechedule.Before(plugin.LastScheduleTime) {
sechedule = plugin.LastScheduleTime
}
if in.LastFinishedTime == nil || in.LastFinishedTime.Before(p.LastFinishedTime) {
in.LastFinishedTime = p.LastFinishedTime
if finishedTime == nil || finishedTime.Before(plugin.LastFinishedTime) {
finishedTime = plugin.LastFinishedTime
}
if in.LastSuccessfulTime == nil || in.LastSuccessfulTime.Before(p.LastSuccessfulTime) {
in.LastSuccessfulTime = p.LastSuccessfulTime
if successful == nil || successful.Before(plugin.LastSuccessfulTime) {
successful = plugin.LastSuccessfulTime
}
if in.NextScheduleTime == nil || p.NextScheduleTime.Before(in.NextScheduleTime) {
in.NextScheduleTime = p.NextScheduleTime
if next == nil || plugin.NextScheduleTime.Before(next) {
next = plugin.NextScheduleTime
}
if p.LastStatus == "Active" {
active++
if plugin.LastStatus == "Active" {
active = append(active, name)
}
switch p.LastFinishedStatus {
switch plugin.LastFinishedStatus {
case string(batchv1.JobFailed):
failed++
failed = append(failed, name)
case string(batchv1.JobComplete):
complete++
complete = append(complete, name)
}
}
var finishedStatus, status string

if failed > 0 {
in.LastFinishedStatus = string(batchv1.JobFailed)
in.LastStatus = string(batchv1.JobFailed)
if len(failed) > 0 {
finishedStatus = string(batchv1.JobFailed)
status = string(batchv1.JobFailed)
}
if failed == 0 && complete > 0 {
in.LastFinishedStatus = string(batchv1.JobComplete)
in.LastStatus = string(batchv1.JobComplete)
if len(failed) == 0 && len(complete) > 0 {
finishedStatus = string(batchv1.JobComplete)
status = string(batchv1.JobComplete)
}
if active > 0 {
in.LastStatus = "Active"
if len(active) > 0 {
status = "Active"
}

in.LastScheduleTime = sechedule
in.LastFinishedTime = finishedTime
in.LastSuccessfulTime = successful
in.NextScheduleTime = next
in.LastFinishedStatus = finishedStatus
in.LastStatus = status
sort.Strings(names)
in.PluginNames = strings.Join(names, ",")
}
Expand Down
53 changes: 53 additions & 0 deletions apis/zora/v1alpha1/clusterscan_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,59 @@ func TestSyncStatus(t *testing.T) {
PluginNames: "brutus",
},
},
{
name: "active + failed",
plugins: map[string]*PluginScanStatus{
"popeye": {
LastScheduleTime: mustParseTime("2022-08-08T21:00:00Z"),
LastStatus: "Active",
NextScheduleTime: mustParseTime("2022-08-08T22:00:00Z"),
LastScanID: "9da315be-b5a1-4f1a-952b-915cc19fe446",
},
"brutus": {
LastScheduleTime: mustParseTime("2022-08-08T21:00:00Z"),
LastFinishedTime: mustParseTime("2022-08-08T21:00:03Z"),
NextScheduleTime: mustParseTime("2022-08-08T22:00:00Z"),
LastScanID: "ce34e6fc-768d-49d0-91b5-65df89ed147d",
LastStatus: string(batchv1.JobFailed),
LastFinishedStatus: string(batchv1.JobFailed),
},
},
want: &ClusterScanStatus{
LastScheduleTime: mustParseTime("2022-08-08T21:00:00Z"),
LastFinishedTime: mustParseTime("2022-08-08T21:00:03Z"),
NextScheduleTime: mustParseTime("2022-08-08T22:00:00Z"),
LastStatus: "Active",
LastFinishedStatus: string(batchv1.JobFailed),
PluginNames: "brutus,popeye",
},
},
{
name: "single plugin has been replaced (popeye to brutus)",
currentStatus: &ClusterScanStatus{
LastScheduleTime: mustParseTime("2022-08-08T21:00:00Z"),
LastFinishedTime: mustParseTime("2022-08-08T21:00:03Z"),
LastSuccessfulTime: mustParseTime("2022-08-08T21:00:03Z"),
NextScheduleTime: mustParseTime("2022-08-08T22:00:00Z"),
LastStatus: string(batchv1.JobComplete),
LastFinishedStatus: string(batchv1.JobComplete),
PluginNames: "popeye",
},
plugins: map[string]*PluginScanStatus{
"brutus": {
NextScheduleTime: mustParseTime("2022-08-12T14:00:00Z"),
},
},
want: &ClusterScanStatus{
LastScheduleTime: nil,
LastFinishedTime: nil,
LastSuccessfulTime: nil,
NextScheduleTime: mustParseTime("2022-08-12T14:00:00Z"),
LastFinishedStatus: "",
LastStatus: "",
PluginNames: "brutus",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
111 changes: 57 additions & 54 deletions controllers/zora/clusterscan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/robfig/cron/v3"
"github.com/undistro/zora/pkg/saas"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -40,6 +39,8 @@ import (
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/undistro/zora/pkg/saas"

"github.com/undistro/zora/apis/zora/v1alpha1"
"github.com/undistro/zora/pkg/kubeconfig"
"github.com/undistro/zora/pkg/plugins/cronjobs"
Expand Down Expand Up @@ -147,7 +148,6 @@ func (r *ClusterScanReconciler) Reconcile(ctx context.Context, req ctrl.Request)

func (r *ClusterScanReconciler) reconcile(ctx context.Context, clusterscan *v1alpha1.ClusterScan) error {
var notReadyErr error
var cronJob *batchv1.CronJob
log := ctrllog.FromContext(ctx)

cluster := &v1alpha1.Cluster{}
Expand Down Expand Up @@ -185,14 +185,6 @@ func (r *ClusterScanReconciler) reconcile(ctx context.Context, clusterscan *v1al
pluginRefs = clusterscan.Spec.Plugins
}

cjlist := &batchv1.CronJobList{}
if err := r.List(ctx, cjlist, client.MatchingLabels{
cronjobs.LabelClusterScan: clusterscan.Name,
}); err != nil {
return err
}
cjmap := mapCjSlice(cjlist.Items)

for _, ref := range pluginRefs {
pluginKey := ref.PluginKey(r.DefaultPluginsNamespace)
plugin := &v1alpha1.Plugin{}
Expand All @@ -201,20 +193,7 @@ func (r *ClusterScanReconciler) reconcile(ctx context.Context, clusterscan *v1al
clusterscan.SetReadyStatus(false, "PluginFetchError", err.Error())
return err
}
if len(cjmap) != 0 {
cj, ok := cjmap[plugin.Name]
if !ok {
return fmt.Errorf("No <CronJob> for plugin <%s>", plugin.Name)
}
delete(cjmap, plugin.Name)
cronJob = cj
} else {
cronJob = cronjobs.New(
fmt.Sprintf("%s-%s", clusterscan.Name, plugin.Name),
kubeconfigSecret.Namespace,
)
}

cronJob := cronjobs.New(fmt.Sprintf("%s-%s", clusterscan.Name, plugin.Name), kubeconfigSecret.Namespace)
cronJobMutator := &cronjobs.Mutator{
Scheme: r.Scheme,
Existing: cronJob,
Expand All @@ -227,7 +206,7 @@ func (r *ClusterScanReconciler) reconcile(ctx context.Context, clusterscan *v1al
Suspend: notReadyErr != nil,
}

result, err := ctrl.CreateOrUpdate(ctx, r.Client, cronJob, cronJobMutator.Mutate())
result, err := ctrl.CreateOrUpdate(ctx, r.Client, cronJob, cronJobMutator.Mutate)
if err != nil {
log.Error(err, fmt.Sprintf("failed to apply CronJob %s", cronJob.Name))
clusterscan.SetReadyStatus(false, "CronJobApplyError", err.Error())
Expand Down Expand Up @@ -277,26 +256,15 @@ func (r *ClusterScanReconciler) reconcile(ctx context.Context, clusterscan *v1al
}
}
}
if len(cjmap) != 0 {
r.deleteCjs(ctx, cjmap)
}

if issues, err := r.getClusterIssues(ctx, clusterscan.Status.LastScanIDs(true)...); err != nil {
r.deleteOldPlugins(ctx, clusterscan, pluginRefs)

issues, err := r.getClusterIssues(ctx, clusterscan.Status.LastScanIDs(true)...)
if err != nil {
clusterscan.SetReadyStatus(false, "ClusterIssueListError", err.Error())
return err
} else if issues != nil {
issc := map[string]int{}
for _, i := range issues {
issc[i.Labels[v1alpha1.LabelPlugin]]++
}
for p, c := range issc {
if clusterscan.Status.Plugins[p].IssueCount == nil {
clusterscan.Status.Plugins[p].IssueCount = new(int)
}
*clusterscan.Status.Plugins[p].IssueCount = c
}
clusterscan.Status.TotalIssues = pointer.Int(len(issues))
}
r.countIssues(issues, clusterscan)

clusterscan.Status.SyncStatus()
clusterscan.Status.Suspend = notReadyErr != nil
Expand All @@ -308,25 +276,60 @@ func (r *ClusterScanReconciler) reconcile(ctx context.Context, clusterscan *v1al
return notReadyErr
}

// Transforms the slice of <Cronjobs> into a map in the form:
// <plugin_name>: <cronjob_pointer>
func mapCjSlice(cjs []batchv1.CronJob) map[string]*batchv1.CronJob {
cjmap := map[string]*batchv1.CronJob{}
for c := 0; c < len(cjs); c++ {
cjmap[cjs[c].Labels[cronjobs.LabelPlugin]] = &cjs[c]
// countIssues update the fields IssueCount (for each plugin) and TotalIssues from ClusterScan status based on the given issues
func (r *ClusterScanReconciler) countIssues(issues []v1alpha1.ClusterIssue, clusterscan *v1alpha1.ClusterScan) {
totalIssuesByPlugin := map[string]int{}
var totalIssues *int
for _, i := range issues {
totalIssuesByPlugin[i.Labels[v1alpha1.LabelPlugin]]++
if totalIssues == nil {
totalIssues = new(int)
}
*totalIssues++
}
return cjmap
for p, ps := range clusterscan.Status.Plugins {
if t, ok := totalIssuesByPlugin[p]; ok {
ps.IssueCount = &t
} else {
ps.IssueCount = nil
}
}
clusterscan.Status.TotalIssues = totalIssues
}

// Deletes <Cronjobs> in the map parameter. If the deletion fails, the error
// will be logged.
func (r *ClusterScanReconciler) deleteCjs(ctx context.Context, cjmap map[string]*batchv1.CronJob) {
l := ctrllog.FromContext(ctx)
for _, cj := range cjmap {
// deleteOldPlugins deletes the old plugins from ClusterScan Status and their CronJobs
func (r *ClusterScanReconciler) deleteOldPlugins(ctx context.Context, clusterscan *v1alpha1.ClusterScan, pluginRefs []v1alpha1.PluginReference) {
log := ctrllog.FromContext(ctx)
oldPlugins := r.getOldPlugins(clusterscan, pluginRefs)
for _, plugin := range oldPlugins {
cj := &batchv1.CronJob{ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", clusterscan.Name, plugin),
Namespace: clusterscan.Namespace,
}}
if err := r.Delete(ctx, cj); err != nil {
l.Error(err, fmt.Sprintf("Failed to delete dangling <CronJob> <%s/%s>", cj.Namespace, cj.Name))
log.Error(err, "failed to delete CronJob", "cronJobName", cj.Name)
} else {
delete(clusterscan.Status.Plugins, plugin)
}
}
}

// getOldPlugins returns the names of the plugins that are in the ClusterScan Status but are not declared (ClusterScan Spec or default)
func (r *ClusterScanReconciler) getOldPlugins(clusterscan *v1alpha1.ClusterScan, pluginRefs []v1alpha1.PluginReference) []string {
var oldPlugins []string
for statusPlugin := range clusterscan.Status.Plugins {
outdated := false
for _, specPlugin := range pluginRefs {
if statusPlugin == specPlugin.Name {
outdated = true
continue
}
}
if !outdated {
oldPlugins = append(oldPlugins, statusPlugin)
}
}
return oldPlugins
}

// Extracts error messages emitted by plugins when their execution fails.
Expand Down
9 changes: 5 additions & 4 deletions pkg/payloads/v1alpha1/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ func NewScanStatusWithIssues(scans []v1alpha1.ClusterScan, issues []v1alpha1.Clu
return nil
}
for _, i := range issues {
pluginStatus[i.Labels[v1alpha1.LabelPlugin]].Issues = append(
pluginStatus[i.Labels[v1alpha1.LabelPlugin]].Issues,
NewResourcedIssue(i),
)
plugin := i.Labels[v1alpha1.LabelPlugin]
if _, ok := pluginStatus[plugin]; ok {
pluginStatus[plugin].Issues = append(pluginStatus[plugin].Issues, NewResourcedIssue(i))
}

}
return pluginStatus
}
Expand Down
Loading

0 comments on commit c538bed

Please sign in to comment.