Skip to content

Commit

Permalink
Add Cloud Nodes Register and List API #838 (#839)
Browse files Browse the repository at this point in the history
* Add APIs to register and list cloud nodes

* Add debug logs and total count to cloud node account list

* Remove commented code, cleanup unused code, move cloud provider constant
  • Loading branch information
jatin-baweja authored Jan 23, 2023
1 parent fc91256 commit 40ca5c3
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 0 deletions.
1 change: 1 addition & 0 deletions deepfence_server/apiDocs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
tagCompliance = "Compliance"
tagCloudCompliance = "Cloud Compliance"
tagCloudResources = "Cloud Resources"
tagCloudNodes = "Cloud Nodes"
tagTopology = "Topology"
tagLookup = "Lookup"
tagThreat = "Threat"
Expand Down
9 changes: 9 additions & 0 deletions deepfence_server/apiDocs/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ func (d *OpenApiDocs) AddControlsOperations() {
http.StatusOK, []string{tagControls}, bearerToken, new(model.AgentId), new(controls.AgentControls))
}

func (d *OpenApiDocs) AddCloudNodeOperations() {
d.AddOperation("registerCloudNodeAccount", http.MethodPost, "/deepfence/cloud-node/account",
"Register Cloud Node Account", "Register Cloud Node Account and return any pending compliance scans from console",
http.StatusOK, []string{tagCloudNodes}, bearerToken, new(model.CloudNodeAccountRegisterReq), new(model.CloudNodeAccountRegisterResp))
d.AddOperation("listCloudNodeAccount", http.MethodPost, "/deepfence/cloud-node/accounts/list",
"List Cloud Node Accounts", "List Cloud Node Accounts registered with the console",
http.StatusOK, []string{tagCloudNodes}, bearerToken, new(model.CloudNodeAccountsListReq), new(model.CloudNodeAccountsListResp))
}

