Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cloud account refresh status #2212

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/YaraHunter
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ routes:
- local_path: var/log/fenced/cloud-scanner-log/cloud_scanner_status.log
remote_path: /deepfence/ingest/cloud-compliance-status

- local_path: var/log/fenced/cloud-resource-refresh-log/cloud_resource_refresh_status.log
remote_path: /deepfence/ingest/cloud-resource-refresh-status

- local_path: var/log/fenced/cloud-resources/cloud_resources.log
remote_path: /deepfence/ingest/cloud-resources
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/yara-rules
Submodule yara-rules updated 1 files
+1 −1 build-timestamp
5 changes: 5 additions & 0 deletions deepfence_server/handler/scan_reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ func (h *Handler) IngestCloudComplianceScanStatusReportHandler(w http.ResponseWr
ingestScanReportKafka(w, r, ingester, h.IngestChan)
}

func (h *Handler) IngestCloudResourceRefreshStatusReportHandler(w http.ResponseWriter, r *http.Request) {
ingester := ingesters.NewCloudResourceRefreshStatusIngester()
ingestScanReportKafka(w, r, ingester, h.IngestChan)
}

func ingestScanReportKafka[T any](
respWrite http.ResponseWriter,
req *http.Request,
Expand Down
38 changes: 38 additions & 0 deletions deepfence_server/ingesters/cloud_resource_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,41 @@ func (tc *CloudResourceIngester) Ingest(

return nil
}

type CloudResourceRefreshStatusIngester struct{}

func NewCloudResourceRefreshStatusIngester() KafkaIngester[[]ingestersUtil.CloudResourceRefreshStatus] {
return &CloudResourceRefreshStatusIngester{}
}

func (tc *CloudResourceRefreshStatusIngester) Ingest(
ctx context.Context,
cs []ingestersUtil.CloudResourceRefreshStatus,
ingestC chan *kgo.Record,
) error {

tenantID, err := directory.ExtractNamespace(ctx)
if err != nil {
return err
}

rh := []kgo.RecordHeader{
{Key: "namespace", Value: []byte(tenantID)},
}

for _, c := range cs {
cb, err := json.Marshal(c)
if err != nil {
log.Error().Msg(err.Error())
} else {
ingestC <- &kgo.Record{
Topic: utils.CloudResourceRefreshStatus,
ramanan-ravi marked this conversation as resolved.
Show resolved Hide resolved
Value: cb,
Headers: rh,
}
}
}

return nil

}
2 changes: 2 additions & 0 deletions deepfence_server/model/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type CloudNodeAccountInfo struct {
Active bool `json:"active"`
LastScanID string `json:"last_scan_id"`
LastScanStatus string `json:"last_scan_status"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status"`
ScanStatusMap map[string]int64 `json:"scan_status_map"`
Version string `json:"version"`
HostNodeID string `json:"host_node_id"`
Expand Down
8 changes: 4 additions & 4 deletions deepfence_server/reporters/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func searchGenericDirectNodeReport[T reporters.Cypherable](ctx context.Context,
}

var (
searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"}
searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "refresh_status", "refresh_message", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"}
)

func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWindow) ([]model.CloudNodeAccountInfo, error) {
Expand Down Expand Up @@ -388,11 +388,11 @@ func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWin
}
CALL {
WITH x MATCH (n:` + dummy.NodeType() + `{node_id: x})
RETURN n.node_name as node_name, n.account_name as account_name, n.active as active, n.version as version
RETURN n.node_name as node_name, n.account_name as account_name, n.refresh_status as refresh_status, n.refresh_message as refresh_message, n.active as active, n.version as version
}
WITH x, node_name, account_name, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` +
WITH x, node_name, account_name, refresh_status, refresh_message, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` +
reporters.ParseFieldFilters2CypherWhereConditions("", mo.Some(scanFilter), true) +
`RETURN x as node_id, node_name, account_name, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
`RETURN x as node_id, node_name, account_name, refresh_status, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
reporters.OrderFilter2CypherCondition("", orderFilters, nil) + fw.FetchWindow2CypherQuery()

log.Debug().Msgf("search cloud node query: %v", query)
Expand Down
1 change: 1 addition & 0 deletions deepfence_server/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func SetupRoutes(r *chi.Mux, serverPort string, serveOpenapiDocs bool, ingestC c
r.Post("/malware-scan-logs", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestMalwareScanStatusHandler))
r.Post("/cloud-compliance", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudComplianceReportHandler))
r.Post("/cloud-compliance-status", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudComplianceScanStatusReportHandler))
r.Post("/cloud-resource-refresh-status", dfHandler.AuthHandler(ResourceScanReport, PermissionIngest, dfHandler.IngestCloudResourceRefreshStatusReportHandler))
})

