Skip to content

Commit

Permalink
Cloud resources refresh status
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanan-ravi committed Jun 24, 2024
1 parent aed8456 commit 791a6b7
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 96 deletions.
6 changes: 3 additions & 3 deletions internal/deepfence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ func NewClient(config util.Config) (*Client, error) {
}

func (c *Client) RegisterCloudAccount(monitoredOrganizationAccounts []util.MonitoredAccount) error {
nodeId := util.GetNodeId(c.config.CloudProvider, c.config.AccountID)
nodeId := util.GetNodeID(c.config.CloudProvider, c.config.AccountID)

req := c.client.Client().CloudNodesAPI.RegisterCloudNodeAccount(context.Background())
if c.config.IsOrganizationDeployment {
monitoredAccounts := make([]client.ModelCloudNodeMonitoredAccount, len(monitoredOrganizationAccounts))
for i := range monitoredOrganizationAccounts {
monitoredAccounts[i] = client.ModelCloudNodeMonitoredAccount{
AccountId: monitoredOrganizationAccounts[i].AccountId,
AccountId: monitoredOrganizationAccounts[i].AccountID,
AccountName: &monitoredOrganizationAccounts[i].AccountName,
NodeId: monitoredOrganizationAccounts[i].NodeId,
NodeId: monitoredOrganizationAccounts[i].NodeID,
}
}

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func main() {
config.AWSCredentialSource = ""
}

config.NodeID = util.GetNodeId(config.CloudProvider, config.AccountID)
config.NodeID = util.GetNodeID(config.CloudProvider, config.AccountID)
config.Version = Version

configJson, err := json.MarshalIndent(config, "", "\t")
Expand Down
25 changes: 23 additions & 2 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

var (
scanStatusFilename = os.Getenv("DF_INSTALL_DIR") + "/var/log/fenced/cloud-scanner-log/cloud_scanner_status.log"
ScanFilename = os.Getenv("DF_INSTALL_DIR") + "/var/log/fenced/cloud-scanner/cloud_scanner.log"
scanStatusFilename = os.Getenv("DF_INSTALL_DIR") + "/var/log/fenced/cloud-scanner-log/cloud_scanner_status.log"
cloudResourceRefreshStatusFilename = os.Getenv("DF_INSTALL_DIR") + "/var/log/fenced/cloud-resource-refresh-log/cloud_resource_refresh_status.log"
ScanFilename = os.Getenv("DF_INSTALL_DIR") + "/var/log/fenced/cloud-scanner/cloud_scanner.log"
)

func WriteScanStatus(status, scanID, scanMessage string) {
Expand All @@ -35,6 +36,26 @@ func WriteScanStatus(status, scanID, scanMessage string) {
}
}

func WriteCloudResourceRefreshStatus(nodeID, refreshStatus, refreshMessage string) {
var scanLogDoc = make(map[string]interface{})
scanLogDoc["cloud_node_id"] = nodeID
scanLogDoc["refresh_status"] = refreshStatus
scanLogDoc["refresh_message"] = refreshMessage

byteJSON, err := json.Marshal(scanLogDoc)
if err != nil {
log.Error().Msgf("Error marshalling json for status: %s", err)
return
}

log.Debug().Msgf("Writing status: %s, %s", refreshStatus, refreshMessage)
err = writeToFile(byteJSON, cloudResourceRefreshStatusFilename)
if err != nil {
log.Error().Msgf("Error writing status data to %s, Error: %s", cloudResourceRefreshStatusFilename, err)
return
}
}

func writeToFile(data []byte, fileName string) error {
jsonString := string(data)
if err := os.MkdirAll(filepath.Dir(fileName), 0755); err != nil {
Expand Down
88 changes: 37 additions & 51 deletions query_resource/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
"github.com/deepfence/cloud-scanner/output"
"github.com/deepfence/cloud-scanner/util"
_ "github.com/lib/pq"
)
Expand Down Expand Up @@ -73,37 +75,56 @@ func clearPostgresqlCache() error {
return nil
}

func QueryAndRegisterResources(config util.Config, organizationAccountIDs []string) []error {
err := clearPostgresqlCache()
if err != nil {
log.Warn().Msgf("failed to clear postgresql cache: " + err.Error())
}
log.Info().Msg("QueryAndRegisterResources after clearPostgresqlCache")
var accountsToScan []string
if len(organizationAccountIDs) > 0 {
accountsToScan = organizationAccountIDs
}
if !util.InSlice(config.CloudMetadata.ID, accountsToScan) {
accountsToScan = append(accountsToScan, config.CloudMetadata.ID)
func QueryAndRegisterResources(config util.Config, accountsToRefresh []util.AccountsToRefresh, completeRefresh bool) []error {
if completeRefresh {
err := clearPostgresqlCache()
if err != nil {
log.Warn().Msgf("failed to clear postgresql cache: " + err.Error())
}
}
log.Info().Msgf("Started querying resources for %s: %v", config.CloudProvider, accountsToScan)

log.Debug().Msgf("Started querying resources for %v", accountsToRefresh)

cloudResourcesFile, err := os.OpenFile(CloudResourcesFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return []error{err}
}
defer cloudResourcesFile.Close()

for _, account := range accountsToRefresh {
output.WriteCloudResourceRefreshStatus(account.NodeID, utils.ScanStatusStarting, "")
}

count := 0
var errs = make([]error, 0)
for _, accountId := range accountsToScan {
for _, account := range accountsToRefresh {
log.Debug().Msgf("Started querying resources for %v", account)
output.WriteCloudResourceRefreshStatus(account.NodeID, utils.ScanStatusInProgress, "")

for _, cloudResourceInfo := range cloudProviderToResourceMap[config.CloudProvider] {
ingestedCount, err := queryResources(accountId, cloudResourceInfo, config, cloudResourcesFile)
// If ResourceTypes is empty, refresh all resource types. Otherwise, only specified ones
if len(account.ResourceTypes) > 0 {
if !util.InSlice(cloudResourceInfo.Table, account.ResourceTypes) {
continue
}
err = clearPostgresqlCacheRows(config.CloudProvider + "_" + account.AccountID + "." + cloudResourceInfo.Table)
if err != nil {
errs = append(errs, err)
continue
}
}

ingestedCount, err := queryResources(account.AccountID, cloudResourceInfo, config, cloudResourcesFile)
if err != nil {
errs = append(errs, err)
}

log.Debug().Msgf("Cloud resources ingested in account %s, resource type %s: %d", account.AccountID, cloudResourceInfo.Table, ingestedCount)
count += ingestedCount
}

log.Debug().Msgf("Querying resources complete for %v", account)
output.WriteCloudResourceRefreshStatus(account.NodeID, utils.ScanStatusSuccess, "")
}
log.Info().Msgf("Cloud resources ingested: %d", count)
return errs
Expand All @@ -120,41 +141,6 @@ func clearPostgresqlCacheRows(keyPrefix string) error {
return nil
}

func QueryAndUpdateResources(config util.Config, cloudResourceTypesToRefresh map[string][]string) []error {
log.Info().Msgf("Started querying updated resources for %s", config.CloudProvider)

cloudResourcesFile, err := os.OpenFile(CloudResourcesFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return []error{err}
}
defer cloudResourcesFile.Close()

count := 0
var errs = make([]error, 0)
for accountID, resourceTypesToRefresh := range cloudResourceTypesToRefresh {
accountIDPrefix := config.CloudProvider + "_" + accountID + "."

for _, cloudResourceInfo := range cloudProviderToResourceMap[config.CloudProvider] {
if !util.InSlice(cloudResourceInfo.Table, resourceTypesToRefresh) {
continue
}
err = clearPostgresqlCacheRows(accountIDPrefix + cloudResourceInfo.Table)
if err != nil {
errs = append(errs, err)
continue
}
ingestedCount, err := queryResources(accountID, cloudResourceInfo, config, cloudResourcesFile)
if err != nil {
errs = append(errs, err)
}
count += ingestedCount
}
}

log.Info().Msgf("Cloud resources updated and ingested: %d", count)
return errs
}

func queryResources(accountId string, cloudResourceInfo CloudResourceInfo, config util.Config, cloudResourcesFile *os.File) (int, error) {
log.Debug().Msgf("Querying resources for %s", cloudResourceInfo.Table)

Expand Down Expand Up @@ -190,7 +176,7 @@ func queryResources(accountId string, cloudResourceInfo CloudResourceInfo, confi

var private_dns_name string
for _, obj := range objMap {
obj["account_id"] = util.GetNodeId(config.CloudProvider, accountId)
obj["account_id"] = util.GetNodeID(config.CloudProvider, accountId)
obj["cloud_provider"] = config.CloudProvider
if _, ok := obj["title"]; ok {
obj["name"] = fmt.Sprint(obj["title"])
Expand Down
2 changes: 1 addition & 1 deletion scanner/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *CloudComplianceScan) parseControlResult(complianceDocs *[]util.Complian
}

nodeName := fmt.Sprintf("%s/%s", c.CloudProvider, accountId)
nodeId := util.GetNodeId(c.CloudProvider, accountId)
nodeId := util.GetNodeID(c.CloudProvider, accountId)

complianceDoc := util.ComplianceDoc{
Timestamp: util.GetDatetimeNow(),
Expand Down
2 changes: 1 addition & 1 deletion scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *CloudComplianceScan) ScanControl(scan *ctl.CloudComplianceScanDetails)
}

log.Info().Msgf("compliance scan started: %s", scan.ScanId)
extrasForInProgress["node_id"] = util.GetNodeId(c.CloudProvider, scan.AccountId)
extrasForInProgress["node_id"] = util.GetNodeID(c.CloudProvider, scan.AccountId)

stopped := false
wg := sync.WaitGroup{}
Expand Down
39 changes: 39 additions & 0 deletions service/query_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package service

import (
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
"github.com/deepfence/cloud-scanner/query_resource"
"github.com/deepfence/cloud-scanner/util"
)

// FetchCloudResources Fetch cloud resources from all accounts
func (c *ComplianceScanService) FetchCloudResources() {
log.Info().Msg("Querying cloud resources")

var accountsToRefresh []util.AccountsToRefresh
if c.config.IsOrganizationDeployment {
for _, monitoredAccount := range c.GetOrganizationAccounts() {
accountsToRefresh = append(accountsToRefresh, util.AccountsToRefresh{
AccountID: monitoredAccount.AccountID,
NodeID: monitoredAccount.NodeID,
})
}
} else {
accountsToRefresh = []util.AccountsToRefresh{
{
AccountID: c.config.AccountID,
NodeID: c.config.NodeID,
},
}
}
c.FetchCloudAccountResources(accountsToRefresh, true)
log.Info().Msg("Querying cloud resources complete")
}

// FetchCloudAccountResources Fetch cloud resources from selected accounts and resource types
func (c *ComplianceScanService) FetchCloudAccountResources(accountsToRefresh []util.AccountsToRefresh, completeRefresh bool) {
errorsCollected := query_resource.QueryAndRegisterResources(c.config, accountsToRefresh, completeRefresh)
if len(errorsCollected) > 0 {
log.Error().Msgf("Error in sending resources, errors: %+v", errorsCollected)
}
}
Loading

0 comments on commit 791a6b7

Please sign in to comment.