Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix watermill middlewares not ack messages after processing #1533

Merged
merged 2 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deepfence_worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.4.0
github.com/anchore/syft v0.87.0
github.com/aws/aws-sdk-go v1.44.325
github.com/cenkalti/backoff/v3 v3.2.2
github.com/deepfence/SecretScanner v0.0.0-00010101000000-000000000000
github.com/deepfence/ThreatMapper/deepfence_server v0.0.0-00010101000000-000000000000
github.com/deepfence/ThreatMapper/deepfence_utils v0.0.0-00010101000000-000000000000
Expand All @@ -34,6 +35,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/minio/minio-go/v7 v7.0.58
github.com/neo4j/neo4j-go-driver/v4 v4.4.7
github.com/pkg/errors v0.9.1
github.com/pressly/goose/v3 v3.15.0
github.com/prometheus/client_golang v1.14.0
github.com/robfig/cron/v3 v3.0.1
Expand Down Expand Up @@ -70,7 +72,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.6.0 // indirect
github.com/casbin/casbin/v2 v2.75.0 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/containerd/containerd v1.7.2 // indirect
Expand Down Expand Up @@ -192,7 +193,6 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
Expand Down
135 changes: 135 additions & 0 deletions deepfence_worker/utils/watermill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package utils

import (
"fmt"
"runtime/debug"

"context"
"time"

"github.com/pkg/errors"

"github.com/cenkalti/backoff/v3"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

// RecoveredPanicError holds the recovered panic's error along with the stacktrace.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}

func (p RecoveredPanicError) Error() string {
return fmt.Sprintf("panic occurred: %#v, stacktrace: \n%s", p.V, p.Stacktrace)
}

// Recoverer recovers from any panic in the handler and appends RecoveredPanicError with the stacktrace
// to any error returned from the handler.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {

defer func() {
if r := recover(); r != nil {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
// ack message as we don't want to execute panic message again
event.Ack()
}
}()

events, err = h(event)
return events, err
}
}

// Retry provides a middleware that retries the handler if errors are returned.
// The retry behaviour is configurable, with exponential backoff and maximum elapsed time.
type Retry struct {
// MaxRetries is maximum number of times a retry will be attempted.
MaxRetries int

// InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier.
InitialInterval time.Duration
// MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval.
MaxInterval time.Duration
// Multiplier is the factor by which the waiting interval will be multiplied between retries.
Multiplier float64
// MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0.
MaxElapsedTime time.Duration
// RandomizationFactor randomizes the spread of the backoff times within the interval of:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64

// OnRetryHook is an optional function that will be executed on each retry attempt.
// The number of the current retry is passed as retryNum,
OnRetryHook func(retryNum int, delay time.Duration)

Logger watermill.LoggerAdapter
}

// Middleware returns the Retry middleware.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}

expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor

ctx := msg.Context()
if r.MaxElapsedTime > 0 {
varunsharma0286 marked this conversation as resolved.
Show resolved Hide resolved
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}

retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case <-ctx.Done():
return producedMessages, err
case <-time.After(waitTime):
// go on
}
noboruma marked this conversation as resolved.
Show resolved Hide resolved

producedMessages, err = h(msg)
if err == nil {
return producedMessages, nil
}

if r.Logger != nil {
r.Logger.Error("Error occurred, retrying", err, watermill.LogFields{
"retry_no": retryNum,
"max_retries": r.MaxRetries,
"wait_time": waitTime,
"elapsed_time": expBackoff.GetElapsedTime(),
})
}
if r.OnRetryHook != nil {
r.OnRetryHook(retryNum, waitTime)
}

retryNum++
if retryNum > r.MaxRetries {
if r.Logger != nil {
r.Logger.Error("Error Max retries reached", err, watermill.LogFields{"msg_uuid": msg.UUID})
}
// ack the message don't want to execute already retried message
msg.Ack()
break retryLoop
}
}

return nil, err
}
}
100 changes: 40 additions & 60 deletions deepfence_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks/reports"
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks/sbom"
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks/secretscan"
workerUtils "github.com/deepfence/ThreatMapper/deepfence_worker/utils"
"github.com/twmb/franz-go/pkg/kgo"
)

