-
Notifications
You must be signed in to change notification settings - Fork 4k
/
Copy pathaggregate_container_state.go
357 lines (318 loc) · 14.8 KB
/
aggregate_container_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// VPA collects CPU and memory usage measurements from all containers running in
// the cluster and aggregates them in memory in structures called
// AggregateContainerState.
// During aggregation the usage samples are grouped together by the key called
// AggregateStateKey and stored in structures such as histograms of CPU and
// memory usage, that are parts of the AggregateContainerState.
//
// The AggregateStateKey consists of the container name, the namespace and the
// set of labels on the pod the container belongs to. In other words, whenever
// two samples come from containers with the same name, in the same namespace
// and with the same pod labels, they end up in the same histogram.
//
// Recall that VPA produces one recommendation for all containers with a given
// name and namespace, having pod labels that match a given selector. Therefore
// for each VPA object and container name the recommender has to take all
// matching AggregateContainerStates and further aggregate them together, in
// order to obtain the final aggregation that is the input to the recommender
// function.
package model
import (
"fmt"
"math"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util"
)
// ContainerNameToAggregateStateMap maps a container name to AggregateContainerState
// that aggregates state of containers with that name.
type ContainerNameToAggregateStateMap map[string]*AggregateContainerState
const (
// SupportedCheckpointVersion is the tag of the supported version of serialized checkpoints.
// Version id should be incremented on every non incompatible change, i.e. if the new
// version of the recommender binary can't initialize from the old checkpoint format or the
// previous version of the recommender binary can't initialize from the new checkpoint format.
SupportedCheckpointVersion = "v3"
)
var (
// DefaultControlledResources is a default value of Spec.ResourcePolicy.ContainerPolicies[].ControlledResources.
DefaultControlledResources = []ResourceName{ResourceCPU, ResourceMemory}
)
// ContainerStateAggregator is an interface for objects that consume and
// aggregate container usage samples.
type ContainerStateAggregator interface {
// AddSample aggregates a single usage sample.
AddSample(sample *ContainerUsageSample)
// SubtractSample removes a single usage sample. The subtracted sample
// should be equal to some sample that was aggregated with AddSample()
// in the past.
SubtractSample(sample *ContainerUsageSample)
// GetLastRecommendation returns last recommendation calculated for this
// aggregator.
GetLastRecommendation() corev1.ResourceList
// NeedsRecommendation returns true if this aggregator should have
// a recommendation calculated.
NeedsRecommendation() bool
// GetUpdateMode returns the update mode of VPA controlling this aggregator,
// nil if aggregator is not autoscaled.
GetUpdateMode() *vpa_types.UpdateMode
}
// AggregateContainerState holds input signals aggregated from a set of containers.
// It can be used as an input to compute the recommendation.
// The CPU and memory distributions use decaying histograms by default
// (see NewAggregateContainerState()).
// Implements ContainerStateAggregator interface.
type AggregateContainerState struct {
// AggregateCPUUsage is a distribution of all CPU samples.
AggregateCPUUsage util.Histogram
// AggregateMemoryPeaks is a distribution of memory peaks from all containers:
// each container should add one peak per memory aggregation interval (e.g. once every 24h).
AggregateMemoryPeaks util.Histogram
// Note: first/last sample timestamps as well as the sample count are based only on CPU samples.
FirstSampleStart time.Time
LastSampleStart time.Time
TotalSamplesCount int
CreationTime time.Time
// Following fields are needed to correctly report quality metrics
// for VPA. When we record a new sample in an AggregateContainerState
// we want to know if it needs recommendation, if the recommendation
// is present and if the automatic updates are on (are we able to
// apply the recommendation to the pods).
LastRecommendation corev1.ResourceList
IsUnderVPA bool
UpdateMode *vpa_types.UpdateMode
ScalingMode *vpa_types.ContainerScalingMode
ControlledResources *[]ResourceName
}
// GetLastRecommendation returns last recorded recommendation.
func (a *AggregateContainerState) GetLastRecommendation() corev1.ResourceList {
return a.LastRecommendation
}
// NeedsRecommendation returns true if the state should have recommendation calculated.
func (a *AggregateContainerState) NeedsRecommendation() bool {
return a.IsUnderVPA && a.ScalingMode != nil && *a.ScalingMode != vpa_types.ContainerScalingModeOff
}
// GetUpdateMode returns the update mode of VPA controlling this aggregator,
// nil if aggregator is not autoscaled.
func (a *AggregateContainerState) GetUpdateMode() *vpa_types.UpdateMode {
return a.UpdateMode
}
// GetScalingMode returns the container scaling mode of the container
// represented byt his aggregator, nil if aggregator is not autoscaled.
func (a *AggregateContainerState) GetScalingMode() *vpa_types.ContainerScalingMode {
return a.ScalingMode
}
// GetControlledResources returns the list of resources controlled by VPA controlling this aggregator.
// Returns default if not set.
func (a *AggregateContainerState) GetControlledResources() []ResourceName {
if a.ControlledResources != nil {
return *a.ControlledResources
}
return DefaultControlledResources
}
// MarkNotAutoscaled registers that this container state is not controlled by
// a VPA object.
func (a *AggregateContainerState) MarkNotAutoscaled() {
a.IsUnderVPA = false
a.LastRecommendation = nil
a.UpdateMode = nil
a.ScalingMode = nil
a.ControlledResources = nil
}
// MergeContainerState merges two AggregateContainerStates.
func (a *AggregateContainerState) MergeContainerState(other *AggregateContainerState) {
a.AggregateCPUUsage.Merge(other.AggregateCPUUsage)
a.AggregateMemoryPeaks.Merge(other.AggregateMemoryPeaks)
if a.FirstSampleStart.IsZero() ||
(!other.FirstSampleStart.IsZero() && other.FirstSampleStart.Before(a.FirstSampleStart)) {
a.FirstSampleStart = other.FirstSampleStart
}
if other.LastSampleStart.After(a.LastSampleStart) {
a.LastSampleStart = other.LastSampleStart
}
a.TotalSamplesCount += other.TotalSamplesCount
}
// NewAggregateContainerState returns a new, empty AggregateContainerState.
func NewAggregateContainerState() *AggregateContainerState {
config := GetAggregationsConfig()
return &AggregateContainerState{
AggregateCPUUsage: util.NewDecayingHistogram(config.CPUHistogramOptions, config.CPUHistogramDecayHalfLife),
AggregateMemoryPeaks: util.NewDecayingHistogram(config.MemoryHistogramOptions, config.MemoryHistogramDecayHalfLife),
CreationTime: time.Now(),
}
}
// AddSample aggregates a single usage sample.
func (a *AggregateContainerState) AddSample(sample *ContainerUsageSample) {
switch sample.Resource {
case ResourceCPU:
a.addCPUSample(sample)
case ResourceMemory:
a.AggregateMemoryPeaks.AddSample(BytesFromMemoryAmount(sample.Usage), 1.0, sample.MeasureStart)
default:
panic(fmt.Sprintf("AddSample doesn't support resource '%s'", sample.Resource))
}
}
// SubtractSample removes a single usage sample from an aggregation.
// The subtracted sample should be equal to some sample that was aggregated with
// AddSample() in the past.
// Only memory samples can be subtracted at the moment. Support for CPU could be
// added if necessary.
func (a *AggregateContainerState) SubtractSample(sample *ContainerUsageSample) {
switch sample.Resource {
case ResourceMemory:
a.AggregateMemoryPeaks.SubtractSample(BytesFromMemoryAmount(sample.Usage), 1.0, sample.MeasureStart)
default:
panic(fmt.Sprintf("SubtractSample doesn't support resource '%s'", sample.Resource))
}
}
func (a *AggregateContainerState) addCPUSample(sample *ContainerUsageSample) {
cpuUsageCores := CoresFromCPUAmount(sample.Usage)
cpuRequestCores := CoresFromCPUAmount(sample.Request)
// Samples are added with the weight equal to the current request. This means that
// whenever the request is increased, the history accumulated so far effectively decays,
// which helps react quickly to CPU starvation.
a.AggregateCPUUsage.AddSample(
cpuUsageCores, math.Max(cpuRequestCores, minSampleWeight), sample.MeasureStart)
if sample.MeasureStart.After(a.LastSampleStart) {
a.LastSampleStart = sample.MeasureStart
}
if a.FirstSampleStart.IsZero() || sample.MeasureStart.Before(a.FirstSampleStart) {
a.FirstSampleStart = sample.MeasureStart
}
a.TotalSamplesCount++
}
// SaveToCheckpoint serializes AggregateContainerState as VerticalPodAutoscalerCheckpointStatus.
// The serialization may result in loss of precission of the histograms.
func (a *AggregateContainerState) SaveToCheckpoint() (*vpa_types.VerticalPodAutoscalerCheckpointStatus, error) {
memory, err := a.AggregateMemoryPeaks.SaveToChekpoint()
if err != nil {
return nil, err
}
cpu, err := a.AggregateCPUUsage.SaveToChekpoint()
if err != nil {
return nil, err
}
return &vpa_types.VerticalPodAutoscalerCheckpointStatus{
LastUpdateTime: metav1.NewTime(time.Now()),
FirstSampleStart: metav1.NewTime(a.FirstSampleStart),
LastSampleStart: metav1.NewTime(a.LastSampleStart),
TotalSamplesCount: a.TotalSamplesCount,
MemoryHistogram: *memory,
CPUHistogram: *cpu,
Version: SupportedCheckpointVersion,
}, nil
}
// LoadFromCheckpoint deserializes data from VerticalPodAutoscalerCheckpointStatus
// into the AggregateContainerState.
func (a *AggregateContainerState) LoadFromCheckpoint(checkpoint *vpa_types.VerticalPodAutoscalerCheckpointStatus) error {
if checkpoint.Version != SupportedCheckpointVersion {
return fmt.Errorf("unsupported checkpoint version %s", checkpoint.Version)
}
a.TotalSamplesCount = checkpoint.TotalSamplesCount
a.FirstSampleStart = checkpoint.FirstSampleStart.Time
a.LastSampleStart = checkpoint.LastSampleStart.Time
err := a.AggregateMemoryPeaks.LoadFromCheckpoint(&checkpoint.MemoryHistogram)
if err != nil {
return err
}
err = a.AggregateCPUUsage.LoadFromCheckpoint(&checkpoint.CPUHistogram)
if err != nil {
return err
}
return nil
}
func (a *AggregateContainerState) isExpired(now time.Time) bool {
if a.isEmpty() {
return now.Sub(a.CreationTime) >= GetAggregationsConfig().GetMemoryAggregationWindowLength()
}
return now.Sub(a.LastSampleStart) >= GetAggregationsConfig().GetMemoryAggregationWindowLength()
}
func (a *AggregateContainerState) isEmpty() bool {
return a.TotalSamplesCount == 0
}
// UpdateFromPolicy updates container state scaling mode and controlled resources based on resource
// policy of the VPA object.
func (a *AggregateContainerState) UpdateFromPolicy(resourcePolicy *vpa_types.ContainerResourcePolicy) {
// ContainerScalingModeAuto is the default scaling mode
scalingModeAuto := vpa_types.ContainerScalingModeAuto
a.ScalingMode = &scalingModeAuto
if resourcePolicy != nil && resourcePolicy.Mode != nil {
a.ScalingMode = resourcePolicy.Mode
}
a.ControlledResources = &DefaultControlledResources
if resourcePolicy != nil && resourcePolicy.ControlledResources != nil {
a.ControlledResources = ResourceNamesApiToModel(*resourcePolicy.ControlledResources)
}
}
// AggregateStateByContainerName takes a set of AggregateContainerStates and merge them
// grouping by the container name. The result is a map from the container name to the aggregation
// from all input containers with the given name.
func AggregateStateByContainerName(aggregateContainerStateMap aggregateContainerStatesMap) ContainerNameToAggregateStateMap {
containerNameToAggregateStateMap := make(ContainerNameToAggregateStateMap)
for aggregationKey, aggregation := range aggregateContainerStateMap {
containerName := aggregationKey.ContainerName()
aggregateContainerState, isInitialized := containerNameToAggregateStateMap[containerName]
if !isInitialized {
aggregateContainerState = NewAggregateContainerState()
containerNameToAggregateStateMap[containerName] = aggregateContainerState
}
aggregateContainerState.MergeContainerState(aggregation)
}
return containerNameToAggregateStateMap
}
// ContainerStateAggregatorProxy is a wrapper for ContainerStateAggregator
// that creates ContainerStateAgregator for container if it is no longer
// present in the cluster state.
type ContainerStateAggregatorProxy struct {
containerID ContainerID
cluster *ClusterState
}
// NewContainerStateAggregatorProxy creates a ContainerStateAggregatorProxy
// pointing to the cluster state.
func NewContainerStateAggregatorProxy(cluster *ClusterState, containerID ContainerID) ContainerStateAggregator {
return &ContainerStateAggregatorProxy{containerID, cluster}
}
// AddSample adds a container sample to the aggregator.
func (p *ContainerStateAggregatorProxy) AddSample(sample *ContainerUsageSample) {
aggregator := p.cluster.findOrCreateAggregateContainerState(p.containerID)
aggregator.AddSample(sample)
}
// SubtractSample subtracts a container sample from the aggregator.
func (p *ContainerStateAggregatorProxy) SubtractSample(sample *ContainerUsageSample) {
aggregator := p.cluster.findOrCreateAggregateContainerState(p.containerID)
aggregator.SubtractSample(sample)
}
// GetLastRecommendation returns last recorded recommendation.
func (p *ContainerStateAggregatorProxy) GetLastRecommendation() corev1.ResourceList {
aggregator := p.cluster.findOrCreateAggregateContainerState(p.containerID)
return aggregator.GetLastRecommendation()
}
// NeedsRecommendation returns true if the aggregator should have recommendation calculated.
func (p *ContainerStateAggregatorProxy) NeedsRecommendation() bool {
aggregator := p.cluster.findOrCreateAggregateContainerState(p.containerID)
return aggregator.NeedsRecommendation()
}
// GetUpdateMode returns update mode of VPA controlling the aggregator.
func (p *ContainerStateAggregatorProxy) GetUpdateMode() *vpa_types.UpdateMode {
aggregator := p.cluster.findOrCreateAggregateContainerState(p.containerID)
return aggregator.GetUpdateMode()
}
// GetScalingMode returns scaling mode of container represented by the aggregator.
func (p *ContainerStateAggregatorProxy) GetScalingMode() *vpa_types.ContainerScalingMode {
aggregator := p.cluster.findOrCreateAggregateContainerState(p.containerID)
return aggregator.GetScalingMode()
}