Skip to content

Commit

Permalink
Add periodic scan retry #728
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Dec 23, 2022
1 parent 84ae783 commit a8ba905
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 37 deletions.
2 changes: 1 addition & 1 deletion deepfence_ingester/ingesters/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func CommitFuncSecretScanStatus(ns string, data []SecretScanStatus) error {
}
defer tx.Close()

if _, err = tx.Run("UNWIND $batch as row MERGE (n:SecretScan{node_id: row.scan_id}) SET n.status = row.scan_status",
if _, err = tx.Run("UNWIND $batch as row MERGE (n:SecretScan{node_id: row.scan_id}) SET n.status = row.scan_status, updated_at = TIMESTAMP()",
map[string]interface{}{"batch": statusesToMaps(data)}); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions deepfence_server/ingesters/scan_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func AddNewScan(ctx context.Context, scan_type utils.Neo4jScanType, scan_id stri
return err
}

if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id, status: $status, retries: 0, trigger_action: $action}) MERGE (m:Node{node_id:$node_id}) MERGE (n)-[:SCANNED]->(m)", scan_type),
if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id, status: $status, retries: 0, trigger_action: $action, updated_at: TIMESTAMP()}) MERGE (m:Node{node_id:$node_id}) MERGE (n)-[:SCANNED]->(m)", scan_type),
map[string]interface{}{"scan_id": scan_id, "status": utils.SCAN_STATUS_STARTING, "node_id": node_id, "action": string(b)}); err != nil {
return err
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func UpdateScanStatus(ctx context.Context, scan_type string, scan_id string, sta
}
defer tx.Close()

if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id}) SET n.status = $status", scan_type),
if _, err = tx.Run(fmt.Sprintf("MERGE (n:%s{node_id: $scan_id}) SET n.status = $status, updated_at = TIMESTAMP()", scan_type),
map[string]interface{}{"scan_id": scan_id, "status": status}); err != nil {
return err
}
Expand Down
17 changes: 5 additions & 12 deletions deepfence_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/deepfence/ThreatMapper/deepfence_server/model"
Expand Down Expand Up @@ -135,21 +134,15 @@ func main() {
}

func initializeCronJobs() {
//TODO local namespace
ctx := directory.NewContextWithNameSpace(directory.NonSaaSDirKey)
ns, err := directory.ExtractNamespace(ctx)
err := directory.PeriodicWorkerEnqueue(context.Background(), directory.CleanUpGraphDBTaskID, "@every 120s")
if err != nil {
log.Fatal().Msgf("could not get namespace: %v", err)
log.Fatal().Msgf("Could not enqueue graph clean up task: %v", err)
}
task, err := tasks.NewCleanUpGraphDBTask(ns)
if err != nil {
log.Fatal().Msgf("could not create task: %v", err)
}
err = directory.PeriodicWorkerEnqueue(ctx, task, "@every 120s")

err = directory.PeriodicWorkerEnqueue(context.Background(), directory.ScanRetryGraphDBTaskID, "@every 120s")
if err != nil {
log.Fatal().Msgf("could not enqueue task: %v", err)
log.Fatal().Msgf("Could not enqueue scans retry task: %v", err)
}
log.Info().Msgf("DB clean cron started")
}

func initialize() (*Config, error) {
Expand Down
37 changes: 36 additions & 1 deletion deepfence_utils/directory/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package directory

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/hibiken/asynq"
)
Expand All @@ -11,6 +14,33 @@ const (
max_size = 500 * 1024 * 1024 // 500 MB
)

type TaskID string

const (
CleanUpGraphDBTaskID TaskID = "CleanUpGraphDB"
ScanRetryGraphDBTaskID TaskID = "ScanRetryGraphDB"
)

type GraphDBContext struct {
Namespace NamespaceID `json:"namespace"`
}

