diff --git a/pkg/providers/aws/aws.go b/pkg/providers/aws/aws.go index b4678ff..bd0ba05 100644 --- a/pkg/providers/aws/aws.go +++ b/pkg/providers/aws/aws.go @@ -9,6 +9,7 @@ import ( "net/url" "time" + "github.com/patrickmn/go-cache" "github.com/re-cinq/cloud-carbon/pkg/config" "github.com/aws/aws-sdk-go-v2/aws" @@ -22,11 +23,16 @@ var ( ) // Client contains the AWS config and service clients -// and is used to call the API +// and is used to access the API type Client struct { - cfg *aws.Config + // AWS specific config for auth and client creation + cfg *aws.Config + + // service APIs ec2Client *ec2Client cloudWatchClient *cloudWatchClient + + cache *cache.Cache } // NewClient creates a struct with the AWS config, EC2 Client, and CloudWatch Client @@ -59,6 +65,8 @@ func NewClient(ctx context.Context, currentConfig *config.Account, customTranspo cfg: &cfg, ec2Client: ec2Client, cloudWatchClient: cloudWatchClient, + // TODO: configure expiry and deletion + cache: cache.New(12*time.Hour, 36*time.Minute), }, nil } diff --git a/pkg/providers/aws/cache.go b/pkg/providers/aws/cache.go deleted file mode 100644 index 4c62cf9..0000000 --- a/pkg/providers/aws/cache.go +++ /dev/null @@ -1,181 +0,0 @@ -// Caches in memory the resources present in a specific AWS region -package amazon - -import ( - "sync" - "time" -) - -// AWS Region -type awsRegion = string - -// The name of the AWS service -type awsService = string - -// Resource id -type awsResourceID = string - -// The list of resources for a specific service and region -type awsResources = map[awsResourceID]awsResource - -// The AWS Resource representation -type awsResource struct { - // The AWS resource id - id awsResourceID - - // The region where the resource is located - region awsRegion - - // The service the resource belongs to - service awsService - - // For example spot, reserved - lifecycle string - - // Amount of cores - coreCount int - - // The instance kind for example - kind string - - // The name - name string - - // When was the last time it was updated - lastUpdated time.Time -} - -// / Helper which creates a new awsResource -func newAWSResource(region awsRegion, service awsService, id awsResourceID, - kind, lifecycle, name string, coreCount int) *awsResource { - return &awsResource{ - id: id, - service: service, - region: region, - lifecycle: lifecycle, - coreCount: coreCount, - kind: kind, - name: name, - lastUpdated: time.Now().UTC(), - } -} - -// The cache for the specific service -type serviceCache = map[awsService]awsResources - -// We do not need to expose the cache outside of this modules -// since its main purpose is to optimize the AWS API queries -// and reduce them overall -type awsCache struct { - // Sync for concurrent mutations of the cache - lock sync.RWMutex - - // cache itseld - entries map[awsRegion]serviceCache -} - -// Initialize the empty cache -func newAWSCache() *awsCache { - return &awsCache{ - lock: sync.RWMutex{}, - entries: make(map[awsRegion]map[awsService]map[awsResourceID]awsResource), - } -} - -// Add an entry to the cache -func (cache *awsCache) Add(entry *awsResource) { - // Lock the map cause we are writing - cache.lock.Lock() - - // Add it - serviceCache, exists := cache.entries[entry.region] - // If the region does not exist, init the cache for that region - if !exists { - // Init the cache - serviceCache = make(map[awsService]map[awsResourceID]awsResource) - cache.entries[entry.region] = serviceCache - } - - serviceEntries, exists := serviceCache[entry.service] - // if the entries for the service are missing, initialize them - if !exists { - serviceEntries = make(map[awsResourceID]awsResource) - serviceCache[entry.service] = serviceEntries - } - - // Set the entry - serviceEntries[entry.id] = *entry - - // Unlock it - cache.lock.Unlock() -} - -// Deletes an entry from the cache, in case it was removed from AWS -func (cache *awsCache) Delete(region awsRegion, service awsService, id awsResourceID) { - // Lock the map cause we are writing - cache.lock.Lock() - - // Add it - serviceCache, exists := cache.entries[region] - // If the region does not exist, init the cache for that region - if !exists { - // well...nothing to do here - return - } - - serviceEntries, exists := serviceCache[service] - // if the entries for the service are missing, Initialize them - if !exists { - // well...nothing to do here - return - } - - // Set the entry - delete(serviceEntries, id) - - // Now check if the serviceEntries is empty so that we can remove the whole cache - if len(serviceEntries) == 0 { - delete(serviceCache, service) - } - - // Now check if the serviceCache is empty so that we can remove the whole cache - if len(serviceCache) == 0 { - delete(cache.entries, region) - } - - // Unlock it - cache.lock.Unlock() -} - -// Check if an entry exists -func (cache *awsCache) Exists(region awsRegion, service awsService, id awsResourceID) bool { - // Check if the entry exists - return cache.Get(region, service, id) != nil -} - -// Get a specific resource -func (cache *awsCache) Get(region awsRegion, service awsService, id awsResourceID) *awsResource { - // Read lock the cache - cache.lock.RLock() - defer cache.lock.RUnlock() - - // Get the region specific cache - regionCache, exists := cache.entries[region] - if !exists { - return nil - } - - // Get the resources in the specific region - resources, exists := regionCache[service] - if !exists { - return nil - } - - // Check if the resource is present - entry, exists := resources[id] - if !exists { - return nil - } - - return &entry -} diff --git a/pkg/providers/aws/cache_test.go b/pkg/providers/aws/cache_test.go deleted file mode 100644 index 0baaa25..0000000 --- a/pkg/providers/aws/cache_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package amazon - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAwsCache(t *testing.T) { - region := "my-region" - service := "my-service" - id := "my-id" - - resource := newAWSResource(region, service, id, "t2.micro", "spot", "test-instance", 2) - assert.Equal(t, region, resource.region) - assert.Equal(t, service, resource.service) - assert.Equal(t, id, resource.id) - - // Init the cache - cache := newAWSCache() - - // Check the exists fails - assert.False(t, cache.Exists(region, service, id)) - - // Add a new resource - cache.Add(resource) - - // Check the exists succeeds - assert.True(t, cache.Exists(region, service, id)) - - // Delete the resource - cache.Delete(region, service, id) - - // Check the exists fails - assert.False(t, cache.Exists(region, service, id)) - - // Make sure we basically have an empty map - _, exists := cache.entries[region] - assert.False(t, exists) -} diff --git a/pkg/providers/aws/cloudwatch.go b/pkg/providers/aws/cloudwatch.go index d441dd4..f46a11a 100644 --- a/pkg/providers/aws/cloudwatch.go +++ b/pkg/providers/aws/cloudwatch.go @@ -9,6 +9,8 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/patrickmn/go-cache" + "github.com/re-cinq/cloud-carbon/pkg/providers/util" v1 "github.com/re-cinq/cloud-carbon/pkg/types/v1" "k8s.io/klog/v2" ) @@ -38,54 +40,66 @@ func NewCloudWatchClient(cfg *aws.Config) *cloudWatchClient { } // Get the resource consumption of an ec2 instance -func (e *cloudWatchClient) GetEC2Metrics(region awsRegion, interval time.Duration, cache *awsCache) (map[string]v1.Instance, error) { - serviceMetrics := make(map[string]v1.Instance) +func (e *cloudWatchClient) GetEC2Metrics(ca *cache.Cache, region string, interval time.Duration) ([]v1.Instance, error) { + instances := []v1.Instance{} + local := make(map[string]*v1.Instance) end := time.Now().UTC() start := end.Add(-interval) // Get the cpu consumption for all the instances in the region cpuMetrics, err := e.getEC2CPU(region, start, end, interval) - if err != nil || len(cpuMetrics) < 1 { - return serviceMetrics, err + if err != nil { + return instances, err + } + + if len(cpuMetrics) == 0 { + return instances, fmt.Errorf("no cpu metrics collected from CloudWatch") } - for _, cpuMetric := range cpuMetrics { - // load the instance metadata from the cache, because the query does not give us - instanceMetadata := cache.Get(region, ec2Service, cpuMetric.instanceID) - if instanceMetadata == nil { - klog.Warningf("instance id %s is not present in the metadata, temporarily skipping collecting metrics", cpuMetric.instanceID) + + // TODO: Will need to iterate cpuMetrics and memMetrics + for i := range cpuMetrics { + // to avoid Implicit memory aliasing in for loop + metric := cpuMetrics[i] + + instanceID, ok := metric.Labels().Get("instanceID") + if !ok { + klog.Errorf("error metric doesn't have an instanceID: %+v", metric) + continue + } + + // load the instance metadata from the cache, because the query does not give us instance info + cachedInstance, exists := ca.Get(util.CacheKey(region, ec2Service, instanceID)) + if cachedInstance == nil || !exists { + klog.Warningf("instance id %s is not present in the metadata, temporarily skipping collecting metrics", instanceID) continue } - // if we got here it means that we do have the instance metadata - instanceService, exists := serviceMetrics[cpuMetric.instanceID] + meta := cachedInstance.(*resource) + + // update local instance metadata map + s, exists := local[instanceID] if !exists { - // Then create a new one - s := v1.NewInstance(cpuMetric.instanceID, provider) + // Then create a new local instance from cached + s = v1.NewInstance(instanceID, provider) s.SetService("EC2") - s.SetKind(instanceMetadata.kind).SetRegion(region) - s.AddLabel("Name", instanceMetadata.name) - serviceMetrics[cpuMetric.instanceID] = *s - - // Makes it easier to use it - instanceService = serviceMetrics[cpuMetric.instanceID] - instanceService.SetKind(instanceMetadata.kind).SetRegion(region) + s.SetKind(meta.kind) + s.SetRegion(region) } - // Build the resource - cpu := v1.NewMetric(v1.CPU.String()).SetResourceUnit(cpuMetric.unit).SetUnitAmount(float64(instanceMetadata.coreCount)) - cpu.SetUsage(cpuMetric.value).SetType(cpuMetric.kind) + s.AddLabel("Name", meta.name) + metric.SetUnitAmount(float64(meta.coreCount)) + s.Metrics().Upsert(&metric) - // Update the CPU information now - instanceService.Metrics().Upsert(cpu) + local[instanceID] = s + instances = append(instances, *s) } - // Return the collected metrics - return serviceMetrics, nil + return instances, nil } // Get the CPU resource consumption of an ec2 instance -func (e *cloudWatchClient) getEC2CPU(region awsRegion, start, end time.Time, interval time.Duration) ([]awsMetric, error) { +func (e *cloudWatchClient) getEC2CPU(region string, start, end time.Time, interval time.Duration) ([]v1.Metric, error) { // Override the region withRegion := func(o *cloudwatch.Options) { o.Region = region @@ -114,24 +128,22 @@ func (e *cloudWatchClient) getEC2CPU(region awsRegion, start, end time.Time, int } // Collector - var cpuMetrics []awsMetric + var cpuMetrics []v1.Metric // Loop through the result and build the intermediate awsMetric model for _, metric := range output.MetricDataResults { + instanceID := aws.ToString(metric.Label) + if instanceID == "Other" { + return nil, errors.New("error bad query passed to GetMetricData - instanceID not found in label") + } + if len(metric.Values) > 0 { - cpuMetric := awsMetric{ - value: metric.Values[0], - instanceID: aws.ToString(metric.Label), - kind: v1.CPU, - unit: v1.Core, - name: v1.CPU.String(), - } - - if cpuMetric.instanceID == "Other" { - return nil, errors.New("error bad query passed to GetMetricData - instanceID not found in label") - } - - cpuMetrics = append(cpuMetrics, cpuMetric) + cpu := v1.NewMetric(v1.CPU.String()) + cpu.SetResourceUnit(v1.Core).SetUsage(metric.Values[0]).SetType(v1.CPU) + cpu.SetLabels(map[string]string{ + "instanceID": instanceID, + }) + cpuMetrics = append(cpuMetrics, *cpu) } } diff --git a/pkg/providers/aws/cloudwatch_test.go b/pkg/providers/aws/cloudwatch_test.go index 14c5b2b..fa46d33 100644 --- a/pkg/providers/aws/cloudwatch_test.go +++ b/pkg/providers/aws/cloudwatch_test.go @@ -41,7 +41,7 @@ func TestGetMetricsData(t *testing.T) { { Id: aws.String("testID"), Label: aws.String("i-00123456789"), - Values: []float64{1.00}, + Values: []float64{.0000123}, }, }, }, @@ -50,17 +50,25 @@ func TestGetMetricsData(t *testing.T) { res, err := client.getEC2CPU(region, start, end, interval) testtools.ExitTest(stubber, t) - expRes := []awsMetric{ - { - name: "cpu", - kind: "cpu", - unit: "core", - value: 1, - instanceID: "i-00123456789", - }, - } + expRes := v1.NewMetric("cpu").SetUsage(float64(.0000123)).SetResourceUnit("core").SetType(v1.CPU) + expRes.SetLabels(map[string]string{ + "instanceID": "i-00123456789", + }) + + // Comparing the metric structs fails because the time.Time timeStamp field will + // not be equal, and since the fields are not exported it cannot be modified or + // excluded in the compare. + // So instead compare each field value individually - assert.Equalf(t, expRes, res, "Result should be: %v, got: %v", expRes, res) + // v1.Metric.String() creates a string of type, name, amount, and usage + assert.Equalf(t, expRes.String(), res[0].String(), "Result should be: %v, got: %v", expRes, res) + // compare labels + assert.Equalf(t, expRes.Labels(), res[0].Labels(), "Result should be: %v, got: %v", expRes, res) + // compare Resource Unit + assert.Equalf(t, expRes.Unit(), res[0].Unit(), "Result should be: %v, got: %v", expRes, res) + // emissions should not yet be calculated at this point + assert.Equal(t, res[0].Emissions(), &v1.ResourceEmissions{}) + // check no error assert.Nil(t, err) }) diff --git a/pkg/providers/aws/ec2.go b/pkg/providers/aws/ec2.go index 07e1712..41c7595 100644 --- a/pkg/providers/aws/ec2.go +++ b/pkg/providers/aws/ec2.go @@ -4,17 +4,19 @@ package amazon import ( "context" "fmt" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/patrickmn/go-cache" + "github.com/re-cinq/cloud-carbon/pkg/providers/util" "k8s.io/klog/v2" ) // Helper service to get EC2 data type ec2Client struct { client *ec2.Client - cache *awsCache } // New instance @@ -33,16 +35,11 @@ func NewEC2Client(cfg *aws.Config) *ec2Client { // Return the ec2 service return &ec2Client{ client: client, - cache: newAWSCache(), } } -func (e *ec2Client) Cache() *awsCache { - return e.cache -} - // refresh stores all the instances for a specific region in cache -func (e *ec2Client) Refresh(ctx context.Context, region awsRegion) error { +func (e *ec2Client) Refresh(ctx context.Context, ca *cache.Cache, region string) error { // Override the region withRegion := func(o *ec2.Options) { o.Region = region @@ -70,15 +67,20 @@ func (e *ec2Client) Refresh(ctx context.Context, region awsRegion) error { for index := range reservation.Instances { instance := reservation.Instances[index] - e.cache.Add(newAWSResource( - region, - ec2Service, - aws.ToString(instance.InstanceId), - string(instance.InstanceType), - string(instance.InstanceLifecycle), - getInstanceTag(instance.Tags, "Name"), - int(aws.ToInt32(instance.CpuOptions.CoreCount)), - )) + id := aws.ToString(instance.InstanceId) + ca.Set(util.CacheKey(region, ec2Service, id), + &resource{ + id: id, + service: ec2Service, + region: region, + kind: string(instance.InstanceType), + lifecycle: string(instance.InstanceLifecycle), + coreCount: int(aws.ToInt32(instance.CpuOptions.CoreCount)), + name: getInstanceTag(instance.Tags, "Name"), + lastUpdated: time.Now().UTC(), + }, + cache.DefaultExpiration, + ) } } return nil diff --git a/pkg/providers/aws/scheduler.go b/pkg/providers/aws/scheduler.go index e2d9158..c916da3 100644 --- a/pkg/providers/aws/scheduler.go +++ b/pkg/providers/aws/scheduler.go @@ -57,7 +57,7 @@ func NewScheduler(ctx context.Context, eventBus bus.Bus) []v1.Scheduler { // Build the initial cache of instances for _, region := range regions { - err := client.ec2Client.Refresh(ctx, region) + err := client.ec2Client.Refresh(ctx, client.cache, region) if err != nil { klog.Errorf("error refreshing EC2 cache at region: %s: %s", region, err) continue @@ -90,15 +90,15 @@ func (s *scheduler) process(ctx context.Context) { for _, region := range s.regions { // refresh instance cache - if err := s.client.ec2Client.Refresh(ctx, region); err != nil { + if err := s.client.ec2Client.Refresh(ctx, s.client.cache, region); err != nil { klog.Errorf("error refreshing EC2 instances: %s", err) return } instances, err := s.client.cloudWatchClient.GetEC2Metrics( + s.client.cache, region, interval, - s.client.ec2Client.Cache(), ) if err != nil { klog.Errorf("error getting EC2 Metrics with cloudwatch: %s", err) diff --git a/pkg/providers/aws/types.go b/pkg/providers/aws/types.go index f5c72cc..2c76305 100644 --- a/pkg/providers/aws/types.go +++ b/pkg/providers/aws/types.go @@ -1,20 +1,32 @@ package amazon -import v1 "github.com/re-cinq/cloud-carbon/pkg/types/v1" +import ( + "time" +) -type awsMetric struct { - // metric name - name string +// The AWS Resource representation +type resource struct { + // The AWS resource id + id string + + // The region where the resource is located + region string + + // The service the resource belongs to + service string - // metric kind - kind v1.ResourceType + // For example spot, reserved + lifecycle string - // metric unit - unit v1.ResourceUnit + // Amount of cores + coreCount int - // value - value float64 + // The instance kind for example + kind string + + // The name + name string - // instance id - instanceID string + // When was the last time it was updated + lastUpdated time.Time } diff --git a/pkg/providers/gcp/gcp.go b/pkg/providers/gcp/gcp.go index 07b0a9f..1cd7f3f 100644 --- a/pkg/providers/gcp/gcp.go +++ b/pkg/providers/gcp/gcp.go @@ -14,6 +14,7 @@ import ( monitoring "cloud.google.com/go/monitoring/apiv3/v2" cache "github.com/patrickmn/go-cache" "github.com/re-cinq/cloud-carbon/pkg/config" + "github.com/re-cinq/cloud-carbon/pkg/providers/util" v1 "github.com/re-cinq/cloud-carbon/pkg/types/v1" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -158,7 +159,7 @@ func (g *GCP) GetMetricsForInstances( // Load the cache // TODO make this more explicit, im not sure why this // is needed as we dont use the cache anywhere - cachedInstance, ok := g.cache.Get(cacheKey(meta.zone, service, meta.name)) + cachedInstance, ok := g.cache.Get(util.CacheKey(meta.zone, service, meta.name)) if cachedInstance == nil && !ok { continue } @@ -255,14 +256,14 @@ func (g *GCP) Refresh(ctx context.Context, project string) { if instance.GetStatus() == "TERMINATED" { // delete the entry from the cache - g.cache.Delete(cacheKey(zone, service, name)) + g.cache.Delete(util.CacheKey(zone, service, name)) continue } if instance.GetStatus() == "RUNNING" { // TODO potentially we do not need a custom resource to cache, maybe // just cache the instance - g.cache.Set(cacheKey(zone, service, name), resource{ + g.cache.Set(util.CacheKey(zone, service, name), resource{ region: zone, service: service, id: instanceID, @@ -277,10 +278,6 @@ func (g *GCP) Refresh(ctx context.Context, project string) { } } -func cacheKey(z, s, n string) string { - return fmt.Sprintf("%s-%s-%s", z, s, n) -} - // getValueFromURL returns the last element in the url Path // example: // input: https://www.googleapis.com/.../machineTypes/e2-micro diff --git a/pkg/providers/util/cache.go b/pkg/providers/util/cache.go new file mode 100644 index 0000000..5c885f2 --- /dev/null +++ b/pkg/providers/util/cache.go @@ -0,0 +1,7 @@ +package util + +import "fmt" + +func CacheKey(z, s, n string) string { + return fmt.Sprintf("%s-%s-%s", z, s, n) +}