Skip to content

Commit

Permalink
Cloud scanner resources refresh progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanan-ravi committed Sep 25, 2024
1 parent 1adaa41 commit 39f3337
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[submodule "golang_deepfence_sdk"]
path = golang_deepfence_sdk
url = https://github.com/deepfence/golang_deepfence_sdk.git
branch = main
branch = cloud-scanner-refresh
[submodule "deepfence_agent/plugins/YaraHunter"]
path = deepfence_agent/plugins/YaraHunter
url = https://github.com/deepfence/YaraHunter.git
Expand All @@ -29,4 +29,4 @@
[submodule "deepfence_agent/plugins/cloud-scanner"]
path = deepfence_agent/plugins/cloud-scanner
url = https://github.com/deepfence/cloud-scanner
branch = main
branch = cloud-scanner-refresh
4 changes: 3 additions & 1 deletion deepfence_agent/Dockerfile.cloud-agent
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ ENV CHECKPOINT_DISABLE=true \
DEEPFENCE_KEY="" \
DF_ENABLE_CLOUD_NODE="true" \
HOME_DIR="/home/deepfence" \
COMPLIANCE_MOD_PATH="/home/deepfence/steampipe"
STEAMPIPE_INSTALL_DIR="/home/deepfence/.steampipe" \
COMPLIANCE_MOD_PATH="/home/deepfence/steampipe" \
VERSION=$VERSION

RUN apt-get update \
&& apt-get install -y --no-install-recommends bash ca-certificates nano logrotate sudo supervisor \
Expand Down
4 changes: 2 additions & 2 deletions deepfence_agent/plugins/etc/run_shipper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
export BATCH_SIZE=${1:-100}
export TRUNCATE_SIZE=${2:-10}

if [[ $DF_INSTALL_DIR == "/home/deepfence" ]]; then
exec /home/deepfence/bin/shipper --base-path="${DF_INSTALL_DIR:-/}" --truncate-size=$TRUNCATE_SIZE --routes=/home/deepfence/routes.yaml --batch-size=$BATCH_SIZE
if [[ "${DF_INSTALL_DIR,,}" == *"/home/deepfence" ]]; then
exec $DF_INSTALL_DIR/bin/shipper --base-path="${DF_INSTALL_DIR:-/}" --truncate-size=$TRUNCATE_SIZE --routes=$DF_INSTALL_DIR/routes.yaml --batch-size=$BATCH_SIZE
else
exec $DF_INSTALL_DIR/home/deepfence/bin/shipper --base-path="${DF_INSTALL_DIR:-/}" --truncate-size=$TRUNCATE_SIZE --routes=$DF_INSTALL_DIR/home/deepfence/routes.yaml --batch-size=$BATCH_SIZE
fi
41 changes: 41 additions & 0 deletions deepfence_agent/start_cloud_agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,48 @@ launch_deepfenced() {
tail -f $DF_INSTALL_DIR/var/log/supervisor/deepfenced*
}

