Skip to content

Commit

Permalink
Clouds scanner - start scan after cloud resources refresh is complete (
Browse files Browse the repository at this point in the history
…#2231)

Cloud refresh count is handled separately from scan count (#15)

Scan and resource refresh can happen at the same time for different accounts
  • Loading branch information
ramanan-ravi authored Jul 2, 2024
1 parent 6f871a8 commit eb56e49
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 180 deletions.
16 changes: 10 additions & 6 deletions deepfence_agent/Dockerfile.cloud-agent
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ WORKDIR /opt/steampipe

USER deepfence

ENV DF_INSTALL_DIR=/home/deepfence
ENV DF_INSTALL_DIR=/home/deepfence \
STEAMPIPE_AWS_PLUGIN_VERSION=0.118.1 \
STEAMPIPE_GCP_PLUGIN_VERSION=0.43.0 \
STEAMPIPE_AZURE_PLUGIN_VERSION=0.49.0 \
STEAMPIPE_AZURE_AD_PLUGIN_VERSION=0.12.0

COPY supervisord-cloud.conf /home/deepfence/supervisord.conf
COPY --from=steampipe /usr/local/bin/steampipe /usr/local/bin/steampipe

RUN steampipe service start \
&& steampipe plugin install steampipe \
# plugin version should be in sync with Deepfence fork https://github.com/deepfence/steampipe-plugin-aws
&& steampipe plugin install aws@0.118.1 gcp@0.43.0 azure@0.49.0 azuread@0.12.0 \
&& steampipe plugin install aws@${STEAMPIPE_AWS_PLUGIN_VERSION} gcp@${STEAMPIPE_GCP_PLUGIN_VERSION} azure@${STEAMPIPE_AZURE_PLUGIN_VERSION} azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION} \
&& git clone https://github.com/turbot/steampipe-mod-aws-compliance.git --branch v0.79 --depth 1 \
&& git clone https://github.com/turbot/steampipe-mod-gcp-compliance.git --branch v0.21 --depth 1 \
&& git clone https://github.com/turbot/steampipe-mod-azure-compliance.git --branch v0.35 --depth 1 \
Expand All @@ -56,10 +60,10 @@ ENV PUBLISH_CLOUD_RESOURCES_INTERVAL_MINUTES=5 \

EXPOSE 8080

COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@latest/steampipe-plugin-aws.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@latest/steampipe-plugin-gcp.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@latest/steampipe-plugin-azure.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@latest/steampipe-plugin-azuread.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@${STEAMPIPE_AWS_PLUGIN_VERSION}/steampipe-plugin-aws.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@${STEAMPIPE_GCP_PLUGIN_VERSION}/steampipe-plugin-gcp.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@${STEAMPIPE_AZURE_PLUGIN_VERSION}/steampipe-plugin-azure.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION}/steampipe-plugin-azuread.plugin

COPY plugins/cloud-scanner/cloud_scanner /home/deepfence/bin/cloud_scanner