Expand Down Expand Up @@ -258,24 +259,24 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {

mux.AddPlugin(plugin.SignalsHandler)

// Retried disabled in favor of neo4j scheduling
//retryMiddleware := middleware.Retry{
// MaxRetries: 3,
// InitialInterval: time.Second * 10,
// MaxInterval: time.Second * 120,
// Multiplier: 1.5,
// MaxElapsedTime: 0,
// RandomizationFactor: 0.5,
// OnRetryHook: func(retryNum int, delay time.Duration) {
// log.Info().Msgf("retry=%d delay=%s", retryNum, delay)
// },
// Logger: wml,
//}
retry := workerUtils.Retry{
MaxRetries: 3,
InitialInterval: time.Second * 5,
MaxInterval: time.Second * 60,
Multiplier: 1.5,
MaxElapsedTime: 0,
RandomizationFactor: 0.25,
OnRetryHook: func(retryNum int, delay time.Duration) {
log.Info().Msgf("retry=%d delay=%s", retryNum, delay)
},
Logger: wml,
}

mux.AddMiddleware(
middleware.CorrelationID,
middleware.NewThrottle(10, time.Second).Middleware,
middleware.Recoverer,
middleware.NewThrottle(20, time.Second).Middleware,
retry.Middleware,
workerUtils.Recoverer,
)

HandlerMap = make(map[string]*NoPublisherTask)
Expand All @@ -285,58 +286,58 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {
worker := NewWorker(wml, cfg, mux)

// sbom
worker.AddNoPublisherHandler(utils.ScanSBOMTask, LogErrorWrapper(sbom.NewSBOMScanner(ingestC).ScanSBOM), false)
worker.AddNoPublisherHandler(utils.ScanSBOMTask, sbom.NewSBOMScanner(ingestC).ScanSBOM, false)

worker.AddHandler(utils.GenerateSBOMTask, LogErrorsWrapper(sbom.NewSbomGenerator(ingestC).GenerateSbom),
worker.AddHandler(utils.GenerateSBOMTask, sbom.NewSbomGenerator(ingestC).GenerateSbom,
utils.ScanSBOMTask, publisher)

worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, LogErrorWrapper(cronjobs.ApplyGraphDBStartup), false)
worker.AddNoPublisherHandler(utils.SetUpGraphDBTask, cronjobs.ApplyGraphDBStartup, false)

worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, LogErrorWrapper(cronjobs.CleanUpDB), true)
worker.AddNoPublisherHandler(utils.CleanUpGraphDBTask, cronjobs.CleanUpDB, true)

worker.AddNoPublisherHandler(utils.ComputeThreatTask, LogErrorWrapper(cronjobs.ComputeThreat), true)
worker.AddNoPublisherHandler(utils.ComputeThreatTask, cronjobs.ComputeThreat, true)

worker.AddNoPublisherHandler(utils.RetryFailedScansTask, LogErrorWrapper(cronjobs.RetryScansDB), true)
worker.AddNoPublisherHandler(utils.RetryFailedScansTask, cronjobs.RetryScansDB, true)

worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, LogErrorWrapper(cronjobs.RetryUpgradeAgent), false)
worker.AddNoPublisherHandler(utils.RetryFailedUpgradesTask, cronjobs.RetryUpgradeAgent, false)

worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, LogErrorWrapper(cronjobs.CleanUpPostgresDB), true)
worker.AddNoPublisherHandler(utils.CleanUpPostgresqlTask, cronjobs.CleanUpPostgresDB, true)

worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, LogErrorWrapper(cronjobs.CleanUpDiagnosisLogs), false)
worker.AddNoPublisherHandler(utils.CleanupDiagnosisLogs, cronjobs.CleanUpDiagnosisLogs, false)

worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, LogErrorWrapper(cronjobs.CheckAgentUpgrade), true)
worker.AddNoPublisherHandler(utils.CheckAgentUpgradeTask, cronjobs.CheckAgentUpgrade, true)

worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, LogErrorWrapper(cronjobs.TriggerConsoleControls), true)
worker.AddNoPublisherHandler(utils.TriggerConsoleActionsTask, cronjobs.TriggerConsoleControls, true)

worker.AddNoPublisherHandler(utils.ScheduledTasks, LogErrorWrapper(cronjobs.RunScheduledTasks), false)
worker.AddNoPublisherHandler(utils.ScheduledTasks, cronjobs.RunScheduledTasks, false)

worker.AddNoPublisherHandler(utils.SyncRegistryTask, LogErrorWrapper(cronjobs.SyncRegistry), false)
worker.AddNoPublisherHandler(utils.SyncRegistryTask, cronjobs.SyncRegistry, false)

worker.AddNoPublisherHandler(utils.SecretScanTask,
LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StartSecretScan), false)
secretscan.NewSecretScanner(ingestC).StartSecretScan, false)

worker.AddNoPublisherHandler(utils.StopSecretScanTask,
LogErrorWrapper(secretscan.NewSecretScanner(ingestC).StopSecretScan), false)
secretscan.NewSecretScanner(ingestC).StopSecretScan, false)

worker.AddNoPublisherHandler(utils.MalwareScanTask,
LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StartMalwareScan), false)
malwarescan.NewMalwareScanner(ingestC).StartMalwareScan, false)

worker.AddNoPublisherHandler(utils.StopMalwareScanTask,
LogErrorWrapper(malwarescan.NewMalwareScanner(ingestC).StopMalwareScan), false)
malwarescan.NewMalwareScanner(ingestC).StopMalwareScan, false)

worker.AddNoPublisherHandler(utils.CloudComplianceTask, LogErrorWrapper(cronjobs.AddCloudControls), true)
worker.AddNoPublisherHandler(utils.CloudComplianceTask, cronjobs.AddCloudControls, true)

worker.AddNoPublisherHandler(utils.CachePostureProviders, LogErrorWrapper(cronjobs.CachePostureProviders), true)
worker.AddNoPublisherHandler(utils.CachePostureProviders, cronjobs.CachePostureProviders, true)

worker.AddNoPublisherHandler(utils.SendNotificationTask, LogErrorWrapper(cronjobs.SendNotifications), true)
worker.AddNoPublisherHandler(utils.SendNotificationTask, cronjobs.SendNotifications, true)

worker.AddNoPublisherHandler(utils.ReportGeneratorTask, LogErrorWrapper(reports.GenerateReport), false)
worker.AddNoPublisherHandler(utils.ReportGeneratorTask, reports.GenerateReport, false)

worker.AddNoPublisherHandler(utils.ReportCleanUpTask, LogErrorWrapper(cronjobs.CleanUpReports), true)
worker.AddNoPublisherHandler(utils.ReportCleanUpTask, cronjobs.CleanUpReports, true)

worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, LogErrorWrapper(cronjobs.LinkCloudResources), true)
worker.AddNoPublisherHandler(utils.LinkCloudResourceTask, cronjobs.LinkCloudResources, true)

worker.AddNoPublisherHandler(utils.LinkNodesTask, LogErrorWrapper(cronjobs.LinkNodes), true)
worker.AddNoPublisherHandler(utils.LinkNodesTask, cronjobs.LinkNodes, true)

go worker.pollHandlers()

Expand All @@ -348,24 +349,3 @@ func startWorker(wml watermill.LoggerAdapter, cfg config) error {
cancel()
return nil
}

func LogErrorWrapper(wrapped func(*message.Message) error) func(*message.Message) error {
return func(msg *message.Message) error {
err := wrapped(msg)
if err != nil {
log.Error().Msgf("Cron job err: %v", err)
}
return nil
}
}

func LogErrorsWrapper(wrapped func(*message.Message) ([]*message.Message, error)) func(*message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
msgs, err := wrapped(msg)
if err != nil {
log.Error().Msgf("Cron job err: %v", err)
return nil, nil
}
return msgs, nil
}
}