From 1580060f086bc6586483162114593ec8e59fc8e9 Mon Sep 17 00:00:00 2001 From: ramanan-ravi Date: Wed, 3 Jul 2024 22:45:54 +0530 Subject: [PATCH] Skip resource refresh from cloudtrail if full refresh is already scheduled --- .../cloud_resource_changes_aws/cloudtrail.go | 2 +- .../cloud_resource_changes_aws/util.go | 11 +- output/output.go | 8 +- query_resource/query.go | 23 ++- query_resource/query_service.go | 157 ++++++++++++++++++ service/query_service.go | 67 -------- service/service.go | 124 +++++--------- 7 files changed, 220 insertions(+), 172 deletions(-) create mode 100644 query_resource/query_service.go delete mode 100644 service/query_service.go diff --git a/cloud_resource_changes/cloud_resource_changes_aws/cloudtrail.go b/cloud_resource_changes/cloud_resource_changes_aws/cloudtrail.go index 41cd793..b5a7e32 100644 --- a/cloud_resource_changes/cloud_resource_changes_aws/cloudtrail.go +++ b/cloud_resource_changes/cloud_resource_changes_aws/cloudtrail.go @@ -55,7 +55,7 @@ func (c *CloudResourceChangesAWS) Initialize() error { return ErrNoCloudTrailsFound } c.cloudTrailTrails = trails - log.Info().Msgf("Following CloudTrail Trails are monitored for events every hour to update the cloud resources in the management console") + log.Info().Msgf("Following CloudTrail Trails are monitored for events every 30 minutes to update the cloud resources in the management console") for i, trail := range c.cloudTrailTrails { log.Info().Msgf("%d. %s (Region: %s)", i+1, trail.Arn, trail.Region) } diff --git a/cloud_resource_changes/cloud_resource_changes_aws/util.go b/cloud_resource_changes/cloud_resource_changes_aws/util.go index 16946b8..6da6cda 100644 --- a/cloud_resource_changes/cloud_resource_changes_aws/util.go +++ b/cloud_resource_changes/cloud_resource_changes_aws/util.go @@ -18,10 +18,14 @@ func GetSupportedAwsRegions() []string { func getCloudTrailTrails(config util.Config) []CloudTrailTrail { var query string + var isOrganizationTrail string + if config.IsOrganizationDeployment { + isOrganizationTrail = "and is_organization_trail = true" + } if len(config.CloudAuditLogsIDs) == 0 { - query = "steampipe query --output json \"select * from aws_" + config.AccountID + ".aws_cloudtrail_trail where is_organization_trail = true and is_multi_region_trail = true\"" + query = "steampipe query --output json \"select * from aws_" + config.AccountID + ".aws_cloudtrail_trail where is_multi_region_trail = true " + isOrganizationTrail + "\"" } else { - query = "steampipe query --output json \"select * from aws_all.aws_cloudtrail_trail where is_organization_trail = true and is_multi_region_trail = true and arn in ('" + strings.Join(config.CloudAuditLogsIDs, "', '") + "')\"" + query = "steampipe query --output json \"select * from aws_all.aws_cloudtrail_trail where is_multi_region_trail = true " + isOrganizationTrail + " and arn in ('" + strings.Join(config.CloudAuditLogsIDs, "', '") + "')\"" } cmd := exec.Command("bash", "-c", query) stdOut, stdErr := cmd.CombinedOutput() @@ -48,6 +52,9 @@ func getCloudTrailTrails(config util.Config) []CloudTrailTrail { selectedTrailList = append(selectedTrailList, trail) selectedARNs[trail.Arn] = true } + if len(selectedTrailList) == 0 { + log.Error().Msg("cloudtrail trail arn provided does not exist or is not a multi-region trail") + } return selectedTrailList } diff --git a/output/output.go b/output/output.go index 92e6602..4df249b 100644 --- a/output/output.go +++ b/output/output.go @@ -29,10 +29,10 @@ func WriteScanStatus(status, scanID, scanMessage string) { return } - log.Debug().Msgf("Writing status: %s", status) + log.Debug().Msgf("Writing scan status: %s", status) err = writeToFile(byteJSON, scanStatusFilename) if err != nil { - log.Error().Msgf("Error writing status data to %s, Error: %s", scanStatusFilename, err) + log.Error().Msgf("Error writing scan status data to %s, Error: %s", scanStatusFilename, err) return } } @@ -50,10 +50,10 @@ func WriteCloudResourceRefreshStatus(nodeID, refreshStatus, refreshMessage strin return } - log.Debug().Msgf("Writing status: %s, %s", refreshStatus, refreshMessage) + log.Debug().Msgf("Writing refresh status: %s, %s", refreshStatus, refreshMessage) err = writeToFile(byteJSON, cloudResourceRefreshStatusFilename) if err != nil { - log.Error().Msgf("Error writing status data to %s, Error: %s", cloudResourceRefreshStatusFilename, err) + log.Error().Msgf("Error writing refresh status data to %s, Error: %s", cloudResourceRefreshStatusFilename, err) return } } diff --git a/query_resource/query.go b/query_resource/query.go index 2bd9b43..1d12f91 100644 --- a/query_resource/query.go +++ b/query_resource/query.go @@ -12,7 +12,6 @@ import ( "github.com/deepfence/ThreatMapper/deepfence_utils/log" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" - "github.com/deepfence/cloud-scanner/output" "github.com/deepfence/cloud-scanner/util" _ "github.com/lib/pq" ) @@ -75,7 +74,7 @@ func clearPostgresqlCache() error { return nil } -func QueryAndRegisterResources(config util.Config, accountsToRefresh []util.AccountsToRefresh, completeRefresh bool) []error { +func (r *ResourceRefreshService) QueryAndRegisterResources(accountsToRefresh []util.AccountsToRefresh, completeRefresh bool) []error { if completeRefresh { err := clearPostgresqlCache() if err != nil { @@ -92,29 +91,29 @@ func QueryAndRegisterResources(config util.Config, accountsToRefresh []util.Acco defer cloudResourcesFile.Close() for _, account := range accountsToRefresh { - output.WriteCloudResourceRefreshStatus(account.NodeID, utils.ScanStatusStarting, "") + r.SetResourceRefreshStatus(account, utils.ScanStatusStarting) } count := 0 var errs = make([]error, 0) for _, account := range accountsToRefresh { log.Debug().Msgf("Started querying resources for %v", account) - output.WriteCloudResourceRefreshStatus(account.NodeID, utils.ScanStatusInProgress, "") + r.SetResourceRefreshStatus(account, utils.ScanStatusInProgress) - for _, cloudResourceInfo := range cloudProviderToResourceMap[config.CloudProvider] { + for _, cloudResourceInfo := range cloudProviderToResourceMap[r.config.CloudProvider] { // If ResourceTypes is empty, refresh all resource types. Otherwise, only specified ones if len(account.ResourceTypes) > 0 { if !util.InSlice(cloudResourceInfo.Table, account.ResourceTypes) { continue } - err = clearPostgresqlCacheRows(config.CloudProvider + "_" + account.AccountID + "." + cloudResourceInfo.Table) + err = clearPostgresqlCacheRows(r.config.CloudProvider + "_" + account.AccountID + "." + cloudResourceInfo.Table) if err != nil { errs = append(errs, err) continue } } - ingestedCount, err := queryResources(account.AccountID, cloudResourceInfo, config, cloudResourcesFile) + ingestedCount, err := r.queryResources(account.AccountID, cloudResourceInfo, cloudResourcesFile) if err != nil { errs = append(errs, err) } @@ -124,7 +123,7 @@ func QueryAndRegisterResources(config util.Config, accountsToRefresh []util.Acco } log.Debug().Msgf("Querying resources complete for %v", account) - output.WriteCloudResourceRefreshStatus(account.NodeID, utils.ScanStatusSuccess, "") + r.SetResourceRefreshStatus(account, utils.ScanStatusSuccess) } log.Info().Msgf("Cloud resources ingested: %d", count) return errs @@ -141,10 +140,10 @@ func clearPostgresqlCacheRows(keyPrefix string) error { return nil } -func queryResources(accountId string, cloudResourceInfo CloudResourceInfo, config util.Config, cloudResourcesFile *os.File) (int, error) { +func (r *ResourceRefreshService) queryResources(accountId string, cloudResourceInfo CloudResourceInfo, cloudResourcesFile *os.File) (int, error) { log.Debug().Msgf("Querying resources for %s", cloudResourceInfo.Table) - query := "steampipe query --output json \"select \\\"" + strings.Join(cloudResourceInfo.Columns[:], "\\\" , \\\"") + "\\\" from " + config.CloudProvider + "_" + strings.Replace(accountId, "-", "", -1) + "." + cloudResourceInfo.Table + " \"" + query := "steampipe query --output json \"select \\\"" + strings.Join(cloudResourceInfo.Columns[:], "\\\" , \\\"") + "\\\" from " + r.config.CloudProvider + "_" + strings.Replace(accountId, "-", "", -1) + "." + cloudResourceInfo.Table + " \"" var stdOut []byte var stdErr error for i := 0; i <= 3; i++ { @@ -176,8 +175,8 @@ func queryResources(accountId string, cloudResourceInfo CloudResourceInfo, confi var private_dns_name string for _, obj := range objMap { - obj["account_id"] = util.GetNodeID(config.CloudProvider, accountId) - obj["cloud_provider"] = config.CloudProvider + obj["account_id"] = util.GetNodeID(r.config.CloudProvider, accountId) + obj["cloud_provider"] = r.config.CloudProvider if _, ok := obj["title"]; ok { obj["name"] = fmt.Sprint(obj["title"]) delete(obj, "title") diff --git a/query_resource/query_service.go b/query_resource/query_service.go new file mode 100644 index 0000000..3155434 --- /dev/null +++ b/query_resource/query_service.go @@ -0,0 +1,157 @@ +package query_resource + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + "github.com/deepfence/cloud-scanner/cloud_resource_changes" + "github.com/deepfence/cloud-scanner/output" + "github.com/deepfence/cloud-scanner/util" +) + +type ResourceRefreshService struct { + config util.Config + resourceRefreshCount atomic.Int32 + resourceRefreshStatus sync.Map + CloudResourceChanges cloud_resource_changes.CloudResourceChanges + mutex sync.Mutex +} + +func NewResourceRefreshService(config util.Config) (*ResourceRefreshService, error) { + cloudResourceChanges, err := cloud_resource_changes.NewCloudResourceChanges(config) + if err != nil { + return nil, err + } + + return &ResourceRefreshService{ + config: config, + resourceRefreshCount: atomic.Int32{}, + resourceRefreshStatus: sync.Map{}, + CloudResourceChanges: cloudResourceChanges, + }, nil +} + +func (r *ResourceRefreshService) Initialize() { + log.Info().Msgf("CloudResourceChanges Initialization started") + err := r.CloudResourceChanges.Initialize() + if err != nil { + log.Warn().Msgf("%+v", err) + } + log.Info().Msgf("CloudResourceChanges Initialization completed") + + go r.refreshResourcesFromTrailPeriodically() +} + +func (r *ResourceRefreshService) Lock() { + r.resourceRefreshCount.Add(1) + log.Debug().Msgf("Resource refresh count: %d", r.resourceRefreshCount.Load()) + r.mutex.Lock() +} + +func (r *ResourceRefreshService) Unlock() { + r.resourceRefreshCount.Add(-1) + log.Debug().Msgf("Resource refresh count: %d", r.resourceRefreshCount.Load()) + r.mutex.Unlock() +} + +func (r *ResourceRefreshService) SetResourceRefreshStatus(account util.AccountsToRefresh, refreshStatus string) { + r.resourceRefreshStatus.Store(account.AccountID, refreshStatus) + output.WriteCloudResourceRefreshStatus(account.NodeID, refreshStatus, "") +} + +// SkipCloudAuditLogUpdate Weather to skip cloud audit log based resource updates +func (r *ResourceRefreshService) SkipCloudAuditLogUpdate(accountID string) bool { + var refreshStatus any + var ok bool + if refreshStatus, ok = r.resourceRefreshStatus.Load(accountID); !ok { + // Skip the resources update + return true + } + refreshStatusString := refreshStatus.(string) + if refreshStatusString == utils.ScanStatusSuccess || refreshStatusString == utils.ScanStatusFailed { + // Proceed with the resources update + return false + } + // Skip the resources update + return true +} + +func (r *ResourceRefreshService) refreshResourcesFromTrailPeriodically() { + refreshTicker := time.NewTicker(2 * time.Minute) // temporarily set to 2 min for testing + for { + select { + case <-refreshTicker.C: + go func() { + r.refreshResourcesFromTrail() + }() + } + } +} + +func (r *ResourceRefreshService) refreshResourcesFromTrail() { + log.Info().Msg("Started updating cloud resources") + cloudResourceTypesToRefresh, _ := r.CloudResourceChanges.GetResourceTypesToRefresh() + if len(cloudResourceTypesToRefresh) == 0 { + return + } + var accountsToRefresh []util.AccountsToRefresh + for accountID, resourceTypes := range cloudResourceTypesToRefresh { + if r.SkipCloudAuditLogUpdate(accountID) { + log.Debug().Msgf("Skipping resource refresh updation for account %s, account wide refresh already scheduled", accountID) + continue + } + + log.Debug().Msgf("Resource refresh updation for account %s, resource types: %v", accountID, resourceTypes) + accountsToRefresh = append(accountsToRefresh, util.AccountsToRefresh{ + AccountID: accountID, + NodeID: util.GetNodeID(r.config.CloudProvider, accountID), + ResourceTypes: resourceTypes, + }) + } + + r.FetchCloudAccountResources(accountsToRefresh, false) + log.Info().Msg("Updating cloud resources complete") +} + +func (r *ResourceRefreshService) GetRefreshCount() int32 { + return r.resourceRefreshCount.Load() +} + +// FetchCloudResources Fetch cloud resources from all accounts +func (r *ResourceRefreshService) FetchCloudResources(organizationAccounts []util.MonitoredAccount) { + log.Info().Msg("Querying cloud resources") + + var accountsToRefresh []util.AccountsToRefresh + if r.config.IsOrganizationDeployment { + for _, monitoredAccount := range organizationAccounts { + accountsToRefresh = append(accountsToRefresh, util.AccountsToRefresh{ + AccountID: monitoredAccount.AccountID, + NodeID: monitoredAccount.NodeID, + }) + } + } else { + accountsToRefresh = []util.AccountsToRefresh{ + { + AccountID: r.config.AccountID, + NodeID: r.config.NodeID, + }, + } + } + r.FetchCloudAccountResources(accountsToRefresh, true) + log.Info().Msg("Querying cloud resources complete") +} + +// FetchCloudAccountResources Fetch cloud resources from selected accounts and resource types +func (r *ResourceRefreshService) FetchCloudAccountResources(accountsToRefresh []util.AccountsToRefresh, completeRefresh bool) { + // Only one cloud account's resources are refreshed at a time + r.Lock() + defer r.Unlock() + + errorsCollected := r.QueryAndRegisterResources(accountsToRefresh, completeRefresh) + if len(errorsCollected) > 0 { + log.Error().Msgf("Error in sending resources, errors: %+v", errorsCollected) + } +} diff --git a/service/query_service.go b/service/query_service.go deleted file mode 100644 index abd8d6a..0000000 --- a/service/query_service.go +++ /dev/null @@ -1,67 +0,0 @@ -package service - -import ( - "sync" - "sync/atomic" - - "github.com/deepfence/ThreatMapper/deepfence_utils/log" - "github.com/deepfence/cloud-scanner/query_resource" - "github.com/deepfence/cloud-scanner/util" -) - -type ResourceRefreshLock struct { - resourceRefreshCount atomic.Int32 - mutex sync.Mutex -} - -func NewResourceRefreshLock() *ResourceRefreshLock { - return &ResourceRefreshLock{} -} - -func (r *ResourceRefreshLock) Lock() { - r.resourceRefreshCount.Add(1) - log.Debug().Msgf("Resource refresh count: %d", r.resourceRefreshCount.Load()) - r.mutex.Lock() -} - -func (r *ResourceRefreshLock) Unlock() { - r.resourceRefreshCount.Add(-1) - log.Debug().Msgf("Resource refresh count: %d", r.resourceRefreshCount.Load()) - r.mutex.Unlock() -} - -func (r *ResourceRefreshLock) GetRefreshCount() int32 { - return r.resourceRefreshCount.Load() -} - -// FetchCloudResources Fetch cloud resources from all accounts -func (c *ComplianceScanService) FetchCloudResources() { - log.Info().Msg("Querying cloud resources") - - var accountsToRefresh []util.AccountsToRefresh - if c.config.IsOrganizationDeployment { - for _, monitoredAccount := range c.GetOrganizationAccounts() { - accountsToRefresh = append(accountsToRefresh, util.AccountsToRefresh{ - AccountID: monitoredAccount.AccountID, - NodeID: monitoredAccount.NodeID, - }) - } - } else { - accountsToRefresh = []util.AccountsToRefresh{ - { - AccountID: c.config.AccountID, - NodeID: c.config.NodeID, - }, - } - } - c.FetchCloudAccountResources(accountsToRefresh, true) - log.Info().Msg("Querying cloud resources complete") -} - -// FetchCloudAccountResources Fetch cloud resources from selected accounts and resource types -func (c *ComplianceScanService) FetchCloudAccountResources(accountsToRefresh []util.AccountsToRefresh, completeRefresh bool) { - errorsCollected := query_resource.QueryAndRegisterResources(c.config, accountsToRefresh, completeRefresh) - if len(errorsCollected) > 0 { - log.Error().Msgf("Error in sending resources, errors: %+v", errorsCollected) - } -} diff --git a/service/service.go b/service/service.go index 90fda7e..dbb9e06 100644 --- a/service/service.go +++ b/service/service.go @@ -25,8 +25,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sts" ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls" "github.com/deepfence/ThreatMapper/deepfence_utils/log" - "github.com/deepfence/cloud-scanner/cloud_resource_changes" "github.com/deepfence/cloud-scanner/internal/deepfence" + "github.com/deepfence/cloud-scanner/query_resource" "github.com/deepfence/cloud-scanner/scanner" "github.com/deepfence/cloud-scanner/util" "google.golang.org/api/cloudresourcemanager/v1" @@ -53,8 +53,7 @@ type ComplianceScanService struct { SocketPath *string organizationAccountIDs []util.MonitoredAccount organizationAccountIDsLock sync.RWMutex - CloudResourceChanges cloud_resource_changes.CloudResourceChanges - ResourceRefreshLock *ResourceRefreshLock + ResourceRefreshService *query_resource.ResourceRefreshService } func NewComplianceScanService(config util.Config, socketPath *string) (*ComplianceScanService, error) { @@ -69,7 +68,7 @@ func NewComplianceScanService(config util.Config, socketPath *string) (*Complian log.Error().Msgf("deepfence.NewClient(config) error: %s", err.Error()) return nil, err } - cloudResourceChanges, err := cloud_resource_changes.NewCloudResourceChanges(config) + resourceRefreshService, err := query_resource.NewResourceRefreshService(config) if err != nil { return nil, err } @@ -84,8 +83,7 @@ func NewComplianceScanService(config util.Config, socketPath *string) (*Complian CloudTrails: make([]util.CloudTrailDetails, 0), SocketPath: socketPath, organizationAccountIDs: []util.MonitoredAccount{}, - CloudResourceChanges: cloudResourceChanges, - ResourceRefreshLock: NewResourceRefreshLock(), + ResourceRefreshService: resourceRefreshService, }, err } @@ -349,17 +347,35 @@ func (c *ComplianceScanService) RunRegisterServices() error { processAwsCredentials(c) case util.CloudProviderGCP: if c.config.IsOrganizationDeployment { - _, err = c.fetchGCPOrganizationProjects() - if err != nil { - log.Warn().Msg(err.Error()) - } - } else { - projects, err := c.fetchGCPProjects() - if err != nil { - log.Warn().Msg(err.Error()) - } else { - if len(projects) == 1 { - c.config.AccountName = projects[0].AccountName + projects, err := c.fetchGCPOrganizationProjects() + if err != nil || len(projects) == 0 { + if err != nil { + log.Error().Msg(err.Error()) + } + + fetchGCPOrganizationAccounts := func() error { + var fetchErr error + refreshTicker := time.NewTicker(2 * time.Minute) + defer refreshTicker.Stop() + stopTicker := time.NewTicker(10 * time.Minute) + defer stopTicker.Stop() + for { + select { + case <-refreshTicker.C: + projects, fetchErr = c.fetchGCPOrganizationProjects() + if fetchErr != nil { + log.Error().Msg(fetchErr.Error()) + } else if len(projects) > 0 { + return nil + } + case <-stopTicker.C: + return fetchErr + } + } + } + err = fetchGCPOrganizationAccounts() + if err != nil { + return err } } } @@ -377,13 +393,6 @@ func (c *ComplianceScanService) RunRegisterServices() error { log.Info().Msgf("Restarting the steampipe service") util.RestartSteampipeService() - log.Info().Msgf("CloudResourceChanges Initialization started") - err = c.CloudResourceChanges.Initialize() - if err != nil { - log.Warn().Msgf("%+v", err) - } - log.Info().Msgf("CloudResourceChanges Initialization completed") - // Registration should be done first before starting other services err = c.dfClient.RegisterCloudAccount(c.GetOrganizationAccounts()) if err != nil { @@ -422,8 +431,7 @@ func (c *ComplianceScanService) RunRegisterServices() error { go c.refreshOrganizationAccountIDs() } - //go c.queryAndRegisterCloudResourcesPeriodically() - go c.refreshResourcesFromTrailPeriodically() + c.ResourceRefreshService.Initialize() go c.listenForScans() done := make(chan os.Signal, 1) @@ -567,12 +575,6 @@ func (c *ComplianceScanService) refreshOrganizationAccountIDs() { log.Info().Msgf("Restarting the steampipe service") util.RestartSteampipeService() - - go func() { - c.ResourceRefreshLock.Lock() - defer c.ResourceRefreshLock.Unlock() - c.FetchCloudAccountResources(newAccounts, false) - }() } } } @@ -592,54 +594,6 @@ func (c *ComplianceScanService) loopRegister() { } } -//func (c *ComplianceScanService) queryAndRegisterCloudResourcesPeriodically() { -// refreshTicker := time.NewTicker(12 * time.Hour) -// for { -// go func() { -// c.ResourceRefreshLock.Lock() -// defer c.ResourceRefreshLock.Unlock() -// c.FetchCloudResources() -// }() -// -// <-refreshTicker.C -// } -//} - -func (c *ComplianceScanService) refreshResourcesFromTrailPeriodically() { - refreshTicker := time.NewTicker(1 * time.Hour) - for { - select { - case <-refreshTicker.C: - go func() { - c.refreshResourcesFromTrail() - }() - } - } -} - -func (c *ComplianceScanService) refreshResourcesFromTrail() { - log.Info().Msg("Started updating cloud resources") - cloudResourceTypesToRefresh, _ := c.CloudResourceChanges.GetResourceTypesToRefresh() - if len(cloudResourceTypesToRefresh) == 0 { - return - } - accountsToRefresh := make([]util.AccountsToRefresh, len(cloudResourceTypesToRefresh)) - index := 0 - for accountID, resourceTypes := range cloudResourceTypesToRefresh { - accountsToRefresh[index] = util.AccountsToRefresh{ - AccountID: accountID, - NodeID: util.GetNodeID(c.config.CloudProvider, accountID), - ResourceTypes: resourceTypes, - } - index += 1 - } - - c.ResourceRefreshLock.Lock() - defer c.ResourceRefreshLock.Unlock() - c.FetchCloudAccountResources(accountsToRefresh, false) - log.Info().Msg("Updating cloud resources complete") -} - func (c *ComplianceScanService) executeScans(scan ctl.CloudComplianceScanDetails) interface{} { log.Debug().Msgf("s.RemainingScansMap: %+v", &c.RemainingScansMap) log.Info().Msgf("executeScans called for: %s", scan) @@ -704,9 +658,9 @@ type ScanDetails struct { } func (c *ComplianceScanService) handleRequest(conn net.Conn) { - log.Debug().Msg("New client connected.") + log.Trace().Msg("New client connected.") defer func() { - log.Debug().Msgf("Connection closed") + log.Trace().Msgf("Connection closed") conn.Close() }() @@ -788,9 +742,7 @@ func (c *ComplianceScanService) handleRequest(conn net.Conn) { } log.Info().Msgf("Refreshing resources for account: %s", args.AccountID) go func() { - c.ResourceRefreshLock.Lock() - defer c.ResourceRefreshLock.Unlock() - c.FetchCloudAccountResources([]util.AccountsToRefresh{ + c.ResourceRefreshService.FetchCloudAccountResources([]util.AccountsToRefresh{ { AccountID: args.AccountID, NodeID: args.NodeId, @@ -806,7 +758,7 @@ func (c *ComplianceScanService) handleRequest(conn net.Conn) { log.Error().Msgf("Error writing job count to unix connection: %+v", err) } case ctl.CloudScannerResourceRefreshCount: - count := int(c.ResourceRefreshLock.GetRefreshCount()) + count := int(c.ResourceRefreshService.GetRefreshCount()) log.Debug().Msgf("Cloud scanner refresh count: %d", count) data := strconv.Itoa(count) _, err = conn.Write([]byte(data))