func PayloadToContext(b []byte) (context.Context, error) {
var p GraphDBContext
if err := json.Unmarshal(b, &p); err != nil {
return nil, fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
return NewContextWithNameSpace(p.Namespace), nil
}

func newUniquePeriodicGraphDBTask(id TaskID, ns NamespaceID) (*asynq.Task, error) {
payload, err := json.Marshal(GraphDBContext{Namespace: ns})
if err != nil {
return nil, err
}
return asynq.NewTask(string(id), payload, asynq.Unique(time.Minute*30)), nil
}

var ErrExhaustedResources = errors.New("Exhausted worker resources")

type async_clients struct {
Expand Down Expand Up @@ -71,7 +101,7 @@ func WorkerEnqueue(ctx context.Context, task *asynq.Task) error {
return nil
}

func PeriodicWorkerEnqueue(ctx context.Context, task *asynq.Task, cronEntry string) error {
func PeriodicWorkerEnqueue(ctx context.Context, taskid TaskID, cronEntry string) error {

clients, err := getClient(ctx, worker_clients_pool, new_asynq_client)
if err != nil {
Expand All @@ -80,6 +110,11 @@ func PeriodicWorkerEnqueue(ctx context.Context, task *asynq.Task, cronEntry stri

scheduler := clients.scheduler

task, err := newUniquePeriodicGraphDBTask(taskid, NonSaaSDirKey)
if err != nil {
return err
}

scheduler.Register(cronEntry, task)

return nil
Expand Down
1 change: 1 addition & 0 deletions deepfence_utils/utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
SCAN_STATUS_SUCCESS = "COMPLETE"
SCAN_STATUS_STARTING = "STARTING"
SCAN_STATUS_INPROGRESS = "IN_PROGRESS"
SCAN_STATUS_FAILED = "FAILED"
)

type Neo4jScanType string
Expand Down
30 changes: 30 additions & 0 deletions deepfence_worker/cronjobs/neo4j.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"time"

"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
"github.com/neo4j/neo4j-go-driver/v4/neo4j"
)

const (
db_clean_up_timeout = time.Minute * 2
db_scan_timeout = time.Minute * 2
)

func CleanUpDB(ctx context.Context) error {
Expand Down Expand Up @@ -45,5 +47,33 @@ func CleanUpDB(ctx context.Context) error {
return err
}

if _, err = tx.Run("MATCH (n) -[:SCANNED]-> (:Node) WHERE n.updated_at < TIMESTAMP()-$time_ms AND n.retries >= 3 SET n.status = '$status'", map[string]interface{}{"time_ms": db_scan_timeout.Milliseconds(), "status": utils.SCAN_STATUS_FAILED}); err != nil {
return err
}

return tx.Commit()
}

func RetryScansDB(ctx context.Context) error {
nc, err := directory.Neo4jClient(ctx)
if err != nil {
return err
}
session, err := nc.Session(neo4j.AccessModeWrite)
if err != nil {
return err
}
defer session.Close()

tx, err := session.BeginTransaction()
if err != nil {
return err
}
defer tx.Close()

if _, err = tx.Run("MATCH (s) -[:SCANNED]-> (:Node) WHERE s.status = '$old_status' AND n.updated_at < TIMESTAMP()-$time_ms AND s.retries < 3 SET n.retries = n.retries + 1, n.status='$new_status'", map[string]interface{}{"time_ms": db_scan_timeout.Milliseconds(), "old_status": utils.SCAN_STATUS_INPROGRESS, "new_status": utils.SCAN_STATUS_STARTING}); err != nil {
return err
}

return tx.Commit()
}
4 changes: 4 additions & 0 deletions deepfence_worker/handler/neo4j.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ import (
func CleanUpGraphDB(ctx context.Context, t *asynq.Task) error {
return tasks.HandleCleanUpGraphDBTask(ctx, t)
}

func RetryScansGraphDB(ctx context.Context, t *asynq.Task) error {
return tasks.HandlScanRetryTask(ctx, t)
}
4 changes: 3 additions & 1 deletion deepfence_worker/router/router.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package router

import (
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_worker/handler"
"github.com/deepfence/ThreatMapper/deepfence_worker/tasks"
"github.com/hibiken/asynq"
)

func SetupRoutes(r *asynq.ServeMux) {
r.HandleFunc(tasks.PingTaskID, tasks.HandlePingTask)
r.HandleFunc(tasks.CleanUpGraphDBTaskID, handler.CleanUpGraphDB)
r.HandleFunc(string(directory.CleanUpGraphDBTaskID), handler.CleanUpGraphDB)
r.HandleFunc(string(directory.ScanRetryGraphDBTaskID), handler.RetryScansGraphDB)
}
39 changes: 19 additions & 20 deletions deepfence_worker/tasks/neo4j.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package tasks

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
Expand All @@ -12,32 +10,33 @@ import (
"github.com/hibiken/asynq"
)

const (
CleanUpGraphDBTaskID = "CleanUpGraphDB"
)
func HandleCleanUpGraphDBTask(_ context.Context, t *asynq.Task) error {
ctx, err := directory.PayloadToContext(t.Payload())
if err != nil {
return err
}

type CleanUpGraphDBContext struct {
Namespace directory.NamespaceID `json:"namespace"`
}
start := time.Now()
err = cronjobs.CleanUpDB(ctx)
log.Info().Msgf("DB clean: %v", time.Since(start))

func NewCleanUpGraphDBTask(ns directory.NamespaceID) (*asynq.Task, error) {
payload, err := json.Marshal(CleanUpGraphDBContext{Namespace: ns})
if err != nil {
return nil, err
log.Error().Msgf("Clean neo4j err: %v", err)
}
return asynq.NewTask(CleanUpGraphDBTaskID, payload, asynq.Unique(time.Minute*30)), nil
return err
}

func HandleCleanUpGraphDBTask(_ context.Context, t *asynq.Task) error {
var p CleanUpGraphDBContext
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
func HandlScanRetryTask(_ context.Context, t *asynq.Task) error {
ctx, err := directory.PayloadToContext(t.Payload())
if err != nil {
return err
}
start := time.Now()
err := cronjobs.CleanUpDB(directory.NewContextWithNameSpace(p.Namespace))
log.Info().Msgf("DB clean: %v", time.Since(start))

err = cronjobs.RetryScansDB(ctx)

if err != nil {
log.Error().Msgf("clean neo4j err: %v", err)
log.Error().Msgf("Retry scan in Neo4j err: %v", err)
}

return err
}

0 comments on commit a8ba905

Please sign in to comment.