diff --git a/deepfence_worker/go.mod b/deepfence_worker/go.mod index f6a7bbbe9b..e4778e66fe 100644 --- a/deepfence_worker/go.mod +++ b/deepfence_worker/go.mod @@ -25,6 +25,7 @@ require ( github.com/ThreeDotsLabs/watermill-kafka/v2 v2.4.0 github.com/anchore/syft v0.87.0 github.com/aws/aws-sdk-go v1.44.325 + github.com/cenkalti/backoff/v3 v3.2.2 github.com/deepfence/SecretScanner v0.0.0-00010101000000-000000000000 github.com/deepfence/ThreatMapper/deepfence_server v0.0.0-00010101000000-000000000000 github.com/deepfence/ThreatMapper/deepfence_utils v0.0.0-00010101000000-000000000000 @@ -34,6 +35,7 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/minio/minio-go/v7 v7.0.58 github.com/neo4j/neo4j-go-driver/v4 v4.4.7 + github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.15.0 github.com/prometheus/client_golang v1.14.0 github.com/robfig/cron/v3 v3.0.1 @@ -70,7 +72,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.6.0 // indirect github.com/casbin/casbin/v2 v2.75.0 // indirect - github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect github.com/containerd/containerd v1.7.2 // indirect @@ -192,7 +193,6 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.39.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect diff --git a/deepfence_worker/utils/watermill.go b/deepfence_worker/utils/watermill.go new file mode 100644 index 0000000000..2f614bb84b --- /dev/null +++ b/deepfence_worker/utils/watermill.go @@ -0,0 +1,137 @@ +package utils + +import ( + "fmt" + "runtime/debug" + + "context" + "time" + + "github.com/pkg/errors" + + "github.com/cenkalti/backoff/v3" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +// RecoveredPanicError holds the recovered panic's error along with the stacktrace. +type RecoveredPanicError struct { + V interface{} + Stacktrace string +} + +func (p RecoveredPanicError) Error() string { + return fmt.Sprintf("panic occurred: %#v, stacktrace: \n%s", p.V, p.Stacktrace) +} + +// Recoverer recovers from any panic in the handler and appends RecoveredPanicError with the stacktrace +// to any error returned from the handler. +func Recoverer(h message.HandlerFunc) message.HandlerFunc { + return func(event *message.Message) (events []*message.Message, err error) { + panicked := true + + defer func() { + if r := recover(); r != nil || panicked { + err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())}) + // ack message as we don't want to execute panic message again + event.Ack() + } + }() + + events, err = h(event) + panicked = false + return events, err + } +} + +// Retry provides a middleware that retries the handler if errors are returned. +// The retry behaviour is configurable, with exponential backoff and maximum elapsed time. +type Retry struct { + // MaxRetries is maximum number of times a retry will be attempted. + MaxRetries int + + // InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier. + InitialInterval time.Duration + // MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval. + MaxInterval time.Duration + // Multiplier is the factor by which the waiting interval will be multiplied between retries. + Multiplier float64 + // MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0. + MaxElapsedTime time.Duration + // RandomizationFactor randomizes the spread of the backoff times within the interval of: + // [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)]. + RandomizationFactor float64 + + // OnRetryHook is an optional function that will be executed on each retry attempt. + // The number of the current retry is passed as retryNum, + OnRetryHook func(retryNum int, delay time.Duration) + + Logger watermill.LoggerAdapter +} + +// Middleware returns the Retry middleware. +func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) ([]*message.Message, error) { + producedMessages, err := h(msg) + if err == nil { + return producedMessages, nil + } + + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = r.InitialInterval + expBackoff.MaxInterval = r.MaxInterval + expBackoff.Multiplier = r.Multiplier + expBackoff.MaxElapsedTime = r.MaxElapsedTime + expBackoff.RandomizationFactor = r.RandomizationFactor + + ctx := msg.Context() + if r.MaxElapsedTime > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime) + defer cancel() + } + + retryNum := 1 + expBackoff.Reset() + retryLoop: + for { + waitTime := expBackoff.NextBackOff() + select { + case <-ctx.Done(): + return producedMessages, err + case <-time.After(waitTime): + // go on + } + + producedMessages, err = h(msg) + if err == nil { + return producedMessages, nil + } + + if r.Logger != nil { + r.Logger.Error("Error occurred, retrying", err, watermill.LogFields{ + "retry_no": retryNum, + "max_retries": r.MaxRetries, + "wait_time": waitTime, + "elapsed_time": expBackoff.GetElapsedTime(), + }) + } + if r.OnRetryHook != nil { + r.OnRetryHook(retryNum, waitTime) + } + + retryNum++ + if retryNum > r.MaxRetries { + if r.Logger != nil { + r.Logger.Error("Error Max retries reached", err, watermill.LogFields{"msg_uuid": msg.UUID}) + } + // ack the message don't want to execute already retried message + msg.Ack() + break retryLoop + } + } + + return nil, err + } +} diff --git a/deepfence_worker/worker.go b/deepfence_worker/worker.go index ab5f3c2512..a035736d00 100644 --- a/deepfence_worker/worker.go +++ b/deepfence_worker/worker.go @@ -20,6 +20,7 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_worker/tasks/reports" "github.com/deepfence/ThreatMapper/deepfence_worker/tasks/sbom" "github.com/deepfence/ThreatMapper/deepfence_worker/tasks/secretscan" + workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils" "github.com/twmb/franz-go/pkg/kgo" ) @@ -258,24 +259,24 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { mux.AddPlugin(plugin.SignalsHandler) - // Retried disabled in favor of neo4j scheduling - //retryMiddleware := middleware.Retry{ - // MaxRetries: 3, - // InitialInterval: time.Second * 10, - // MaxInterval: time.Second * 120, - // Multiplier: 1.5, - // MaxElapsedTime: 0, - // RandomizationFactor: 0.5, - // OnRetryHook: func(retryNum int, delay time.Duration) { - // log.Info().Msgf("retry=%d delay=%s", retryNum, delay) - // }, - // Logger: wml, - //} + retry := workerUtils.Retry{ + MaxRetries: 3, + InitialInterval: time.Second * 5, + MaxInterval: time.Second * 60, + Multiplier: 1.5, + MaxElapsedTime: 0, + RandomizationFactor: 0.25, + OnRetryHook: func(retryNum int, delay time.Duration) { + log.Info().Msgf("retry=%d delay=%s", retryNum, delay) + }, + Logger: wml, + } mux.AddMiddleware( middleware.CorrelationID, - middleware.NewThrottle(10, time.Second).Middleware, - middleware.Recoverer, + middleware.NewThrottle(20, time.Second).Middleware, + retry.Middleware, + workerUtils.Recoverer, ) HandlerMap = make(map[string]*NoPublisherTask) @@ -285,58 +286,58 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { worker := NewWorker(wml, cfg, mux) // sbom - worker.AddNoPublisherHandler(utils.ScanSBOMTask, LogErrorWrapper(sbom.NewSBOMScanner(ingestC).ScanSBOM), false) + worker.AddNoPublisherHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM, false) - worker.AddHandler(utils.GenerateSBOMTask, LogErrorsWrapper(sbom.NewSbomGenerator(ingestC).GenerateSbom), + worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom, utils.ScanSBOMTask, publisher) - worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, LogErrorWrapper(cronjobs.ApplyGraphDBStartup), false) + worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, cronjobs.ApplyGraphDBStartup, false) - worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, LogErrorWrapper(cronjobs.CleanUpDB), true) + worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB, true) - worker.AddNoPublisherHandler(utils.ComputeThreatTask, LogErrorWrapper(cronjobs.ComputeThreat), true) + worker.AddNoPublisherHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat, true) - worker.AddNoPublisherHandler(utils.RetryFailedScansTask, LogErrorWrapper(cronjobs.RetryScansDB), true) + worker.AddNoPublisherHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB, true) - worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, LogErrorWrapper(cronjobs.RetryUpgradeAgent), false) + worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent, false) - worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, LogErrorWrapper(cronjobs.CleanUpPostgresDB), true) + worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB, true) - worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, LogErrorWrapper(cronjobs.CleanUpDiagnosisLogs), false) + worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs, false) - worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, LogErrorWrapper(cronjobs.CheckAgentUpgrade), true) + worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade, true) - worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, LogErrorWrapper(cronjobs.TriggerConsoleControls), true) + worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls, true) - worker.AddNoPublisherHandler(utils.ScheduledTasks, LogErrorWrapper(cronjobs.RunScheduledTasks), false) + worker.AddNoPublisherHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks, false) - worker.AddNoPublisherHandler(utils.SyncRegistryTask, LogErrorWrapper(cronjobs.SyncRegistry), false) + worker.AddNoPublisherHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry, false) worker.AddNoPublisherHandler(utils.SecretScanTask, - LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StartSecretScan), false) + secretscan.NewSecretScanner(ingestC).StartSecretScan, false) worker.AddNoPublisherHandler(utils.StopSecretScanTask, - LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StopSecretScan), false) + secretscan.NewSecretScanner(ingestC).StopSecretScan, false) worker.AddNoPublisherHandler(utils.MalwareScanTask, - LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StartMalwareScan), false) + malwarescan.NewMalwareScanner(ingestC).StartMalwareScan, false) worker.AddNoPublisherHandler(utils.StopMalwareScanTask, - LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StopMalwareScan), false) + malwarescan.NewMalwareScanner(ingestC).StopMalwareScan, false) - worker.AddNoPublisherHandler(utils.CloudComplianceTask, LogErrorWrapper(cronjobs.AddCloudControls), true) + worker.AddNoPublisherHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls, true) - worker.AddNoPublisherHandler(utils.CachePostureProviders, LogErrorWrapper(cronjobs.CachePostureProviders), true) + worker.AddNoPublisherHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders, true) - worker.AddNoPublisherHandler(utils.SendNotificationTask, LogErrorWrapper(cronjobs.SendNotifications), true) + worker.AddNoPublisherHandler(utils.SendNotificationTask, cronjobs.SendNotifications, true) - worker.AddNoPublisherHandler(utils.ReportGeneratorTask, LogErrorWrapper(reports.GenerateReport), false) + worker.AddNoPublisherHandler(utils.ReportGeneratorTask, reports.GenerateReport, false) - worker.AddNoPublisherHandler(utils.ReportCleanUpTask, LogErrorWrapper(cronjobs.CleanUpReports), true) + worker.AddNoPublisherHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports, true) - worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, LogErrorWrapper(cronjobs.LinkCloudResources), true) + worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources, true) - worker.AddNoPublisherHandler(utils.LinkNodesTask, LogErrorWrapper(cronjobs.LinkNodes), true) + worker.AddNoPublisherHandler(utils.LinkNodesTask, cronjobs.LinkNodes, true) go worker.pollHandlers() @@ -349,23 +350,23 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error { return nil } -func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error { - return func(msg *message.Message) error { - err := wrapped(msg) - if err != nil { - log.Error().Msgf("Cron job err: %v", err) - } - return nil - } -} - -func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) { - return func(msg *message.Message) ([]*message.Message, error) { - msgs, err := wrapped(msg) - if err != nil { - log.Error().Msgf("Cron job err: %v", err) - return nil, nil - } - return msgs, nil - } -} +// func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error { +// return func(msg *message.Message) error { +// err := wrapped(msg) +// if err != nil { +// log.Error().Msgf("Cron job err: %v", err) +// } +// return nil +// } +// } + +// func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) { +// return func(msg *message.Message) ([]*message.Message, error) { +// msgs, err := wrapped(msg) +// if err != nil { +// log.Error().Msgf("Cron job err: %v", err) +// return nil, nil +// } +// return msgs, nil +// } +// }