func (d *OpenApiDocs) AddIngestersOperations() {
d.AddOperation("ingestAgentReport", http.MethodPost, "/deepfence/ingest/report",
"Ingest Topology Data", "Ingest data reported by one Agent",
Expand Down
5 changes: 5 additions & 0 deletions deepfence_server/auth/policy.csv
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ p, standard-user, agent-report, ingest
p, admin, cloud-report, ingest
p, standard-user, cloud-report, ingest

p, admin, cloud-node, register
p, admin, cloud-node, read
p, standard-user, cloud-node, register
p, standard-user, cloud-node, read

p, admin, diagnosis, generate
p, standard-user, diagnosis, generate
p, admin, diagnosis, read
Expand Down
167 changes: 167 additions & 0 deletions deepfence_server/handler/cloud_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package handler

import (
"fmt"
"net/http"

"github.com/deepfence/ThreatMapper/deepfence_server/model"
"github.com/deepfence/ThreatMapper/deepfence_server/reporters"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
httpext "github.com/go-playground/pkg/v5/net/http"
"github.com/sirupsen/logrus"
)

func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http.Request) {
req, err := extractCloudNodeDetails(w, r)
if err != nil {
return
}

logrus.Debugf("Register Cloud Node Account Request: %+v", req)

monitoredAccountIds := req.MonitoredAccountIds
orgAccountId := req.OrgAccountId
scanList := make(map[string]model.CloudComplianceScanDetails)
cloudtrailTrails := make([]model.CloudNodeCloudtrailTrail, 10)
nodeId := req.NodeId

ctx := directory.NewContextWithNameSpace(directory.NonSaaSDirKey)

doRefresh := "false"

logrus.Debugf("Monitored account ids count: %d", len(monitoredAccountIds))
if len(monitoredAccountIds) != 0 {
logrus.Debugf("More than 1 account to be monitored: %+v", monitoredAccountIds)
if orgAccountId != "" {
complianceError(w, "Org account id is needed for multi account setup")
return
}
monitoredAccountIds[req.CloudAccount] = nodeId
node := map[string]interface{}{
"node_id": fmt.Sprintf("%s-%s-cloud-org", req.CloudProvider, orgAccountId),
"cloud_provider": req.CloudProvider,
"node_name": orgAccountId,
}
err = model.UpsertCloudComplianceNode(ctx, node)
if err != nil {
complianceError(w, err.Error())
}
for monitoredAccountId, monitoredNodeId := range monitoredAccountIds {
var monitoredNode map[string]interface{}
monitoredNode = map[string]interface{}{
"node_id": monitoredNodeId,
"cloud_provider": req.CloudProvider,
"node_name": monitoredAccountId,
}
err = model.UpsertCloudComplianceNode(ctx, monitoredNode)
if err != nil {
complianceError(w, err.Error())
}
pendingScansList, err := reporters.GetPendingScansList(ctx, utils.CLOUD_COMPLIANCE_SCAN, monitoredNodeId)
if err != nil {
continue
}
for _, scan := range pendingScansList.ScansInfo {
scanDetail := model.CloudComplianceScanDetails{
ScanId: scan.ScanId,
ScanType: "cis",
AccountId: monitoredNodeId,
}
scanList[scan.ScanId] = scanDetail
}
}
} else {
logrus.Debugf("Single account monitoring for node: %s", nodeId)
node := map[string]interface{}{
"node_id": nodeId,
"cloud_provider": req.CloudProvider,
"node_name": req.CloudAccount,
}
logrus.Debugf("Node for upsert: %+v", node)
err = model.UpsertCloudComplianceNode(ctx, node)
if err != nil {
logrus.Infof("Error while upserting node: %+v", err)
complianceError(w, err.Error())
}
pendingScansList, err := reporters.GetPendingScansList(ctx, utils.CLOUD_COMPLIANCE_SCAN, nodeId)
if err != nil || len(pendingScansList.ScansInfo) == 0 {
logrus.Debugf("No pending scans found for node id: %s", nodeId)
httpext.JSON(w, http.StatusOK,
model.CloudNodeAccountRegisterResp{Data: model.CloudNodeAccountRegisterRespData{Scans: scanList,
CloudtrailTrails: cloudtrailTrails, Refresh: doRefresh}})
return
}
for _, scan := range pendingScansList.ScansInfo {
scanDetail := model.CloudComplianceScanDetails{
ScanId: scan.ScanId,
ScanType: utils.CLOUD_COMPLIANCE_SCAN,
AccountId: nodeId,
}
scanList[scan.ScanId] = scanDetail
}
logrus.Debugf("Pending scans for node: %+v", scanList)
}
logrus.Debugf("Returning response: Scan List %+v cloudtrailTrails %+v Refresh %s", scanList, cloudtrailTrails, doRefresh)
httpext.JSON(w, http.StatusOK,
model.CloudNodeAccountRegisterResp{Data: model.CloudNodeAccountRegisterRespData{Scans: scanList,
CloudtrailTrails: cloudtrailTrails, Refresh: doRefresh}})
return
}

func (h *Handler) ListCloudNodeAccountHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var req model.CloudNodeAccountsListReq
err := httpext.DecodeJSON(r, httpext.NoQueryParams, MaxPostRequestSize, &req)
if err != nil {
log.Error().Msgf("%v", err)
httpext.JSON(w, http.StatusBadRequest, model.Response{Success: false})
return
}

if utils.StringToCloudProvider(req.CloudProvider) == -1 {
err = fmt.Errorf("unknown CloudProvider: %s", req.CloudProvider)
log.Error().Msgf("%v", err)
httpext.JSON(w, http.StatusBadRequest, model.Response{Success: false, Data: err.Error()})
}

infos, err := model.GetCloudComplianceNodesList(r.Context(), req.CloudProvider, req.Window)
if err != nil {
log.Error().Msgf("%v, req=%v", err, req)
httpext.JSON(w, http.StatusInternalServerError, model.Response{Success: false})
return
}

httpext.JSON(w, http.StatusOK, infos)
}

func complianceError(w http.ResponseWriter, errorString string) {
err := httpext.JSON(w, http.StatusInternalServerError, model.Response{Success: false,
Data: errorString})
if err != nil {
log.Error().Msgf("%v", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errorString))
}
}

