Skip to content

Commit

Permalink
add controller for tidb initializer (#1403)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielZhangQD authored and sre-bot committed Dec 26, 2019
1 parent d45617d commit eb67f58
Show file tree
Hide file tree
Showing 14 changed files with 879 additions and 27 deletions.
3 changes: 3 additions & 0 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller/backupschedule"
"github.com/pingcap/tidb-operator/pkg/controller/restore"
"github.com/pingcap/tidb-operator/pkg/controller/tidbcluster"
"github.com/pingcap/tidb-operator/pkg/controller/tidbinitializer"
"github.com/pingcap/tidb-operator/pkg/features"
"github.com/pingcap/tidb-operator/pkg/scheme"
"github.com/pingcap/tidb-operator/pkg/version"
Expand Down Expand Up @@ -168,6 +169,7 @@ func main() {
backupController := backup.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
restoreController := restore.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
bsController := backupschedule.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
tidbInitController := tidbinitializer.NewController(kubeCli, cli, genericCli, informerFactory, kubeInformerFactory)

// Start informer factories after all controller are initialized.
informerFactory.Start(ctx.Done())
Expand All @@ -189,6 +191,7 @@ func main() {
go wait.Forever(func() { backupController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { restoreController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { bsController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { tidbInitController.Run(workers, ctx.Done()) }, waitDuration)
wait.Forever(func() { tcController.Run(workers, ctx.Done()) }, waitDuration)
}
onStopped := func() {
Expand Down
21 changes: 7 additions & 14 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3388,21 +3388,11 @@ spec:
is bootstrapped.
type: string
initSqlConfigMap:
description: LocalObjectReference contains enough information to let
you locate the referenced object inside the same namespace.
properties:
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
type: object
description: InitSqlConfigMapName reference a configmap that provide
init-sql, take high precedence than initSql if set
type: string
passwordSecret:
description: LocalObjectReference contains enough information to let
you locate the referenced object inside the same namespace.
properties:
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
type: object
type: string
permitHost:
description: permitHost is the host which will only be allowed to connect
to the TiDB.
Expand All @@ -3421,6 +3411,9 @@ spec:
value. More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/'
type: object
type: object
timezone:
description: Time zone of TiDB initializer Pods
type: string
required:
- image
- cluster
Expand Down
15 changes: 12 additions & 3 deletions pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/apis/pingcap/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&RestoreList{},
&DataResource{},
&DataResourceList{},
&TidbInitializer{},
&TidbInitializerList{},
)

metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbinitializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha1

// GetPermitHost retrieves the permit host from TidbInitializer
func (ti *TidbInitializer) GetPermitHost() string {
var permitHost string
if ti.Spec.PermitHost == nil {
permitHost = `%`
} else {
permitHost = *ti.Spec.PermitHost
}
return permitHost
}
8 changes: 6 additions & 2 deletions pkg/apis/pingcap/v1alpha1/tidbinitializer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ type TidbInitializerSpec struct {

// InitSqlConfigMapName reference a configmap that provide init-sql, take high precedence than initSql if set
// +optional
InitSqlConfigMap *corev1.LocalObjectReference `json:"initSqlConfigMap,omitempty"`
InitSqlConfigMap *string `json:"initSqlConfigMap,omitempty"`

// +optional
PasswordSecret *corev1.LocalObjectReference `json:"passwordSecret,omitempty"`
PasswordSecret *string `json:"passwordSecret,omitempty"`

// +optional
Resources *corev1.ResourceRequirements `json:"resources,omitempty"`

// Time zone of TiDB initializer Pods
// +optional
Timezone string `json:"timezone,omitempty"`
}

// +k8s:openapi-gen=true
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/scheme"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -249,6 +250,11 @@ func PumpMemberName(clusterName string) string {
return fmt.Sprintf("%s-pump", clusterName)
}

// TiDBInitializerMemberName returns TiDBInitializer member name
func TiDBInitializerMemberName(clusterName string) string {
return fmt.Sprintf("%s-tidb-initializer", clusterName)
}

// For backward compatibility, pump peer member name do not has -peer suffix
// PumpPeerMemberName returns pump peer service name
func PumpPeerMemberName(clusterName string) string {
Expand Down Expand Up @@ -412,7 +418,12 @@ func WatchForController(informer cache.SharedIndexInformer, q workqueue.Interfac
// Ensure the ref is exactly the controller we listed
if ref.Kind == controllerObj.GetObjectKind().GroupVersionKind().Kind &&
refGV.Group == controllerObj.GetObjectKind().GroupVersionKind().Group {
q.Add(controllerObj)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(controllerObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Cound't get key for object %+v: %v", controllerObj, err))
return
}
q.Add(key)
}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -470,9 +481,13 @@ func GuaranteedUpdate(cli client.Client, obj runtime.Object, updateFunc func() e
if err := cli.Get(context.TODO(), key, obj); err != nil {
return err
}
beforeMutation := obj.DeepCopyObject()
if err := updateFunc(); err != nil {
return err
}
if apiequality.Semantic.DeepEqual(obj, beforeMutation) {
return nil
}
return cli.Update(context.TODO(), obj)
})
}
71 changes: 70 additions & 1 deletion pkg/controller/generic_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type TypedControlInterface interface {
UpdateStatus(newStatus runtime.Object) error
// Delete delete the given object from the cluster
Delete(controller, obj runtime.Object) error
// Create create the given object for the controller
Create(controller, obj runtime.Object) error
// Exist check whether object exists
Exist(key client.ObjectKey, obj runtime.Object) (bool, error)
}

type typedWrapper struct {
Expand Down Expand Up @@ -196,10 +200,19 @@ func (w *typedWrapper) CreateOrUpdateService(controller runtime.Object, svc *cor
return result.(*corev1.Service), nil
}

func (w *typedWrapper) Create(controller, obj runtime.Object) error {
return w.GenericControlInterface.Create(controller, obj)
}
func (w *typedWrapper) Exist(key client.ObjectKey, obj runtime.Object) (bool, error) {
return w.GenericControlInterface.Exist(key, obj)
}

// GenericControlInterface manages generic object that managed by an arbitrary controller
type GenericControlInterface interface {
CreateOrUpdate(controller, obj runtime.Object, mergeFn MergeFn) (runtime.Object, error)
Create(controller, obj runtime.Object) error
UpdateStatus(obj runtime.Object) error
Exist(key client.ObjectKey, obj runtime.Object) (bool, error)
Delete(controller, obj runtime.Object) error
}

Expand Down Expand Up @@ -234,6 +247,18 @@ func (c *realGenericControlInterface) UpdateStatus(obj runtime.Object) error {
return c.client.Status().Update(context.TODO(), obj)
}

// Exist checks whether object exists
func (c *realGenericControlInterface) Exist(key client.ObjectKey, obj runtime.Object) (bool, error) {
err := c.client.Get(context.TODO(), key, obj)
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return true, err
}
return true, nil
}

// CreateOrUpdate create an object to the Kubernetes cluster for controller, if the object to create is existed,
// call mergeFn to merge the change in new object to the existing object, then update the existing object.
// The object will also be adopted by the given controller.
Expand Down Expand Up @@ -288,7 +313,22 @@ func (c *realGenericControlInterface) CreateOrUpdate(controller, obj runtime.Obj

// object do not exist, return the creation result
c.RecordControllerEvent("create", controller, desired, err)
return desired, nil
return desired, err
}

// Create create an object to the Kubernetes cluster for controller
func (c *realGenericControlInterface) Create(controller, obj runtime.Object) error {
// controller-runtime/client will mutate the object pointer in-place,
// to be consistent with other methods in our controller, we copy the object
// to avoid the in-place mutation here and hereafter.
desired := obj.DeepCopyObject()
if err := setControllerReference(controller, desired); err != nil {
return err
}

err := c.client.Create(context.TODO(), desired)
c.RecordControllerEvent("create", controller, desired, err)
return err
}

func (c *realGenericControlInterface) Delete(controller, obj runtime.Object) error {
Expand Down Expand Up @@ -352,6 +392,8 @@ type FakeGenericControl struct {
createOrUpdateTracker RequestTracker
deleteTracker RequestTracker
updateStatusTracker RequestTracker
createTracker RequestTracker
existTracker RequestTracker
}

// NewFakeGenericControl returns a FakeGenericControl
Expand All @@ -364,9 +406,36 @@ func NewFakeGenericControl(initObjects ...runtime.Object) *FakeGenericControl {
RequestTracker{},
RequestTracker{},
RequestTracker{},
RequestTracker{},
RequestTracker{},
}
}
func (gc *FakeGenericControl) Create(controller, obj runtime.Object) error {
defer gc.createTracker.Inc()
if gc.createTracker.ErrorReady() {
defer gc.createTracker.Reset()
return gc.createTracker.GetError()
}

return gc.control.Create(controller, obj)
}

func (gc *FakeGenericControl) Exist(key client.ObjectKey, obj runtime.Object) (bool, error) {
defer gc.existTracker.Inc()
if gc.existTracker.ErrorReady() {
defer gc.existTracker.Reset()
return true, gc.existTracker.GetError()
}

return gc.control.Exist(key, obj)
}

func (gc *FakeGenericControl) SetCreateError(err error, after int) {
gc.createTracker.SetError(err).SetAfter(after)
}
func (gc *FakeGenericControl) SetExistError(err error, after int) {
gc.existTracker.SetError(err).SetAfter(after)
}
func (gc *FakeGenericControl) SetUpdateStatusError(err error, after int) {
gc.updateStatusTracker.SetError(err).SetAfter(after)
}
Expand Down
Loading

0 comments on commit eb67f58

Please sign in to comment.