diff --git a/deepfence_agent/plugins/YaraHunter b/deepfence_agent/plugins/YaraHunter index 5a4252b7cf..3fea260d8d 160000 --- a/deepfence_agent/plugins/YaraHunter +++ b/deepfence_agent/plugins/YaraHunter @@ -1 +1 @@ -Subproject commit 5a4252b7cfb3154a17d0925477a20e07f53fef23 +Subproject commit 3fea260d8df7176deef162e8abc5c1d57d2e1c7b diff --git a/deepfence_agent/plugins/cloud-scanner b/deepfence_agent/plugins/cloud-scanner index aed8456bdf..a67667f0f6 160000 --- a/deepfence_agent/plugins/cloud-scanner +++ b/deepfence_agent/plugins/cloud-scanner @@ -1 +1 @@ -Subproject commit aed8456bdfc6d37b9da1d3ed338e42c109e74c46 +Subproject commit a67667f0f66fd513f66e076f74e4400bc97f0901 diff --git a/deepfence_agent/plugins/deepfence_shipper/routes_cloudscanner.yaml b/deepfence_agent/plugins/deepfence_shipper/routes_cloudscanner.yaml index f9d17297fc..a6dd874fd7 100644 --- a/deepfence_agent/plugins/deepfence_shipper/routes_cloudscanner.yaml +++ b/deepfence_agent/plugins/deepfence_shipper/routes_cloudscanner.yaml @@ -8,5 +8,8 @@ routes: - local_path: var/log/fenced/cloud-scanner-log/cloud_scanner_status.log remote_path: /deepfence/ingest/cloud-compliance-status + - local_path: var/log/fenced/cloud-resource-refresh-log/cloud_resource_refresh_status.log + remote_path: /deepfence/ingest/cloud-resource-refresh-status + - local_path: var/log/fenced/cloud-resources/cloud_resources.log remote_path: /deepfence/ingest/cloud-resources diff --git a/deepfence_agent/plugins/yara-rules b/deepfence_agent/plugins/yara-rules index c34eb6901c..8b173557d8 160000 --- a/deepfence_agent/plugins/yara-rules +++ b/deepfence_agent/plugins/yara-rules @@ -1 +1 @@ -Subproject commit c34eb6901c2d8f5fffdcacf4a6d2900e03ae01ae +Subproject commit 8b173557d85c282bef4f7d8201bc0b2e7612c329 diff --git a/deepfence_server/handler/scan_reports.go b/deepfence_server/handler/scan_reports.go index ce38594ec4..01b7857014 100644 --- a/deepfence_server/handler/scan_reports.go +++ b/deepfence_server/handler/scan_reports.go @@ -754,6 +754,11 @@ func (h *Handler) IngestCloudComplianceScanStatusReportHandler(w http.ResponseWr ingestScanReportKafka(w, r, ingester, h.IngestChan) } +func (h *Handler) IngestCloudResourceRefreshStatusReportHandler(w http.ResponseWriter, r *http.Request) { + ingester := ingesters.NewCloudResourceRefreshStatusIngester() + ingestScanReportKafka(w, r, ingester, h.IngestChan) +} + func ingestScanReportKafka[T any]( respWrite http.ResponseWriter, req *http.Request, diff --git a/deepfence_server/ingesters/cloud_resource_ingester.go b/deepfence_server/ingesters/cloud_resource_ingester.go index c67ccd0eff..9872a686f5 100644 --- a/deepfence_server/ingesters/cloud_resource_ingester.go +++ b/deepfence_server/ingesters/cloud_resource_ingester.go @@ -47,3 +47,41 @@ func (tc *CloudResourceIngester) Ingest( return nil } + +type CloudResourceRefreshStatusIngester struct{} + +func NewCloudResourceRefreshStatusIngester() KafkaIngester[[]ingestersUtil.CloudResourceRefreshStatus] { + return &CloudResourceRefreshStatusIngester{} +} + +func (tc *CloudResourceRefreshStatusIngester) Ingest( + ctx context.Context, + cs []ingestersUtil.CloudResourceRefreshStatus, + ingestC chan *kgo.Record, +) error { + + tenantID, err := directory.ExtractNamespace(ctx) + if err != nil { + return err + } + + rh := []kgo.RecordHeader{ + {Key: "namespace", Value: []byte(tenantID)}, + } + + for _, c := range cs { + cb, err := json.Marshal(c) + if err != nil { + log.Error().Msg(err.Error()) + } else { + ingestC <- &kgo.Record{ + Topic: utils.CloudResourceRefreshStatus, + Value: cb, + Headers: rh, + } + } + } + + return nil + +} diff --git a/deepfence_server/model/cloud_node.go b/deepfence_server/model/cloud_node.go index d49ca8381a..3de7dc845a 100644 --- a/deepfence_server/model/cloud_node.go +++ b/deepfence_server/model/cloud_node.go @@ -83,6 +83,8 @@ type CloudNodeAccountInfo struct { Active bool `json:"active"` LastScanID string `json:"last_scan_id"` LastScanStatus string `json:"last_scan_status"` + RefreshMessage string `json:"refresh_message"` + RefreshStatus string `json:"refresh_status"` ScanStatusMap map[string]int64 `json:"scan_status_map"` Version string `json:"version"` HostNodeID string `json:"host_node_id"` diff --git a/deepfence_server/reporters/search/search.go b/deepfence_server/reporters/search/search.go index d68dd0a2c3..e51e3d0b03 100644 --- a/deepfence_server/reporters/search/search.go +++ b/deepfence_server/reporters/search/search.go @@ -329,7 +329,7 @@ func searchGenericDirectNodeReport[T reporters.Cypherable](ctx context.Context, } var ( - searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"} + searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "refresh_status", "refresh_message", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"} ) func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWindow) ([]model.CloudNodeAccountInfo, error) { @@ -388,11 +388,11 @@ func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWin } CALL { WITH x MATCH (n:` + dummy.NodeType() + `{node_id: x}) - RETURN n.node_name as node_name, n.account_name as account_name, n.active as active, n.version as version + RETURN n.node_name as node_name, n.account_name as account_name, n.refresh_status as refresh_status, n.refresh_message as refresh_message, n.active as active, n.version as version } - WITH x, node_name, account_name, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + + WITH x, node_name, account_name, refresh_status, refresh_message, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.ParseFieldFilters2CypherWhereConditions("", mo.Some(scanFilter), true) + - `RETURN x as node_id, node_name, account_name, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) + + `RETURN x as node_id, node_name, account_name, refresh_status, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) + reporters.OrderFilter2CypherCondition("", orderFilters, nil) + fw.FetchWindow2CypherQuery() log.Debug().Msgf("search cloud node query: %v", query) diff --git a/deepfence_server/router/router.go b/deepfence_server/router/router.go index 8d91765987..81ac0438a7 100644 --- a/deepfence_server/router/router.go +++ b/deepfence_server/router/router.go @@ -356,6 +356,7 @@ func SetupRoutes(r *chi.Mux, serverPort string, serveOpenapiDocs bool, ingestC c r.Post("/malware-scan-logs", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestMalwareScanStatusHandler)) r.Post("/cloud-compliance", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudComplianceReportHandler)) r.Post("/cloud-compliance-status", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudComplianceScanStatusReportHandler)) + r.Post("/cloud-resource-refresh-status", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudResourceRefreshStatusReportHandler)) }) r.Route("/cloud-node", func(r chi.Router) { diff --git a/deepfence_utils/utils/constants.go b/deepfence_utils/utils/constants.go index f95fe5b918..e08546f6d3 100644 --- a/deepfence_utils/utils/constants.go +++ b/deepfence_utils/utils/constants.go @@ -11,21 +11,22 @@ const ( // kafka topics const ( - AuditLogs = "audit-logs" - VulnerabilityScan = "vulnerability-scan" - VulnerabilityScanStatus = "vulnerability-scan-status" - SecretScan = "secret-scan" - SecretScanStatus = "secret-scan-status" - MalwareScan = "malware-scan" - MalwareScanStatus = "malware-scan-status" - SbomArtifacts = "sbom-artifact" - SbomCVEScan = "sbom-cve-scan" - CloudComplianceScan = "cloud-compliance-scan" - CloudComplianceScanStatus = "cloud-compliance-scan-status" - ComplianceScan = "compliance-scan" - ComplianceScanStatus = "compliance-scan-status" - CloudTrailAlerts = "cloudtrail-alert" - CloudResource = "cloud-resource" + AuditLogs = "audit-logs" + VulnerabilityScan = "vulnerability-scan" + VulnerabilityScanStatus = "vulnerability-scan-status" + SecretScan = "secret-scan" + SecretScanStatus = "secret-scan-status" + MalwareScan = "malware-scan" + MalwareScanStatus = "malware-scan-status" + SbomArtifacts = "sbom-artifact" + SbomCVEScan = "sbom-cve-scan" + CloudComplianceScan = "cloud-compliance-scan" + CloudComplianceScanStatus = "cloud-compliance-scan-status" + CloudResourceRefreshStatus = "cloud-resource-refresh-status" + ComplianceScan = "compliance-scan" + ComplianceScanStatus = "compliance-scan-status" + CloudTrailAlerts = "cloudtrail-alert" + CloudResource = "cloud-resource" ) // task names @@ -202,6 +203,7 @@ var Topics = []string{ MalwareScan, MalwareScanStatus, SbomArtifacts, SbomCVEScan, CloudComplianceScan, CloudComplianceScanStatus, + CloudResourceRefreshStatus, ComplianceScan, ComplianceScanStatus, CloudTrailAlerts, AuditLogs, diff --git a/deepfence_utils/utils/ingesters/cloud_resource.go b/deepfence_utils/utils/ingesters/cloud_resource.go index c64d30a15b..a76792c436 100644 --- a/deepfence_utils/utils/ingesters/cloud_resource.go +++ b/deepfence_utils/utils/ingesters/cloud_resource.go @@ -187,3 +187,17 @@ func convertStructFieldToJSONString(bb map[string]interface{}, key string) map[s } return bb } + +type CloudResourceRefreshStatus struct { + CloudNodeID string `json:"cloud_node_id"` + RefreshMessage string `json:"refresh_message"` + RefreshStatus string `json:"refresh_status"` +} + +func (c *CloudResourceRefreshStatus) ToMap() map[string]interface{} { + return map[string]interface{}{ + "cloud_node_id": c.CloudNodeID, + "refresh_message": c.RefreshMessage, + "refresh_status": c.RefreshStatus, + } +} diff --git a/deepfence_worker/ingesters/cloud_resource.go b/deepfence_worker/ingesters/cloud_resource.go index 6078d670bb..035d58fbee 100644 --- a/deepfence_worker/ingesters/cloud_resource.go +++ b/deepfence_worker/ingesters/cloud_resource.go @@ -9,6 +9,7 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/telemetry" + "github.com/deepfence/ThreatMapper/deepfence_utils/utils" ingestersUtil "github.com/deepfence/ThreatMapper/deepfence_utils/utils/ingesters" "github.com/neo4j/neo4j-go-driver/v5/neo4j" ) @@ -239,3 +240,78 @@ func LinkNodesWithCloudResources(ctx context.Context) error { return tx.Commit(ctx) } + +func CommitFuncCloudResourceRefreshStatus(ctx context.Context, ns string, cs []ingestersUtil.CloudResourceRefreshStatus) error { + + ctx = directory.ContextWithNameSpace(ctx, directory.NamespaceID(ns)) + + ctx, span := telemetry.NewSpan(ctx, "ingesters", "commit-func-cloud-resource") + defer span.End() + + driver, err := directory.Neo4jClient(ctx) + if err != nil { + return err + } + + session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + defer session.Close(ctx) + + tx, err := session.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second)) + if err != nil { + return err + } + defer tx.Close(ctx) + + _, err = tx.Run(ctx, ` + UNWIND $batch as row + MATCH (n:CloudNode{node_id: row.cloud_node_id}) + SET n.refresh_status = row.refresh_status, + n.refresh_message = row.refresh_message`, + map[string]interface{}{ + "batch": ResourceRefreshStatusToMaps(cs), + }, + ) + + return tx.Commit(ctx) +} + +func ResourceRefreshStatusToMaps(data []ingestersUtil.CloudResourceRefreshStatus) []map[string]interface{} { + statusBuff := map[string]map[string]interface{}{} + for _, i := range data { + statusMap := i.ToMap() + + cloudNodeId, ok := statusMap["cloud_node_id"].(string) + if !ok { + log.Error().Msgf("failed to convert cloud_node_id to string, data: %v", statusMap) + continue + } + + newStatus, ok := statusMap["refresh_status"].(string) + if !ok { + log.Error().Msgf("failed to convert refresh_status to string, data: %v", statusMap) + continue + } + + old, found := statusBuff[cloudNodeId] + if !found { + statusBuff[cloudNodeId] = statusMap + } else { + oldStatus, ok := old["refresh_status"].(string) + if !ok { + log.Error().Msgf("failed to convert refresh_status to string, data: %v", old) + continue + } + if newStatus != oldStatus { + if newStatus == utils.ScanStatusSuccess || + newStatus == utils.ScanStatusFailed || newStatus == utils.ScanStatusCancelled { + statusBuff[cloudNodeId] = statusMap + } + } + } + } + statuses := []map[string]interface{}{} + for _, v := range statusBuff { + statuses = append(statuses, v) + } + return statuses +} diff --git a/deepfence_worker/processors/common.go b/deepfence_worker/processors/common.go index fb25fbbcd0..49f84b0624 100644 --- a/deepfence_worker/processors/common.go +++ b/deepfence_worker/processors/common.go @@ -111,6 +111,12 @@ func NewKafkaProcessors(namespace string) map[string]*BulkProcessor { desWrapper(ingesters.CommitFuncStatus[ingestersUtil.CloudComplianceScanStatus](utils.NEO4JCloudComplianceScan))), ) + processors[utils.TopicWithNamespace(utils.CloudResourceRefreshStatus, namespace)] = NewBulkProcessor( + utils.CloudResourceRefreshStatus, namespace, + telemetryWrapper(utils.CloudResourceRefreshStatus, + desWrapper(ingesters.CommitFuncCloudResourceRefreshStatus)), + ) + processors[utils.TopicWithNamespace(utils.CloudResource, namespace)] = NewBulkProcessorWithSize( utils.CloudResource, namespace, telemetryWrapper(utils.CloudResource,