From a8ba9059ea46396735b92bf0edf78002dd4e35c8 Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Fri, 23 Dec 2022 11:30:45 +0900 Subject: [PATCH] Add periodic scan retry #728 --- deepfence_ingester/ingesters/secrets.go | 2 +- deepfence_server/ingesters/scan_status.go | 4 +-- deepfence_server/main.go | 17 +++------- deepfence_utils/directory/worker.go | 37 ++++++++++++++++++++- deepfence_utils/utils/constants.go | 1 + deepfence_worker/cronjobs/neo4j.go | 30 +++++++++++++++++ deepfence_worker/handler/neo4j.go | 4 +++ deepfence_worker/router/router.go | 4 ++- deepfence_worker/tasks/neo4j.go | 39 +++++++++++------------ 9 files changed, 101 insertions(+), 37 deletions(-) diff --git a/deepfence_ingester/ingesters/secrets.go b/deepfence_ingester/ingesters/secrets.go index 73e677dbf9..7eb79b627f 100644 --- a/deepfence_ingester/ingesters/secrets.go +++ b/deepfence_ingester/ingesters/secrets.go @@ -103,7 +103,7 @@ func CommitFuncSecretScanStatus(ns string, data []SecretScanStatus) error { } defer tx.Close() - if _, err = tx.Run("UNWIND $batch as row MERGE (n:SecretScan{node_id: row.scan_id}) SET n.status = row.scan_status", + if _, err = tx.Run("UNWIND $batch as row MERGE (n:SecretScan{node_id: row.scan_id}) SET n.status = row.scan_status, updated_at = TIMESTAMP()", map[string]interface{}{"batch": statusesToMaps(data)}); err != nil { return err } diff --git a/deepfence_server/ingesters/scan_status.go b/deepfence_server/ingesters/scan_status.go index d7044f7e32..0146d7dceb 100644 --- a/deepfence_server/ingesters/scan_status.go +++ b/deepfence_server/ingesters/scan_status.go @@ -64,7 +64,7 @@ func AddNewScan(ctx context.Context, scan_type utils.Neo4jScanType, scan_id stri return err } - if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id, status: $status, retries: 0, trigger_action: $action}) MERGE (m:Node{node_id:$node_id}) MERGE (n)-[:SCANNED]->(m)", scan_type), + if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id, status: $status, retries: 0, trigger_action: $action, updated_at: TIMESTAMP()}) MERGE (m:Node{node_id:$node_id}) MERGE (n)-[:SCANNED]->(m)", scan_type), map[string]interface{}{"scan_id": scan_id, "status": utils.SCAN_STATUS_STARTING, "node_id": node_id, "action": string(b)}); err != nil { return err } @@ -92,7 +92,7 @@ func UpdateScanStatus(ctx context.Context, scan_type string, scan_id string, sta } defer tx.Close() - if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id}) SET n.status = $status", scan_type), + if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id}) SET n.status = $status, updated_at = TIMESTAMP()", scan_type), map[string]interface{}{"scan_id": scan_id, "status": status}); err != nil { return err } diff --git a/deepfence_server/main.go b/deepfence_server/main.go index 5a3cb6f4b0..34225e4c9c 100644 --- a/deepfence_server/main.go +++ b/deepfence_server/main.go @@ -12,7 +12,6 @@ import ( "strings" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" - "github.com/deepfence/ThreatMapper/deepfence_worker/tasks" "github.com/twmb/franz-go/pkg/kgo" "github.com/deepfence/ThreatMapper/deepfence_server/model" @@ -135,21 +134,15 @@ func main() { } func initializeCronJobs() { - //TODO local namespace - ctx := directory.NewContextWithNameSpace(directory.NonSaaSDirKey) - ns, err := directory.ExtractNamespace(ctx) + err := directory.PeriodicWorkerEnqueue(context.Background(), directory.CleanUpGraphDBTaskID, "@every 120s") if err != nil { - log.Fatal().Msgf("could not get namespace: %v", err) + log.Fatal().Msgf("Could not enqueue graph clean up task: %v", err) } - task, err := tasks.NewCleanUpGraphDBTask(ns) - if err != nil { - log.Fatal().Msgf("could not create task: %v", err) - } - err = directory.PeriodicWorkerEnqueue(ctx, task, "@every 120s") + + err = directory.PeriodicWorkerEnqueue(context.Background(), directory.ScanRetryGraphDBTaskID, "@every 120s") if err != nil { - log.Fatal().Msgf("could not enqueue task: %v", err) + log.Fatal().Msgf("Could not enqueue scans retry task: %v", err) } - log.Info().Msgf("DB clean cron started") } func initialize() (*Config, error) { diff --git a/deepfence_utils/directory/worker.go b/deepfence_utils/directory/worker.go index d35fb6dfdb..8c9928ad00 100644 --- a/deepfence_utils/directory/worker.go +++ b/deepfence_utils/directory/worker.go @@ -2,7 +2,10 @@ package directory import ( "context" + "encoding/json" "errors" + "fmt" + "time" "github.com/hibiken/asynq" ) @@ -11,6 +14,33 @@ const ( max_size = 500 * 1024 * 1024 // 500 MB ) +type TaskID string + +const ( + CleanUpGraphDBTaskID TaskID = "CleanUpGraphDB" + ScanRetryGraphDBTaskID TaskID = "ScanRetryGraphDB" +) + +type GraphDBContext struct { + Namespace NamespaceID `json:"namespace"` +} + +func PayloadToContext(b []byte) (context.Context, error) { + var p GraphDBContext + if err := json.Unmarshal(b, &p); err != nil { + return nil, fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + return NewContextWithNameSpace(p.Namespace), nil +} + +func newUniquePeriodicGraphDBTask(id TaskID, ns NamespaceID) (*asynq.Task, error) { + payload, err := json.Marshal(GraphDBContext{Namespace: ns}) + if err != nil { + return nil, err + } + return asynq.NewTask(string(id), payload, asynq.Unique(time.Minute*30)), nil +} + var ErrExhaustedResources = errors.New("Exhausted worker resources") type async_clients struct { @@ -71,7 +101,7 @@ func WorkerEnqueue(ctx context.Context, task *asynq.Task) error { return nil } -func PeriodicWorkerEnqueue(ctx context.Context, task *asynq.Task, cronEntry string) error { +func PeriodicWorkerEnqueue(ctx context.Context, taskid TaskID, cronEntry string) error { clients, err := getClient(ctx, worker_clients_pool, new_asynq_client) if err != nil { @@ -80,6 +110,11 @@ func PeriodicWorkerEnqueue(ctx context.Context, task *asynq.Task, cronEntry stri scheduler := clients.scheduler + task, err := newUniquePeriodicGraphDBTask(taskid, NonSaaSDirKey) + if err != nil { + return err + } + scheduler.Register(cronEntry, task) return nil diff --git a/deepfence_utils/utils/constants.go b/deepfence_utils/utils/constants.go index 575ada02fc..c626469083 100644 --- a/deepfence_utils/utils/constants.go +++ b/deepfence_utils/utils/constants.go @@ -21,6 +21,7 @@ const ( SCAN_STATUS_SUCCESS = "COMPLETE" SCAN_STATUS_STARTING = "STARTING" SCAN_STATUS_INPROGRESS = "IN_PROGRESS" + SCAN_STATUS_FAILED = "FAILED" ) type Neo4jScanType string diff --git a/deepfence_worker/cronjobs/neo4j.go b/deepfence_worker/cronjobs/neo4j.go index 41161d0d98..44bce5bda6 100644 --- a/deepfence_worker/cronjobs/neo4j.go +++ b/deepfence_worker/cronjobs/neo4j.go @@ -5,11 +5,13 @@ import ( "time" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" + "github.com/deepfence/ThreatMapper/deepfence_utils/utils" "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) const ( db_clean_up_timeout = time.Minute * 2 + db_scan_timeout = time.Minute * 2 ) func CleanUpDB(ctx context.Context) error { @@ -45,5 +47,33 @@ func CleanUpDB(ctx context.Context) error { return err } + if _, err = tx.Run("MATCH (n) -[:SCANNED]-> (:Node) WHERE n.updated_at < TIMESTAMP()-$time_ms AND n.retries >= 3 SET n.status = '$status'", map[string]interface{}{"time_ms": db_scan_timeout.Milliseconds(), "status": utils.SCAN_STATUS_FAILED}); err != nil { + return err + } + + return tx.Commit() +} + +func RetryScansDB(ctx context.Context) error { + nc, err := directory.Neo4jClient(ctx) + if err != nil { + return err + } + session, err := nc.Session(neo4j.AccessModeWrite) + if err != nil { + return err + } + defer session.Close() + + tx, err := session.BeginTransaction() + if err != nil { + return err + } + defer tx.Close() + + if _, err = tx.Run("MATCH (s) -[:SCANNED]-> (:Node) WHERE s.status = '$old_status' AND n.updated_at < TIMESTAMP()-$time_ms AND s.retries < 3 SET n.retries = n.retries + 1, n.status='$new_status'", map[string]interface{}{"time_ms": db_scan_timeout.Milliseconds(), "old_status": utils.SCAN_STATUS_INPROGRESS, "new_status": utils.SCAN_STATUS_STARTING}); err != nil { + return err + } + return tx.Commit() } diff --git a/deepfence_worker/handler/neo4j.go b/deepfence_worker/handler/neo4j.go index 2504bdd063..05deaed3a3 100644 --- a/deepfence_worker/handler/neo4j.go +++ b/deepfence_worker/handler/neo4j.go @@ -10,3 +10,7 @@ import ( func CleanUpGraphDB(ctx context.Context, t *asynq.Task) error { return tasks.HandleCleanUpGraphDBTask(ctx, t) } + +func RetryScansGraphDB(ctx context.Context, t *asynq.Task) error { + return tasks.HandlScanRetryTask(ctx, t) +} diff --git a/deepfence_worker/router/router.go b/deepfence_worker/router/router.go index af430e8895..701e2f2a9e 100644 --- a/deepfence_worker/router/router.go +++ b/deepfence_worker/router/router.go @@ -1,6 +1,7 @@ package router import ( + "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_worker/handler" "github.com/deepfence/ThreatMapper/deepfence_worker/tasks" "github.com/hibiken/asynq" @@ -8,5 +9,6 @@ import ( func SetupRoutes(r *asynq.ServeMux) { r.HandleFunc(tasks.PingTaskID, tasks.HandlePingTask) - r.HandleFunc(tasks.CleanUpGraphDBTaskID, handler.CleanUpGraphDB) + r.HandleFunc(string(directory.CleanUpGraphDBTaskID), handler.CleanUpGraphDB) + r.HandleFunc(string(directory.ScanRetryGraphDBTaskID), handler.RetryScansGraphDB) } diff --git a/deepfence_worker/tasks/neo4j.go b/deepfence_worker/tasks/neo4j.go index f3c5d86856..f6cb09ca8e 100644 --- a/deepfence_worker/tasks/neo4j.go +++ b/deepfence_worker/tasks/neo4j.go @@ -2,8 +2,6 @@ package tasks import ( "context" - "encoding/json" - "fmt" "time" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -12,32 +10,33 @@ import ( "github.com/hibiken/asynq" ) -const ( - CleanUpGraphDBTaskID = "CleanUpGraphDB" -) +func HandleCleanUpGraphDBTask(_ context.Context, t *asynq.Task) error { + ctx, err := directory.PayloadToContext(t.Payload()) + if err != nil { + return err + } -type CleanUpGraphDBContext struct { - Namespace directory.NamespaceID `json:"namespace"` -} + start := time.Now() + err = cronjobs.CleanUpDB(ctx) + log.Info().Msgf("DB clean: %v", time.Since(start)) -func NewCleanUpGraphDBTask(ns directory.NamespaceID) (*asynq.Task, error) { - payload, err := json.Marshal(CleanUpGraphDBContext{Namespace: ns}) if err != nil { - return nil, err + log.Error().Msgf("Clean neo4j err: %v", err) } - return asynq.NewTask(CleanUpGraphDBTaskID, payload, asynq.Unique(time.Minute*30)), nil + return err } -func HandleCleanUpGraphDBTask(_ context.Context, t *asynq.Task) error { - var p CleanUpGraphDBContext - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) +func HandlScanRetryTask(_ context.Context, t *asynq.Task) error { + ctx, err := directory.PayloadToContext(t.Payload()) + if err != nil { + return err } - start := time.Now() - err := cronjobs.CleanUpDB(directory.NewContextWithNameSpace(p.Namespace)) - log.Info().Msgf("DB clean: %v", time.Since(start)) + + err = cronjobs.RetryScansDB(ctx) + if err != nil { - log.Error().Msgf("clean neo4j err: %v", err) + log.Error().Msgf("Retry scan in Neo4j err: %v", err) } + return err }