Skip to content

Commit

Permalink
Add cloud account refresh status (#2212)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanan-ravi authored Jun 24, 2024
1 parent 45195f9 commit ccd78b7
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 22 deletions.
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/YaraHunter
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ routes:
- local_path: var/log/fenced/cloud-scanner-log/cloud_scanner_status.log
remote_path: /deepfence/ingest/cloud-compliance-status

- local_path: var/log/fenced/cloud-resource-refresh-log/cloud_resource_refresh_status.log
remote_path: /deepfence/ingest/cloud-resource-refresh-status

- local_path: var/log/fenced/cloud-resources/cloud_resources.log
remote_path: /deepfence/ingest/cloud-resources
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/yara-rules
Submodule yara-rules updated 1 files
+1 −1 build-timestamp
5 changes: 5 additions & 0 deletions deepfence_server/handler/scan_reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ func (h *Handler) IngestCloudComplianceScanStatusReportHandler(w http.ResponseWr
ingestScanReportKafka(w, r, ingester, h.IngestChan)
}

func (h *Handler) IngestCloudResourceRefreshStatusReportHandler(w http.ResponseWriter, r *http.Request) {
ingester := ingesters.NewCloudResourceRefreshStatusIngester()
ingestScanReportKafka(w, r, ingester, h.IngestChan)
}

func ingestScanReportKafka[T any](
respWrite http.ResponseWriter,
req *http.Request,
Expand Down
38 changes: 38 additions & 0 deletions deepfence_server/ingesters/cloud_resource_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,41 @@ func (tc *CloudResourceIngester) Ingest(

return nil
}

type CloudResourceRefreshStatusIngester struct{}

func NewCloudResourceRefreshStatusIngester() KafkaIngester[[]ingestersUtil.CloudResourceRefreshStatus] {
return &CloudResourceRefreshStatusIngester{}
}

func (tc *CloudResourceRefreshStatusIngester) Ingest(
ctx context.Context,
cs []ingestersUtil.CloudResourceRefreshStatus,
ingestC chan *kgo.Record,
) error {

tenantID, err := directory.ExtractNamespace(ctx)
if err != nil {
return err
}

rh := []kgo.RecordHeader{
{Key: "namespace", Value: []byte(tenantID)},
}

for _, c := range cs {
cb, err := json.Marshal(c)
if err != nil {
log.Error().Msg(err.Error())
} else {
ingestC <- &kgo.Record{
Topic: utils.CloudResourceRefreshStatus,
Value: cb,
Headers: rh,
}
}
}

return nil

}
2 changes: 2 additions & 0 deletions deepfence_server/model/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type CloudNodeAccountInfo struct {
Active bool `json:"active"`
LastScanID string `json:"last_scan_id"`
LastScanStatus string `json:"last_scan_status"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status"`
ScanStatusMap map[string]int64 `json:"scan_status_map"`
Version string `json:"version"`
HostNodeID string `json:"host_node_id"`
Expand Down
8 changes: 4 additions & 4 deletions deepfence_server/reporters/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func searchGenericDirectNodeReport[T reporters.Cypherable](ctx context.Context,
}

var (
searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"}
searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "refresh_status", "refresh_message", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"}
)

func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWindow) ([]model.CloudNodeAccountInfo, error) {
Expand Down Expand Up @@ -388,11 +388,11 @@ func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWin
}
CALL {
WITH x MATCH (n:` + dummy.NodeType() + `{node_id: x})
RETURN n.node_name as node_name, n.account_name as account_name, n.active as active, n.version as version
RETURN n.node_name as node_name, n.account_name as account_name, n.refresh_status as refresh_status, n.refresh_message as refresh_message, n.active as active, n.version as version
}
WITH x, node_name, account_name, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` +
WITH x, node_name, account_name, refresh_status, refresh_message, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` +
reporters.ParseFieldFilters2CypherWhereConditions("", mo.Some(scanFilter), true) +
`RETURN x as node_id, node_name, account_name, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
`RETURN x as node_id, node_name, account_name, refresh_status, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
reporters.OrderFilter2CypherCondition("", orderFilters, nil) + fw.FetchWindow2CypherQuery()

log.Debug().Msgf("search cloud node query: %v", query)
Expand Down
1 change: 1 addition & 0 deletions deepfence_server/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func SetupRoutes(r *chi.Mux, serverPort string, serveOpenapiDocs bool, ingestC c
r.Post("/malware-scan-logs", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestMalwareScanStatusHandler))
r.Post("/cloud-compliance", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudComplianceReportHandler))
r.Post("/cloud-compliance-status", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudComplianceScanStatusReportHandler))
r.Post("/cloud-resource-refresh-status", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudResourceRefreshStatusReportHandler))
})