check_persistent_volume() {
if [[ $DF_INSTALL_DIR != "/home/deepfence" ]]; then
echo "Copying postgres data to $DF_INSTALL_DIR"

clean_df_install_dir="false"
if [ -e "$DF_INSTALL_DIR"/.cloud_scanner_version ]; then
# Clean $DF_INSTALL_DIR if cloud scanner version is different
if [[ "$(cat "$DF_INSTALL_DIR"/.cloud_scanner_version)" != "$VERSION" ]]; then
clean_df_install_dir="true"
fi
else
clean_df_install_dir="true"
fi

if [[ $clean_df_install_dir == "true" ]]; then
rm -rf "${DF_INSTALL_DIR:?}"/* "${DF_INSTALL_DIR:?}"/.*
cp -R /home/deepfence/var "$DF_INSTALL_DIR"

mkdir -p "$STEAMPIPE_INSTALL_DIR"
cp -R /home/deepfence/.steampipe/config "$STEAMPIPE_INSTALL_DIR"/
cp -R /home/deepfence/.steampipe/plugins "$STEAMPIPE_INSTALL_DIR"/
cp -R /home/deepfence/.steampipe/db "$STEAMPIPE_INSTALL_DIR"/

# sudo -E -u deepfence /bin/bash -c "export HOME=/home/deepfence \
# && steampipe service start \
# && steampipe plugin install --skip-config \
# && steampipe service stop --force"
fi

cp -R /home/deepfence/bin /home/deepfence/routes.yaml /home/deepfence/run_shipper.sh /home/deepfence/supervisord.conf "$DF_INSTALL_DIR"
echo "$VERSION" > "$DF_INSTALL_DIR"/.cloud_scanner_version
chown -R deepfence: "$DF_INSTALL_DIR" "$STEAMPIPE_INSTALL_DIR"
fi

if [ ! -e "$DF_INSTALL_DIR"/.install_id ]; then
cat /proc/sys/kernel/random/uuid > "$DF_INSTALL_DIR"/.install_id
chown -R deepfence: "$DF_INSTALL_DIR"/.install_id
fi
}

main() {
check_persistent_volume
launch_deepfenced
}

Expand Down
50 changes: 28 additions & 22 deletions deepfence_server/handler/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http
}
orgNodeID := fmt.Sprintf("%s-%s-cloud-org", req.CloudProvider, orgAccountID)
orgAccountNode := map[string]interface{}{
"node_id": orgNodeID,
"cloud_provider": model.PostureProviderOrgMap[req.CloudProvider],
"node_name": orgAccountID,
"account_name": req.AccountName,
"version": req.Version,
"node_type": req.CloudProvider,
"node_id": orgNodeID,
"cloud_provider": model.PostureProviderOrgMap[req.CloudProvider],
"node_name": orgAccountID,
"account_name": req.AccountName,
"version": req.Version,
"node_type": req.CloudProvider,
"installation_id": req.InstallationID,
"persistent_volume": req.PersistentVolumeSupported,
}
err = model.UpsertCloudAccount(ctx, orgAccountNode, req.IsOrganizationDeployment, req.HostNodeID)
err = model.UpsertCloudAccount(ctx, orgAccountNode, req.IsOrganizationDeployment, req.HostNodeID, req.InitialRequest)
if err != nil {
h.complianceError(w, err.Error())
return
Expand All @@ -77,16 +79,18 @@ func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http
return
}
monitoredNodes[i] = map[string]interface{}{
"node_id": monitoredAccount.NodeID,
"cloud_provider": req.CloudProvider,
"node_name": monitoredAccount.AccountID,
"account_name": monitoredAccount.AccountName,
"organization_id": orgNodeID,
"version": req.Version,
"node_type": req.CloudProvider,
"node_id": monitoredAccount.NodeID,
"cloud_provider": req.CloudProvider,
"node_name": monitoredAccount.AccountID,
"account_name": monitoredAccount.AccountName,
"organization_id": orgNodeID,
"version": req.Version,
"node_type": req.CloudProvider,
"installation_id": req.InstallationID,
"persistent_volume": req.PersistentVolumeSupported,
}
}
err = model.UpsertChildCloudAccounts(ctx, monitoredNodes, orgNodeID, req.HostNodeID)
err = model.UpsertChildCloudAccounts(ctx, monitoredNodes, orgNodeID, req.InstallationID, req.HostNodeID, req.InitialRequest)
if err != nil {
log.Error().Msgf("Error while upserting node: %+v", err)
h.complianceError(w, err.Error())
Expand All @@ -95,15 +99,17 @@ func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http
} else {
log.Debug().Msgf("Single account monitoring for node: %s", nodeID)
node := map[string]interface{}{
"node_id": nodeID,
"cloud_provider": req.CloudProvider,
"node_name": req.AccountID,
"account_name": req.AccountName,
"version": req.Version,
"node_type": req.CloudProvider,
"node_id": nodeID,
"cloud_provider": req.CloudProvider,
"node_name": req.AccountID,
"account_name": req.AccountName,
"version": req.Version,
"node_type": req.CloudProvider,
"installation_id": req.InstallationID,
"persistent_volume": req.PersistentVolumeSupported,
}
log.Debug().Msgf("Node for upsert: %+v", node)
err = model.UpsertCloudAccount(ctx, node, req.IsOrganizationDeployment, req.HostNodeID)
err = model.UpsertCloudAccount(ctx, node, req.IsOrganizationDeployment, req.HostNodeID, req.InitialRequest)
if err != nil {
log.Error().Msgf("Error while upserting node: %+v", err)
h.complianceError(w, err.Error())
Expand Down
94 changes: 76 additions & 18 deletions deepfence_server/model/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ type CloudNodeMonitoredAccount struct {
}

type CloudNodeAccountRegisterReq struct {
NodeID string `json:"node_id" validate:"required" required:"true"`
AccountName string `json:"account_name"`
HostNodeID string `json:"host_node_id" validate:"required" required:"true"`
AccountID string `json:"account_id" validate:"required" required:"true"`
CloudProvider string `json:"cloud_provider" validate:"required,oneof=aws gcp azure" enum:"aws,gcp,azure" required:"true"`
IsOrganizationDeployment bool `json:"is_organization_deployment"`
MonitoredAccounts []CloudNodeMonitoredAccount `json:"monitored_accounts"`
OrganizationAccountID string `json:"organization_account_id"`
Version string `json:"version" validate:"required" required:"true"`
NodeID string `json:"node_id" validate:"required" required:"true"`
AccountName string `json:"account_name"`
HostNodeID string `json:"host_node_id" validate:"required" required:"true"`
AccountID string `json:"account_id" validate:"required" required:"true"`
CloudProvider string `json:"cloud_provider" validate:"required,oneof=aws gcp azure" enum:"aws,gcp,azure" required:"true"`
IsOrganizationDeployment bool `json:"is_organization_deployment"`
MonitoredAccounts []CloudNodeMonitoredAccount `json:"monitored_accounts"`
OrganizationAccountID string `json:"organization_account_id"`
Version string `json:"version" validate:"required" required:"true"`
PersistentVolumeSupported bool `json:"persistent_volume_supported"`
InstallationID string `json:"installation_id" validate:"required" required:"true"`
InitialRequest bool `json:"initial_request"`
}

type CloudNodeAccountsListReq struct {
Expand Down Expand Up @@ -84,6 +87,7 @@ type CloudNodeAccountInfo struct {
LastScanID string `json:"last_scan_id"`
LastScanStatus string `json:"last_scan_status"`
RefreshMessage string `json:"refresh_message"`
RefreshMetadata string `json:"refresh_metadata"`
RefreshStatus string `json:"refresh_status"`
RefreshStatusMap map[string]int64 `json:"refresh_status_map"`
ScanStatusMap map[string]int64 `json:"scan_status_map"`
Expand Down Expand Up @@ -207,7 +211,7 @@ type PostureProvider struct {
ResourceCount int64 `json:"resource_count"`
}

func UpsertCloudAccount(ctx context.Context, nodeDetails map[string]interface{}, isOrganizationDeployment bool, hostNodeID string) error {
func UpsertCloudAccount(ctx context.Context, nodeDetails map[string]interface{}, isOrganizationDeployment bool, hostNodeID string, initialRequest bool) error {

ctx, span := telemetry.NewSpan(ctx, "model", "upsert-cloud-compliance-node")
defer span.End()
Expand All @@ -227,9 +231,37 @@ func UpsertCloudAccount(ctx context.Context, nodeDetails map[string]interface{},
defer tx.Close(ctx)

var setRefreshStatusQuery string
var scheduleRefreshStatusQuery string
// Organization account node does not have refresh status. It is rather an aggregation of all child account statuses.
if !isOrganizationDeployment {
setRefreshStatusQuery = `ON CREATE SET n.refresh_status = '` + utils.ScanStatusStarting + `', n.refresh_message = ''`
setRefreshStatusQuery = `ON CREATE SET n.refresh_status = '` + utils.ScanStatusStarting + `', n.refresh_message = '', n.refresh_metadata = ''`

if initialRequest {
session2 := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session2.Close(ctx)

tx2, err := session2.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second))
if err != nil {
return err
}
defer tx2.Close(ctx)

res, err := tx2.Run(ctx,
`MATCH (n:CloudNode{node_id:$node_id}) RETURN coalesce(n.installation_id,''), coalesce(n.refresh_status,'')`,
map[string]interface{}{"node_id": nodeDetails["node_id"]})
if err != nil {
return err
}
rec, err := res.Single(ctx)
// ON CREATE will handle if Node does not exist
if err == nil {
if rec.Values[0].(string) != nodeDetails["installation_id"] {
scheduleRefreshStatusQuery = `, n.refresh_status = '` + utils.ScanStatusStarting + `', n.refresh_message = '', n.refresh_metadata = ''`
} else if rec.Values[1].(string) == utils.ScanStatusInProgress {
scheduleRefreshStatusQuery = `, n.refresh_status = '` + utils.ScanStatusStarting + `'`
}
}
}
}

if _, err = tx.Run(ctx, `
Expand All @@ -238,8 +270,8 @@ func UpsertCloudAccount(ctx context.Context, nodeDetails map[string]interface{},
MERGE (n:CloudNode{node_id:row.node_id})
`+setRefreshStatusQuery+`
MERGE (r) -[:HOSTS]-> (n)
SET n+= row, n.active = true, n.updated_at = TIMESTAMP(), n.version = row.version,
r.node_name=$host_node_id, r.active = true, r.agent_running=true, r.updated_at = TIMESTAMP()`,
SET n+= row, n.active = true, n.updated_at = TIMESTAMP(), n.version = row.version, r.node_name = $host_node_id,
r.active = true, r.agent_running = true, r.updated_at = TIMESTAMP()`+scheduleRefreshStatusQuery,
map[string]interface{}{
"param": nodeDetails,
"host_node_id": hostNodeID,
Expand All @@ -250,7 +282,7 @@ func UpsertCloudAccount(ctx context.Context, nodeDetails map[string]interface{},
return tx.Commit(ctx)
}

func UpsertChildCloudAccounts(ctx context.Context, nodeDetails []map[string]interface{}, parentNodeID string, hostNodeID string) error {
func UpsertChildCloudAccounts(ctx context.Context, nodeDetails []map[string]interface{}, parentNodeID string, installationID string, hostNodeID string, initialRequest bool) error {
ctx, span := telemetry.NewSpan(ctx, "model", "upsert-cloud-compliance-node")
defer span.End()

Expand All @@ -268,17 +300,43 @@ func UpsertChildCloudAccounts(ctx context.Context, nodeDetails []map[string]inte
}
defer tx.Close(ctx)

var scheduleRefreshStatusQuery string
if initialRequest {
session2 := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session2.Close(ctx)

tx2, err := session2.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second))
if err != nil {
return err
}
defer tx2.Close(ctx)

res, err := tx2.Run(ctx,
`MATCH (n:CloudNode{node_id:$node_id}) RETURN coalesce(n.installation_id,'')`,
map[string]interface{}{"node_id": parentNodeID})
if err != nil {
return err
}
rec, err := res.Single(ctx)
// ON CREATE will handle if Node does not exist
if err == nil {
if rec.Values[0].(string) != installationID {
scheduleRefreshStatusQuery = `, n.refresh_status = '` + utils.ScanStatusStarting + `', n.refresh_message = '', n.refresh_metadata = ''`
}
}
}

if _, err = tx.Run(ctx, `
MERGE (r:Node{node_id:$host_node_id, node_type: "cloud_agent"})
MERGE (m:CloudNode{node_id: $parent_node_id})
WITH r, m
UNWIND $param as row
MERGE (n:CloudNode{node_id:row.node_id})
ON CREATE SET n.refresh_status = '`+utils.ScanStatusStarting+`', n.refresh_message = ''
ON CREATE SET n.refresh_status = '`+utils.ScanStatusStarting+`', n.refresh_message = '', n.refresh_metadata = ''
MERGE (m) -[:IS_CHILD]-> (n)
MERGE (r) -[:HOSTS]-> (n)
SET n+= row, n.active = true, n.updated_at = TIMESTAMP(), n.version = row.version,
r.active = true, r.agent_running=true, r.updated_at = TIMESTAMP()`,
SET n+= row, n.active = true, n.updated_at = TIMESTAMP(), n.version = row.version, r.active = true,
r.agent_running=true, r.updated_at = TIMESTAMP()`+scheduleRefreshStatusQuery,
map[string]interface{}{
"param": nodeDetails,
"parent_node_id": parentNodeID,
Expand Down Expand Up @@ -560,7 +618,7 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err
if _, err = tx.Run(ctx, `
UNWIND $batch as cloudNode
MATCH (m:CloudNode{node_id: cloudNode})
SET m.refresh_status = '`+utils.ScanStatusStarting+`', m.refresh_message = ''`,
SET m.refresh_status = '`+utils.ScanStatusStarting+`', m.refresh_message = '', m.refresh_metadata = ''`,
map[string]interface{}{
"batch": c.NodeIDs,
}); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions deepfence_server/reporters/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func searchGenericDirectNodeReport[T reporters.Cypherable](ctx context.Context,
}

var (
searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "refresh_status", "refresh_message", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"}
searchCloudNodeFields = []string{"node_id", "node_name", "account_name", "refresh_status", "refresh_metadata", "refresh_message", "version", "compliance_percentage", "last_scan_id", "last_scan_status", "active"}
)

func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWindow) ([]model.CloudNodeAccountInfo, error) {
Expand Down Expand Up @@ -392,7 +392,7 @@ func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWin
}
WITH x, node_name, account_name, refresh_status, refresh_message, version, compliance_percentage, active ` +
reporters.ParseFieldFilters2CypherWhereConditions("", mo.Some(scanFilter), true) +
`RETURN x as node_id, node_name, account_name, refresh_status, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, '' as last_scan_id, '' as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
`RETURN x as node_id, node_name, account_name, refresh_status, '' as refresh_metadata, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, '' as last_scan_id, '' as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
reporters.OrderFilter2CypherCondition("", orderFilters, nil) + fw.FetchWindow2CypherQuery()
} else {
query = `
Expand All @@ -413,11 +413,11 @@ func searchCloudNode(ctx context.Context, filter SearchFilter, fw model.FetchWin
}
CALL {
WITH x MATCH (n:` + dummy.NodeType() + `{node_id: x})
RETURN n.node_name as node_name, n.account_name as account_name, n.refresh_status as refresh_status, n.refresh_message as refresh_message, n.active as active, n.version as version
RETURN n.node_name as node_name, n.account_name as account_name, n.refresh_status as refresh_status, n.refresh_metadata as refresh_metadata, n.refresh_message as refresh_message, n.active as active, n.version as version
}
WITH x, node_name, account_name, refresh_status, refresh_message, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` +
WITH x, node_name, account_name, refresh_status, refresh_metadata, refresh_message, version, compliance_percentage, last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` +
reporters.ParseFieldFilters2CypherWhereConditions("", mo.Some(scanFilter), true) +
`RETURN x as node_id, node_name, account_name, refresh_status, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
`RETURN x as node_id, node_name, account_name, refresh_status, refresh_metadata, refresh_message, COALESCE(version, 'unknown') as version, compliance_percentage, COALESCE(last_scan_id, '') as last_scan_id, COALESCE(last_scan_status, '') as last_scan_status, active ` + reporters.FieldFilterCypher("", filter.InFieldFilter) +
reporters.OrderFilter2CypherCondition("", orderFilters, nil) + fw.FetchWindow2CypherQuery()
}
log.Debug().Msgf("search cloud node query: %v", query)
Expand Down
18 changes: 10 additions & 8 deletions deepfence_utils/utils/ingesters/cloud_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,19 @@ func convertStructFieldToJSONString(bb map[string]interface{}, key string) map[s
}

type CloudResourceRefreshStatus struct {
CloudNodeID string `json:"cloud_node_id"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status"`
UpdatedAt int64 `json:"updated_at"`
CloudNodeID string `json:"cloud_node_id"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status"`
RefreshMetadata string `json:"refresh_metadata"`
UpdatedAt int64 `json:"updated_at"`
}

func (c *CloudResourceRefreshStatus) ToMap() map[string]interface{} {
return map[string]interface{}{
"cloud_node_id": c.CloudNodeID,
"refresh_message": c.RefreshMessage,
"refresh_status": c.RefreshStatus,
"updated_at": c.UpdatedAt,
"cloud_node_id": c.CloudNodeID,
"refresh_message": c.RefreshMessage,
"refresh_status": c.RefreshStatus,
"refresh_metadata": c.RefreshMetadata,
"updated_at": c.UpdatedAt,
}
}
Loading

0 comments on commit 39f3337

Please sign in to comment.