Skip to content

Commit

Permalink
fix watermill middlewares not ack messages after processing
Browse files Browse the repository at this point in the history
  • Loading branch information
gnmahanth authored and noboruma committed Sep 8, 2023
1 parent 291e887 commit 6bc58f4
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 61 deletions.
4 changes: 2 additions & 2 deletions deepfence_worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.4.0
github.com/anchore/syft v0.87.0
github.com/aws/aws-sdk-go v1.44.325
github.com/cenkalti/backoff/v3 v3.2.2
github.com/deepfence/SecretScanner v0.0.0-00010101000000-000000000000
github.com/deepfence/ThreatMapper/deepfence_server v0.0.0-00010101000000-000000000000
github.com/deepfence/ThreatMapper/deepfence_utils v0.0.0-00010101000000-000000000000
Expand All @@ -34,6 +35,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/minio/minio-go/v7 v7.0.58
github.com/neo4j/neo4j-go-driver/v4 v4.4.7
github.com/pkg/errors v0.9.1
github.com/pressly/goose/v3 v3.15.0
github.com/prometheus/client_golang v1.14.0
github.com/robfig/cron/v3 v3.0.1
Expand Down Expand Up @@ -70,7 +72,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.6.0 // indirect
github.com/casbin/casbin/v2 v2.75.0 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/containerd/containerd v1.7.2 // indirect
Expand Down Expand Up @@ -192,7 +193,6 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
Expand Down
137 changes: 137 additions & 0 deletions deepfence_worker/utils/watermill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package utils

import (
"fmt"
"runtime/debug"

"context"
"time"

"github.com/pkg/errors"

"github.com/cenkalti/backoff/v3"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

// RecoveredPanicError holds the recovered panic's error along with the stacktrace.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}

func (p RecoveredPanicError) Error() string {
return fmt.Sprintf("panic occurred: %#v, stacktrace: \n%s", p.V, p.Stacktrace)
}

// Recoverer recovers from any panic in the handler and appends RecoveredPanicError with the stacktrace
// to any error returned from the handler.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true

defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
// ack message as we don't want to execute panic message again
event.Ack()
}
}()

events, err = h(event)
panicked = false
return events, err
}
}

// Retry provides a middleware that retries the handler if errors are returned.
// The retry behaviour is configurable, with exponential backoff and maximum elapsed time.
type Retry struct {
// MaxRetries is maximum number of times a retry will be attempted.
MaxRetries int

// InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier.
InitialInterval time.Duration
// MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval.
MaxInterval time.Duration
// Multiplier is the factor by which the waiting interval will be multiplied between retries.
Multiplier float64
// MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0.
MaxElapsedTime time.Duration
// RandomizationFactor randomizes the spread of the backoff times within the interval of:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64

// OnRetryHook is an optional function that will be executed on each retry attempt.
// The number of the current retry is passed as retryNum,
OnRetryHook func(retryNum int, delay time.Duration)

Logger watermill.LoggerAdapter
}

// Middleware returns the Retry middleware.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}

expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor

ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}

retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case <-ctx.Done():
return producedMessages, err
case <-time.After(waitTime):
// go on
}

producedMessages, err = h(msg)
if err == nil {
return producedMessages, nil
}

if r.Logger != nil {
r.Logger.Error("Error occurred, retrying", err, watermill.LogFields{
"retry_no": retryNum,
"max_retries": r.MaxRetries,
"wait_time": waitTime,
"elapsed_time": expBackoff.GetElapsedTime(),
})
}
if r.OnRetryHook != nil {
r.OnRetryHook(retryNum, waitTime)
}

retryNum++
if retryNum > r.MaxRetries {
if r.Logger != nil {
r.Logger.Error("Error Max retries reached", err, watermill.LogFields{"msg_uuid": msg.UUID})
}
// ack the message don't want to execute already retried message
msg.Ack()
break retryLoop
}
}

return nil, err
}
}
119 changes: 60 additions & 59 deletions deepfence_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks/reports"
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks/sbom"
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks/secretscan"
workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils"
"github.com/twmb/franz-go/pkg/kgo"
)

Expand Down Expand Up @@ -258,24 +259,24 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {

mux.AddPlugin(plugin.SignalsHandler)

// Retried disabled in favor of neo4j scheduling
//retryMiddleware := middleware.Retry{
// MaxRetries: 3,
// InitialInterval: time.Second * 10,
// MaxInterval: time.Second * 120,
// Multiplier: 1.5,
// MaxElapsedTime: 0,
// RandomizationFactor: 0.5,
// OnRetryHook: func(retryNum int, delay time.Duration) {
// log.Info().Msgf("retry=%d delay=%s", retryNum, delay)
// },
// Logger: wml,
//}
retry := workerUtils.Retry{
MaxRetries: 3,
InitialInterval: time.Second * 5,
MaxInterval: time.Second * 60,
Multiplier: 1.5,
MaxElapsedTime: 0,
RandomizationFactor: 0.25,
OnRetryHook: func(retryNum int, delay time.Duration) {
log.Info().Msgf("retry=%d delay=%s", retryNum, delay)
},
Logger: wml,
}

mux.AddMiddleware(
middleware.CorrelationID,
middleware.NewThrottle(10, time.Second).Middleware,
middleware.Recoverer,
middleware.NewThrottle(20, time.Second).Middleware,
retry.Middleware,
workerUtils.Recoverer,
)

HandlerMap = make(map[string]*NoPublisherTask)
Expand All @@ -285,58 +286,58 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {
worker := NewWorker(wml, cfg, mux)

// sbom
worker.AddNoPublisherHandler(utils.ScanSBOMTask, LogErrorWrapper(sbom.NewSBOMScanner(ingestC).ScanSBOM), false)
worker.AddNoPublisherHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM, false)

worker.AddHandler(utils.GenerateSBOMTask, LogErrorsWrapper(sbom.NewSbomGenerator(ingestC).GenerateSbom),
worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom,
utils.ScanSBOMTask, publisher)

worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, LogErrorWrapper(cronjobs.ApplyGraphDBStartup), false)
worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, cronjobs.ApplyGraphDBStartup, false)

worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, LogErrorWrapper(cronjobs.CleanUpDB), true)
worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB, true)

worker.AddNoPublisherHandler(utils.ComputeThreatTask, LogErrorWrapper(cronjobs.ComputeThreat), true)
worker.AddNoPublisherHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat, true)

worker.AddNoPublisherHandler(utils.RetryFailedScansTask, LogErrorWrapper(cronjobs.RetryScansDB), true)
worker.AddNoPublisherHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB, true)

worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, LogErrorWrapper(cronjobs.RetryUpgradeAgent), false)
worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent, false)

worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, LogErrorWrapper(cronjobs.CleanUpPostgresDB), true)
worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB, true)

worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, LogErrorWrapper(cronjobs.CleanUpDiagnosisLogs), false)
worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs, false)

worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, LogErrorWrapper(cronjobs.CheckAgentUpgrade), true)
worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade, true)

worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, LogErrorWrapper(cronjobs.TriggerConsoleControls), true)
worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls, true)

worker.AddNoPublisherHandler(utils.ScheduledTasks, LogErrorWrapper(cronjobs.RunScheduledTasks), false)
worker.AddNoPublisherHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks, false)

worker.AddNoPublisherHandler(utils.SyncRegistryTask, LogErrorWrapper(cronjobs.SyncRegistry), false)
worker.AddNoPublisherHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry, false)

worker.AddNoPublisherHandler(utils.SecretScanTask,
LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StartSecretScan), false)
secretscan.NewSecretScanner(ingestC).StartSecretScan, false)

worker.AddNoPublisherHandler(utils.StopSecretScanTask,
LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StopSecretScan), false)
secretscan.NewSecretScanner(ingestC).StopSecretScan, false)

worker.AddNoPublisherHandler(utils.MalwareScanTask,
LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StartMalwareScan), false)
malwarescan.NewMalwareScanner(ingestC).StartMalwareScan, false)

worker.AddNoPublisherHandler(utils.StopMalwareScanTask,
LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StopMalwareScan), false)
malwarescan.NewMalwareScanner(ingestC).StopMalwareScan, false)

worker.AddNoPublisherHandler(utils.CloudComplianceTask, LogErrorWrapper(cronjobs.AddCloudControls), true)
worker.AddNoPublisherHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls, true)

worker.AddNoPublisherHandler(utils.CachePostureProviders, LogErrorWrapper(cronjobs.CachePostureProviders), true)
worker.AddNoPublisherHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders, true)

worker.AddNoPublisherHandler(utils.SendNotificationTask, LogErrorWrapper(cronjobs.SendNotifications), true)
worker.AddNoPublisherHandler(utils.SendNotificationTask, cronjobs.SendNotifications, true)

worker.AddNoPublisherHandler(utils.ReportGeneratorTask, LogErrorWrapper(reports.GenerateReport), false)
worker.AddNoPublisherHandler(utils.ReportGeneratorTask, reports.GenerateReport, false)

worker.AddNoPublisherHandler(utils.ReportCleanUpTask, LogErrorWrapper(cronjobs.CleanUpReports), true)
worker.AddNoPublisherHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports, true)

worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, LogErrorWrapper(cronjobs.LinkCloudResources), true)
worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources, true)

worker.AddNoPublisherHandler(utils.LinkNodesTask, LogErrorWrapper(cronjobs.LinkNodes), true)
worker.AddNoPublisherHandler(utils.LinkNodesTask, cronjobs.LinkNodes, true)

go worker.pollHandlers()

Expand All @@ -349,23 +350,23 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {
return nil
}

func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error {
return func(msg *message.Message) error {
err := wrapped(msg)
if err != nil {
log.Error().Msgf("Cron job err: %v", err)
}
return nil
}
}

func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
msgs, err := wrapped(msg)
if err != nil {
log.Error().Msgf("Cron job err: %v", err)
return nil, nil
}
return msgs, nil
}
}
// func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error {
// return func(msg *message.Message) error {
// err := wrapped(msg)
// if err != nil {
// log.Error().Msgf("Cron job err: %v", err)
// }
// return nil
// }
// }

// func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) {
// return func(msg *message.Message) ([]*message.Message, error) {
// msgs, err := wrapped(msg)
// if err != nil {
// log.Error().Msgf("Cron job err: %v", err)
// return nil, nil
// }
// return msgs, nil
// }
// }

0 comments on commit 6bc58f4

Please sign in to comment.