diff --git a/deepfence_worker/cronjobs/notification.go b/deepfence_worker/cronjobs/notification.go index ec07742d83..c2961b90b7 100644 --- a/deepfence_worker/cronjobs/notification.go +++ b/deepfence_worker/cronjobs/notification.go @@ -107,7 +107,7 @@ func processIntegration[T any](msg *message.Message, integrationRow postgresql_d }, ) filters.FieldsFilters.ContainsFilter = reporters.ContainsFilter{ - FieldsValues: map[string][]interface{}{"status": {"COMPLETE"}}, + FieldsValues: map[string][]interface{}{"status": {utils.SCAN_STATUS_SUCCESS}}, } list, err := reporters_scan.GetScansList(ctx, utils.DetectedNodeScanType[integrationRow.Resource], filters.NodeIds, filters.FieldsFilters, model.FetchWindow{}) diff --git a/deepfence_worker/worker.go b/deepfence_worker/worker.go index 4695b4bee6..54c7e6ccae 100644 --- a/deepfence_worker/worker.go +++ b/deepfence_worker/worker.go @@ -285,55 +285,58 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { worker := NewWorker(wml, cfg, mux) // sbom - worker.AddNoPublisherHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM, false) - worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom, + worker.AddNoPublisherHandler(utils.ScanSBOMTask, LogErrorWrapper(sbom.NewSBOMScanner(ingestC).ScanSBOM), false) + + worker.AddHandler(utils.GenerateSBOMTask, LogErrorsWrapper(sbom.NewSbomGenerator(ingestC).GenerateSbom), utils.ScanSBOMTask, publisher) - worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, cronjobs.ApplyGraphDBStartup, false) + worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, LogErrorWrapper(cronjobs.ApplyGraphDBStartup), false) - worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB, true) + worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, LogErrorWrapper(cronjobs.CleanUpDB), true) - worker.AddNoPublisherHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat, true) + worker.AddNoPublisherHandler(utils.ComputeThreatTask, LogErrorWrapper(cronjobs.ComputeThreat), true) - worker.AddNoPublisherHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB, true) + worker.AddNoPublisherHandler(utils.RetryFailedScansTask, LogErrorWrapper(cronjobs.RetryScansDB), true) - worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent, false) + worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, LogErrorWrapper(cronjobs.RetryUpgradeAgent), false) - worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB, true) + worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, LogErrorWrapper(cronjobs.CleanUpPostgresDB), true) - worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs, false) + worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, LogErrorWrapper(cronjobs.CleanUpDiagnosisLogs), false) - worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade, true) + worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, LogErrorWrapper(cronjobs.CheckAgentUpgrade), true) - worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls, true) + worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, LogErrorWrapper(cronjobs.TriggerConsoleControls), true) - worker.AddNoPublisherHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks, false) + worker.AddNoPublisherHandler(utils.ScheduledTasks, LogErrorWrapper(cronjobs.RunScheduledTasks), false) - worker.AddNoPublisherHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry, false) + worker.AddNoPublisherHandler(utils.SyncRegistryTask, LogErrorWrapper(cronjobs.SyncRegistry), false) worker.AddNoPublisherHandler(utils.SecretScanTask, - secretscan.NewSecretScanner(ingestC).StartSecretScan, false) + LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StartSecretScan), false) + worker.AddNoPublisherHandler(utils.StopSecretScanTask, - secretscan.NewSecretScanner(ingestC).StopSecretScan, false) + LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StopSecretScan), false) worker.AddNoPublisherHandler(utils.MalwareScanTask, - malwarescan.NewMalwareScanner(ingestC).StartMalwareScan, false) + LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StartMalwareScan), false) + worker.AddNoPublisherHandler(utils.StopMalwareScanTask, - malwarescan.NewMalwareScanner(ingestC).StopMalwareScan, false) + LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StopMalwareScan), false) - worker.AddNoPublisherHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls, true) + worker.AddNoPublisherHandler(utils.CloudComplianceTask, LogErrorWrapper(cronjobs.AddCloudControls), true) - worker.AddNoPublisherHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders, true) + worker.AddNoPublisherHandler(utils.CachePostureProviders, LogErrorWrapper(cronjobs.CachePostureProviders), true) - worker.AddNoPublisherHandler(utils.SendNotificationTask, cronjobs.SendNotifications, true) + worker.AddNoPublisherHandler(utils.SendNotificationTask, LogErrorWrapper(cronjobs.SendNotifications), true) - worker.AddNoPublisherHandler(utils.ReportGeneratorTask, reports.GenerateReport, false) + worker.AddNoPublisherHandler(utils.ReportGeneratorTask, LogErrorWrapper(reports.GenerateReport), false) - worker.AddNoPublisherHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports, true) + worker.AddNoPublisherHandler(utils.ReportCleanUpTask, LogErrorWrapper(cronjobs.CleanUpReports), true) - worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources, true) + worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, LogErrorWrapper(cronjobs.LinkCloudResources), true) - worker.AddNoPublisherHandler(utils.LinkNodesTask, cronjobs.LinkNodes, true) + worker.AddNoPublisherHandler(utils.LinkNodesTask, LogErrorWrapper(cronjobs.LinkNodes), true) go worker.pollHandlers() @@ -345,3 +348,24 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { cancel() return nil } + +func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error { + return func(msg *message.Message) error { + err := wrapped(msg) + if err != nil { + log.Error().Msgf("Cron job err: %v", err) + } + return nil + } +} + +func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) { + return func(msg *message.Message) ([]*message.Message, error) { + msgs, err := wrapped(msg) + if err != nil { + log.Error().Msgf("Cron job err: %v", err) + return nil, nil + } + return msgs, nil + } +}