Expand Down
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/YaraHunter
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/cloud-scanner
Submodule cloud-scanner updated 41 files
+0 −73 Dockerfile
+3 −10 Makefile
+10 −0 README.md
+11 −28 cloud_resource_changes/cloud_resource_changes_aws/cloudtrail.go
+3 −11 cloud_resource_changes/cloud_resource_changes_aws/util.go
+0 −85 cloudformation/deepfence-cloud-scanner-members.template
+6 −4 ...nization-deployment/automated-deployment/deepfence-cloud-scanner-automated-organization-deployment.template
+2 −2 ...ed/organization-deployment/manual-deployment/deepfence-managed-cloud-scanner-organization-iam-role.template
+5 −5 ...ed/organization-deployment/manual-deployment/deepfence-managed-cloud-scanner-organization-stackset.template
+35 −0 ...rmation/deepfence-managed/single-account-deployment/deepfence-managed-cloud-scanner-single-account.template
+14 −3 cloudformation/self-hosted/organization-deployment/deepfence-cloud-scanner-member-roles.template
+58 −109 cloudformation/self-hosted/organization-deployment/deepfence-cloud-scanner-org-common.template
+45 −38 cloudformation/self-hosted/organization-deployment/deepfence-cloud-scanner-org-ecs.template
+84 −82 cloudformation/self-hosted/single-account-deployment/deepfence-cloud-scanner.template
+0 −11 entrypoint.sh
+52 −8 go.mod
+184 −17 go.sum
+1 −1 golang_deepfence_sdk
+2 −0 helm-chart/.gitignore
+23 −0 helm-chart/deepfence-cloud-scanner/.helmignore
+24 −0 helm-chart/deepfence-cloud-scanner/Chart.yaml
+3 −0 helm-chart/deepfence-cloud-scanner/templates/NOTES.txt
+62 −0 helm-chart/deepfence-cloud-scanner/templates/_helpers.tpl
+94 −0 helm-chart/deepfence-cloud-scanner/templates/deployment.yaml
+11 −0 helm-chart/deepfence-cloud-scanner/templates/secret.yaml
+13 −0 helm-chart/deepfence-cloud-scanner/templates/serviceaccount.yaml
+105 −0 helm-chart/deepfence-cloud-scanner/values.yaml
+14 −0 helm-chart/index.yaml
+34 −156 internal/deepfence/client.go
+0 −59 internal/deepfence/diagnosis.go
+0 −27 internal/deepfence/util.go
+55 −88 main.go
+1 −1 output/file_output.go
+27 −41 output/output.go
+68 −75 query_resource/query.go
+9 −9 scanner/parser.go
+28 −123 scanner/scanner.go
+67 −0 service/query_service.go
+585 −231 service/service.go
+72 −75 util/type.go
+8 −17 util/util.go
2 changes: 1 addition & 1 deletion deepfence_agent/plugins/yara-rules
Submodule yara-rules updated 2 files
+1 −1 build-timestamp
+29,518 −115,984 malware.yar
50 changes: 2 additions & 48 deletions deepfence_bootstrapper/router/cloud_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func RefreshResources(req ctl.RefreshResourcesRequest) error {
return nil
}

