From 3f7f0dd0d411872ceef38c19de092df8ec77a3d2 Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Tue, 12 Sep 2023 21:37:55 +0900 Subject: [PATCH] Fix deadlock on pod scan status updates --- deepfence_worker/ingesters/common.go | 42 +++++++++++++--------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/deepfence_worker/ingesters/common.go b/deepfence_worker/ingesters/common.go index cb4ff5e441..36f546393e 100644 --- a/deepfence_worker/ingesters/common.go +++ b/deepfence_worker/ingesters/common.go @@ -2,7 +2,6 @@ package ingesters import ( "encoding/json" - "fmt" "time" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -70,15 +69,18 @@ func CommitFuncStatus[Status any](ts utils.Neo4jScanType) func(ns string, data [ return err } + if ts != utils.NEO4J_CLOUD_COMPLIANCE_SCAN && ts != utils.NEO4J_COMPLIANCE_SCAN { + err = updatePodScanStatus(ts, recordMap, session) + if err != nil { + return err + } + } + err = tx.Commit() if err != nil { return err } - if ts != utils.NEO4J_CLOUD_COMPLIANCE_SCAN && ts != utils.NEO4J_COMPLIANCE_SCAN { - updatePodScanStatus(ts, recordMap, session) - } - return nil } } @@ -86,36 +88,30 @@ func CommitFuncStatus[Status any](ts utils.Neo4jScanType) func(ns string, data [ func updatePodScanStatus(ts utils.Neo4jScanType, recordMap []map[string]interface{}, session neo4j.Session) error { - query := `UNWIND $batch as row - MATCH (n:Pod) - CALL { - WITH row - MATCH (p)-[r:SCANNED]->(c:Container) - WHERE p.node_id = row.scan_id AND c.pod_name IS NOT NULL - RETURN c.pod_name AS pod_name - } - WITH n, row - WHERE n.pod_name = pod_name - SET n.%s = row.scan_status` + query := ` + UNWIND $batch as row + MATCH (s:` + string(ts) + `{node_id: row.scan_id})-[:SCANNED]->(c:Container) + WHERE c.pod_id IS NOT NULL + MATCH (n:Pod{node_id: c.pod_id}) + SET n.` + ingestersUtil.ScanStatusField[ts] + `=row.scan_status` tx, err := session.BeginTransaction(neo4j.WithTxTimeout(30 * time.Second)) if err != nil { return err } - _, err = tx.Run(fmt.Sprintf(query, ingestersUtil.ScanStatusField[ts]), - map[string]interface{}{"batch": recordMap}) + log.Debug().Msgf("query: %v", query) + _, err = tx.Run(query, + map[string]interface{}{ + "batch": recordMap, + }, + ) if err != nil { log.Error().Msgf("Error in pod status update query: %+v", err) return err } - err = tx.Commit() - if err != nil { - log.Info().Msgf("Error in commit for pod status update query: %v", err) - return err - } return nil }