From 9767a3de6726f2fa6d8ab912e9dc390537f8da72 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Sun, 7 Jul 2024 09:05:33 -0700 Subject: [PATCH] agent/core: Implement LFC-aware scaling Part of #872. This builds on the metrics that will be exposed by neondatabase/neon#8298. For now, we only look at the working set size metrics over various time windows. The algorithm is somewhat straightforward to implement (see wss.go), but unfortunately seems to be difficult to understand *why* it's expected to work. See also: https://www.notion.so/neondatabase/874ef1cc942a4e6592434dbe9e609350 --- deploy/agent/config_map.yaml | 5 ++- pkg/agent/core/dumpstate.go | 15 ++++--- pkg/agent/core/metrics.go | 81 ++++++++++++++++++++++++++++++++- pkg/agent/core/state.go | 71 ++++++++++++++++++++++++++--- pkg/agent/core/state_test.go | 18 +++++--- pkg/agent/core/wss.go | 87 ++++++++++++++++++++++++++++++++++++ pkg/agent/core/wss_test.go | 56 +++++++++++++++++++++++ pkg/api/vminfo.go | 27 +++++++++++ 8 files changed, 339 insertions(+), 21 deletions(-) create mode 100644 pkg/agent/core/wss.go create mode 100644 pkg/agent/core/wss_test.go diff --git a/deploy/agent/config_map.yaml b/deploy/agent/config_map.yaml index c793ba6c7..2d9756c99 100644 --- a/deploy/agent/config_map.yaml +++ b/deploy/agent/config_map.yaml @@ -12,7 +12,10 @@ data: "defaultConfig": { "loadAverageFractionTarget": 0.9, "memoryUsageFractionTarget": 0.75, - "enableLFCMetrics": false + "enableLFCMetrics": false, + "lfcSizePerCU": 0.75, + "lfcWindowSizeMinutes": 5, + "lfcMinWaitBeforeDownscaleMinutes": 15 } }, "billing": { diff --git a/pkg/agent/core/dumpstate.go b/pkg/agent/core/dumpstate.go index 63a8d0ce3..182ea5ab3 100644 --- a/pkg/agent/core/dumpstate.go +++ b/pkg/agent/core/dumpstate.go @@ -33,13 +33,14 @@ func (d StateDump) MarshalJSON() ([]byte, error) { func (s *State) Dump() StateDump { return StateDump{ internal: state{ - Debug: s.internal.Debug, - Config: s.internal.Config, - VM: s.internal.VM, - Plugin: s.internal.Plugin.deepCopy(), - Monitor: s.internal.Monitor.deepCopy(), - NeonVM: s.internal.NeonVM.deepCopy(), - Metrics: shallowCopy[SystemMetrics](s.internal.Metrics), + Debug: s.internal.Debug, + Config: s.internal.Config, + VM: s.internal.VM, + Plugin: s.internal.Plugin.deepCopy(), + Monitor: s.internal.Monitor.deepCopy(), + NeonVM: s.internal.NeonVM.deepCopy(), + Metrics: shallowCopy[SystemMetrics](s.internal.Metrics), + LFCMetrics: shallowCopy[LFCMetrics](s.internal.LFCMetrics), }, } } diff --git a/pkg/agent/core/metrics.go b/pkg/agent/core/metrics.go index fb9c1b2ad..d45cc6d88 100644 --- a/pkg/agent/core/metrics.go +++ b/pkg/agent/core/metrics.go @@ -3,8 +3,12 @@ package core // Definition of the Metrics type, plus reading it from vector.dev's prometheus format host metrics import ( + "cmp" "fmt" "io" + "slices" + "strconv" + "time" promtypes "github.com/prometheus/client_model/go" promfmt "github.com/prometheus/common/expfmt" @@ -31,7 +35,8 @@ type LFCMetrics struct { CacheMissesTotal float64 CacheWritesTotal float64 - ApproximateWorkingSetSizeTotal float64 // approximate_working_set_size + // lfc_approximate_working_set_size_seconds + ApproximateworkingSetSizeBuckets []float64 } // FromPrometheus represents metric types that can be parsed from prometheus output. @@ -118,12 +123,15 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro } } + wssBuckets, err := extractWorkingSetSizeBuckets(mfs) + ec.Add(err) + tmp := LFCMetrics{ CacheHitsTotal: getFloat("lfc_hits"), CacheMissesTotal: getFloat("lfc_misses"), CacheWritesTotal: getFloat("lfc_writes"), - ApproximateWorkingSetSizeTotal: getFloat("lfc_approximate_working_set_size"), + ApproximateworkingSetSizeBuckets: wssBuckets, } if err := ec.Resolve(); err != nil { @@ -133,3 +141,72 @@ func (m *LFCMetrics) fromPrometheus(mfs map[string]*promtypes.MetricFamily) erro *m = tmp return nil } + +func extractWorkingSetSizeBuckets(mfs map[string]*promtypes.MetricFamily) ([]float64, error) { + metricName := "lfc_approximate_working_set_size_seconds" + mf := mfs[metricName] + if mf == nil { + return nil, missingMetric(metricName) + } + + if mf.GetType() != promtypes.MetricType_GAUGE { + return nil, fmt.Errorf("wrong metric type: expected %s, but got %s", promtypes.MetricType_GAUGE, mf.GetType()) + } else if len(mf.Metric) < 1 { + return nil, fmt.Errorf("expected >= metric, found %d", len(mf.Metric)) + } + + type pair struct { + duration time.Duration + value float64 + } + + var pairs []pair + for _, m := range mf.Metric { + // Find the duration label + durationLabel := "duration_minutes" + durationIndex := slices.IndexFunc(m.Label, func(l *promtypes.LabelPair) bool { + return l.GetName() == durationLabel + }) + if durationIndex == -1 { + return nil, fmt.Errorf("metric missing label %q", durationLabel) + } + + durationMinutes, err := strconv.Atoi(m.Label[durationIndex].GetValue()) + if err != nil { + return nil, fmt.Errorf("couldn't parse metric's %q label as int: %w", durationLabel, err) + } + + pairs = append(pairs, pair{ + duration: time.Minute * time.Duration(durationMinutes), + value: m.GetGauge().GetValue(), + }) + } + + slices.SortFunc(pairs, func(x, y pair) int { + return cmp.Compare(x.duration, y.duration) + }) + + // Check that the values make are as expected: they should all be 1 minute apart, starting + // at 1 minute. + // NOTE: this assumption is relied on elsewhere for scaling on ApproximateworkingSetSizeBuckets. + // Please search for usages before changing this behavior. + if pairs[0].duration != 1 { + return nil, fmt.Errorf("expected smallest duration to be %v, got %v", time.Minute, pairs[0].duration) + } + for i := range pairs { + expected := time.Minute * time.Duration(i+1) + if pairs[i].duration != expected { + return nil, fmt.Errorf( + "expected duration values to be exactly 1m apart, got unexpected value %v instead of %v", + pairs[i].duration, + expected, + ) + } + } + + var values []float64 + for _, p := range pairs { + values = append(values, p.value) + } + return values, nil +} diff --git a/pkg/agent/core/state.go b/pkg/agent/core/state.go index 5a3d27d73..3c3f7e1c5 100644 --- a/pkg/agent/core/state.go +++ b/pkg/agent/core/state.go @@ -29,6 +29,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/neondatabase/autoscaling/pkg/api" "github.com/neondatabase/autoscaling/pkg/util" @@ -114,6 +115,8 @@ type state struct { NeonVM neonvmState Metrics *SystemMetrics + + LFCMetrics *LFCMetrics } type pluginState struct { @@ -221,7 +224,8 @@ func NewState(vm api.VmInfo, config Config) *State { OngoingRequested: nil, RequestFailedAt: nil, }, - Metrics: nil, + Metrics: nil, + LFCMetrics: nil, }, } } @@ -667,8 +671,11 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( // 2. Cap the goal CU by min/max, etc // 3. that's it! + hasMetrics := s.Metrics != nil && (!*s.scalingConfig().EnableLFCMetrics || s.LFCMetrics != nil) + var goalCU uint32 - if s.Metrics != nil { + var lfcLogFields func(zapcore.ObjectEncoder) error + if hasMetrics { // For CPU: // Goal compute unit is at the point where (CPUs) × (LoadAverageFractionTarget) == (load // average), @@ -686,6 +693,44 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( memGoalCU := uint32(memGoalBytes / s.Config.ComputeUnit.Mem) goalCU = util.Max(cpuGoalCU, memGoalCU) + + // For LFC metrics, if enabled: + if *s.scalingConfig().EnableLFCMetrics { + cfg := s.scalingConfig() + wssValues := s.LFCMetrics.ApproximateworkingSetSizeBuckets + // At this point, we can assume that the values are equally spaced at 1 minute apart, + // starting at 1 minute. + offsetIndex := *cfg.LFCMinWaitBeforeDownscaleMinutes - 1 // -1 because values start at 1m + windowSize := *cfg.LFCWindowSizeMinutes + // Handle invalid metrics: + if len(wssValues) < offsetIndex+windowSize { + s.warn("not enough working set size values to make scaling determination") + } else { + estimateWss := EstimateTrueWorkingSetSize(wssValues, WssEstimatorConfig{ + MaxAllowedIncreaseFactor: 4.0, // hard-code this for now. + InitialOffset: offsetIndex, + WindowSize: windowSize, + }) + projectLen := 1 // hard-code this for now. + predictedHighestNextMinute := ProjectNextHighest(wssValues, projectLen, estimateWss) + + // predictedHighestNextMinute is still in units of 8KiB pages. Let's convert that + // into GiB, then convert that into CU, and then invert the discount from only some + // of the memory going towards LFC to get the actual CU required to fit the + // predicted working set size. + requiredCU := predictedHighestNextMinute * 8192 / s.Config.ComputeUnit.Mem.AsFloat64() / *cfg.LFCSizePerCU + lfcGoalCU := uint32(math.Ceil(requiredCU)) + goalCU = util.Max(goalCU, lfcGoalCU) + + lfcLogFields = func(obj zapcore.ObjectEncoder) error { + obj.AddFloat64("estimateWssPages", estimateWss) + obj.AddFloat64("predictedNextWssPages", predictedHighestNextMinute) + obj.AddFloat64("requiredCU", requiredCU) + return nil + } + } + } + } // Copy the initial value of the goal CU so that we can accurately track whether either @@ -727,7 +772,7 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( // If there's no constraints and s.metrics is nil, then we'll end up with goalCU = 0. // But if we have no metrics, we'd prefer to keep things as-is, rather than scaling down. - if s.Metrics == nil && goalCU == 0 { + if !hasMetrics && goalCU == 0 { goalResources = s.VM.Using() } else { goalResources = s.Config.ComputeUnit.Mul(uint16(goalCU)) @@ -792,7 +837,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) ( } } - s.info("Calculated desired resources", zap.Object("current", s.VM.Using()), zap.Object("target", result)) + logFields := []zap.Field{ + zap.Object("current", s.VM.Using()), + zap.Object("target", result), + } + if lfcLogFields != nil { + logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFields))) + } + s.info("Calculated desired resources", logFields...) return result, calculateWaitTime } @@ -922,6 +974,8 @@ func (s *State) Debug(enabled bool) { } func (s *State) UpdatedVM(vm api.VmInfo) { + hadLFCMetrics := *s.internal.scalingConfig().EnableLFCMetrics + // FIXME: overriding this is required right now because we trust that a successful request to // NeonVM means the VM was already updated, which... isn't true, and otherwise we could run into // sync issues. @@ -932,6 +986,13 @@ func (s *State) UpdatedVM(vm api.VmInfo) { // - https://github.com/neondatabase/autoscaling/issues/462 vm.SetUsing(s.internal.VM.Using()) s.internal.VM = vm + + // If the VM *only now* has LFC-aware scaling enabled, then remove any existing LFC metrics we + // might have. If we didn't do this, then we disabling & re-enabling LFC scaling could cause us + // to make scaling decisions based on stale data. + if !hadLFCMetrics && *s.internal.scalingConfig().EnableLFCMetrics { + s.internal.LFCMetrics = nil + } } func (s *State) UpdateSystemMetrics(metrics SystemMetrics) { @@ -939,7 +1000,7 @@ func (s *State) UpdateSystemMetrics(metrics SystemMetrics) { } func (s *State) UpdateLFCMetrics(metrics LFCMetrics) { - // stub implementation, intentionally does nothing yet. + s.internal.LFCMetrics = &metrics } // PluginHandle provides write access to the scheduler plugin pieces of an UpdateState diff --git a/pkg/agent/core/state_test.go b/pkg/agent/core/state_test.go index 7684b1add..d8d1d45f2 100644 --- a/pkg/agent/core/state_test.go +++ b/pkg/agent/core/state_test.go @@ -108,9 +108,12 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) { core.Config{ ComputeUnit: api.Resources{VCPU: 250, Mem: 1 * slotSize}, DefaultScalingConfig: api.ScalingConfig{ - LoadAverageFractionTarget: lo.ToPtr(0.5), - MemoryUsageFractionTarget: lo.ToPtr(0.5), - EnableLFCMetrics: nil, + LoadAverageFractionTarget: lo.ToPtr(0.5), + MemoryUsageFractionTarget: lo.ToPtr(0.5), + EnableLFCMetrics: nil, + LFCSizePerCU: lo.ToPtr(0.75), + LFCWindowSizeMinutes: lo.ToPtr(5), + LFCMinWaitBeforeDownscaleMinutes: lo.ToPtr(15), }, // these don't really matter, because we're not using (*State).NextActions() NeonVMRetryWait: time.Second, @@ -179,9 +182,12 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{ Core: core.Config{ ComputeUnit: DefaultComputeUnit, DefaultScalingConfig: api.ScalingConfig{ - LoadAverageFractionTarget: lo.ToPtr(0.5), - MemoryUsageFractionTarget: lo.ToPtr(0.5), - EnableLFCMetrics: nil, + LoadAverageFractionTarget: lo.ToPtr(0.5), + MemoryUsageFractionTarget: lo.ToPtr(0.5), + EnableLFCMetrics: nil, + LFCSizePerCU: lo.ToPtr(0.75), + LFCWindowSizeMinutes: lo.ToPtr(5), + LFCMinWaitBeforeDownscaleMinutes: lo.ToPtr(15), }, NeonVMRetryWait: 5 * time.Second, PluginRequestTick: 5 * time.Second, diff --git a/pkg/agent/core/wss.go b/pkg/agent/core/wss.go new file mode 100644 index 000000000..447078ffb --- /dev/null +++ b/pkg/agent/core/wss.go @@ -0,0 +1,87 @@ +package core + +// Working set size estimation +// For more, see: https://www.notion.so/neondatabase/874ef1cc942a4e6592434dbe9e609350 + +import ( + "fmt" +) + +type WssEstimatorConfig struct { + // MaxAllowedIncreaseFactor is the maximum tolerable increase in slope between windows. + // If the slope increases by more than this factor, we will cut off the working set size as the + // border between the two windows. + MaxAllowedIncreaseFactor float64 + // InitialOffset is the index of the minimum working set size we must consider. + // + // In practice, this is taken from the scaling config's LFCMinWaitBeforeDownscaleMinutes, with + // the expectation that datapoints are all one minute apart, starting at 1m. So a value of 15m + // translates to an InitialOffset of 14 (-1 because indexes start at zero, but the first + // datapoint is 1m). + InitialOffset int + // WindowSize sets the number of datapoints averaged out in a slope calculation. + // This value must be >= 2. + // + // In practice, this value is taken from the scaling config's LFCWindowSizeMinutes, with the + // expectation that datapoints are all one minute apart. So, a value of 5 minutes translates to + // a WindowSize of 5. + WindowSize int +} + +// EstimateTrueWorkingSetSize returns an estimate of the "true" current working set size, given a +// series of datapoints for the observed working set size over increasing time intervals. +// +// In practice, the 'series' is e.g., values of 'neon.lfc_approximate_working_set_size_seconds(d)' +// for equidistant values of 'd' from 1 minute to 60 minutes. +func EstimateTrueWorkingSetSize( + series []float64, + cfg WssEstimatorConfig, +) float64 { + if cfg.WindowSize < 2 { + panic(fmt.Errorf("cfg.WindowSize must be >= 2 (got %v)", cfg.WindowSize)) + } + + w := cfg.WindowSize - 1 + + for t := cfg.InitialOffset; t < len(series)-w; t += 1 { + + d0 := max(0.0, series[t]-series[t-w]) + d1 := max(0.0, series[t+w]-series[t]) + + if d1 > d0*cfg.MaxAllowedIncreaseFactor { + return series[t] + } + } + + return series[len(series)-1] +} + +// ProjectNextHighest looks at the rate of change between points in 'series' (so long as they're +// less than or equal to 'ceil'), returning the maximum value if any of these slopes were to +// continue for 'projectLen' additional datapoints. +// +// For example, given the series '0, 1, 3, 4, 5, 7', projectLen of 3, and ceil equal to 6, +// ProjectNextHighest will return 9 (because 1 → 3 would reach 9 if it continued for another 3 +// datapoints (→ 5 → 7 → 9). The 5 → 7 transition is excluded because 7 is not ≤ 6. +// +// Internally, ProjectNextHighest is used to allow preemptive scale-up when we can see that the +// observed working set size is increasing, but we don't know how big it'll get. +// In short, this function helps answer: "How much should we scale-up to accommodate expected +// increases in demand?". +func ProjectNextHighest(series []float64, projectLen int, ceil float64) float64 { + if len(series) < 2 { + panic(fmt.Errorf("Cannot ProjectNextHighest with series of length %d (must be >= 2)", len(series))) + } else if series[1] > ceil { + panic(fmt.Errorf("Cannot ProjectNextHighest because series[1] > ceil (%v > %v)", series[1], ceil)) + } + + highest := series[0] + for i := 1; i < len(series); i += 1 { + x0 := series[i-1] + x1 := max(x0, series[i]) // ignore decreases + predicted := x1 + (x1-x0)*float64(projectLen) + highest = max(highest, predicted) + } + + return highest +} diff --git a/pkg/agent/core/wss_test.go b/pkg/agent/core/wss_test.go new file mode 100644 index 000000000..248ccaa9d --- /dev/null +++ b/pkg/agent/core/wss_test.go @@ -0,0 +1,56 @@ +package core_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/neondatabase/autoscaling/pkg/agent/core" +) + +func TestEstimateTrueWorkingSetSize(t *testing.T) { + cases := []struct { + name string + cfg core.WssEstimatorConfig + series []float64 + expected float64 + }{ + { + name: "basic-plateau", + cfg: core.WssEstimatorConfig{ + MaxAllowedIncreaseFactor: 2.0, + InitialOffset: 9, + WindowSize: 5, + }, + series: []float64{ + 0.1, 0.2, 0.3, 0.4, 0.5, + 0.6, 0.7, 0.8, 0.9, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, + 1.1, 1.2, 1.3, 1.4, 1.5, + 1.6, 1.7, 1.8, 1.9, 2.0, + }, + expected: 1.0, + }, + { + name: "plateau-before-init", + cfg: core.WssEstimatorConfig{ + MaxAllowedIncreaseFactor: 2.0, + InitialOffset: 9, + WindowSize: 5, + }, + series: []float64{ + 0.1, 0.2, 0.3, 0.3, 0.3, + 0.3, 0.3, 0.4, 0.5, 0.6, + 0.7, 0.8, 0.9, 1.0, 1.1, + }, + expected: 1.1, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + actual := core.EstimateTrueWorkingSetSize(c.series, c.cfg) + assert.Equal(t, c.expected, actual) + }) + } +} diff --git a/pkg/api/vminfo.go b/pkg/api/vminfo.go index d5dc8fdf6..be1148049 100644 --- a/pkg/api/vminfo.go +++ b/pkg/api/vminfo.go @@ -335,6 +335,21 @@ type ScalingConfig struct { // For an individual VM, if this field is left out the settings will fall back on the global // default. EnableLFCMetrics *bool `json:"enableLFCMetrics,omitempty"` + + // LFCSizePerCU dictates the amount of memory in any given Compute Unit that will be allocated + // to the LFC. For example, if the LFC is sized at 75% of memory, then this value would be 0.75. + LFCSizePerCU *float64 `json:"lfcSizePerCU,omitempty"` + + // LFCMinWaitBeforeDownscaleMinutes dictates the minimum duration we must wait before lowering + // the goal CU based on LFC working set size. + // For example, a value of 15 means we will not allow downscaling below the working set size + // over the past 15 minutes. This allows us to accommodate spiky workloads without flushing the + // cache every time. + LFCMinWaitBeforeDownscaleMinutes *int `json:"lfcMinWaitBeforeDownscaleMinutes,omitempty"` + + // LFCWindowSizeMinutes dictates the minimum duration we must use during internal calculations + // of the rate of increase in LFC working set size. + LFCWindowSizeMinutes *int `json:"lfcWindowSizeMinutes,omitempty"` } // WithOverrides returns a new copy of defaults, where fields set in overrides replace the ones in @@ -355,6 +370,15 @@ func (defaults ScalingConfig) WithOverrides(overrides *ScalingConfig) ScalingCon if overrides.EnableLFCMetrics != nil { defaults.EnableLFCMetrics = &[]bool{*overrides.EnableLFCMetrics}[0] } + if overrides.LFCSizePerCU != nil { + defaults.LFCSizePerCU = lo.ToPtr(*overrides.LFCSizePerCU) + } + if overrides.LFCWindowSizeMinutes != nil { + defaults.LFCWindowSizeMinutes = lo.ToPtr(*overrides.LFCWindowSizeMinutes) + } + if overrides.LFCMinWaitBeforeDownscaleMinutes != nil { + defaults.LFCMinWaitBeforeDownscaleMinutes = lo.ToPtr(*overrides.LFCMinWaitBeforeDownscaleMinutes) + } return defaults } @@ -398,6 +422,9 @@ func (c *ScalingConfig) validate(requireAll bool) error { if requireAll { erc.Whenf(ec, c.EnableLFCMetrics == nil, "%s is a required field", ".enableLFCMetrics") + erc.Whenf(ec, c.LFCSizePerCU == nil, "%s is a required field", ".lfcSizePerCU") + erc.Whenf(ec, c.LFCWindowSizeMinutes == nil, "%s is a required field", ".lfcWindowSizeMinutes") + erc.Whenf(ec, c.LFCMinWaitBeforeDownscaleMinutes == nil, "%s is a required field", ".lfcMinWaitBeforeDownscaleMinutes") } // heads-up! some functions elsewhere depend on the concrete return type of this function.