func GetCloudScannerJobCount() int32 {
func GetCloudScannerJobCount(action ctl.ActionID) int32 {
conn, err := net.Dial("unix", CloudScannerSocketPath)
if err != nil {
log.Error().Err(err).Msgf("GetCloudScannerJobCount: error in creating cloud compliance scanner client with socket %s", CloudScannerSocketPath)
Expand All @@ -98,7 +98,7 @@ func GetCloudScannerJobCount() int32 {
defer conn.Close()

jobCountReq := map[string]interface{}{
"action": ctl.CloudScannerJobCount,
"action": action,
}
jobCountReqBytes, err := json.Marshal(jobCountReq)
if err != nil {
Expand Down Expand Up @@ -128,49 +128,3 @@ func GetCloudScannerJobCount() int32 {
return jobCount

Check failure on line 128 in deepfence_bootstrapper/router/cloud_scanner.go

View workflow job for this annotation

GitHub Actions / lint-bootstrapper

SA4004: the surrounding loop is unconditionally terminated (staticcheck)
}
}

func GetCloudNodeID() (string, error) {
cloudNodeID := ""
conn, err := net.Dial("unix", CloudScannerSocketPath)
if err != nil {
log.Error().Err(err).Msgf("Error creating cloud scanner client with socket %s", CloudScannerSocketPath)
return cloudNodeID, err
}
defer conn.Close()
reqMap := make(map[string]interface{})
reqMap["GetCloudNodeID"] = true
cloudNodeIDReq := map[string]interface{}{
"args": reqMap,
}

cloudNodeIDReqBytes, err := json.Marshal(cloudNodeIDReq)
if err != nil {
log.Error().Err(err).Msg("Error in converting request into valid json")
return cloudNodeID, err
}

_, err = conn.Write(cloudNodeIDReqBytes)
if err != nil {
log.Error().Err(err).Msgf("Error in writing data to unix socket %s", CloudScannerSocketPath)
return cloudNodeID, err
}

responseTimeout := 10 * time.Second
deadline := time.Now().Add(responseTimeout)
buf := make([]byte, 1024)
for {
conn.SetReadDeadline(deadline)
n, err := conn.Read(buf[:])
if err != nil {
log.Error().Err(err).Msg("Error in read")
return cloudNodeID, err
}

count, err := fmt.Sscan(string(buf[0:n]), &cloudNodeID)
if err != nil || count != 1 {
return cloudNodeID, err
}
break
}
return cloudNodeID, err
}
24 changes: 14 additions & 10 deletions deepfence_bootstrapper/router/openapi_client_controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,29 @@ func (ct *OpenapiClient) StartControlsWatching(nodeID string,

const (
MaxAgentWorkload = 2
MaxCloudAgentWorkload = 1
MaxCloudAgentWorkload = 2
)

func GetScannersWorkloads(nodeType string) int32 {
res := int32(0)
var secret, malware, vuln, cloud int32
if nodeType == ctl.CLOUD_AGENT {
cloud = GetCloudScannerJobCount()
var cloudPostureScan, cloudResourceRefreshCount int32

cloudPostureScan = GetCloudScannerJobCount(ctl.CloudScannerJobCount)
cloudResourceRefreshCount = GetCloudScannerJobCount(ctl.CloudScannerResourceRefreshCount)

log.Info().Msgf("workloads = cloud posture: %d, cloud resource refresh: %d", cloudPostureScan, cloudResourceRefreshCount)
return cloudPostureScan + cloudResourceRefreshCount
} else {
var secret, malware, vuln int32

secret = GetSecretScannerJobCount()
malware = GetMalwareScannerJobCount()
vuln = GetPackageScannerJobCount()
}

//TODO: Add more scanners workload
log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d, cloud: %d",
vuln, secret, malware, cloud)
res = secret + malware + vuln + cloud
return res
//TODO: Add more scanners workload
log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d", vuln, secret, malware)
return secret + malware + vuln
}
}

var upgrade atomic.Bool
Expand Down
45 changes: 29 additions & 16 deletions deepfence_server/controls/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetAgentActions(ctx context.Context, agentID model.AgentID, consoleURL stri
actions = append(actions, upgradeActions...)
}

scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, workNumToExtract)
scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, agentType, workNumToExtract)
workNumToExtract -= len(scanActions)
if scanErr == nil {
actions = append(actions, scanActions...)
Expand Down Expand Up @@ -312,7 +312,7 @@ func hasPendingAgentScans(ctx context.Context, client neo4j.DriverWithContext, n
return len(records) != 0, err
}

func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int) ([]controls.Action, error) {
func ExtractStartingAgentScans(ctx context.Context, nodeID string, agentType string, maxWork int) ([]controls.Action, error) {

ctx, span := telemetry.NewSpan(ctx, "control", "extract-starting-agent-scans")
defer span.End()
Expand Down Expand Up @@ -340,14 +340,28 @@ func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int)
}
defer tx.Close(ctx)

r, err := tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id})
var r neo4j.ResultWithContext
if agentType == controls.CLOUD_AGENT {
r, err = tx.Run(ctx, `MATCH (c:CloudNode) <- [:SCANNED] - (s) -[:SCHEDULED]-> (n:Node{node_id:$id})
WHERE s.status = '`+utils.ScanStatusStarting+`'
AND c.refresh_status = 'COMPLETE'
AND c.active = true
AND s.retries < 3
WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work
SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP()
WITH s
RETURN s.trigger_action`,
map[string]interface{}{"id": nodeID, "max_work": maxWork})
map[string]interface{}{"id": nodeID, "max_work": maxWork})
} else {
r, err = tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id})
WHERE s.status = '`+utils.ScanStatusStarting+`'
AND s.retries < 3
WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work
SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP()
WITH s
RETURN s.trigger_action`,
map[string]interface{}{"id": nodeID, "max_work": maxWork})
}

if err != nil {
return res, err
Expand Down Expand Up @@ -629,8 +643,7 @@ func ExtractPendingAgentUpgrade(ctx context.Context, nodeID string, maxWork int,

}

func ExtractRefreshResourceAction(ctx context.Context, nodeID string,
maxWork int) ([]controls.Action, error) {
func ExtractRefreshResourceAction(ctx context.Context, nodeID string, maxWork int) ([]controls.Action, error) {

ctx, span := telemetry.NewSpan(ctx, "control", "extract-pending-refresh-resources")
defer span.End()
Expand Down Expand Up @@ -659,15 +672,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string,
defer tx.Close(ctx)

r, err := tx.Run(ctx, `
MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (c:CloudNode)
MATCH(r:CloudNodeRefresh{node_id:c.node_id})
WHERE r.refresh=true
WITH HEAD(collect(r)) AS rnode
WHERE rnode IS NOT NULL
WITH rnode, rnode.node_id AS node_id
DETACH DELETE rnode
RETURN node_id`,
map[string]interface{}{"id": nodeID})
MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (r:CloudNode)
WHERE r.refresh_status = '`+utils.ScanStatusStarting+`'
AND NOT COALESCE(r.cloud_compliance_scan_status, '') IN ['`+utils.ScanStatusStarting+`', '`+utils.ScanStatusInProgress+`']
WITH r LIMIT $max_work
SET r.refresh_status = '`+utils.ScanStatusInProgress+`', r.refresh_message = ''
WITH r
RETURN r.node_id, r.node_name AS account_id`,
map[string]interface{}{"id": nodeID, "max_work": maxWork})

