diff --git a/deepfence_worker/cronjobs/cloud_compliance.go b/deepfence_worker/cronjobs/cloud_compliance.go index bc6a69acf5..3268c99736 100644 --- a/deepfence_worker/cronjobs/cloud_compliance.go +++ b/deepfence_worker/cronjobs/cloud_compliance.go @@ -185,6 +185,7 @@ func AddCloudControls(ctx context.Context, task *asynq.Task) error { func CachePostureProviders(ctx context.Context, task *asynq.Task) error { log.Info().Msgf("Caching Posture Providers") + defer log.Info().Msgf("Caching Posture Providers - Done") driver, err := directory.Neo4jClient(ctx) if err != nil { return err @@ -237,9 +238,9 @@ func CachePostureProviders(ctx context.Context, task *asynq.Task) error { scan_count_query = ` MATCH (n:` + string(neo4jNodeType) + `) - WHERE n.pseudo=false and n.active=true and n.agent_running=true + WHERE n.pseudo=false and n.agent_running=true MATCH (n) <-[:SCANNED]- (m:` + string(utils.NEO4J_COMPLIANCE_SCAN) + `) - RETURN count(distinct m)` + RETURN count(distinct n)` success_count_query = ` MATCH (n:` + string(neo4jNodeType) + `) @@ -274,10 +275,9 @@ func CachePostureProviders(ctx context.Context, task *asynq.Task) error { scan_count_query = ` MATCH (o:` + string(neo4jNodeType) + `{cloud_provider:$cloud_provider+'_org'}) -[:IS_CHILD]-> (m:` + string(neo4jNodeType) + `) - WHERE o.active=true AND m.organization_id IS NOT NULL MATCH (n:` + string(utils.NEO4J_CLOUD_COMPLIANCE_SCAN) + `)-[:SCANNED]->(m) - RETURN count(distinct n)` + RETURN count(distinct m)` success_count_query = ` MATCH (o:` + string(neo4jNodeType) + `{cloud_provider:$cloud_provider+'_org'}) -[:IS_CHILD]-> (m:` + string(neo4jNodeType) + `) @@ -307,9 +307,8 @@ func CachePostureProviders(ctx context.Context, task *asynq.Task) error { scan_count_query = ` MATCH (m:` + string(neo4jNodeType) + `{cloud_provider: $cloud_provider}) - WHERE m.active=true MATCH (n:` + string(utils.NEO4J_CLOUD_COMPLIANCE_SCAN) + `)-[:SCANNED]->(m) - RETURN count(distinct n)` + RETURN count(distinct m)` success_count_query = ` MATCH (m:` + string(neo4jNodeType) + `{cloud_provider: $cloud_provider}) diff --git a/deepfence_worker/ingesters/common.go b/deepfence_worker/ingesters/common.go index f665eb495c..7c54e5526c 100644 --- a/deepfence_worker/ingesters/common.go +++ b/deepfence_worker/ingesters/common.go @@ -2,6 +2,7 @@ package ingesters import ( "encoding/json" + "strconv" "time" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -79,12 +80,12 @@ func CommitFuncStatus[Status any](ts utils.Neo4jScanType) func(ns string, data [ return err } - if ts != utils.NEO4J_COMPLIANCE_SCAN { - worker, err := directory.Worker(ctx) - if err != nil { - return err - } + worker, err := directory.Worker(ctx) + if err != nil { + return err + } + if ts != utils.NEO4J_COMPLIANCE_SCAN { event := scans.UpdateScanEvent{ ScanType: ts, RecordMap: recordMap, @@ -97,7 +98,17 @@ func CommitFuncStatus[Status any](ts utils.Neo4jScanType) func(ns string, data [ if ts == utils.NEO4J_CLOUD_COMPLIANCE_SCAN { task = utils.UpdateCloudResourceScanStatusTask } - err = worker.Enqueue(task, b) + if err := worker.Enqueue(task, b); err != nil { + log.Error().Err(err).Msgf("failed to enqueue %s", task) + } + } + + if (ts == utils.NEO4J_COMPLIANCE_SCAN || ts == utils.NEO4J_CLOUD_COMPLIANCE_SCAN) && anyCompleted(others) { + err := worker.Enqueue(utils.CachePostureProviders, + []byte(strconv.FormatInt(utils.GetTimestamp(), 10))) + if err != nil { + log.Error().Err(err).Msgf("failed to enqueue %s", utils.CachePostureProviders) + } } return err @@ -109,14 +120,28 @@ func statusesToMaps[T any](data []T) []map[string]interface{} { statusBuff := map[string]map[string]interface{}{} for _, i := range data { new := ToMap(i) - scan_id := new["scan_id"].(string) - new_status := new["scan_status"].(string) + + scan_id, ok := new["scan_id"].(string) + if !ok { + log.Error().Msgf("failed to convert scan_id to string, data: %v", new) + continue + } + + new_status, ok := new["scan_status"].(string) + if !ok { + log.Error().Msgf("failed to convert scan_status to string, data: %v", new) + continue + } old, found := statusBuff[scan_id] if !found { statusBuff[scan_id] = new } else { - old_status := old["scan_status"].(string) + old_status, ok := old["scan_status"].(string) + if !ok { + log.Error().Msgf("failed to convert scan_status to string, data: %v", old) + continue + } if new_status != old_status { if new_status == utils.SCAN_STATUS_SUCCESS || new_status == utils.SCAN_STATUS_FAILED || new_status == utils.SCAN_STATUS_CANCELLED { @@ -138,7 +163,13 @@ func splitInprogressStatus(data []map[string]interface{}) ([]map[string]interfac others := []map[string]interface{}{} for i := range data { - if data[i]["scan_status"].(string) == utils.SCAN_STATUS_INPROGRESS { + status, ok := data[i]["scan_status"].(string) + if !ok { + log.Error().Msgf("failed to convert scan_status to string, data: %v", data[i]) + continue + } + + if status == utils.SCAN_STATUS_INPROGRESS { in_progress = append(in_progress, data[i]) } else { others = append(others, data[i]) @@ -156,3 +187,23 @@ func ToMap[T any](data T) map[string]interface{} { _ = json.Unmarshal(out, &bb) return bb } + +func anyCompleted(data []map[string]interface{}) bool { + + complete := false + + for i := range data { + status, ok := data[i]["scan_status"].(string) + if !ok { + log.Error().Msgf("failed to convert scan_status to string, data: %v", data[i]) + continue + } + + if status == utils.SCAN_STATUS_SUCCESS { + complete = true + break + } + } + + return complete +}