Skip to content

Commit

Permalink
Add auto-scaling calculation based by CPU load (#1722)
Browse files Browse the repository at this point in the history
* add cpu metrics func
  • Loading branch information
Yisaer authored Feb 19, 2020
1 parent bf112e2 commit 3d54d47
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 15 deletions.
109 changes: 109 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/calculate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"encoding/json"
"fmt"
"math"
"net/http"
"strconv"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
promClient "github.com/prometheus/client_golang/api"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/util/sets"
)

const (
TikvSumCpuMetricsPattern = `sum(increase(tikv_thread_cpu_seconds_total{cluster="%s"}[%s])) by (instance)`
TidbSumCpuMetricsPattern = `sum(increase(process_cpu_seconds_total{cluster="%s",job="tidb"}[%s])) by (instance)`
InvalidTacMetricConfigureMsg = "tac[%s/%s] metric configuration invalid"
queryPath = "/api/v1/query"

float64EqualityThreshold = 1e-9
)

type SingleQuery struct {
Timestamp int64
Quary string
Instances []string
Metric autoscalingv2beta2.MetricSpec
}

func queryMetricsFromPrometheus(tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client, sq *SingleQuery, resp *Response) error {
query := sq.Quary
timestamp := sq.Timestamp
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", *tac.Spec.MetricsUrl, queryPath), nil)
if err != nil {
return err
}
q := req.URL.Query()
q.Add("query", query)
q.Add("time", fmt.Sprintf("%d", timestamp))
req.URL.RawQuery = q.Encode()
r, body, err := client.Do(req.Context(), req)
if err != nil {
return err
}
if r.StatusCode != http.StatusOK {
return fmt.Errorf("tac[%s/%s] query error, status code:%d", tac.Namespace, tac.Name, r.StatusCode)
}
err = json.Unmarshal(body, resp)
if err != nil {
return err
}
if resp.Status != statusSuccess {
return fmt.Errorf("tac[%s/%s] query error, response status: %v", tac.Namespace, tac.Name, resp.Status)
}
return nil
}

// sumForEachInstance sum the value in Response of each instance from Prometheus
func sumForEachInstance(instances []string, resp *Response) (float64, error) {
if resp == nil {
return 0, fmt.Errorf("metrics response from Promethus can't be empty")
}
s := sets.String{}
for _, instance := range instances {
s.Insert(instance)
}
sum := 0.0
if len(resp.Data.Result) < 1 {
return 0, fmt.Errorf("metrics Response return zero info")
}
for _, r := range resp.Data.Result {
if s.Has(r.Metric.Instance) {
v, err := strconv.ParseFloat(r.Value[1].(string), 64)
if err != nil {
return 0.0, err
}
sum = sum + v
}
}
return sum, nil
}

// calculate func calculate the recommended replicas by given usageRatio and currentReplicas
func calculate(currentValue float64, targetValue float64, currentReplicas int32) (int32, error) {
if almostEqual(targetValue, 0.0) {
return -1, fmt.Errorf("targetValue in calculate func can't be zero")
}
usageRatio := currentValue / targetValue
return int32(math.Ceil(usageRatio * float64(currentReplicas))), nil
}

func almostEqual(a, b float64) bool {
return math.Abs(a-b) <= float64EqualityThreshold
}
76 changes: 76 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"fmt"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
promClient "github.com/prometheus/client_golang/api"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

const (
CpuSumMetricsErrorMsg = "tac[%s/%s] cpu sum metrics error, can't calculate the past %s cpu metrics, may caused by prometheus restart while data persistence not enabled"
)

//TODO: create issue to explain how auto-scaling algorithm based on cpu metrics work
func CalculateRecomendedReplicasByCpuCosts(tac *v1alpha1.TidbClusterAutoScaler, sq *SingleQuery, sts *appsv1.StatefulSet,
client promClient.Client, memberType v1alpha1.MemberType, duration time.Duration) (int32, error) {
metric := sq.Metric
instances := sq.Instances

if metric.Resource == nil || metric.Resource.Target.AverageUtilization == nil {
return -1, fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}
currentReplicas := len(instances)
c, err := filterContainer(tac, sts, memberType.String())
if err != nil {
return -1, err
}
cpuRequestsRatio, err := extractCpuRequestsRatio(c)
if err != nil {
return -1, err
}
r := &Response{}
err = queryMetricsFromPrometheus(tac, client, sq, r)
if err != nil {
return -1, err
}
sum, err := sumForEachInstance(instances, r)
if err != nil {
return -1, err
}
if sum < 0 {
return -1, fmt.Errorf(CpuSumMetricsErrorMsg, tac.Namespace, tac.Name, duration.String())
}
cpuSecsTotal := sum
durationSeconds := duration.Seconds()
utilizationRatio := float64(*metric.Resource.Target.AverageUtilization) / 100.0
expectedCpuSecsTotal := cpuRequestsRatio * durationSeconds * float64(currentReplicas) * utilizationRatio
rc, err := calculate(cpuSecsTotal, expectedCpuSecsTotal, int32(currentReplicas))
if err != nil {
return -1, err
}
return rc, nil
}

func extractCpuRequestsRatio(c *corev1.Container) (float64, error) {
if c.Resources.Requests.Cpu() == nil || c.Resources.Requests.Cpu().MilliValue() < 1 {
return 0, fmt.Errorf("container[%s] cpu requests is empty", c.Name)
}
return float64(c.Resources.Requests.Cpu().MilliValue()) / 1000.0, nil
}
89 changes: 89 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
)

