Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support stable scheduling for TiDB #477

Merged
merged 8 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/tidb-operator/templates/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ spec:
- /usr/local/bin/tidb-scheduler
- -v={{ .Values.scheduler.logLevel }}
- -port=10262
{{- if .Values.scheduler.features }}
- -features={{ join "," .Values.scheduler.features }}
{{- end }}
- name: kube-scheduler
image: {{ required "scheduler.kubeSchedulerImageName is required" .Values.scheduler.kubeSchedulerImageName }}:{{ .Values.scheduler.kubeSchedulerImageTag | default (split "-" .Capabilities.KubeVersion.GitVersion)._0 }}
imagePullPolicy: {{ .Values.imagePullPolicy | default "IfNotPresent" }}
Expand Down
2 changes: 2 additions & 0 deletions charts/tidb-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ scheduler:
logLevel: 2
replicas: 1
schedulerName: tidb-scheduler
# features:
# - StableScheduling
resources:
limits:
cpu: 250m
Expand Down
2 changes: 2 additions & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/features"
"github.com/pingcap/tidb-operator/pkg/scheduler/server"
"github.com/pingcap/tidb-operator/version"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -39,6 +40,7 @@ func init() {
flag.BoolVar(&printVersion, "V", false, "Show version and quit")
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
flag.IntVar(&port, "port", 10262, "The port that the tidb scheduler's http service runs on (default 10262)")
features.DefaultFeatureGate.AddFlag(flag.CommandLine)
flag.Parse()
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pingcap.com/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ type TiDBMember struct {
Health bool `json:"health"`
// Last time the health transitioned from one to another.
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// Node hosting pod of this TiDB member.
NodeName string `json:"node,omitempty"`
}

// TiDBFailureMember is the tidb failure member information
Expand Down
61 changes: 61 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package features

import (
"flag"
"fmt"
"strings"

utilflags "github.com/pingcap/tidb-operator/pkg/util/flags"
"k8s.io/apimachinery/pkg/util/sets"
)

var (
allFeatures = sets.NewString(StableScheduling)
// DefaultFeatureGate is a shared global FeatureGate.
DefaultFeatureGate FeatureGate = NewFeatureGate()
)

const (
// StableScheduling controls stable scheduling of TiDB members.
StableScheduling string = "StableScheduling"
)

type FeatureGate interface {
// AddFlag adds a flag for setting global feature gates to the specified FlagSet.
AddFlag(flagset *flag.FlagSet)
// Enabled returns true if the key is enabled.
Enabled(key string) bool
}

type featureGate struct {
defaultFeatures sets.String
enabledFeatures sets.String
}

func (f *featureGate) AddFlag(flagset *flag.FlagSet) {
flag.Var(utilflags.NewStringSetValue(f.defaultFeatures, &f.enabledFeatures), "features", fmt.Sprintf("features to enable, comma-separated list of string, available: %s", strings.Join(allFeatures.List(), ",")))
}

func (f *featureGate) Enabled(key string) bool {
return f.enabledFeatures.Has(key)
}

func NewFeatureGate() FeatureGate {
return &featureGate{
defaultFeatures: sets.NewString(),
enabledFeatures: sets.NewString(),
}
}
10 changes: 10 additions & 0 deletions pkg/manager/member/tidb_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apps "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/listers/apps/v1beta1"
Expand Down Expand Up @@ -395,6 +396,15 @@ func (tmm *tidbMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, se
if !exist || oldTidbMember.Health != newTidbMember.Health {
newTidbMember.LastTransitionTime = metav1.Now()
}
pod, err := tmm.podLister.Pods(tc.GetNamespace()).Get(name)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if pod != nil && pod.Spec.NodeName != "" {
// Update assiged node
newTidbMember.NodeName = pod.Spec.NodeName
}
// Ignore if pod does not exist or not scheduled
tidbStatus[name] = newTidbMember
}
tc.Status.TiDB.Members = tidbStatus
Expand Down
16 changes: 4 additions & 12 deletions pkg/scheduler/predicates/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
kubescheme "k8s.io/client-go/kubernetes/scheme"
eventv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)

Expand All @@ -49,10 +47,11 @@ type ha struct {
}

// NewHA returns a Predicate
func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate {
func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface, recorder record.EventRecorder) Predicate {
h := &ha{
kubeCli: kubeCli,
cli: cli,
kubeCli: kubeCli,
cli: cli,
recorder: recorder,
}
h.podListFn = h.realPodListFn
h.podGetFn = h.realPodGetFn
Expand All @@ -61,13 +60,6 @@ func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate {
h.pvcListFn = h.realPVCListFn
h.updatePVCFn = h.realUpdatePVCFn
h.acquireLockFn = h.realAcquireLock

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(h.kubeCli.CoreV1().RESTClient()).Events("")})
h.recorder = eventBroadcaster.NewRecorder(kubescheme.Scheme, apiv1.EventSource{Component: "tidb-scheduler"})

return h
}

Expand Down
112 changes: 112 additions & 0 deletions pkg/scheduler/predicates/stable_scheduling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package predicates

import (
"fmt"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/label"
apiv1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)

const (
// UnableToRunOnPreviousNodeReason represents the reason of event that we
// cannot schedule the new pod of component member to its previous node.
UnableToRunOnPreviousNodeReason = "UnableToRunOnPreviousNode"
)

var (
// supportedComponents holds the supported components
supportedComponents = sets.NewString(label.TiDBLabelVal)
)

type stableScheduling struct {
kubeCli kubernetes.Interface
cli versioned.Interface
recorder record.EventRecorder
}

// NewStableScheduling returns a Predicate
func NewStableScheduling(kubeCli kubernetes.Interface, cli versioned.Interface, recorder record.EventRecorder) Predicate {
p := &stableScheduling{
kubeCli: kubeCli,
cli: cli,
recorder: recorder,
}
return p
}

func (p *stableScheduling) Name() string {
return "StableScheduling"
}

func (p *stableScheduling) findPreviousNodeInTC(tc *v1alpha1.TidbCluster, pod *apiv1.Pod) string {
members := tc.Status.TiDB.Members
if members == nil {
return ""
}
tidbMember, ok := tc.Status.TiDB.Members[pod.Name]
if !ok {
return ""
}
return tidbMember.NodeName
}

func (p *stableScheduling) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]apiv1.Node, error) {
ns := pod.GetNamespace()
podName := pod.GetName()
component := pod.Labels[label.ComponentLabelKey]
tcName := getTCNameFromPod(pod, component)

if !supportedComponents.Has(component) {
return nodes, nil
}

tc, err := p.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// However tidb-operator will delete pods when tidb cluster does
// not exist anymore, it does no harm to fail the pod. But it's
// best to not make any assumptions here.
return nodes, nil
}
return nil, err
}

nodeName := p.findPreviousNodeInTC(tc, pod)

if nodeName != "" {
glog.V(2).Infof("found previous node %q for pod %q in TiDB cluster %q", nodeName, podName, tcName)
for _, node := range nodes {
if node.Name == nodeName {
glog.V(2).Infof("previous node %q for pod %q in TiDB cluster %q exists in candicates, filter out other nodes", nodeName, podName, tcName)
return []apiv1.Node{node}, nil
}
}
msg := fmt.Sprintf("cannot run on its previous node %q", nodeName)
p.recorder.Event(pod, apiv1.EventTypeWarning, UnableToRunOnPreviousNodeReason, msg)
} else {
glog.V(2).Infof("no previous node exists for pod %q in TiDB cluster %s/%q", podName, ns, tcName)
}

return nodes, nil
}
Loading