diff --git a/deepfence_ingester/ingesters/malware.go b/deepfence_ingester/ingesters/malware.go new file mode 100644 index 0000000000..d58fec2659 --- /dev/null +++ b/deepfence_ingester/ingesters/malware.go @@ -0,0 +1,96 @@ +package ingesters + +import ( + "encoding/json" + "time" + + "github.com/deepfence/ThreatMapper/deepfence_utils/directory" + "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/neo4j/neo4j-go-driver/v4/neo4j" +) + +type MalwareScanStatus struct { + Timestamp time.Time `json:"@timestamp"` + ContainerName string `json:"container_name"` + HostName string `json:"host_name"` + KubernetesClusterName string `json:"kubernetes_cluster_name"` + Masked string `json:"masked"` + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + NodeType string `json:"node_type"` + ScanID string `json:"scan_id"` + ScanStatus string `json:"scan_status"` +} + +type Malware struct { + // TODO: add malware struct +} + +func CommitFuncMalware(ns string, data []Malware) error { + ctx := directory.NewContextWithNameSpace(directory.NamespaceID(ns)) + driver, err := directory.Neo4jClient(ctx) + if err != nil { + return err + } + + session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + if err != nil { + return err + } + defer session.Close() + + tx, err := session.BeginTransaction() + if err != nil { + return err + } + defer tx.Close() + + // // TODO: add query to commit for malware results + log.Error().Msg("Not implemented") + + return tx.Commit() +} + +func CommitFuncMalwareScanStatus(ns string, data []MalwareScanStatus) error { + ctx := directory.NewContextWithNameSpace(directory.NamespaceID(ns)) + driver, err := directory.Neo4jClient(ctx) + if err != nil { + return err + } + + session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + if err != nil { + return err + } + defer session.Close() + + tx, err := session.BeginTransaction() + if err != nil { + return err + } + defer tx.Close() + + // TODO: add query to commit for scan status + log.Error().Msg("Not implemented") + + return tx.Commit() +} + +func MalwareToMaps(ms []Malware) []map[string]interface{} { + res := []map[string]interface{}{} + for _, v := range ms { + res = append(res, utils.ToMap(v)) + } + return res +} + +func (c Malware) ToMap() map[string]interface{} { + out, err := json.Marshal(c) + if err != nil { + return nil + } + bb := map[string]interface{}{} + _ = json.Unmarshal(out, &bb) + return bb +} diff --git a/deepfence_ingester/processors/common.go b/deepfence_ingester/processors/common.go index 526e5af78d..294bf03ce8 100644 --- a/deepfence_ingester/processors/common.go +++ b/deepfence_ingester/processors/common.go @@ -74,6 +74,14 @@ func StartKafkaProcessors(ctx context.Context) { utils.CLOUD_COMPLIANCE_SCAN_STATUS, desWrapper(ingesters.CommitFuncCloudComplianceScanStatus), ) + processors[utils.MALWARE_SCAN] = NewBulkProcessor( + utils.MALWARE_SCAN, + desWrapper(ingesters.CommitFuncMalware), + ) + processors[utils.MALWARE_SCAN_STATUS] = NewBulkProcessor( + utils.MALWARE_SCAN_STATUS, + desWrapper(ingesters.CommitFuncMalwareScanStatus), + ) for i := range processors { processors[i].Start(ctx)