From a473c71aa991bab3f7faa85c9499dced17f56790 Mon Sep 17 00:00:00 2001 From: ramanan-ravi Date: Mon, 1 Jul 2024 16:02:54 +0530 Subject: [PATCH] Clouds scanner - start scan after cloud resources refresh is complete --- deepfence_agent/Dockerfile.cloud-agent | 16 +++--- .../router/cloud_scanner.go | 50 +------------------ .../router/openapi_client_controls.go | 24 +++++---- deepfence_server/controls/agent.go | 41 +++++++++------ deepfence_server/handler/cloud_node.go | 12 ++++- deepfence_server/handler/scan_reports.go | 2 +- deepfence_server/model/cloud_node.go | 50 ++----------------- .../reporters/scan/scan_reporters.go | 16 ++++-- deepfence_utils/controls/agent.go | 6 ++- deepfence_utils/utils/constants.go | 1 + .../utils/ingesters/cloud_resource.go | 1 + deepfence_worker/ingesters/cloud_resource.go | 42 +++------------- 12 files changed, 94 insertions(+), 167 deletions(-) diff --git a/deepfence_agent/Dockerfile.cloud-agent b/deepfence_agent/Dockerfile.cloud-agent index 7ee0397a3e..22baac87ad 100644 --- a/deepfence_agent/Dockerfile.cloud-agent +++ b/deepfence_agent/Dockerfile.cloud-agent @@ -29,7 +29,11 @@ WORKDIR /opt/steampipe USER deepfence -ENV DF_INSTALL_DIR=/home/deepfence +ENV DF_INSTALL_DIR=/home/deepfence \ + STEAMPIPE_AWS_PLUGIN_VERSION=0.118.1 \ + STEAMPIPE_GCP_PLUGIN_VERSION=0.43.0 \ + STEAMPIPE_AZURE_PLUGIN_VERSION=0.49.0 \ + STEAMPIPE_AZURE_AD_PLUGIN_VERSION=0.12.0 COPY supervisord-cloud.conf /home/deepfence/supervisord.conf COPY --from=steampipe /usr/local/bin/steampipe /usr/local/bin/steampipe @@ -37,7 +41,7 @@ COPY --from=steampipe /usr/local/bin/steampipe /usr/local/bin/steampipe RUN steampipe service start \ && steampipe plugin install steampipe \ # plugin version should be in sync with Deepfence fork https://github.com/deepfence/steampipe-plugin-aws - && steampipe plugin install aws@0.118.1 gcp@0.43.0 azure@0.49.0 azuread@0.12.0 \ + && steampipe plugin install aws@${STEAMPIPE_AWS_PLUGIN_VERSION} gcp@${STEAMPIPE_GCP_PLUGIN_VERSION} azure@${STEAMPIPE_AZURE_PLUGIN_VERSION} azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION} \ && git clone https://github.com/turbot/steampipe-mod-aws-compliance.git --branch v0.79 --depth 1 \ && git clone https://github.com/turbot/steampipe-mod-gcp-compliance.git --branch v0.21 --depth 1 \ && git clone https://github.com/turbot/steampipe-mod-azure-compliance.git --branch v0.35 --depth 1 \ @@ -56,10 +60,10 @@ ENV PUBLISH_CLOUD_RESOURCES_INTERVAL_MINUTES=5 \ EXPOSE 8080 -COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@latest/steampipe-plugin-aws.plugin -COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@latest/steampipe-plugin-gcp.plugin -COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@latest/steampipe-plugin-azure.plugin -COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@latest/steampipe-plugin-azuread.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@${STEAMPIPE_AWS_PLUGIN_VERSION}/steampipe-plugin-aws.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@${STEAMPIPE_GCP_PLUGIN_VERSION}/steampipe-plugin-gcp.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@${STEAMPIPE_AZURE_PLUGIN_VERSION}/steampipe-plugin-azure.plugin +COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION}/steampipe-plugin-azuread.plugin COPY plugins/cloud-scanner/cloud_scanner /home/deepfence/bin/cloud_scanner diff --git a/deepfence_bootstrapper/router/cloud_scanner.go b/deepfence_bootstrapper/router/cloud_scanner.go index a7166d927c..f0455c1000 100644 --- a/deepfence_bootstrapper/router/cloud_scanner.go +++ b/deepfence_bootstrapper/router/cloud_scanner.go @@ -89,7 +89,7 @@ func RefreshResources(req ctl.RefreshResourcesRequest) error { return nil } -func GetCloudScannerJobCount() int32 { +func GetCloudScannerJobCount(action ctl.ActionID) int32 { conn, err := net.Dial("unix", CloudScannerSocketPath) if err != nil { log.Error().Err(err).Msgf("GetCloudScannerJobCount: error in creating cloud compliance scanner client with socket %s", CloudScannerSocketPath) @@ -98,7 +98,7 @@ func GetCloudScannerJobCount() int32 { defer conn.Close() jobCountReq := map[string]interface{}{ - "action": ctl.CloudScannerJobCount, + "action": action, } jobCountReqBytes, err := json.Marshal(jobCountReq) if err != nil { @@ -128,49 +128,3 @@ func GetCloudScannerJobCount() int32 { return jobCount } } - -func GetCloudNodeID() (string, error) { - cloudNodeID := "" - conn, err := net.Dial("unix", CloudScannerSocketPath) - if err != nil { - log.Error().Err(err).Msgf("Error creating cloud scanner client with socket %s", CloudScannerSocketPath) - return cloudNodeID, err - } - defer conn.Close() - reqMap := make(map[string]interface{}) - reqMap["GetCloudNodeID"] = true - cloudNodeIDReq := map[string]interface{}{ - "args": reqMap, - } - - cloudNodeIDReqBytes, err := json.Marshal(cloudNodeIDReq) - if err != nil { - log.Error().Err(err).Msg("Error in converting request into valid json") - return cloudNodeID, err - } - - _, err = conn.Write(cloudNodeIDReqBytes) - if err != nil { - log.Error().Err(err).Msgf("Error in writing data to unix socket %s", CloudScannerSocketPath) - return cloudNodeID, err - } - - responseTimeout := 10 * time.Second - deadline := time.Now().Add(responseTimeout) - buf := make([]byte, 1024) - for { - conn.SetReadDeadline(deadline) - n, err := conn.Read(buf[:]) - if err != nil { - log.Error().Err(err).Msg("Error in read") - return cloudNodeID, err - } - - count, err := fmt.Sscan(string(buf[0:n]), &cloudNodeID) - if err != nil || count != 1 { - return cloudNodeID, err - } - break - } - return cloudNodeID, err -} diff --git a/deepfence_bootstrapper/router/openapi_client_controls.go b/deepfence_bootstrapper/router/openapi_client_controls.go index 9832d50f9f..235074e4af 100644 --- a/deepfence_bootstrapper/router/openapi_client_controls.go +++ b/deepfence_bootstrapper/router/openapi_client_controls.go @@ -119,25 +119,29 @@ func (ct *OpenapiClient) StartControlsWatching(nodeID string, const ( MaxAgentWorkload = 2 - MaxCloudAgentWorkload = 1 + MaxCloudAgentWorkload = 2 ) func GetScannersWorkloads(nodeType string) int32 { - res := int32(0) - var secret, malware, vuln, cloud int32 if nodeType == ctl.CLOUD_AGENT { - cloud = GetCloudScannerJobCount() + var cloudPostureScan, cloudResourceRefreshCount int32 + + cloudPostureScan = GetCloudScannerJobCount(ctl.CloudScannerJobCount) + cloudResourceRefreshCount = GetCloudScannerJobCount(ctl.CloudScannerResourceRefreshCount) + + log.Info().Msgf("workloads = cloud posture: %d, cloud resource refresh: %d", cloudPostureScan, cloudResourceRefreshCount) + return cloudPostureScan + cloudResourceRefreshCount } else { + var secret, malware, vuln int32 + secret = GetSecretScannerJobCount() malware = GetMalwareScannerJobCount() vuln = GetPackageScannerJobCount() - } - //TODO: Add more scanners workload - log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d, cloud: %d", - vuln, secret, malware, cloud) - res = secret + malware + vuln + cloud - return res + //TODO: Add more scanners workload + log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d", vuln, secret, malware) + return secret + malware + vuln + } } var upgrade atomic.Bool diff --git a/deepfence_server/controls/agent.go b/deepfence_server/controls/agent.go index e7e76706f9..d74a33aa17 100644 --- a/deepfence_server/controls/agent.go +++ b/deepfence_server/controls/agent.go @@ -69,7 +69,7 @@ func GetAgentActions(ctx context.Context, agentID model.AgentID, consoleURL stri actions = append(actions, upgradeActions...) } - scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, workNumToExtract) + scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, agentType, workNumToExtract) workNumToExtract -= len(scanActions) if scanErr == nil { actions = append(actions, scanActions...) @@ -312,7 +312,7 @@ func hasPendingAgentScans(ctx context.Context, client neo4j.DriverWithContext, n return len(records) != 0, err } -func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int) ([]controls.Action, error) { +func ExtractStartingAgentScans(ctx context.Context, nodeID string, agentType string, maxWork int) ([]controls.Action, error) { ctx, span := telemetry.NewSpan(ctx, "control", "extract-starting-agent-scans") defer span.End() @@ -340,14 +340,27 @@ func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int) } defer tx.Close(ctx) - r, err := tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) + var r neo4j.ResultWithContext + if agentType == controls.CLOUD_AGENT { + r, err = tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) -[:HOSTS]-> (c:CloudNode) WHERE s.status = '`+utils.ScanStatusStarting+`' + AND c.refresh_status = 'COMPLETE' AND s.retries < 3 WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP() WITH s RETURN s.trigger_action`, - map[string]interface{}{"id": nodeID, "max_work": maxWork}) + map[string]interface{}{"id": nodeID, "max_work": maxWork}) + } else { + r, err = tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) + WHERE s.status = '`+utils.ScanStatusStarting+`' + AND s.retries < 3 + WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work + SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP() + WITH s + RETURN s.trigger_action`, + map[string]interface{}{"id": nodeID, "max_work": maxWork}) + } if err != nil { return res, err @@ -659,15 +672,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string, defer tx.Close(ctx) r, err := tx.Run(ctx, ` - MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (c:CloudNode) - MATCH(r:CloudNodeRefresh{node_id:c.node_id}) - WHERE r.refresh=true - WITH HEAD(collect(r)) AS rnode - WHERE rnode IS NOT NULL - WITH rnode, rnode.node_id AS node_id - DETACH DELETE rnode - RETURN node_id`, - map[string]interface{}{"id": nodeID}) + MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (r:CloudNode) + WHERE r.refresh_status = '`+utils.ScanStatusQueued+`' + AND NOT COALESCE(r.cloud_compliance_scan_status, '') IN ['`+utils.ScanStatusStarting+`', '`+utils.ScanStatusInProgress+`'] + WITH r LIMIT $max_work + SET r.refresh_status = '`+utils.ScanStatusStarting+`', r.refresh_message = '' + WITH r + RETURN r.node_id, r.node_name AS account_id`, + map[string]interface{}{"id": nodeID, "max_work": 1}) // Maximum one account can be refreshed at a time if err != nil { return res, err @@ -683,13 +695,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string, for _, record := range records { var action controls.Action - if record.Values[0] == nil { + if record.Values[0] == nil || record.Values[1] == nil { log.Error().Msgf("Invalid CloudNode ID, skipping") continue } req := controls.RefreshResourcesRequest{} req.NodeId = record.Values[0].(string) + req.AccountID = record.Values[1].(string) req.NodeType = controls.CloudAccount reqBytes, err := json.Marshal(req) diff --git a/deepfence_server/handler/cloud_node.go b/deepfence_server/handler/cloud_node.go index 6e4e5585fd..0c764bb429 100644 --- a/deepfence_server/handler/cloud_node.go +++ b/deepfence_server/handler/cloud_node.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/deepfence/ThreatMapper/deepfence_server/model" + "github.com/deepfence/ThreatMapper/deepfence_server/reporters" reporters_scan "github.com/deepfence/ThreatMapper/deepfence_server/reporters/scan" ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -19,6 +20,9 @@ import ( var ( cloudAccountNodeType = ctl.ResourceTypeToString(ctl.CloudAccount) + refreshAccountFilter = reporters.FieldsFilters{ + ContainsFilter: reporters.ContainsFilter{FieldsValues: map[string][]interface{}{"refresh_status": {"COMPLETE", "ERROR"}}}, + } ) func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http.Request) { @@ -124,13 +128,19 @@ func (h *Handler) RefreshCloudAccountHandler(w http.ResponseWriter, r *http.Requ nodeIdentifiers[i] = model.NodeIdentifier{NodeID: id, NodeType: cloudAccountNodeType} } - cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers) + cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers, &refreshAccountFilter) if err != nil { log.Error().Msgf(err.Error()) h.respondError(&BadDecoding{err}, w) return } + if len(cloudNodeIds) == 0 { + // Refresh already in progress for all requested cloud accounts + w.WriteHeader(http.StatusNoContent) + return + } + resolvedRequest := model.CloudAccountRefreshReq{NodeIDs: make([]string, len(cloudNodeIds))} for i, id := range cloudNodeIds { resolvedRequest.NodeIDs[i] = id.NodeID diff --git a/deepfence_server/handler/scan_reports.go b/deepfence_server/handler/scan_reports.go index d2b1a66a7f..e8e17683c4 100644 --- a/deepfence_server/handler/scan_reports.go +++ b/deepfence_server/handler/scan_reports.go @@ -418,7 +418,7 @@ func (h *Handler) StartComplianceScanHandler(w http.ResponseWriter, r *http.Requ regular, k8s, _, _ := extractBulksNodes(reqs.NodeIDs) - cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular) + cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular, nil) if err != nil { h.respondError(err, w) return diff --git a/deepfence_server/model/cloud_node.go b/deepfence_server/model/cloud_node.go index 8a74aff137..acaa69c3f8 100644 --- a/deepfence_server/model/cloud_node.go +++ b/deepfence_server/model/cloud_node.go @@ -84,7 +84,7 @@ type CloudNodeAccountInfo struct { LastScanID string `json:"last_scan_id"` LastScanStatus string `json:"last_scan_status"` RefreshMessage string `json:"refresh_message"` - RefreshStatus string `json:"refresh_status" enum:"STARTING,IN_PROGRESS,ERROR,COMPLETE"` + RefreshStatus string `json:"refresh_status" enum:"QUEUED,STARTING,IN_PROGRESS,ERROR,COMPLETE"` ScanStatusMap map[string]int64 `json:"scan_status_map"` Version string `json:"version"` HostNodeID string `json:"host_node_id"` @@ -532,8 +532,8 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err if _, err = tx.Run(ctx, ` UNWIND $batch as cloudNode - MERGE (n:CloudNodeRefresh{node_id: cloudNode}) - SET n.refresh = true, n.updated_at = TIMESTAMP()`, + MATCH (m:CloudNode{node_id: cloudNode}) + SET m.refresh_status = '`+utils.ScanStatusQueued+`', m.refresh_message = ''`, map[string]interface{}{ "batch": c.NodeIDs, }); err != nil { @@ -542,50 +542,6 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err return tx.Commit(ctx) } -func (c *CloudAccountRefreshReq) GetCloudAccountRefresh(ctx context.Context) ([]string, error) { - - ctx, span := telemetry.NewSpan(ctx, "model", "get-cloud-account-refresh") - defer span.End() - - var updatedNodeIDs []string - driver, err := directory.Neo4jClient(ctx) - if err != nil { - return updatedNodeIDs, err - } - - session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) - defer session.Close(ctx) - - tx, err := session.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second)) - if err != nil { - return updatedNodeIDs, err - } - defer tx.Close(ctx) - - res, err := tx.Run(ctx, ` - UNWIND $batch as cloudNode - MATCH (n:CloudNodeRefresh{node_id: cloudNode}) - WHERE n.refresh=true - WITH n, n.node_id as deletedNodeID - DELETE n - RETURN deletedNodeID`, - map[string]interface{}{ - "batch": c.NodeIDs, - }) - if err != nil { - return updatedNodeIDs, err - } - recs, err := res.Collect(ctx) - if err != nil { - return updatedNodeIDs, err - } - - for _, rec := range recs { - updatedNodeIDs = append(updatedNodeIDs, rec.Values[0].(string)) - } - return updatedNodeIDs, tx.Commit(ctx) -} - type CloudAccountDeleteReq struct { NodeIDs []string `json:"node_ids" validate:"required,gt=0" required:"true"` } diff --git a/deepfence_server/reporters/scan/scan_reporters.go b/deepfence_server/reporters/scan/scan_reporters.go index c2a759b303..d923d28174 100644 --- a/deepfence_server/reporters/scan/scan_reporters.go +++ b/deepfence_server/reporters/scan/scan_reporters.go @@ -398,7 +398,7 @@ func GetPodContainerIDs(ctx context.Context, podIds []model.NodeIdentifier) ([]m return res, nil } -func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdentifier) ([]model.NodeIdentifier, error) { +func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdentifier, filters *reporters.FieldsFilters) ([]model.NodeIdentifier, error) { ctx, span := telemetry.NewSpan(ctx, "scan-reports", "get-cloud-account-ids") defer span.End() @@ -418,9 +418,15 @@ func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdenti } defer tx.Close(ctx) + filterClauses := mo.None[reporters.FieldsFilters]() + if filters != nil { + filterClauses = mo.Some(*filters) + } + nres, err := tx.Run(ctx, ` MATCH (n:CloudNode) WHERE n.node_id IN $node_ids + `+reporters.ParseFieldFilters2CypherWhereConditions(`n`, filterClauses, false)+` RETURN n.node_id, n.cloud_provider`, map[string]interface{}{"node_ids": NodeIdentifierToIDList(cloudProviderIds)}) if err != nil { @@ -432,6 +438,7 @@ func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdenti return res, err } orgNodeIds := []string{} + childNodeIDs := []string{} for _, rec := range recs { cloudProvider := rec.Values[1].(string) if cloudProvider == model.PostureProviderAWSOrg || cloudProvider == model.PostureProviderGCPOrg || cloudProvider == model.PostureProviderAzureOrg { @@ -442,13 +449,16 @@ func GetCloudAccountIDs(ctx context.Context, cloudProviderIds []model.NodeIdenti NodeID: rec.Values[0].(string), NodeType: controls.ResourceTypeToString(controls.CloudAccount), }) + childNodeIDs = append(childNodeIDs, rec.Values[0].(string)) } if len(orgNodeIds) > 0 { nres, err = tx.Run(ctx, ` MATCH (n:CloudNode) -[:IS_CHILD] -> (m) - WHERE n.node_id IN $node_ids + WHERE n.node_id IN $node_ids + AND NOT m.node_id IN $child_node_ids + `+reporters.ParseFieldFilters2CypherWhereConditions(`m`, filterClauses, false)+` RETURN m.node_id`, - map[string]interface{}{"node_ids": orgNodeIds}) + map[string]interface{}{"node_ids": orgNodeIds, "child_node_ids": childNodeIDs}) if err != nil { return res, err } diff --git a/deepfence_utils/controls/agent.go b/deepfence_utils/controls/agent.go index 49858bb795..7d74eabe7a 100644 --- a/deepfence_utils/controls/agent.go +++ b/deepfence_utils/controls/agent.go @@ -27,6 +27,7 @@ const ( StartCloudComplianceScan StopCloudComplianceScan CloudScannerJobCount + CloudScannerResourceRefreshCount ) type ScanResource int @@ -160,8 +161,9 @@ type CloudComplianceScanBenchmark struct { } type RefreshResourcesRequest struct { - NodeId string `json:"node_id" required:"true"` - NodeType ScanResource `json:"node_type" required:"true"` + NodeId string `json:"node_id" required:"true"` + AccountID string `json:"account_id" required:"true"` + NodeType ScanResource `json:"node_type" required:"true"` } type StopSecretScanRequest StartSecretScanRequest diff --git a/deepfence_utils/utils/constants.go b/deepfence_utils/utils/constants.go index f109c5f875..f018b9337f 100644 --- a/deepfence_utils/utils/constants.go +++ b/deepfence_utils/utils/constants.go @@ -81,6 +81,7 @@ const ( const ( ScanStatusSuccess = "COMPLETE" ScanStatusStarting = "STARTING" + ScanStatusQueued = "QUEUED" ScanStatusInProgress = "IN_PROGRESS" ScanStatusFailed = "ERROR" ScanStatusCancelPending = "CANCEL_PENDING" diff --git a/deepfence_utils/utils/ingesters/cloud_resource.go b/deepfence_utils/utils/ingesters/cloud_resource.go index a76792c436..f2a65fc499 100644 --- a/deepfence_utils/utils/ingesters/cloud_resource.go +++ b/deepfence_utils/utils/ingesters/cloud_resource.go @@ -192,6 +192,7 @@ type CloudResourceRefreshStatus struct { CloudNodeID string `json:"cloud_node_id"` RefreshMessage string `json:"refresh_message"` RefreshStatus string `json:"refresh_status"` + UpdatedAt int64 `json:"updated_at"` } func (c *CloudResourceRefreshStatus) ToMap() map[string]interface{} { diff --git a/deepfence_worker/ingesters/cloud_resource.go b/deepfence_worker/ingesters/cloud_resource.go index 035d58fbee..e9137d2ffb 100644 --- a/deepfence_worker/ingesters/cloud_resource.go +++ b/deepfence_worker/ingesters/cloud_resource.go @@ -4,12 +4,12 @@ import ( "context" "encoding/json" "fmt" + "sort" "time" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/telemetry" - "github.com/deepfence/ThreatMapper/deepfence_utils/utils" ingestersUtil "github.com/deepfence/ThreatMapper/deepfence_utils/utils/ingesters" "github.com/neo4j/neo4j-go-driver/v5/neo4j" ) @@ -276,42 +276,14 @@ func CommitFuncCloudResourceRefreshStatus(ctx context.Context, ns string, cs []i } func ResourceRefreshStatusToMaps(data []ingestersUtil.CloudResourceRefreshStatus) []map[string]interface{} { - statusBuff := map[string]map[string]interface{}{} - for _, i := range data { - statusMap := i.ToMap() + statuses := make([]map[string]interface{}, len(data)) - cloudNodeId, ok := statusMap["cloud_node_id"].(string) - if !ok { - log.Error().Msgf("failed to convert cloud_node_id to string, data: %v", statusMap) - continue - } - - newStatus, ok := statusMap["refresh_status"].(string) - if !ok { - log.Error().Msgf("failed to convert refresh_status to string, data: %v", statusMap) - continue - } + sort.Slice(data, func(i, j int) bool { + return data[i].UpdatedAt < data[j].UpdatedAt + }) - old, found := statusBuff[cloudNodeId] - if !found { - statusBuff[cloudNodeId] = statusMap - } else { - oldStatus, ok := old["refresh_status"].(string) - if !ok { - log.Error().Msgf("failed to convert refresh_status to string, data: %v", old) - continue - } - if newStatus != oldStatus { - if newStatus == utils.ScanStatusSuccess || - newStatus == utils.ScanStatusFailed || newStatus == utils.ScanStatusCancelled { - statusBuff[cloudNodeId] = statusMap - } - } - } - } - statuses := []map[string]interface{}{} - for _, v := range statusBuff { - statuses = append(statuses, v) + for i, d := range data { + statuses[i] = d.ToMap() } return statuses }