diff --git a/deepfence_agent/Dockerfile.cloud-agent b/deepfence_agent/Dockerfile.cloud-agent index 7ee0397a3e..22baac87ad 100644 --- a/deepfence_agent/Dockerfile.cloud-agent +++ b/deepfence_agent/Dockerfile.cloud-agent @@ -29,7 +29,11 @@ WORKDIR /opt/steampipe USER deepfence -ENV DF_INSTALL_DIR=/home/deepfence +ENV DF_INSTALL_DIR=/home/deepfence \ + STEAMPIPE_AWS_PLUGIN_VERSION=0.118.1 \ + STEAMPIPE_GCP_PLUGIN_VERSION=0.43.0 \ + STEAMPIPE_AZURE_PLUGIN_VERSION=0.49.0 \ + STEAMPIPE_AZURE_AD_PLUGIN_VERSION=0.12.0 COPY supervisord-cloud.conf /home/deepfence/supervisord.conf COPY --from=steampipe /usr/local/bin/steampipe /usr/local/bin/steampipe @@ -37,7 +41,7 @@ COPY --from=steampipe /usr/local/bin/steampipe /usr/local/bin/steampipe RUN steampipe service start \ && steampipe plugin install steampipe \ # plugin version should be in sync with Deepfence fork https://github.com/deepfence/steampipe-plugin-aws - && steampipe plugin install aws@0.118.1 gcp@0.43.0 azure@0.49.0 azuread@0.12.0 \ + && steampipe plugin install aws@${STEAMPIPE_AWS_PLUGIN_VERSION} gcp@${STEAMPIPE_GCP_PLUGIN_VERSION} azure@${STEAMPIPE_AZURE_PLUGIN_VERSION} azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION} \ && git clone https://github.com/turbot/steampipe-mod-aws-compliance.git --branch v0.79 --depth 1 \ && git clone https://github.com/turbot/steampipe-mod-gcp-compliance.git --branch v0.21 --depth 1 \ && git clone https://github.com/turbot/steampipe-mod-azure-compliance.git --branch v0.35 --depth 1 \ @@ -56,10 +60,10 @@ ENV PUBLISH_CLOUD_RESOURCES_INTERVAL_MINUTES=5 \ EXPOSE 8080 -COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@latest/steampipe-plugin-aws.plugin -COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@latest/steampipe-plugin-gcp.plugin -COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@latest/steampipe-plugin-azure.plugin -COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@latest/steampipe-plugin-azuread.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@${STEAMPIPE_AWS_PLUGIN_VERSION}/steampipe-plugin-aws.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@${STEAMPIPE_GCP_PLUGIN_VERSION}/steampipe-plugin-gcp.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@${STEAMPIPE_AZURE_PLUGIN_VERSION}/steampipe-plugin-azure.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION}/steampipe-plugin-azuread.plugin COPY plugins/cloud-scanner/cloud_scanner /home/deepfence/bin/cloud_scanner diff --git a/deepfence_agent/plugins/YaraHunter b/deepfence_agent/plugins/YaraHunter index 8e4d161ce8..e13c0e5e7a 160000 --- a/deepfence_agent/plugins/YaraHunter +++ b/deepfence_agent/plugins/YaraHunter @@ -1 +1 @@ -Subproject commit 8e4d161ce812f31a0afd6d1a0d4f8c5440845f47 +Subproject commit e13c0e5e7a5df52cff2b01c1bd847fa5597402d5 diff --git a/deepfence_agent/plugins/cloud-scanner b/deepfence_agent/plugins/cloud-scanner index c25aedcfdb..5930094155 160000 --- a/deepfence_agent/plugins/cloud-scanner +++ b/deepfence_agent/plugins/cloud-scanner @@ -1 +1 @@ -Subproject commit c25aedcfdbc09a9deae9f40cdea9367a6b6ca658 +Subproject commit 59300941551a60ba55d98911484f9c3b76683a02 diff --git a/deepfence_agent/plugins/yara-rules b/deepfence_agent/plugins/yara-rules index 8217b51893..a926dfa18c 160000 --- a/deepfence_agent/plugins/yara-rules +++ b/deepfence_agent/plugins/yara-rules @@ -1 +1 @@ -Subproject commit 8217b518934b556ee7f56e6e5fc3e05be8c8d9fa +Subproject commit a926dfa18c3af763dbebcbfc679d6a055d6efce8 diff --git a/deepfence_bootstrapper/router/cloud_scanner.go b/deepfence_bootstrapper/router/cloud_scanner.go index a7166d927c..f0455c1000 100644 --- a/deepfence_bootstrapper/router/cloud_scanner.go +++ b/deepfence_bootstrapper/router/cloud_scanner.go @@ -89,7 +89,7 @@ func RefreshResources(req ctl.RefreshResourcesRequest) error { return nil } -func GetCloudScannerJobCount() int32 { +func GetCloudScannerJobCount(action ctl.ActionID) int32 { conn, err := net.Dial("unix", CloudScannerSocketPath) if err != nil { log.Error().Err(err).Msgf("GetCloudScannerJobCount: error in creating cloud compliance scanner client with socket %s", CloudScannerSocketPath) @@ -98,7 +98,7 @@ func GetCloudScannerJobCount() int32 { defer conn.Close() jobCountReq := map[string]interface{}{ - "action": ctl.CloudScannerJobCount, + "action": action, } jobCountReqBytes, err := json.Marshal(jobCountReq) if err != nil { @@ -128,49 +128,3 @@ func GetCloudScannerJobCount() int32 { return jobCount } } - -func GetCloudNodeID() (string, error) { - cloudNodeID := "" - conn, err := net.Dial("unix", CloudScannerSocketPath) - if err != nil { - log.Error().Err(err).Msgf("Error creating cloud scanner client with socket %s", CloudScannerSocketPath) - return cloudNodeID, err - } - defer conn.Close() - reqMap := make(map[string]interface{}) - reqMap["GetCloudNodeID"] = true - cloudNodeIDReq := map[string]interface{}{ - "args": reqMap, - } - - cloudNodeIDReqBytes, err := json.Marshal(cloudNodeIDReq) - if err != nil { - log.Error().Err(err).Msg("Error in converting request into valid json") - return cloudNodeID, err - } - - _, err = conn.Write(cloudNodeIDReqBytes) - if err != nil { - log.Error().Err(err).Msgf("Error in writing data to unix socket %s", CloudScannerSocketPath) - return cloudNodeID, err - } - - responseTimeout := 10 * time.Second - deadline := time.Now().Add(responseTimeout) - buf := make([]byte, 1024) - for { - conn.SetReadDeadline(deadline) - n, err := conn.Read(buf[:]) - if err != nil { - log.Error().Err(err).Msg("Error in read") - return cloudNodeID, err - } - - count, err := fmt.Sscan(string(buf[0:n]), &cloudNodeID) - if err != nil || count != 1 { - return cloudNodeID, err - } - break - } - return cloudNodeID, err -} diff --git a/deepfence_bootstrapper/router/openapi_client_controls.go b/deepfence_bootstrapper/router/openapi_client_controls.go index 9832d50f9f..235074e4af 100644 --- a/deepfence_bootstrapper/router/openapi_client_controls.go +++ b/deepfence_bootstrapper/router/openapi_client_controls.go @@ -119,25 +119,29 @@ func (ct *OpenapiClient) StartControlsWatching(nodeID string, const ( MaxAgentWorkload = 2 - MaxCloudAgentWorkload = 1 + MaxCloudAgentWorkload = 2 ) func GetScannersWorkloads(nodeType string) int32 { - res := int32(0) - var secret, malware, vuln, cloud int32 if nodeType == ctl.CLOUD_AGENT { - cloud = GetCloudScannerJobCount() + var cloudPostureScan, cloudResourceRefreshCount int32 + + cloudPostureScan = GetCloudScannerJobCount(ctl.CloudScannerJobCount) + cloudResourceRefreshCount = GetCloudScannerJobCount(ctl.CloudScannerResourceRefreshCount) + + log.Info().Msgf("workloads = cloud posture: %d, cloud resource refresh: %d", cloudPostureScan, cloudResourceRefreshCount) + return cloudPostureScan + cloudResourceRefreshCount } else { + var secret, malware, vuln int32 + secret = GetSecretScannerJobCount() malware = GetMalwareScannerJobCount() vuln = GetPackageScannerJobCount() - } - //TODO: Add more scanners workload - log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d, cloud: %d", - vuln, secret, malware, cloud) - res = secret + malware + vuln + cloud - return res + //TODO: Add more scanners workload + log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d", vuln, secret, malware) + return secret + malware + vuln + } } var upgrade atomic.Bool diff --git a/deepfence_server/controls/agent.go b/deepfence_server/controls/agent.go index e7e76706f9..3163bf54ed 100644 --- a/deepfence_server/controls/agent.go +++ b/deepfence_server/controls/agent.go @@ -69,7 +69,7 @@ func GetAgentActions(ctx context.Context, agentID model.AgentID, consoleURL stri actions = append(actions, upgradeActions...) } - scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, workNumToExtract) + scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, agentType, workNumToExtract) workNumToExtract -= len(scanActions) if scanErr == nil { actions = append(actions, scanActions...) @@ -312,7 +312,7 @@ func hasPendingAgentScans(ctx context.Context, client neo4j.DriverWithContext, n return len(records) != 0, err } -func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int) ([]controls.Action, error) { +func ExtractStartingAgentScans(ctx context.Context, nodeID string, agentType string, maxWork int) ([]controls.Action, error) { ctx, span := telemetry.NewSpan(ctx, "control", "extract-starting-agent-scans") defer span.End() @@ -340,14 +340,28 @@ func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int) } defer tx.Close(ctx) - r, err := tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) + var r neo4j.ResultWithContext + if agentType == controls.CLOUD_AGENT { + r, err = tx.Run(ctx, `MATCH (c:CloudNode) <- [:SCANNED] - (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) WHERE s.status = '`+utils.ScanStatusStarting+`' + AND c.refresh_status = 'COMPLETE' + AND c.active = true AND s.retries < 3 WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP() WITH s RETURN s.trigger_action`, - map[string]interface{}{"id": nodeID, "max_work": maxWork}) + map[string]interface{}{"id": nodeID, "max_work": maxWork}) + } else { + r, err = tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) + WHERE s.status = '`+utils.ScanStatusStarting+`' + AND s.retries < 3 + WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work + SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP() + WITH s + RETURN s.trigger_action`, + map[string]interface{}{"id": nodeID, "max_work": maxWork}) + } if err != nil { return res, err @@ -629,8 +643,7 @@ func ExtractPendingAgentUpgrade(ctx context.Context, nodeID string, maxWork int, } -func ExtractRefreshResourceAction(ctx context.Context, nodeID string, - maxWork int) ([]controls.Action, error) { +func ExtractRefreshResourceAction(ctx context.Context, nodeID string, maxWork int) ([]controls.Action, error) { ctx, span := telemetry.NewSpan(ctx, "control", "extract-pending-refresh-resources") defer span.End() @@ -659,15 +672,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string, defer tx.Close(ctx) r, err := tx.Run(ctx, ` - MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (c:CloudNode) - MATCH(r:CloudNodeRefresh{node_id:c.node_id}) - WHERE r.refresh=true - WITH HEAD(collect(r)) AS rnode - WHERE rnode IS NOT NULL - WITH rnode, rnode.node_id AS node_id - DETACH DELETE rnode - RETURN node_id`, - map[string]interface{}{"id": nodeID}) + MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (r:CloudNode) + WHERE r.refresh_status = '`+utils.ScanStatusStarting+`' + AND NOT COALESCE(r.cloud_compliance_scan_status, '') IN ['`+utils.ScanStatusStarting+`', '`+utils.ScanStatusInProgress+`'] + WITH r LIMIT $max_work + SET r.refresh_status = '`+utils.ScanStatusInProgress+`', r.refresh_message = '' + WITH r + RETURN r.node_id, r.node_name AS account_id`, + map[string]interface{}{"id": nodeID, "max_work": maxWork}) if err != nil { return res, err @@ -683,13 +695,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string, for _, record := range records { var action controls.Action - if record.Values[0] == nil { + if record.Values[0] == nil || record.Values[1] == nil { log.Error().Msgf("Invalid CloudNode ID, skipping") continue } req := controls.RefreshResourcesRequest{} req.NodeId = record.Values[0].(string) + req.AccountID = record.Values[1].(string) req.NodeType = controls.CloudAccount reqBytes, err := json.Marshal(req) diff --git a/deepfence_server/handler/cloud_node.go b/deepfence_server/handler/cloud_node.go index 6e4e5585fd..0c764bb429 100644 --- a/deepfence_server/handler/cloud_node.go +++ b/deepfence_server/handler/cloud_node.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/deepfence/ThreatMapper/deepfence_server/model" + "github.com/deepfence/ThreatMapper/deepfence_server/reporters" reporters_scan "github.com/deepfence/ThreatMapper/deepfence_server/reporters/scan" ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -19,6 +20,9 @@ import ( var ( cloudAccountNodeType = ctl.ResourceTypeToString(ctl.CloudAccount) + refreshAccountFilter = reporters.FieldsFilters{ + ContainsFilter: reporters.ContainsFilter{FieldsValues: map[string][]interface{}{"refresh_status": {"COMPLETE", "ERROR"}}}, + } ) func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http.Request) { @@ -124,13 +128,19 @@ func (h *Handler) RefreshCloudAccountHandler(w http.ResponseWriter, r *http.Requ nodeIdentifiers[i] = model.NodeIdentifier{NodeID: id, NodeType: cloudAccountNodeType} } - cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers) + cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers, &refreshAccountFilter) if err != nil { log.Error().Msgf(err.Error()) h.respondError(&BadDecoding{err}, w) return } + if len(cloudNodeIds) == 0 { + // Refresh already in progress for all requested cloud accounts + w.WriteHeader(http.StatusNoContent) + return + } + resolvedRequest := model.CloudAccountRefreshReq{NodeIDs: make([]string, len(cloudNodeIds))} for i, id := range cloudNodeIds { resolvedRequest.NodeIDs[i] = id.NodeID diff --git a/deepfence_server/handler/scan_reports.go b/deepfence_server/handler/scan_reports.go index d2b1a66a7f..e8e17683c4 100644 --- a/deepfence_server/handler/scan_reports.go +++ b/deepfence_server/handler/scan_reports.go @@ -418,7 +418,7 @@ func (h *Handler) StartComplianceScanHandler(w http.ResponseWriter, r *http.Requ regular, k8s, _, _ := extractBulksNodes(reqs.NodeIDs) - cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular) + cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular, nil) if err != nil { h.respondError(err, w) return diff --git a/deepfence_server/ingesters/scan_status.go b/deepfence_server/ingesters/scan_status.go index 068b8fb63b..d462bf6049 100644 --- a/deepfence_server/ingesters/scan_status.go +++ b/deepfence_server/ingesters/scan_status.go @@ -325,16 +325,16 @@ func AddNewCloudComplianceScan( } } } - nt := controls.KubernetesCluster - if nodeType == controls.ResourceTypeToString(controls.Host) { - nt = controls.Host - } var action []byte var hostNodeID, hostNeo4jNodeType string if nodeType == controls.ResourceTypeToString(controls.KubernetesCluster) || nodeType == controls.ResourceTypeToString(controls.Host) { hostNodeID = nodeID hostNeo4jNodeType = neo4jNodeType + nt := controls.KubernetesCluster + if nodeType == controls.ResourceTypeToString(controls.Host) { + nt = controls.Host + } internalReq, _ := json.Marshal(controls.StartComplianceScanRequest{ NodeID: nodeID, NodeType: nt, @@ -374,7 +374,7 @@ func AddNewCloudComplianceScan( internalReq, _ := json.Marshal(controls.StartCloudComplianceScanRequest{ NodeID: nodeID, - NodeType: nt, + NodeType: controls.CloudAccount, BinArgs: map[string]string{"scan_id": scanID, "benchmark_types": strings.Join(benchmarkTypes, ",")}, ScanDetails: controls.CloudComplianceScanDetails{ ScanId: scanID, diff --git a/deepfence_server/model/cloud_node.go b/deepfence_server/model/cloud_node.go index 8a74aff137..b5e02e62a0 100644 --- a/deepfence_server/model/cloud_node.go +++ b/deepfence_server/model/cloud_node.go @@ -84,7 +84,7 @@ type CloudNodeAccountInfo struct { LastScanID string `json:"last_scan_id"` LastScanStatus string `json:"last_scan_status"` RefreshMessage string `json:"refresh_message"` - RefreshStatus string `json:"refresh_status" enum:"STARTING,IN_PROGRESS,ERROR,COMPLETE"` + RefreshStatus string `json:"refresh_status" enum:"QUEUED,STARTING,IN_PROGRESS,ERROR,COMPLETE"` ScanStatusMap map[string]int64 `json:"scan_status_map"` Version string `json:"version"` HostNodeID string `json:"host_node_id"` @@ -230,7 +230,7 @@ func UpsertCloudComplianceNode(ctx context.Context, nodeDetails map[string]inter MERGE (r:Node{node_id:$host_node_id, node_type: "cloud_agent"}) WITH $param as row, r MERGE (n:CloudNode{node_id:row.node_id}) - ON CREATE SET n.refresh_status = 'STARTING', n.refresh_message = '' + ON CREATE SET n.refresh_status = '`+utils.ScanStatusStarting+`', n.refresh_message = '' MERGE (r) -[:HOSTS]-> (n) SET n+= row, n.active = true, n.updated_at = TIMESTAMP(), n.version = row.version, r.node_name=$host_node_id, r.active = true, r.agent_running=true, r.updated_at = TIMESTAMP()`, @@ -246,7 +246,7 @@ func UpsertCloudComplianceNode(ctx context.Context, nodeDetails map[string]inter MERGE (m:CloudNode{node_id: $parent_node_id}) WITH $param as row, r, m MERGE (n:CloudNode{node_id:row.node_id}) - ON CREATE SET n.refresh_status = 'STARTING', n.refresh_message = '' + ON CREATE SET n.refresh_status = '`+utils.ScanStatusStarting+`', n.refresh_message = '' MERGE (m) -[:IS_CHILD]-> (n) MERGE (r) -[:HOSTS]-> (n) SET n+= row, n.active = true, n.updated_at = TIMESTAMP(), n.version = row.version, @@ -532,8 +532,8 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err if _, err = tx.Run(ctx, ` UNWIND $batch as cloudNode - MERGE (n:CloudNodeRefresh{node_id: cloudNode}) - SET n.refresh = true, n.updated_at = TIMESTAMP()`, + MATCH (m:CloudNode{node_id: cloudNode}) + SET m.refresh_status = '`+utils.ScanStatusStarting+`', m.refresh_message = ''`, map[string]interface{}{ "batch": c.NodeIDs, }); err != nil { @@ -542,50 +542,6 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err return tx.Commit(ctx) } -func (c *CloudAccountRefreshReq) GetCloudAccountRefresh(ctx context.Context) ([]string, error) { - - ctx, span := telemetry.NewSpan(ctx, "model", "get-cloud-account-refresh") - defer span.End() - - var updatedNodeIDs []string - driver, err := directory.Neo4jClient(ctx) - if err != nil { - return updatedNodeIDs, err - } - - session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) - defer session.Close(ctx) - - tx, err := session.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second)) - if err != nil { - return updatedNodeIDs, err - } - defer tx.Close(ctx) - - res, err := tx.Run(ctx, ` - UNWIND $batch as cloudNode - MATCH (n:CloudNodeRefresh{node_id: cloudNode}) - WHERE n.refresh=true - WITH n, n.node_id as deletedNodeID - DELETE n - RETURN deletedNodeID`, - map[string]interface{}{ - "batch": c.NodeIDs, - }) - if err != nil { - return updatedNodeIDs, err - } - recs, err := res.Collect(ctx) - if err != nil { - return updatedNodeIDs, err - } - - for _, rec := range recs { - updatedNodeIDs = append(updatedNodeIDs, rec.Values[0].(string)) - } - return updatedNodeIDs, tx.Commit(ctx) -} - type CloudAccountDeleteReq struct { NodeIDs []string `json:"node_ids" validate:"required,gt=0" required:"true"` } diff --git a/deepfence_server/reporters/scan/scan_reporters.go b/deepfence_server/reporters/scan/scan_reporters.go index c2a759b303..d923d28174 100644 --- a/deepfence_server/reporters/scan/scan_reporters.go +++ b/deepfence_server/reporters/scan/scan_reporters.go @@ -398,7 +398,7 @@ func GetPodContainerIDs(ctx context.Context, podIds []model.NodeIdentifier) ([]m return res, nil } -func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdentifier) ([]model.NodeIdentifier, error) { +func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdentifier, filters *reporters.FieldsFilters) ([]model.NodeIdentifier, error) { ctx, span := telemetry.NewSpan(ctx, "scan-reports", "get-cloud-account-ids") defer span.End() @@ -418,9 +418,15 @@ func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdenti } defer tx.Close(ctx) + filterClauses := mo.None[reporters.FieldsFilters]() + if filters != nil { + filterClauses = mo.Some(*filters) + } + nres, err := tx.Run(ctx, ` MATCH (n:CloudNode) WHERE n.node_id IN $node_ids + `+reporters.ParseFieldFilters2CypherWhereConditions(`n`, filterClauses, false)+` RETURN n.node_id, n.cloud_provider`, map[string]interface{}{"node_ids": NodeIdentifierToIDList(cloudProviderIds)}) if err != nil { @@ -432,6 +438,7 @@ func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdenti return res, err } orgNodeIds := []string{} + childNodeIDs := []string{} for _, rec := range recs { cloudProvider := rec.Values[1].(string) if cloudProvider == model.PostureProviderAWSOrg || cloudProvider == model.PostureProviderGCPOrg || cloudProvider == model.PostureProviderAzureOrg { @@ -442,13 +449,16 @@ func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdenti NodeID: rec.Values[0].(string), NodeType: controls.ResourceTypeToString(controls.CloudAccount), }) + childNodeIDs = append(childNodeIDs, rec.Values[0].(string)) } if len(orgNodeIds) > 0 { nres, err = tx.Run(ctx, ` MATCH (n:CloudNode) -[:IS_CHILD] -> (m) - WHERE n.node_id IN $node_ids + WHERE n.node_id IN $node_ids + AND NOT m.node_id IN $child_node_ids + `+reporters.ParseFieldFilters2CypherWhereConditions(`m`, filterClauses, false)+` RETURN m.node_id`, - map[string]interface{}{"node_ids": orgNodeIds}) + map[string]interface{}{"node_ids": orgNodeIds, "child_node_ids": childNodeIDs}) if err != nil { return res, err } diff --git a/deepfence_utils/controls/agent.go b/deepfence_utils/controls/agent.go index 49858bb795..7d74eabe7a 100644 --- a/deepfence_utils/controls/agent.go +++ b/deepfence_utils/controls/agent.go @@ -27,6 +27,7 @@ const ( StartCloudComplianceScan StopCloudComplianceScan CloudScannerJobCount + CloudScannerResourceRefreshCount ) type ScanResource int @@ -160,8 +161,9 @@ type CloudComplianceScanBenchmark struct { } type RefreshResourcesRequest struct { - NodeId string `json:"node_id" required:"true"` - NodeType ScanResource `json:"node_type" required:"true"` + NodeId string `json:"node_id" required:"true"` + AccountID string `json:"account_id" required:"true"` + NodeType ScanResource `json:"node_type" required:"true"` } type StopSecretScanRequest StartSecretScanRequest diff --git a/deepfence_utils/utils/ingesters/cloud_resource.go b/deepfence_utils/utils/ingesters/cloud_resource.go index a76792c436..f2a65fc499 100644 --- a/deepfence_utils/utils/ingesters/cloud_resource.go +++ b/deepfence_utils/utils/ingesters/cloud_resource.go @@ -192,6 +192,7 @@ type CloudResourceRefreshStatus struct { CloudNodeID string `json:"cloud_node_id"` RefreshMessage string `json:"refresh_message"` RefreshStatus string `json:"refresh_status"` + UpdatedAt int64 `json:"updated_at"` } func (c *CloudResourceRefreshStatus) ToMap() map[string]interface{} { diff --git a/deepfence_worker/ingesters/cloud_resource.go b/deepfence_worker/ingesters/cloud_resource.go index 035d58fbee..e9137d2ffb 100644 --- a/deepfence_worker/ingesters/cloud_resource.go +++ b/deepfence_worker/ingesters/cloud_resource.go @@ -4,12 +4,12 @@ import ( "context" "encoding/json" "fmt" + "sort" "time" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/telemetry" - "github.com/deepfence/ThreatMapper/deepfence_utils/utils" ingestersUtil "github.com/deepfence/ThreatMapper/deepfence_utils/utils/ingesters" "github.com/neo4j/neo4j-go-driver/v5/neo4j" ) @@ -276,42 +276,14 @@ func CommitFuncCloudResourceRefreshStatus(ctx context.Context, ns string, cs []i } func ResourceRefreshStatusToMaps(data []ingestersUtil.CloudResourceRefreshStatus) []map[string]interface{} { - statusBuff := map[string]map[string]interface{}{} - for _, i := range data { - statusMap := i.ToMap() + statuses := make([]map[string]interface{}, len(data)) - cloudNodeId, ok := statusMap["cloud_node_id"].(string) - if !ok { - log.Error().Msgf("failed to convert cloud_node_id to string, data: %v", statusMap) - continue - } - - newStatus, ok := statusMap["refresh_status"].(string) - if !ok { - log.Error().Msgf("failed to convert refresh_status to string, data: %v", statusMap) - continue - } + sort.Slice(data, func(i, j int) bool { + return data[i].UpdatedAt < data[j].UpdatedAt + }) - old, found := statusBuff[cloudNodeId] - if !found { - statusBuff[cloudNodeId] = statusMap - } else { - oldStatus, ok := old["refresh_status"].(string) - if !ok { - log.Error().Msgf("failed to convert refresh_status to string, data: %v", old) - continue - } - if newStatus != oldStatus { - if newStatus == utils.ScanStatusSuccess || - newStatus == utils.ScanStatusFailed || newStatus == utils.ScanStatusCancelled { - statusBuff[cloudNodeId] = statusMap - } - } - } - } - statuses := []map[string]interface{}{} - for _, v := range statusBuff { - statuses = append(statuses, v) + for i, d := range data { + statuses[i] = d.ToMap() } return statuses } diff --git a/golang_deepfence_sdk b/golang_deepfence_sdk index 226fd1e1e6..e4ec9311fd 160000 --- a/golang_deepfence_sdk +++ b/golang_deepfence_sdk @@ -1 +1 @@ -Subproject commit 226fd1e1e6bb7b9f4e179d6f738bb7cd9c55dd72 +Subproject commit e4ec9311fdf96022b9a0fad59e63e530ba50587f