// MetricType describe the current Supported Metric Type to calculate the recommended Replicas
type MetricType string

const (
MetricTypeCPU MetricType = "cpu"
//metricTypeQPS MetricType = "qps"
)

// currently, we only choose one metrics to be computed.
// If there exists several metrics, we tend to choose ResourceMetricSourceType metric
func FilterMetrics(metrics []autoscalingv2beta2.MetricSpec) autoscalingv2beta2.MetricSpec {
for _, m := range metrics {
if m.Type == autoscalingv2beta2.ResourceMetricSourceType && m.Resource != nil {
return m
}
}
return metrics[0]
}

// genMetricType return the supported MetricType in Operator by kubernetes auto-scaling MetricType
func GenMetricType(tac *v1alpha1.TidbClusterAutoScaler, metric autoscalingv2beta2.MetricSpec) (MetricType, error) {
if metric.Type == autoscalingv2beta2.ResourceMetricSourceType && metric.Resource != nil && metric.Resource.Name == corev1.ResourceCPU {
return MetricTypeCPU, nil
}
return "", fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}

// filterContainer is to filter the specific container from the given statefulset(tidb/tikv)
func filterContainer(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, containerName string) (*corev1.Container, error) {
for _, c := range sts.Spec.Template.Spec.Containers {
if c.Name == containerName {
return &c, nil
}
}
return nil, fmt.Errorf("tac[%s/%s]'s Target have not %s container", tac.Namespace, tac.Name, containerName)
}

const (
statusSuccess = "success"
)

// Response is used to marshal the data queried from Prometheus
type Response struct {
Status string `json:"status"`
Data Data `json:"data"`
}

type Data struct {
ResultType string `json:"resultType"`
Result []Result `json:"result"`
}

type Result struct {
Metric Metric `json:"metric"`
Value []interface{} `json:"value"`
}

type Metric struct {
Cluster string `json:"cluster,omitempty"`
Instance string `json:"instance"`
Job string `json:"job,omitempty"`
KubernetesNamespace string `json:"kubernetes_namespace,omitempty"`
KubernetesNode string `json:"kubernetes_node,omitempty"`
KubernetesPodIp string `json:"kubernetes_pod_ip,omitempty"`
}
45 changes: 44 additions & 1 deletion pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package autoscaler

import (
"fmt"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate"
"github.com/pingcap/tidb-operator/pkg/label"
operatorUtils "github.com/pingcap/tidb-operator/pkg/util"
promClient "github.com/prometheus/client_golang/api"
appsv1 "k8s.io/api/apps/v1"
)

func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client) error {
Expand All @@ -36,7 +39,11 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
return nil
}
currentReplicas := tc.Spec.TiDB.Replicas
targetReplicas := calculateRecommendedReplicas(tac, v1alpha1.TiDBMemberType, client)
instances := filterTidbInstances(tc)
targetReplicas, err := calculateTidbMetrics(tac, sts, client, instances)
if err != nil {
return err
}
targetReplicas = limitTargetReplicas(targetReplicas, tac, v1alpha1.TiDBMemberType)
if targetReplicas == tc.Spec.TiDB.Replicas {
emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType)
Expand Down Expand Up @@ -80,3 +87,39 @@ func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = time.Now().String()
emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType)
}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
metric := calculate.FilterMetrics(tac.Spec.TiDB.Metrics)
mType, err := calculate.GenMetricType(tac, metric)
if err != nil {
return -1, err
}
duration, err := time.ParseDuration(*tac.Spec.TiDB.MetricsTimeDuration)
if err != nil {
return -1, err
}
sq := &calculate.SingleQuery{
Timestamp: time.Now().Unix(),
Instances: instances,
Metric: metric,
Quary: fmt.Sprintf(calculate.TidbSumCpuMetricsPattern, tac.Spec.Cluster.Name, *tac.Spec.TiDB.MetricsTimeDuration),
}

switch mType {
case calculate.MetricTypeCPU:
return calculate.CalculateRecomendedReplicasByCpuCosts(tac, sq, sts, client, v1alpha1.TiDBMemberType, duration)
default:
return -1, fmt.Errorf(calculate.InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}
}

func filterTidbInstances(tc *v1alpha1.TidbCluster) []string {
var instances []string
for i := 0; int32(i) < tc.Status.TiDB.StatefulSet.Replicas; i++ {
podName := operatorUtils.GetPodName(tc, v1alpha1.TiDBMemberType, int32(i))
if _, existed := tc.Status.TiDB.FailureMembers[podName]; !existed {
instances = append(instances, podName)
}
}
return instances
}
1 change: 1 addition & 0 deletions pkg/autoscaler/autoscaler/tidb_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestSyncTiDBAfterCalculated(t *testing.T) {
tc.Spec.TiDB.Replicas = test.currentReplicas
tac.Annotations[label.AnnTiDBConsecutiveScaleInCount] = fmt.Sprintf("%d", test.currentScaleInCount)
tac.Annotations[label.AnnTiDBConsecutiveScaleOutCount] = fmt.Sprintf("%d", test.currentScaleOutCount)
tac.Spec.TiKV = nil

err := syncTiDBAfterCalculated(tc, tac, test.currentReplicas, test.recommendedReplicas)
g.Expect(err).ShouldNot(HaveOccurred())
Expand Down
Loading

0 comments on commit 3d54d47

Please sign in to comment.