Skip to content

Commit

Permalink
Make sure cron jobs are never returning errors
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Sep 5, 2023
1 parent 5a0c33f commit fd450a7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
2 changes: 1 addition & 1 deletion deepfence_worker/cronjobs/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func processIntegration[T any](msg *message.Message, integrationRow postgresql_d
},
)
filters.FieldsFilters.ContainsFilter = reporters.ContainsFilter{
FieldsValues: map[string][]interface{}{"status": {"COMPLETE"}},
FieldsValues: map[string][]interface{}{"status": {utils.SCAN_STATUS_SUCCESS}},
}
list, err := reporters_scan.GetScansList(ctx, utils.DetectedNodeScanType[integrationRow.Resource],
filters.NodeIds, filters.FieldsFilters, model.FetchWindow{})
Expand Down
72 changes: 48 additions & 24 deletions deepfence_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,55 +285,58 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {
worker := NewWorker(wml, cfg, mux)

// sbom
worker.AddNoPublisherHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM, false)
worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom,
worker.AddNoPublisherHandler(utils.ScanSBOMTask, LogErrorWrapper(sbom.NewSBOMScanner(ingestC).ScanSBOM), false)

worker.AddHandler(utils.GenerateSBOMTask, LogErrorsWrapper(sbom.NewSbomGenerator(ingestC).GenerateSbom),
utils.ScanSBOMTask, publisher)

worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, cronjobs.ApplyGraphDBStartup, false)
worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, LogErrorWrapper(cronjobs.ApplyGraphDBStartup), false)

worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB, true)
worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, LogErrorWrapper(cronjobs.CleanUpDB), true)

worker.AddNoPublisherHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat, true)
worker.AddNoPublisherHandler(utils.ComputeThreatTask, LogErrorWrapper(cronjobs.ComputeThreat), true)

worker.AddNoPublisherHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB, true)
worker.AddNoPublisherHandler(utils.RetryFailedScansTask, LogErrorWrapper(cronjobs.RetryScansDB), true)

worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent, false)
worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, LogErrorWrapper(cronjobs.RetryUpgradeAgent), false)

worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB, true)
worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, LogErrorWrapper(cronjobs.CleanUpPostgresDB), true)

worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs, false)
worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, LogErrorWrapper(cronjobs.CleanUpDiagnosisLogs), false)

worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade, true)
worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, LogErrorWrapper(cronjobs.CheckAgentUpgrade), true)

worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls, true)
worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, LogErrorWrapper(cronjobs.TriggerConsoleControls), true)

worker.AddNoPublisherHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks, false)
worker.AddNoPublisherHandler(utils.ScheduledTasks, LogErrorWrapper(cronjobs.RunScheduledTasks), false)

worker.AddNoPublisherHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry, false)
worker.AddNoPublisherHandler(utils.SyncRegistryTask, LogErrorWrapper(cronjobs.SyncRegistry), false)

worker.AddNoPublisherHandler(utils.SecretScanTask,
secretscan.NewSecretScanner(ingestC).StartSecretScan, false)
LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StartSecretScan), false)

worker.AddNoPublisherHandler(utils.StopSecretScanTask,
secretscan.NewSecretScanner(ingestC).StopSecretScan, false)
LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StopSecretScan), false)

worker.AddNoPublisherHandler(utils.MalwareScanTask,
malwarescan.NewMalwareScanner(ingestC).StartMalwareScan, false)
LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StartMalwareScan), false)

worker.AddNoPublisherHandler(utils.StopMalwareScanTask,
malwarescan.NewMalwareScanner(ingestC).StopMalwareScan, false)
LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StopMalwareScan), false)

worker.AddNoPublisherHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls, true)
worker.AddNoPublisherHandler(utils.CloudComplianceTask, LogErrorWrapper(cronjobs.AddCloudControls), true)

worker.AddNoPublisherHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders, true)
worker.AddNoPublisherHandler(utils.CachePostureProviders, LogErrorWrapper(cronjobs.CachePostureProviders), true)

worker.AddNoPublisherHandler(utils.SendNotificationTask, cronjobs.SendNotifications, true)
worker.AddNoPublisherHandler(utils.SendNotificationTask, LogErrorWrapper(cronjobs.SendNotifications), true)

worker.AddNoPublisherHandler(utils.ReportGeneratorTask, reports.GenerateReport, false)
worker.AddNoPublisherHandler(utils.ReportGeneratorTask, LogErrorWrapper(reports.GenerateReport), false)

worker.AddNoPublisherHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports, true)
worker.AddNoPublisherHandler(utils.ReportCleanUpTask, LogErrorWrapper(cronjobs.CleanUpReports), true)

worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources, true)
worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, LogErrorWrapper(cronjobs.LinkCloudResources), true)

worker.AddNoPublisherHandler(utils.LinkNodesTask, cronjobs.LinkNodes, true)
worker.AddNoPublisherHandler(utils.LinkNodesTask, LogErrorWrapper(cronjobs.LinkNodes), true)

go worker.pollHandlers()

Expand All @@ -345,3 +348,24 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {
cancel()
return nil
}

func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error {
return func(msg *message.Message) error {
err := wrapped(msg)
if err != nil {
log.Error().Msgf("Cron job err: %v", err)
}
return nil
}
}

func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
msgs, err := wrapped(msg)
if err != nil {
log.Error().Msgf("Cron job err: %v", err)
return nil, nil
}
return msgs, nil
}
}

0 comments on commit fd450a7

Please sign in to comment.