r.Route("/cloud-node", func(r chi.Router) {
Expand Down
32 changes: 17 additions & 15 deletions deepfence_utils/utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@ const (

// kafka topics
const (
AuditLogs = "audit-logs"
VulnerabilityScan = "vulnerability-scan"
VulnerabilityScanStatus = "vulnerability-scan-status"
SecretScan = "secret-scan"
SecretScanStatus = "secret-scan-status"
MalwareScan = "malware-scan"
MalwareScanStatus = "malware-scan-status"
SbomArtifacts = "sbom-artifact"
SbomCVEScan = "sbom-cve-scan"
CloudComplianceScan = "cloud-compliance-scan"
CloudComplianceScanStatus = "cloud-compliance-scan-status"
ComplianceScan = "compliance-scan"
ComplianceScanStatus = "compliance-scan-status"
CloudTrailAlerts = "cloudtrail-alert"
CloudResource = "cloud-resource"
AuditLogs = "audit-logs"
VulnerabilityScan = "vulnerability-scan"
VulnerabilityScanStatus = "vulnerability-scan-status"
SecretScan = "secret-scan"
SecretScanStatus = "secret-scan-status"
MalwareScan = "malware-scan"
MalwareScanStatus = "malware-scan-status"
SbomArtifacts = "sbom-artifact"
SbomCVEScan = "sbom-cve-scan"
CloudComplianceScan = "cloud-compliance-scan"
CloudComplianceScanStatus = "cloud-compliance-scan-status"
CloudResourceRefreshStatus = "cloud-resource-refresh-status"
ComplianceScan = "compliance-scan"
ComplianceScanStatus = "compliance-scan-status"
CloudTrailAlerts = "cloudtrail-alert"
CloudResource = "cloud-resource"
)

// task names
Expand Down Expand Up @@ -202,6 +203,7 @@ var Topics = []string{
MalwareScan, MalwareScanStatus,
SbomArtifacts, SbomCVEScan,
CloudComplianceScan, CloudComplianceScanStatus,
CloudResourceRefreshStatus,
ComplianceScan, ComplianceScanStatus,
CloudTrailAlerts,
AuditLogs,
Expand Down
14 changes: 14 additions & 0 deletions deepfence_utils/utils/ingesters/cloud_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,17 @@ func convertStructFieldToJSONString(bb map[string]interface{}, key string) map[s
}
return bb
}

type CloudResourceRefreshStatus struct {
CloudNodeID string `json:"cloud_node_id"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status"`
}

func (c *CloudResourceRefreshStatus) ToMap() map[string]interface{} {
return map[string]interface{}{
"cloud_node_id": c.CloudNodeID,
"refresh_message": c.RefreshMessage,
"refresh_status": c.RefreshStatus,
}
}
76 changes: 76 additions & 0 deletions deepfence_worker/ingesters/cloud_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/deepfence/ThreatMapper/deepfence_utils/telemetry"
"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
ingestersUtil "github.com/deepfence/ThreatMapper/deepfence_utils/utils/ingesters"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
Expand Down Expand Up @@ -239,3 +240,78 @@ func LinkNodesWithCloudResources(ctx context.Context) error {

return tx.Commit(ctx)
}

func CommitFuncCloudResourceRefreshStatus(ctx context.Context, ns string, cs []ingestersUtil.CloudResourceRefreshStatus) error {

ctx = directory.ContextWithNameSpace(ctx, directory.NamespaceID(ns))

ctx, span := telemetry.NewSpan(ctx, "ingesters", "commit-func-cloud-resource")
defer span.End()

driver, err := directory.Neo4jClient(ctx)
if err != nil {
return err
}

session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close(ctx)

tx, err := session.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second))
if err != nil {
return err
}
defer tx.Close(ctx)

_, err = tx.Run(ctx, `
UNWIND $batch as row
MATCH (n:CloudNode{node_id: row.cloud_node_id})
SET n.refresh_status = row.refresh_status,
n.refresh_message = row.refresh_message`,
map[string]interface{}{
"batch": ResourceRefreshStatusToMaps(cs),
},
)

return tx.Commit(ctx)
}

func ResourceRefreshStatusToMaps(data []ingestersUtil.CloudResourceRefreshStatus) []map[string]interface{} {
statusBuff := map[string]map[string]interface{}{}
for _, i := range data {
statusMap := i.ToMap()

cloudNodeId, ok := statusMap["cloud_node_id"].(string)
if !ok {
log.Error().Msgf("failed to convert cloud_node_id to string, data: %v", statusMap)
continue
}

newStatus, ok := statusMap["refresh_status"].(string)
if !ok {
log.Error().Msgf("failed to convert refresh_status to string, data: %v", statusMap)
continue
}

old, found := statusBuff[cloudNodeId]
if !found {
statusBuff[cloudNodeId] = statusMap
} else {
oldStatus, ok := old["refresh_status"].(string)
if !ok {
log.Error().Msgf("failed to convert refresh_status to string, data: %v", old)
continue
}
if newStatus != oldStatus {
if newStatus == utils.ScanStatusSuccess ||
newStatus == utils.ScanStatusFailed || newStatus == utils.ScanStatusCancelled {
statusBuff[cloudNodeId] = statusMap
}
}
}
}
statuses := []map[string]interface{}{}
for _, v := range statusBuff {
statuses = append(statuses, v)
}
return statuses
}
6 changes: 6 additions & 0 deletions deepfence_worker/processors/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func NewKafkaProcessors(namespace string) map[string]*BulkProcessor {
desWrapper(ingesters.CommitFuncStatus[ingestersUtil.CloudComplianceScanStatus](utils.NEO4JCloudComplianceScan))),
)

processors[utils.TopicWithNamespace(utils.CloudResourceRefreshStatus, namespace)] = NewBulkProcessor(
utils.CloudResourceRefreshStatus, namespace,
telemetryWrapper(utils.CloudResourceRefreshStatus,
desWrapper(ingesters.CommitFuncCloudResourceRefreshStatus)),
)

processors[utils.TopicWithNamespace(utils.CloudResource, namespace)] = NewBulkProcessorWithSize(
utils.CloudResource, namespace,
telemetryWrapper(utils.CloudResource,
Expand Down

0 comments on commit ccd78b7

Please sign in to comment.