From dcc63aaf70d2b20bbd2d66b14609710baa25e01c Mon Sep 17 00:00:00 2001 From: chenk Date: Wed, 23 Nov 2022 16:16:48 +0200 Subject: [PATCH] refactor: separate scan job with different reconciler (#727) * refactor: separate scan job with different reconciler Signed-off-by: chenk * refactor: separate scan job with different reconciler Signed-off-by: chenk * refactor: separate scan job with different reconciler Signed-off-by: chenk * refactor: separate scan job with different reconciler Signed-off-by: chenk * refactor: separate scan job with different reconciler Signed-off-by: chenk Signed-off-by: chenk --- pkg/operator/operator.go | 18 +- pkg/trivyoperator/config.go | 8 +- pkg/vulnerabilityreport/controller/helper.go | 72 +++++ pkg/vulnerabilityreport/controller/scanjob.go | 269 +++++++++++++++ .../{controller.go => controller/workload.go} | 306 +----------------- pkg/vulnerabilityreport/suite_test.go | 6 +- 6 files changed, 374 insertions(+), 305 deletions(-) create mode 100644 pkg/vulnerabilityreport/controller/helper.go create mode 100644 pkg/vulnerabilityreport/controller/scanjob.go rename pkg/vulnerabilityreport/{controller.go => controller/workload.go} (51%) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 4b5ca86c1..188f6d290 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -19,6 +19,7 @@ import ( "github.com/aquasecurity/trivy-operator/pkg/rbacassessment" "github.com/aquasecurity/trivy-operator/pkg/trivyoperator" "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport" + vcontroller "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport/controller" "github.com/aquasecurity/trivy-operator/pkg/webhook" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" @@ -163,14 +164,13 @@ func Start(ctx context.Context, buildInfo trivyoperator.BuildInfo, operatorConfi return fmt.Errorf("initializing %s plugin: %w", pluginContext.GetName(), err) } - if err = (&vulnerabilityreport.WorkloadController{ + if err = (&vcontroller.WorkloadController{ Logger: ctrl.Log.WithName("reconciler").WithName("vulnerabilityreport"), Config: operatorConfig, ConfigData: trivyOperatorConfig, Client: mgr.GetClient(), ObjectResolver: objectResolver, LimitChecker: limitChecker, - LogsReader: logsReader, SecretsReader: secretsReader, Plugin: plugin, PluginContext: pluginContext, @@ -180,6 +180,20 @@ func Start(ctx context.Context, buildInfo trivyoperator.BuildInfo, operatorConfi return fmt.Errorf("unable to setup vulnerabilityreport reconciler: %w", err) } + if err = (&vcontroller.ScanJobController{ + Logger: ctrl.Log.WithName("reconciler").WithName("scan job"), + Config: operatorConfig, + ConfigData: trivyOperatorConfig, + ObjectResolver: objectResolver, + LogsReader: logsReader, + Plugin: plugin, + PluginContext: pluginContext, + VulnerabilityReadWriter: vulnerabilityreport.NewReadWriter(&objectResolver), + ExposedSecretReadWriter: exposedsecretreport.NewReadWriter(&objectResolver), + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to setup scan job reconciler: %w", err) + } + if operatorConfig.ScannerReportTTL != nil { if err = (&TTLReportReconciler{ Logger: ctrl.Log.WithName("reconciler").WithName("ttlreport"), diff --git a/pkg/trivyoperator/config.go b/pkg/trivyoperator/config.go index 33c72b42b..bb3ac4135 100644 --- a/pkg/trivyoperator/config.go +++ b/pkg/trivyoperator/config.go @@ -87,10 +87,10 @@ type ConfigManager interface { // GetDefaultConfig returns the default configuration settings. func GetDefaultConfig() ConfigData { return map[string]string{ - keyVulnerabilityReportsScanner: "Trivy", - keyConfigAuditReportsScanner: "Trivy", - KeyScanJobcompressLogs: "true", - "compliance.failEntriesLimit": "10", + keyVulnerabilityReportsScanner: "Trivy", + keyConfigAuditReportsScanner: "Trivy", + KeyScanJobcompressLogs: "true", + "compliance.failEntriesLimit": "10", KeyReportRecordFailedChecksOnly: "true", } } diff --git a/pkg/vulnerabilityreport/controller/helper.go b/pkg/vulnerabilityreport/controller/helper.go new file mode 100644 index 000000000..9110ce5a1 --- /dev/null +++ b/pkg/vulnerabilityreport/controller/helper.go @@ -0,0 +1,72 @@ +package controller + +import ( + "context" + "reflect" + + "github.com/aquasecurity/trivy-operator/pkg/exposedsecretreport" + "github.com/aquasecurity/trivy-operator/pkg/kube" + "github.com/aquasecurity/trivy-operator/pkg/trivyoperator" + "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport" +) + +func hasReports(ctx context.Context, esReadWriter exposedsecretreport.ReadWriter, vulnReadWriter vulnerabilityreport.ReadWriter, owner kube.ObjectRef, hash string, images kube.ContainerImages) (bool, error) { + hasVulnerabilityReports, err := hasVulnerabilityReports(ctx, vulnReadWriter, owner, hash, images) + if err != nil { + return false, err + } + + hasSecretReports, err := hasSecretReports(ctx, esReadWriter, owner, hash, images) + if err != nil { + return false, err + } + + return hasVulnerabilityReports && hasSecretReports, nil +} + +func hasVulnerabilityReports(ctx context.Context, vulnReadWriter vulnerabilityreport.ReadWriter, owner kube.ObjectRef, hash string, images kube.ContainerImages) (bool, error) { + // TODO FindByOwner should accept optional label selector to further narrow down search results + list, err := vulnReadWriter.FindByOwner(ctx, owner) + if err != nil { + return false, err + } + + actual := map[string]bool{} + for _, report := range list { + if containerName, ok := report.Labels[trivyoperator.LabelContainerName]; ok { + if hash == report.Labels[trivyoperator.LabelResourceSpecHash] { + actual[containerName] = true + } + } + } + + return compareReports(actual, images), nil +} + +func hasSecretReports(ctx context.Context, esReadWriter exposedsecretreport.ReadWriter, owner kube.ObjectRef, hash string, images kube.ContainerImages) (bool, error) { + // TODO FindByOwner should accept optional label selector to further narrow down search results + list, err := esReadWriter.FindByOwner(ctx, owner) + if err != nil { + return false, err + } + + actual := map[string]bool{} + for _, report := range list { + if containerName, ok := report.Labels[trivyoperator.LabelContainerName]; ok { + if hash == report.Labels[trivyoperator.LabelResourceSpecHash] { + actual[containerName] = true + } + } + } + + return compareReports(actual, images), nil +} + +func compareReports(actual map[string]bool, images kube.ContainerImages) bool { + expected := map[string]bool{} + for containerName := range images { + expected[containerName] = true + } + + return reflect.DeepEqual(actual, expected) +} diff --git a/pkg/vulnerabilityreport/controller/scanjob.go b/pkg/vulnerabilityreport/controller/scanjob.go new file mode 100644 index 000000000..69eab5a34 --- /dev/null +++ b/pkg/vulnerabilityreport/controller/scanjob.go @@ -0,0 +1,269 @@ +package controller + +import ( + "context" + "fmt" + + "github.com/aquasecurity/trivy-operator/pkg/apis/aquasecurity/v1alpha1" + "github.com/aquasecurity/trivy-operator/pkg/exposedsecretreport" + "github.com/aquasecurity/trivy-operator/pkg/kube" + "github.com/aquasecurity/trivy-operator/pkg/operator/etc" + . "github.com/aquasecurity/trivy-operator/pkg/operator/predicate" + "github.com/aquasecurity/trivy-operator/pkg/trivyoperator" + "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport" + "github.com/go-logr/logr" + "go.uber.org/multierr" + batchv1 "k8s.io/api/batch/v1" + k8sapierror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// WorkloadController watches Kubernetes workloads and generates +// v1alpha1.VulnerabilityReport instances using vulnerability scanner that that +// implements the Plugin interface. +type ScanJobController struct { + logr.Logger + etc.Config + kube.ObjectResolver + kube.LogsReader + vulnerabilityreport.Plugin + trivyoperator.PluginContext + trivyoperator.ConfigData + VulnerabilityReadWriter vulnerabilityreport.ReadWriter + ExposedSecretReadWriter exposedsecretreport.ReadWriter +} + +// Manage scan jobs with image pull secrets +// kubebuilder:rbac:groups="",resources=secrets,verbs=create;update +//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete + +func (r *ScanJobController) SetupWithManager(mgr ctrl.Manager) error { + var predicates []predicate.Predicate + if !r.ConfigData.VulnerabilityScanJobsInSameNamespace() { + predicates = append(predicates, InNamespace(r.Config.Namespace)) + } + predicates = append(predicates, ManagedByTrivyOperator, IsVulnerabilityReportScan, JobHasAnyCondition) + return ctrl.NewControllerManagedBy(mgr). + For(&batchv1.Job{}, builder.WithPredicates(predicates...)). + Complete(r.reconcileJobs()) +} + +func (r *ScanJobController) reconcileJobs() reconcile.Func { + return func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Logger.WithValues("job", req.NamespacedName) + + job := &batchv1.Job{} + err := r.Client.Get(ctx, req.NamespacedName, job) + if err != nil { + if k8sapierror.IsNotFound(err) { + log.V(1).Info("Ignoring cached job that must have been deleted") + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("getting job from cache: %w", err) + } + + if len(job.Status.Conditions) == 0 { + log.V(1).Info("Ignoring Job without conditions") + return ctrl.Result{}, nil + } + + switch jobCondition := job.Status.Conditions[0].Type; jobCondition { + case batchv1.JobComplete: + err = r.processCompleteScanJob(ctx, job) + case batchv1.JobFailed: + err = r.processFailedScanJob(ctx, job) + default: + err = fmt.Errorf("unrecognized scan job condition: %v", jobCondition) + } + + return ctrl.Result{}, err + } + +} + +func (r *ScanJobController) processCompleteScanJob(ctx context.Context, job *batchv1.Job) error { + log := r.Logger.WithValues("job", fmt.Sprintf("%s/%s", job.Namespace, job.Name)) + + ownerRef, err := kube.ObjectRefFromObjectMeta(job.ObjectMeta) + if err != nil { + return fmt.Errorf("getting owner ref from scan job metadata: %w", err) + } + + owner, err := r.ObjectFromObjectRef(ctx, ownerRef) + if err != nil { + if k8sapierror.IsNotFound(err) { + log.V(1).Info("Report owner must have been deleted", "owner", owner) + return r.deleteJob(ctx, job) + } + return fmt.Errorf("getting object from object ref: %w", err) + } + + containerImages, err := kube.GetContainerImagesFromJob(job) + if err != nil { + return fmt.Errorf("getting container images: %w", err) + } + + podSpecHash, ok := job.Labels[trivyoperator.LabelResourceSpecHash] + if !ok { + return fmt.Errorf("expected label %s not set", trivyoperator.LabelResourceSpecHash) + } + + hasReports, err := hasReports(ctx, r.ExposedSecretReadWriter, r.VulnerabilityReadWriter, ownerRef, podSpecHash, containerImages) + if err != nil { + return err + } + + if hasReports { + log.V(1).Info("VulnerabilityReports already exist", "owner", owner) + log.V(1).Info("Deleting complete scan job", "owner", owner) + return r.deleteJob(ctx, job) + } + + var vulnerabilityReports []v1alpha1.VulnerabilityReport + var secretReports []v1alpha1.ExposedSecretReport + + var merr error + for containerName, containerImage := range containerImages { + vulnReports, secReports, err := r.processScanJobResults(ctx, job, containerName, containerImage, owner) + if err != nil { + merr = multierr.Append(merr, err) + } + vulnerabilityReports = append(vulnerabilityReports, vulnReports...) + secretReports = append(secretReports, secReports...) + } + if merr != nil { + return merr + } + + if r.Config.VulnerabilityScannerEnabled { + err = r.VulnerabilityReadWriter.Write(ctx, vulnerabilityReports) + if err != nil { + return err + } + } + + if r.Config.ExposedSecretScannerEnabled { + err = r.ExposedSecretReadWriter.Write(ctx, secretReports) + if err != nil { + return err + } + } + + log.V(1).Info("Deleting complete scan job", "owner", owner) + return r.deleteJob(ctx, job) +} + +func (r *ScanJobController) processScanJobResults(ctx context.Context, job *batchv1.Job, containerName, containerImage string, owner client.Object) ([]v1alpha1.VulnerabilityReport, []v1alpha1.ExposedSecretReport, error) { + log := r.Logger.WithValues("job-results-processor", fmt.Sprintf("%s/%s", job.Namespace, job.Name)) + + var vulnerabilityReports []v1alpha1.VulnerabilityReport + var secretReports []v1alpha1.ExposedSecretReport + + podSpecHash, ok := job.Labels[trivyoperator.LabelResourceSpecHash] + if !ok { + return nil, nil, fmt.Errorf("expected label %s not set", trivyoperator.LabelResourceSpecHash) + } + + logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, containerName) + if err != nil { + if k8sapierror.IsNotFound(err) { + log.V(1).Info("Cached job must have been deleted") + return nil, nil, nil + } + if kube.IsPodControlledByJobNotFound(err) { + log.V(1).Info("Pod must have been deleted") + return nil, nil, r.deleteJob(ctx, job) + } + return nil, nil, fmt.Errorf("getting logs for pod %q: %w", job.Namespace+"/"+job.Name, err) + } + + defer func() { + err := logsStream.Close() + if err != nil { + log.V(1).Error(err, "could not close log stream") + } + }() + + vulnReportData, secretReportData, err := r.Plugin.ParseReportData(r.PluginContext, containerImage, logsStream) + if err != nil { + return nil, nil, err + } + + resourceLabelsToInclude := r.GetReportResourceLabels() + + reportBuilder := vulnerabilityreport.NewReportBuilder(r.Client.Scheme()). + Controller(owner). + Container(containerName). + Data(vulnReportData). + PodSpecHash(podSpecHash). + ResourceLabelsToInclude(resourceLabelsToInclude) + + if r.Config.ScannerReportTTL != nil { + reportBuilder.ReportTTL(r.Config.ScannerReportTTL) + } + + report, err := reportBuilder.Get() + if err != nil { + return nil, nil, err + } + + secretReportBuilder := exposedsecretreport.NewReportBuilder(r.Client.Scheme()). + Controller(owner). + Container(containerName). + Data(secretReportData). + PodSpecHash(podSpecHash). + ResourceLabelsToInclude(resourceLabelsToInclude) + if r.Config.ScannerReportTTL != nil { + secretReportBuilder.ReportTTL(r.Config.ScannerReportTTL) + } + secretReport, err := secretReportBuilder.Get() + if err != nil { + return nil, nil, err + } + + vulnerabilityReports = append(vulnerabilityReports, report) + secretReports = append(secretReports, secretReport) + + return vulnerabilityReports, secretReports, nil +} + +func (r *ScanJobController) processFailedScanJob(ctx context.Context, scanJob *batchv1.Job) error { + log := r.Logger.WithValues("job", fmt.Sprintf("%s/%s", scanJob.Namespace, scanJob.Name)) + + statuses, err := r.GetTerminatedContainersStatusesByJob(ctx, scanJob) + if err != nil { + if k8sapierror.IsNotFound(err) { + log.V(1).Info("Cached job must have been deleted") + return nil + } + if kube.IsPodControlledByJobNotFound(err) { + log.V(1).Info("Pod must have been deleted") + return r.deleteJob(ctx, scanJob) + } + return err + } + for container, status := range statuses { + if status.ExitCode == 0 { + continue + } + log.Error(nil, "Scan job container", "container", container, "status.reason", status.Reason, "status.message", status.Message) + } + log.V(1).Info("Deleting failed scan job") + return r.deleteJob(ctx, scanJob) +} + +func (r *ScanJobController) deleteJob(ctx context.Context, job *batchv1.Job) error { + err := r.Client.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil { + if k8sapierror.IsNotFound(err) { + return nil + } + return fmt.Errorf("deleting job: %w", err) + } + return nil +} diff --git a/pkg/vulnerabilityreport/controller.go b/pkg/vulnerabilityreport/controller/workload.go similarity index 51% rename from pkg/vulnerabilityreport/controller.go rename to pkg/vulnerabilityreport/controller/workload.go index 702b0d35b..c081dd0ca 100644 --- a/pkg/vulnerabilityreport/controller.go +++ b/pkg/vulnerabilityreport/controller/workload.go @@ -1,11 +1,10 @@ -package vulnerabilityreport +package controller import ( "context" "errors" "fmt" "net/http" - "reflect" "strings" "github.com/aquasecurity/trivy-operator/pkg/apis/aquasecurity/v1alpha1" @@ -17,16 +16,14 @@ import ( . "github.com/aquasecurity/trivy-operator/pkg/operator/predicate" "github.com/aquasecurity/trivy-operator/pkg/operator/workload" "github.com/aquasecurity/trivy-operator/pkg/trivyoperator" + "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport" "github.com/go-logr/logr" - "go.uber.org/multierr" batchv1 "k8s.io/api/batch/v1" k8sapierror "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -36,15 +33,14 @@ import ( type WorkloadController struct { logr.Logger etc.Config - client.Client kube.ObjectResolver + client.Client jobs.LimitChecker - kube.LogsReader - kube.SecretsReader - Plugin + vulnerabilityreport.Plugin trivyoperator.PluginContext + kube.SecretsReader trivyoperator.ConfigData - VulnerabilityReadWriter ReadWriter + VulnerabilityReadWriter vulnerabilityreport.ReadWriter ExposedSecretReadWriter exposedsecretreport.ReadWriter } @@ -60,7 +56,6 @@ type WorkloadController struct { // Manage scan jobs with image pull secrets // kubebuilder:rbac:groups="",resources=secrets,verbs=create;update -//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete func (r *WorkloadController) SetupWithManager(mgr ctrl.Manager) error { installModePredicate, err := InstallModePredicate(r.Config) @@ -95,14 +90,7 @@ func (r *WorkloadController) SetupWithManager(mgr ctrl.Manager) error { return err } } - var predicates []predicate.Predicate - if !r.ConfigData.VulnerabilityScanJobsInSameNamespace() { - predicates = append(predicates, InNamespace(r.Config.Namespace)) - } - predicates = append(predicates, ManagedByTrivyOperator, IsVulnerabilityReportScan, JobHasAnyCondition) - return ctrl.NewControllerManagedBy(mgr). - For(&batchv1.Job{}, builder.WithPredicates(predicates...)). - Complete(r.reconcileJobs()) + return nil } func (r *WorkloadController) reconcileWorkload(workloadKind kube.Kind) reconcile.Func { @@ -145,7 +133,7 @@ func (r *WorkloadController) reconcileWorkload(workloadKind kube.Kind) reconcile log = log.WithValues("podSpecHash", hash) // Check if containers of the Pod have corresponding VulnerabilityReports. - hasReports, err := r.hasReports(ctx, workloadRef, hash, containerImages) + hasReports, err := hasReports(ctx, r.ExposedSecretReadWriter, r.VulnerabilityReadWriter, workloadRef, hash, containerImages) if err != nil { return ctrl.Result{}, fmt.Errorf("getting vulnerability reports: %w", err) } @@ -192,67 +180,6 @@ func (r *WorkloadController) trivyServerAvaliable(serverURL string) (bool, error return false, nil } -func (r *WorkloadController) hasReports(ctx context.Context, owner kube.ObjectRef, hash string, images kube.ContainerImages) (bool, error) { - hasVulnerabilityReports, err := r.hasVulnerabilityReports(ctx, owner, hash, images) - if err != nil { - return false, err - } - - hasSecretReports, err := r.hasSecretReports(ctx, owner, hash, images) - if err != nil { - return false, err - } - - return hasVulnerabilityReports && hasSecretReports, nil -} - -func (r *WorkloadController) hasVulnerabilityReports(ctx context.Context, owner kube.ObjectRef, hash string, images kube.ContainerImages) (bool, error) { - // TODO FindByOwner should accept optional label selector to further narrow down search results - list, err := r.VulnerabilityReadWriter.FindByOwner(ctx, owner) - if err != nil { - return false, err - } - - actual := map[string]bool{} - for _, report := range list { - if containerName, ok := report.Labels[trivyoperator.LabelContainerName]; ok { - if hash == report.Labels[trivyoperator.LabelResourceSpecHash] { - actual[containerName] = true - } - } - } - - return compareReports(actual, images), nil -} - -func (r *WorkloadController) hasSecretReports(ctx context.Context, owner kube.ObjectRef, hash string, images kube.ContainerImages) (bool, error) { - // TODO FindByOwner should accept optional label selector to further narrow down search results - list, err := r.ExposedSecretReadWriter.FindByOwner(ctx, owner) - if err != nil { - return false, err - } - - actual := map[string]bool{} - for _, report := range list { - if containerName, ok := report.Labels[trivyoperator.LabelContainerName]; ok { - if hash == report.Labels[trivyoperator.LabelResourceSpecHash] { - actual[containerName] = true - } - } - } - - return compareReports(actual, images), nil -} - -func compareReports(actual map[string]bool, images kube.ContainerImages) bool { - expected := map[string]bool{} - for containerName := range images { - expected[containerName] = true - } - - return reflect.DeepEqual(actual, expected) -} - func (r *WorkloadController) hasActiveScanJob(ctx context.Context, owner kube.ObjectRef, hash string) (bool, *batchv1.Job, error) { jobName := fmt.Sprintf("scan-vulnerabilityreport-%s", kube.ComputeHash(owner)) job := &batchv1.Job{} @@ -316,7 +243,7 @@ func (r *WorkloadController) submitScanJob(ctx context.Context, owner client.Obj return fmt.Errorf("getting scan job template labels: %w", err) } - scanJob, secrets, err := NewScanJobBuilder(). + scanJob, secrets, err := vulnerabilityreport.NewScanJobBuilder(). WithPlugin(r.Plugin). WithPluginContext(r.PluginContext). WithTimeout(r.Config.ScanJobTimeout). @@ -371,218 +298,3 @@ func (r *WorkloadController) submitScanJob(ctx context.Context, owner client.Obj return nil } - -func (r *WorkloadController) reconcileJobs() reconcile.Func { - return func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Logger.WithValues("job", req.NamespacedName) - - job := &batchv1.Job{} - err := r.Client.Get(ctx, req.NamespacedName, job) - if err != nil { - if k8sapierror.IsNotFound(err) { - log.V(1).Info("Ignoring cached job that must have been deleted") - return ctrl.Result{}, nil - } - return ctrl.Result{}, fmt.Errorf("getting job from cache: %w", err) - } - - if len(job.Status.Conditions) == 0 { - log.V(1).Info("Ignoring Job without conditions") - return ctrl.Result{}, nil - } - - switch jobCondition := job.Status.Conditions[0].Type; jobCondition { - case batchv1.JobComplete: - err = r.processCompleteScanJob(ctx, job) - case batchv1.JobFailed: - err = r.processFailedScanJob(ctx, job) - default: - err = fmt.Errorf("unrecognized scan job condition: %v", jobCondition) - } - - return ctrl.Result{}, err - } - -} - -func (r *WorkloadController) processCompleteScanJob(ctx context.Context, job *batchv1.Job) error { - log := r.Logger.WithValues("job", fmt.Sprintf("%s/%s", job.Namespace, job.Name)) - - ownerRef, err := kube.ObjectRefFromObjectMeta(job.ObjectMeta) - if err != nil { - return fmt.Errorf("getting owner ref from scan job metadata: %w", err) - } - - owner, err := r.ObjectFromObjectRef(ctx, ownerRef) - if err != nil { - if k8sapierror.IsNotFound(err) { - log.V(1).Info("Report owner must have been deleted", "owner", owner) - return r.deleteJob(ctx, job) - } - return fmt.Errorf("getting object from object ref: %w", err) - } - - containerImages, err := kube.GetContainerImagesFromJob(job) - if err != nil { - return fmt.Errorf("getting container images: %w", err) - } - - podSpecHash, ok := job.Labels[trivyoperator.LabelResourceSpecHash] - if !ok { - return fmt.Errorf("expected label %s not set", trivyoperator.LabelResourceSpecHash) - } - - hasReports, err := r.hasReports(ctx, ownerRef, podSpecHash, containerImages) - if err != nil { - return err - } - - if hasReports { - log.V(1).Info("VulnerabilityReports already exist", "owner", owner) - log.V(1).Info("Deleting complete scan job", "owner", owner) - return r.deleteJob(ctx, job) - } - - var vulnerabilityReports []v1alpha1.VulnerabilityReport - var secretReports []v1alpha1.ExposedSecretReport - - var merr error - for containerName, containerImage := range containerImages { - vulnReports, secReports, err := r.processScanJobResults(ctx, job, containerName, containerImage, owner) - if err != nil { - merr = multierr.Append(merr, err) - } - vulnerabilityReports = append(vulnerabilityReports, vulnReports...) - secretReports = append(secretReports, secReports...) - } - if merr != nil { - return merr - } - - if r.Config.VulnerabilityScannerEnabled { - err = r.VulnerabilityReadWriter.Write(ctx, vulnerabilityReports) - if err != nil { - return err - } - } - - if r.Config.ExposedSecretScannerEnabled { - err = r.ExposedSecretReadWriter.Write(ctx, secretReports) - if err != nil { - return err - } - } - - log.V(1).Info("Deleting complete scan job", "owner", owner) - return r.deleteJob(ctx, job) -} - -func (r *WorkloadController) processScanJobResults(ctx context.Context, job *batchv1.Job, containerName, containerImage string, owner client.Object) ([]v1alpha1.VulnerabilityReport, []v1alpha1.ExposedSecretReport, error) { - log := r.Logger.WithValues("job-results-processor", fmt.Sprintf("%s/%s", job.Namespace, job.Name)) - - var vulnerabilityReports []v1alpha1.VulnerabilityReport - var secretReports []v1alpha1.ExposedSecretReport - - podSpecHash, ok := job.Labels[trivyoperator.LabelResourceSpecHash] - if !ok { - return nil, nil, fmt.Errorf("expected label %s not set", trivyoperator.LabelResourceSpecHash) - } - - logsStream, err := r.LogsReader.GetLogsByJobAndContainerName(ctx, job, containerName) - if err != nil { - if k8sapierror.IsNotFound(err) { - log.V(1).Info("Cached job must have been deleted") - return nil, nil, nil - } - if kube.IsPodControlledByJobNotFound(err) { - log.V(1).Info("Pod must have been deleted") - return nil, nil, r.deleteJob(ctx, job) - } - return nil, nil, fmt.Errorf("getting logs for pod %q: %w", job.Namespace+"/"+job.Name, err) - } - - defer func() { - err := logsStream.Close() - if err != nil { - log.V(1).Error(err, "could not close log stream") - } - }() - - vulnReportData, secretReportData, err := r.Plugin.ParseReportData(r.PluginContext, containerImage, logsStream) - if err != nil { - return nil, nil, err - } - - resourceLabelsToInclude := r.GetReportResourceLabels() - - reportBuilder := NewReportBuilder(r.Client.Scheme()). - Controller(owner). - Container(containerName). - Data(vulnReportData). - PodSpecHash(podSpecHash). - ResourceLabelsToInclude(resourceLabelsToInclude) - - if r.Config.ScannerReportTTL != nil { - reportBuilder.ReportTTL(r.Config.ScannerReportTTL) - } - - report, err := reportBuilder.Get() - if err != nil { - return nil, nil, err - } - - secretReportBuilder := exposedsecretreport.NewReportBuilder(r.Client.Scheme()). - Controller(owner). - Container(containerName). - Data(secretReportData). - PodSpecHash(podSpecHash). - ResourceLabelsToInclude(resourceLabelsToInclude) - if r.Config.ScannerReportTTL != nil { - secretReportBuilder.ReportTTL(r.Config.ScannerReportTTL) - } - secretReport, err := secretReportBuilder.Get() - if err != nil { - return nil, nil, err - } - - vulnerabilityReports = append(vulnerabilityReports, report) - secretReports = append(secretReports, secretReport) - - return vulnerabilityReports, secretReports, nil -} - -func (r *WorkloadController) processFailedScanJob(ctx context.Context, scanJob *batchv1.Job) error { - log := r.Logger.WithValues("job", fmt.Sprintf("%s/%s", scanJob.Namespace, scanJob.Name)) - - statuses, err := r.GetTerminatedContainersStatusesByJob(ctx, scanJob) - if err != nil { - if k8sapierror.IsNotFound(err) { - log.V(1).Info("Cached job must have been deleted") - return nil - } - if kube.IsPodControlledByJobNotFound(err) { - log.V(1).Info("Pod must have been deleted") - return r.deleteJob(ctx, scanJob) - } - return err - } - for container, status := range statuses { - if status.ExitCode == 0 { - continue - } - log.Error(nil, "Scan job container", "container", container, "status.reason", status.Reason, "status.message", status.Message) - } - log.V(1).Info("Deleting failed scan job") - return r.deleteJob(ctx, scanJob) -} - -func (r *WorkloadController) deleteJob(ctx context.Context, job *batchv1.Job) error { - err := r.Client.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)) - if err != nil { - if k8sapierror.IsNotFound(err) { - return nil - } - return fmt.Errorf("deleting job: %w", err) - } - return nil -} diff --git a/pkg/vulnerabilityreport/suite_test.go b/pkg/vulnerabilityreport/suite_test.go index e041ea778..f0f7ea0e6 100644 --- a/pkg/vulnerabilityreport/suite_test.go +++ b/pkg/vulnerabilityreport/suite_test.go @@ -1,9 +1,10 @@ package vulnerabilityreport_test import ( + "os" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "os" "context" "path/filepath" @@ -17,6 +18,7 @@ import ( "github.com/aquasecurity/trivy-operator/pkg/plugins" "github.com/aquasecurity/trivy-operator/pkg/trivyoperator" "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport" + "github.com/aquasecurity/trivy-operator/pkg/vulnerabilityreport/controller" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -105,7 +107,7 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - err = (&vulnerabilityreport.WorkloadController{ + err = (&controller.WorkloadController{ Logger: ctrl.Log.WithName("reconciler").WithName("vulnerabilityreport"), Config: config, Client: managerClient,