if err != nil {
return res, err
Expand All @@ -683,13 +695,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string,

for _, record := range records {
var action controls.Action
if record.Values[0] == nil {
if record.Values[0] == nil || record.Values[1] == nil {
log.Error().Msgf("Invalid CloudNode ID, skipping")
continue
}

req := controls.RefreshResourcesRequest{}
req.NodeId = record.Values[0].(string)
req.AccountID = record.Values[1].(string)
req.NodeType = controls.CloudAccount

reqBytes, err := json.Marshal(req)
Expand Down
12 changes: 11 additions & 1 deletion deepfence_server/handler/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"

"github.com/deepfence/ThreatMapper/deepfence_server/model"
"github.com/deepfence/ThreatMapper/deepfence_server/reporters"
reporters_scan "github.com/deepfence/ThreatMapper/deepfence_server/reporters/scan"
ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
Expand All @@ -19,6 +20,9 @@ import (

var (
cloudAccountNodeType = ctl.ResourceTypeToString(ctl.CloudAccount)
refreshAccountFilter = reporters.FieldsFilters{
ContainsFilter: reporters.ContainsFilter{FieldsValues: map[string][]interface{}{"refresh_status": {"COMPLETE", "ERROR"}}},
}
)

func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -124,13 +128,19 @@ func (h *Handler) RefreshCloudAccountHandler(w http.ResponseWriter, r *http.Requ
nodeIdentifiers[i] = model.NodeIdentifier{NodeID: id, NodeType: cloudAccountNodeType}
}

cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers)
cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers, &refreshAccountFilter)
if err != nil {
log.Error().Msgf(err.Error())
h.respondError(&BadDecoding{err}, w)
return
}

if len(cloudNodeIds) == 0 {
// Refresh already in progress for all requested cloud accounts
w.WriteHeader(http.StatusNoContent)
return
}

resolvedRequest := model.CloudAccountRefreshReq{NodeIDs: make([]string, len(cloudNodeIds))}
for i, id := range cloudNodeIds {
resolvedRequest.NodeIDs[i] = id.NodeID
Expand Down
2 changes: 1 addition & 1 deletion deepfence_server/handler/scan_reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (h *Handler) StartComplianceScanHandler(w http.ResponseWriter, r *http.Requ

regular, k8s, _, _ := extractBulksNodes(reqs.NodeIDs)

cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular)
cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular, nil)
if err != nil {
h.respondError(err, w)
return
Expand Down
10 changes: 5 additions & 5 deletions deepfence_server/ingesters/scan_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,16 @@ func AddNewCloudComplianceScan(
}
}
}
nt := controls.KubernetesCluster
if nodeType == controls.ResourceTypeToString(controls.Host) {
nt = controls.Host
}
var action []byte
var hostNodeID, hostNeo4jNodeType string

if nodeType == controls.ResourceTypeToString(controls.KubernetesCluster) || nodeType == controls.ResourceTypeToString(controls.Host) {
hostNodeID = nodeID
hostNeo4jNodeType = neo4jNodeType
nt := controls.KubernetesCluster
if nodeType == controls.ResourceTypeToString(controls.Host) {
nt = controls.Host
}
internalReq, _ := json.Marshal(controls.StartComplianceScanRequest{
NodeID: nodeID,
NodeType: nt,
Expand Down Expand Up @@ -374,7 +374,7 @@ func AddNewCloudComplianceScan(

internalReq, _ := json.Marshal(controls.StartCloudComplianceScanRequest{
NodeID: nodeID,
NodeType: nt,
NodeType: controls.CloudAccount,
BinArgs: map[string]string{"scan_id": scanID, "benchmark_types": strings.Join(benchmarkTypes, ",")},
ScanDetails: controls.CloudComplianceScanDetails{
ScanId: scanID,
Expand Down
Loading

0 comments on commit eb56e49

Please sign in to comment.