From 22956a264f0c91625c31bcc84b1d989c0d599d60 Mon Sep 17 00:00:00 2001 From: chenyanying3 Date: Mon, 30 Sep 2024 18:50:15 +0800 Subject: [PATCH] Don't scrape pods via HTTP if the activator in path --- pkg/autoscaler/metrics/collector.go | 47 +++++++++++++++++++++++++++- pkg/autoscaler/scaling/autoscaler.go | 2 ++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pkg/autoscaler/metrics/collector.go b/pkg/autoscaler/metrics/collector.go index 9719fac388eb..a349242c84a9 100644 --- a/pkg/autoscaler/metrics/collector.go +++ b/pkg/autoscaler/metrics/collector.go @@ -80,6 +80,12 @@ type MetricClient interface { // StableAndPanicRPS returns both the stable and the panic RPS // for the given replica as of the given time. StableAndPanicRPS(key types.NamespacedName, now time.Time) (float64, float64, error) + + // Pause pauses the pod scrapper of the collection with specified Key. + Pause(key types.NamespacedName) + + // Resume pauses the pod scrapper of the collection with specified Key. + Resume(key types.NamespacedName) } // MetricCollector manages collection of metrics for many entities. @@ -149,6 +155,26 @@ func (c *MetricCollector) Delete(namespace, name string) { } } +// Pause pauses the pod scrapper of the collection with specified Key. +func (c *MetricCollector) Pause(key types.NamespacedName) { + c.collectionsMutex.RLock() + defer c.collectionsMutex.RUnlock() + + if collection, exists := c.collections[key]; exists { + collection.pause() + } +} + +// Resume resume the pod scrapper of the collection with specified Key. +func (c *MetricCollector) Resume(key types.NamespacedName) { + c.collectionsMutex.RLock() + defer c.collectionsMutex.RUnlock() + + if collection, exists := c.collections[key]; exists { + collection.resume() + } +} + // Record records a stat that's been generated outside of the metric collector. func (c *MetricCollector) Record(key types.NamespacedName, now time.Time, stat Stat) { c.collectionsMutex.RLock() @@ -245,6 +271,8 @@ type ( lastErr error grp sync.WaitGroup stopCh chan struct{} + // Pause scrape + pauseCh chan bool } ) @@ -288,7 +316,8 @@ func newCollection(metric *autoscalingv1alpha1.Metric, scraper StatsScraper, clo metric.Spec.PanicWindow, config.BucketSize), scraper: scraper, - stopCh: make(chan struct{}), + stopCh: make(chan struct{}), + pauseCh: make(chan bool), } key := types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name} @@ -299,12 +328,18 @@ func newCollection(metric *autoscalingv1alpha1.Metric, scraper StatsScraper, clo defer c.grp.Done() scrapeTicker := clock.NewTicker(scrapeTickInterval) + var pause bool defer scrapeTicker.Stop() for { select { case <-c.stopCh: return + case pause = <-c.pauseCh: case <-scrapeTicker.C(): + if pause { + continue + } + scraper := c.getScraper() if scraper == nil { // Don't scrape empty target service. @@ -414,3 +449,13 @@ func (dst *Stat) average(sample, total float64) { dst.RequestCount = dst.RequestCount / sample * total dst.ProxiedRequestCount = dst.ProxiedRequestCount / sample * total } + +// pause pauses the pod scraper of the current collection. +func (c *collection) pause() { + c.pauseCh <- true +} + +// resume resumes the pod scraper of the current collection. +func (c *collection) resume() { + c.pauseCh <- false +} diff --git a/pkg/autoscaler/scaling/autoscaler.go b/pkg/autoscaler/scaling/autoscaler.go index 01f815f5d942..25cae432a046 100644 --- a/pkg/autoscaler/scaling/autoscaler.go +++ b/pkg/autoscaler/scaling/autoscaler.go @@ -280,6 +280,8 @@ func (a *autoscaler) Scale(logger *zap.SugaredLogger, now time.Time) ScaleResult case spec.TargetBurstCapacity > 0: totCap := float64(originalReadyPodsCount) * spec.TotalValue excessBCF = math.Floor(totCap - spec.TargetBurstCapacity - observedPanicValue) + case spec.TargetBurstCapacity == -1: + a.metricClient.Pause(metricKey) } if debugEnabled {