r.Route("/cloud-node", func(r chi.Router) {
Expand Down
32 changes: 17 additions & 15 deletions deepfence_utils/utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@ const (

// kafka topics
const (
AuditLogs = "audit-logs"
VulnerabilityScan = "vulnerability-scan"
VulnerabilityScanStatus = "vulnerability-scan-status"
SecretScan = "secret-scan"
SecretScanStatus = "secret-scan-status"
MalwareScan = "malware-scan"
MalwareScanStatus = "malware-scan-status"
SbomArtifacts = "sbom-artifact"
SbomCVEScan = "sbom-cve-scan"
CloudComplianceScan = "cloud-compliance-scan"
CloudComplianceScanStatus = "cloud-compliance-scan-status"
ComplianceScan = "compliance-scan"
ComplianceScanStatus = "compliance-scan-status"
CloudTrailAlerts = "cloudtrail-alert"
CloudResource = "cloud-resource"
AuditLogs = "audit-logs"
VulnerabilityScan = "vulnerability-scan"
VulnerabilityScanStatus = "vulnerability-scan-status"
SecretScan = "secret-scan"
SecretScanStatus = "secret-scan-status"
MalwareScan = "malware-scan"
MalwareScanStatus = "malware-scan-status"
SbomArtifacts = "sbom-artifact"
SbomCVEScan = "sbom-cve-scan"
CloudComplianceScan = "cloud-compliance-scan"
CloudComplianceScanStatus = "cloud-compliance-scan-status"
CloudResourceRefreshStatus = "cloud-resource-refresh-status"
ComplianceScan = "compliance-scan"
ComplianceScanStatus = "compliance-scan-status"
CloudTrailAlerts = "cloudtrail-alert"
CloudResource = "cloud-resource"
)

// task names
Expand Down Expand Up @@ -202,6 +203,7 @@ var Topics = []string{
MalwareScan, MalwareScanStatus,
SbomArtifacts, SbomCVEScan,
CloudComplianceScan, CloudComplianceScanStatus,
CloudResourceRefreshStatus,
ComplianceScan, ComplianceScanStatus,
CloudTrailAlerts,
AuditLogs,
Expand Down
14 changes: 14 additions & 0 deletions deepfence_utils/utils/ingesters/cloud_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,17 @@ func convertStructFieldToJSONString(bb map[string]interface{}, key string) map[s
}
return bb
}

type CloudResourceRefreshStatus struct {
CloudNodeID string `json:"cloud_node_id"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status"`
}

func (c *CloudResourceRefreshStatus) ToMap() map[string]interface{} {
return map[string]interface{}{
"cloud_node_id": c.CloudNodeID,
"refresh_message": c.RefreshMessage,
"refresh_status": c.RefreshStatus,
}
}
76 changes: 76 additions & 0 deletions deepfence_worker/ingesters/cloud_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/deepfence/ThreatMapper/deepfence_utils/telemetry"
"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
ingestersUtil "github.com/deepfence/ThreatMapper/deepfence_utils/utils/ingesters"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
Expand Down Expand Up @@ -239,3 +240,78 @@ func LinkNodesWithCloudResources(ctx context.Context) error {

return tx.Commit(ctx)
}

func CommitFuncCloudResourceRefreshStatus(ctx context.Context, ns string, cs []ingestersUtil.CloudResourceRefreshStatus) error {

ctx = directory.ContextWithNameSpace(ctx, directory.NamespaceID(ns))

ctx, span := telemetry.NewSpan(ctx, "ingesters", "commit-func-cloud-resource")
defer span.End()

driver, err := directory.Neo4jClient(ctx)
if err != nil {
return err
}

session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close(ctx)

tx, err := session.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second))
if err != nil {
return err
}
defer tx.Close(ctx)

_, err = tx.Run(ctx, `
UNWIND $batch as row
MATCH (n:CloudNode{node_id: row.cloud_node_id})
SET n.refresh_status = row.refresh_status,
n.refresh_message = row.refresh_message`,
map[string]interface{}{
"batch": ResourceRefreshStatusToMaps(cs),
},
)

return tx.Commit(ctx)
}

func ResourceRefreshStatusToMaps(data []ingestersUtil.CloudResourceRefreshStatus) []map[string]interface{} {
statusBuff := map[string]map[string]interface{}{}
for _, i := range data {
statusMap := i.ToMap()

cloudNodeId, ok := statusMap["cloud_node_id"].(string)
if !ok {
log.Error().Msgf("failed to convert cloud_node_id to string, data: %v", statusMap)
continue
}

newStatus, ok := statusMap["refresh_status"].(string)
if !ok {
log.Error().Msgf("failed to convert refresh_status to string, data: %v", statusMap)
continue
}

old, found := statusBuff[cloudNodeId]
if !found {
statusBuff[cloudNodeId] = statusMap
} else {
oldStatus, ok := old["refresh_status"].(string)
if !ok {
log.Error().Msgf("failed to convert refresh_status to string, data: %v", old)
continue
}
if newStatus != oldStatus {
if newStatus == utils.ScanStatusSuccess ||
newStatus == utils.ScanStatusFailed || newStatus == utils.ScanStatusCancelled {
statusBuff[cloudNodeId] = statusMap
}
}
}
}
statuses := []map[string]interface{}{}
for _, v := range statusBuff {
statuses = append(statuses, v)
}
return statuses
}
6 changes: 6 additions & 0 deletions deepfence_worker/processors/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func NewKafkaProcessors(namespace string) map[string]*BulkProcessor {
desWrapper(ingesters.CommitFuncStatus[ingestersUtil.CloudComplianceScanStatus](utils.NEO4JCloudComplianceScan))),
)

processors[utils.TopicWithNamespace(utils.CloudResourceRefreshStatus, namespace)] = NewBulkProcessor(
utils.CloudResourceRefreshStatus, namespace,
telemetryWrapper(utils.CloudResourceRefreshStatus,
desWrapper(ingesters.CommitFuncCloudResourceRefreshStatus)),
)

processors[utils.TopicWithNamespace(utils.CloudResource, namespace)] = NewBulkProcessorWithSize(
utils.CloudResource, namespace,
telemetryWrapper(utils.CloudResource,
Expand Down
Loading