Skip to content

Commit

Permalink
remove safeToStartCycle check
Browse files Browse the repository at this point in the history
  • Loading branch information
Minyi Zhong committed Jan 20, 2025
1 parent bba30c9 commit c95603b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 152 deletions.
123 changes: 0 additions & 123 deletions pkg/observer/controller.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package observer

import (
"context"
"errors"
"net/http"
"sort"
"strconv"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/prometheus/client_golang/api"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
"github.com/atlassian-labs/cyclops/pkg/generation"
"github.com/atlassian-labs/cyclops/pkg/k8s"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
)

var apiVersion = "undefined" //nolint:golint,varcheck,deadcode,unused
Expand All @@ -44,34 +35,6 @@ type timedKey struct {
key string
}

// runMetricsHandler creates the metrics struct for the controller and starts the handler and server
func runMetricsHandler(stopCh <-chan struct{}, addr string) *metrics {
// setup metrics and http handler
metrics := newMetrics()
collectMetricsStruct(metrics)

mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
server := http.Server{Addr: addr, Handler: mux}

// listen and serve on new thread until closed
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.Fatalln("metrics server failed:", err)
}
}()

// wait on stopCh to send the shutdown signal to the server
go func() {
<-stopCh
if err := server.Shutdown(context.Background()); err != nil {
klog.Fatalln("failed to shutdown metrics server:", err)
}
}()

return metrics
}

// NewController creates an implementation of a controller for observing changes and returns the public Controller interface
func NewController(client client.Client, stopCh <-chan struct{}, options Options, nodeLister k8s.NodeLister, observers map[string]Observer, metricsAddr string) Controller {
// the initial order doesn't matter, just setup the keys
Expand All @@ -89,7 +52,6 @@ func NewController(client client.Client, stopCh <-chan struct{}, options Options
optimisedOrder: initialOrder,
stopCh: stopCh,

metrics: runMetricsHandler(stopCh, metricsAddr),
Options: options,
}
}
Expand Down Expand Up @@ -277,66 +239,6 @@ func (c *controller) dropInProgressNodeGroups(nodeGroups v1.NodeGroupList, cnrs
return restingNodeGroups
}

// get the cluster-autoscaler last scaleUp activity time
func stringToTime(s string) (time.Time, error) {
sec, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, 0), nil
}

// query cluster-autoscaler metrics to figure out if it's safe to start a new CNR
func (c *controller) safeToStartCycle() bool {
client, err := api.NewClient(api.Config{
Address: c.PrometheusAddress,
})
if err != nil {
// Prometheus might not be installed in the cluster. return true if it can't connect
klog.Errorln("Error creating client:", err)
return true
}

v1api := promv1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// scaleDown metric is updated every cycle cluster-autoscaler is checking if the cluster should scaleDown
// scaleDown does not get checked and therefore not updated when the cluster is scaling up since no check for scaleDown is needed
result, warnings, err := v1api.Query(ctx, "cluster_autoscaler_last_activity{activity='scaleDown'}", time.Now())
if err != nil {
// cluster-autoscaler might not be installed in the cluster. return true if it can't find the metrics of run the query
klog.Errorln("Error querying Prometheus:", err)
return true
}
if len(warnings) > 0 {
klog.Errorln("Warnings:", warnings)
}

v := result.(model.Vector)
// cluster-autoscaler should always gives a response if it's active
if v.Len() == 0 {
klog.Errorln("Empty response from prometheus")
return true
}

scaleUpTime := v[v.Len()-1].Value.String()
t, err := stringToTime(scaleUpTime)
if err != nil {
klog.Errorln("Error converting the time:", err)
return false
}

// cluster_autoscaler_last_activity values will update every PrometheusScrapeInterval in non-scaling scenario
lastScaleEvent := time.Since(t)
if lastScaleEvent > c.PrometheusScrapeInterval {
klog.Infoln("Scale up event recently happened")
return false
}
klog.V(3).Infoln("No scale up event")

return true
}

// createCNRs generates and applies CNRs from the changedNodeGroups
func (c *controller) createCNRs(changedNodeGroups []*ListedNodeGroups) {
klog.V(3).Infoln("applying")
Expand Down Expand Up @@ -371,26 +273,6 @@ func (c *controller) nextRunTime() time.Time {
return time.Now().UTC().Add(c.CheckInterval)
}

func (c *controller) checkIfSafeToStartCycle() bool {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 120 * time.Second

err := backoff.Retry(func() error {
if !c.safeToStartCycle() {
klog.Error("Cluster autoscaler scaleUp event in progress. Retry...")
return errors.New("cluster-autoscaler event in progress")
}
return nil
}, b)

if err != nil {
klog.Errorln("there are still cluster-autoscaler scaleUp events")
return false
}

return true
}

// Run runs the controller loops once. detecting lock, changes, and applying CNRs
// implements cron.Job interface
func (c *controller) Run() {
Expand Down Expand Up @@ -420,11 +302,6 @@ func (c *controller) Run() {
}
}

// query cluster-autoscaler to check if it's safe to start a new CNR
if !c.checkIfSafeToStartCycle() {
return
}

// wait for the desired amount to allow any in progress changes to batch up
klog.V(3).Infof("waiting for %v to allow changes to settle", c.WaitInterval)
select {
Expand Down
29 changes: 0 additions & 29 deletions pkg/observer/metrics.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package observer

import (
"reflect"

"github.com/prometheus/client_golang/prometheus"

"k8s.io/klog/v2"
)

const metricsNamespace = "cyclops_observer"
Expand Down Expand Up @@ -55,28 +51,3 @@ func newMetrics() *metrics {
),
}
}

// collectMetricsStruct uses magic (reflection) to automatically fill prometheus with the Metrics from a struct
func collectMetricsStruct(v interface{}) {
if c, ok := v.(prometheus.Collector); ok {
prometheus.MustRegister(c)
}

val := reflect.ValueOf(v).Elem()

for i := 0; i < val.NumField(); i++ {
field := val.Field(i)

if !field.CanInterface() {
continue
}

collector, ok := field.Interface().(prometheus.Collector)
if !ok {
continue
}

klog.V(5).Infoln("registering collector", val.Type().Field(i).Name, "as metric")
prometheus.MustRegister(collector)
}
}

0 comments on commit c95603b

Please sign in to comment.