func extractCloudNodeDetails(w http.ResponseWriter, r *http.Request) (model.CloudNodeAccountRegisterReq, error) {
defer r.Body.Close()
var req model.CloudNodeAccountRegisterReq
err := httpext.DecodeJSON(r, httpext.NoQueryParams, MaxPostRequestSize, &req)

if err != nil {
log.Error().Msgf("%v", err)
httpext.JSON(w, http.StatusBadRequest, model.Response{Success: false})
return req, err
}

if utils.StringToCloudProvider(req.CloudProvider) == -1 {
err = fmt.Errorf("unknown CloudProvider: %s", req.CloudProvider)
log.Error().Msgf("%v", err)
httpext.JSON(w, http.StatusBadRequest, model.Response{Success: false, Data: err.Error()})
}

return req, err
}
1 change: 1 addition & 0 deletions deepfence_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func initializeOpenApiDocs(openApiDocs *apiDocs.OpenApiDocs) {
openApiDocs.AddIngestersOperations()
openApiDocs.AddScansOperations()
openApiDocs.AddDiagnosisOperations()
openApiDocs.AddCloudNodeOperations()
}

func initializeKafka() error {
Expand Down
138 changes: 138 additions & 0 deletions deepfence_server/model/cloud_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package model

import (
"context"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/neo4j/neo4j-go-driver/v4/neo4j"
)

type CloudNodeAccountRegisterReq struct {
NodeId string `json:"node_id" required:"true"`
CloudAccount string `json:"cloud_account" required:"true"`
CloudProvider string `json:"cloud_provider" required:"true" enum:"aws,gcp,azure"`
MonitoredAccountIds map[string]string `json:"monitored_account_ids"`
OrgAccountId string `json:"org_acc_id"`
}

type CloudNodeAccountRegisterResp struct {
Data CloudNodeAccountRegisterRespData `json:"data"`
}

type CloudNodeAccountRegisterRespData struct {
Scans map[string]CloudComplianceScanDetails `json:"scans"`
CloudtrailTrails []CloudNodeCloudtrailTrail `json:"cloudtrail_trails"`
Refresh string `json:"refresh"`
}

type CloudNodeAccountsListReq struct {
CloudProvider string `json:"cloud_provider"`
Window FetchWindow `json:"window" required:"true"`
}

type CloudNodeAccountsListResp struct {
CloudNodeAccountInfo []CloudNodeAccountInfo `json:"cloud_node_accounts_info" required:"true"`
Total int `json:"total" required:"true"`
}

type CloudNodeAccountInfo struct {
NodeId string `json:"node_id"`
NodeName string `json:"node_name"`
CloudProvider string `json:"cloud_provider"`
CompliancePercentage string `json:"compliance_percentage"`
Active string `json:"active"`
}

type CloudComplianceScanDetails struct {
ScanId string `json:"scan_id"`
ScanType string `json:"scan_type"`
AccountId string `json:"account_id"`
}

type CloudNodeCloudtrailTrail struct {
AccountId string `json:"account_id"`
TrailName string `json:"trail_name"`
}

type PendingCloudComplianceScan struct {
ScanId string `json:"scan_id"`
ScanType string `json:"scan_type"`
Controls []string `json:"controls"`
AccountId string `json:"account_id"`
}

func UpsertCloudComplianceNode(ctx context.Context, nodeDetails map[string]interface{}) error {
driver, err := directory.Neo4jClient(ctx)
if err != nil {
return err
}

session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()

tx, err := session.BeginTransaction()
if err != nil {
return err
}
defer tx.Close()

if _, err := tx.Run("WITH $param as row MERGE (n:Node{node_id:row.node_id}) SET n+= row, n.updated_at = TIMESTAMP()", map[string]interface{}{"param": nodeDetails}); err != nil {
return err
}

return tx.Commit()
}

func GetCloudComplianceNodesList(ctx context.Context, cloudProvider string, fw FetchWindow) (CloudNodeAccountsListResp, error) {
driver, err := directory.Neo4jClient(ctx)
if err != nil {
return CloudNodeAccountsListResp{Total: 0}, err
}

session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
if err != nil {
return CloudNodeAccountsListResp{Total: 0}, err
}
defer session.Close()

tx, err := session.BeginTransaction()
if err != nil {
return CloudNodeAccountsListResp{Total: 0}, err
}
defer tx.Close()

res, err := tx.Run(`MATCH (n:Node{cloud_provider: $cloud_provider}) RETURN n.node_id, n.node_name, n.cloud_provider ORDER BY n.updated_at SKIP $skip LIMIT $limit`,
map[string]interface{}{"cloud_provider": cloudProvider, "skip": fw.Offset, "limit": fw.Size})
if err != nil {
return CloudNodeAccountsListResp{Total: 0}, err
}

recs, err := res.Collect()
if err != nil {
return CloudNodeAccountsListResp{Total: 0}, err
}

cloud_node_accounts_info := []CloudNodeAccountInfo{}
for _, rec := range recs {
tmp := CloudNodeAccountInfo{
NodeId: rec.Values[0].(string),
NodeName: rec.Values[1].(string),
CloudProvider: rec.Values[2].(string),
CompliancePercentage: "0.00",
Active: "true",
}
cloud_node_accounts_info = append(cloud_node_accounts_info, tmp)
}

total := fw.Offset + len(cloud_node_accounts_info)
countRes, err := tx.Run(`MATCH (n:Node {cloud_provider: $cloud_provider}) RETURN COUNT(*)`,
map[string]interface{}{"cloud_provider": cloudProvider})

countRec, err := countRes.Single()
if err != nil {
return CloudNodeAccountsListResp{CloudNodeAccountInfo: cloud_node_accounts_info, Total: total}, nil
}

total = int(countRec.Values[0].(int64))

return CloudNodeAccountsListResp{CloudNodeAccountInfo: cloud_node_accounts_info, Total: total}, nil
}
42 changes: 42 additions & 0 deletions deepfence_server/reporters/scan_reporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,48 @@ func GetScansList(ctx context.Context, scan_type utils.Neo4jScanType, node_id st
return model.ScanListResp{ScansInfo: scans_info}, nil
}

func GetPendingScansList(ctx context.Context, scan_type utils.Neo4jScanType, node_id string) (model.ScanListResp, error) {
driver, err := directory.Neo4jClient(ctx)
if err != nil {
return model.ScanListResp{}, err
}

session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
if err != nil {
return model.ScanListResp{}, err
}
defer session.Close()

tx, err := session.BeginTransaction()
if err != nil {
return model.ScanListResp{}, err
}
defer tx.Close()

res, err := tx.Run(`MATCH (m:`+string(scan_type)+`) -[:SCANNED]-> (:Node{node_id: $node_id}) WHERE NOT m.status = $complete AND NOT m.status = $failed AND NOT m.status = $in_progress RETURN m.node_id, m.status, m.updated_at ORDER BY m.updated_at`,
map[string]interface{}{"node_id": node_id, "complete": utils.SCAN_STATUS_SUCCESS, "failed": utils.SCAN_STATUS_FAILED, "in_progress": utils.SCAN_STATUS_INPROGRESS})
if err != nil {
return model.ScanListResp{}, err
}

recs, err := res.Collect()
if err != nil {
return model.ScanListResp{}, err
}

scans_info := []model.ScanInfo{}
for _, rec := range recs {
tmp := model.ScanInfo{
ScanId: rec.Values[0].(string),
Status: rec.Values[1].(string),
UpdatedAt: rec.Values[2].(int64),
}
scans_info = append(scans_info, tmp)
}

return model.ScanListResp{ScansInfo: scans_info}, nil
}

func GetScanResults(ctx context.Context, scan_type utils.Neo4jScanType, scan_id string, fw model.FetchWindow) (model.ScanResultsResp, error) {
driver, err := directory.Neo4jClient(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 40ca5c3

Please sign in to comment.