Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Fix multi decommissioning #235

Merged
merged 13 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ orbs:
- run:
name: Set up K3d
command: |
wget -q -O - https://raw.githubusercontent.com/rancher/k3d/master/install.sh | TAG=v3.0.0-rc.1 bash
k3d create cluster --image ${K3S_IMAGE} --wait 0
wget -q -O - https://raw.githubusercontent.com/rancher/k3d/main/install.sh | TAG=v3.0.0 bash
k3d cluster create --image ${K3S_IMAGE} --wait 0
mkdir -p ${HOME}/.kube
k3d get kubeconfig 0
- checkout:
path: /home/circleci/casskop
- run:
Expand All @@ -61,7 +60,7 @@ orbs:
docker pull $(cat casskop-build-image-tar)
docker save $(cat casskop-build-image-tar) > casskop-build-image.tar
fi
k3d load image casskop-build-image.tar
k3d image import casskop-build-image.tar
- save_cache:
name: Save Casskop build image
key: '{{ checksum "casskop-build-image-tar" }}'
Expand All @@ -74,14 +73,8 @@ orbs:
sudo chmod o+w /usr/local/bin/
wget -P /usr/local/bin/ https://storage.googleapis.com/kubernetes-release/release/${K8S_VERSION}/bin/linux/amd64/kubectl
chmod +x /usr/local/bin/kubectl
KUBECONFIG="$(k3d get-kubeconfig)" kubectl get nodes
- attach_workspace: # Attach artifact from workdir
at: /home/circleci
- run: # Check we correctly access to K8s information with kubectl cli
name: Test k8s
command: |
kubectl get nodes
kubectl get pods
# Acceptance test
- run: # Run acceptance test through 'docker-e2e-test-fix-arg' makefile step
name: Operator acceptance test
Expand Down
3,082 changes: 1,563 additions & 1,519 deletions deploy/crds/db.orange.com_cassandraclusters_crd.yaml

Large diffs are not rendered by default.

25 changes: 11 additions & 14 deletions pkg/apis/db/v1alpha1/cassandracluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var (

ActionCorrectCRDConfig = ClusterStateInfo{11, "CorrectCRDConfig"} //The Operator has correct a bad CRD configuration

regexDCRackName = regexp.MustCompile("^[a-z]([-a-z0-9]*[a-z0-9])?$")
)

const (
Expand Down Expand Up @@ -264,12 +265,9 @@ func (cc *CassandraCluster) GetRackSize(dc int) int {
return len(cc.Spec.Topology.DC[dc].Rack)
}

//GetRackName return the Name of the rack for DC at indice dc and Rack at indice rack
//GetRackName return the Name of the rack for DC at index dc and Rack at index rack
func (cc *CassandraCluster) GetRackName(dc int, rack int) string {
if dc >= cc.GetDCSize() {
return DefaultCassandraRack
}
if rack >= cc.GetRackSize(dc) {
if dc >= cc.GetDCSize() || rack >= cc.GetRackSize(dc) {
return DefaultCassandraRack
}
return cc.Spec.Topology.DC[dc].Rack[rack].Name
Expand All @@ -278,14 +276,13 @@ func (cc *CassandraCluster) GetRackName(dc int, rack int) string {
// GetDCRackName compute dcName + RackName to be used in statefulsets, services..
// it return empty if the name don't match with kubernetes domain name validation regexp
func (cc *CassandraCluster) GetDCRackName(dcName string, rackName string) string {
var dcRackName string
dcRackName = dcName + "-" + rackName
var regex_name = regexp.MustCompile("^[a-z]([-a-z0-9]*[a-z0-9])?$")
if !regex_name.MatchString(dcRackName) {
logrus.Errorf("%s don't match valide name service: a DNS-1035 label must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character", dcRackName)
return ""
dcRackName := dcName + "-" + rackName
if regexDCRackName.MatchString(dcRackName) {
return dcRackName
}
return dcRackName
logrus.Errorf("%s is not a valid service name: a DNS-1035 label must consist of lower case "+
"alphanumeric characters or '-', and must start and end with an alphanumeric character", dcRackName)
return ""
}

//GetDCFromDCRackName send dc name from dcRackName (dc-rack)
Expand All @@ -304,10 +301,10 @@ func (cc *CassandraCluster) GetDCAndRackFromDCRackName(dcRackName string) (strin
func (cc *CassandraCluster) initTopology(dcName string, rackName string) {
cc.Spec.Topology = Topology{
DC: []DC{
DC{
{
Name: dcName,
Rack: []Rack{
Rack{
{
Name: rackName,
},
},
Expand Down
98 changes: 45 additions & 53 deletions pkg/controller/cassandracluster/cassandra_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cassandracluster

import (
"context"
"fmt"
"reflect"
"strconv"
"time"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (rcc *ReconcileCassandraCluster) updateCassandraStatus(cc *api.CassandraClu
}

// getNextCassandraClusterStatus goal is to detect some changes in the status between cassandracluster and its statefulset
// We follow only one change at a Time : so this function will return on first changed found
// We follow only one change at a Time : so this function will return on the first change found
func (rcc *ReconcileCassandraCluster) getNextCassandraClusterStatus(cc *api.CassandraCluster, dc,
rack int, dcName, rackName string, storedStatefulSet *appsv1.StatefulSet, status *api.CassandraClusterStatus) error {

Expand All @@ -77,14 +78,14 @@ func (rcc *ReconcileCassandraCluster) getNextCassandraClusterStatus(cc *api.Cass
}

//If we set up UnlockNextOperation in CRD we allow to see mode change even last operation didn't ended correctly
needSpecificChange := false
unlockNextOperation := false
if cc.Spec.UnlockNextOperation &&
rcc.hasUnschedulablePod(cc.Namespace, dcName, rackName) {
needSpecificChange = true
unlockNextOperation = true
}
//Do nothing in Initial phase except if we force it
if status.CassandraRackStatus[dcRackName].Phase == api.ClusterPhaseInitial.Name {
if !needSpecificChange {
if !unlockNextOperation {
ClusterPhaseMetric.set(api.ClusterPhaseInitial, cc.Name)
return nil
}
Expand All @@ -101,7 +102,7 @@ func (rcc *ReconcileCassandraCluster) getNextCassandraClusterStatus(cc *api.Cass
// decommission more
// We don't want to check for new operation while there are already ongoing one in order not to break them (ie decommission..)
// Meanwhile we allow to check for new changes if unlockNextOperation has been set (to recover from problems)
if needSpecificChange ||
if unlockNextOperation ||
erdrix marked this conversation as resolved.
Show resolved Hide resolved
(!rcc.thereIsPodDisruption() &&
lastAction.Status != api.StatusOngoing &&
lastAction.Status != api.StatusToDo &&
Expand Down Expand Up @@ -150,22 +151,23 @@ func (rcc *ReconcileCassandraCluster) getNextCassandraClusterStatus(cc *api.Cass
}

//needToWaitDelayBeforeCheck will return if last action start time is < to api.DefaultDelayWait
//that mean start operation is too soon to check to an end operation or other available operations
//this is mostly to let the cassandra cluster and the operator to have the time to correctly stage the action
//that means the last operation was started only a few seconds ago and checking now would not make any sense
//this is mostly to give cassandra and the operator enough time to correctly stage the action
//DefaultDelayWait is of 2 minutes
func needToWaitDelayBeforeCheck(cc *api.CassandraCluster, dcRackName string, storedStatefulSet *appsv1.StatefulSet,
status *api.CassandraClusterStatus) bool {
lastAction := &status.CassandraRackStatus[dcRackName].CassandraLastAction

if lastAction.StartTime != nil {

t := *lastAction.StartTime
now := metav1.Now()

if t.Add(api.DefaultDelayWait * time.Second).After(now.Time) {
logrus.WithFields(logrus.Fields{"cluster": cc.Name,
"rack": dcRackName}).Info("The Operator Waits " + strconv.Itoa(api.
DefaultDelayWait) + " seconds for the action to start correctly")
"rack": dcRackName}).Info(
fmt.Sprintf("The Operator Waits %s seconds for the action to start correctly",
strconv.Itoa(api.DefaultDelayWait)),
)
return true
}
}
Expand Down Expand Up @@ -426,10 +428,14 @@ func (rcc *ReconcileCassandraCluster) UpdateStatusIfActionEnded(cc *api.Cassandr
// The Phase is: Initializing -> Running <--> Pending
// The Phase is a very high level view of the cluster, for a better view we need to see Actions and Pod Operations
func (rcc *ReconcileCassandraCluster) UpdateCassandraRackStatusPhase(cc *api.CassandraCluster, dcName string,
rackName string, storedStatefulSet *appsv1.StatefulSet, status *api.CassandraClusterStatus) error {
rackName string, storedStatefulSet *appsv1.StatefulSet, status *api.CassandraClusterStatus) {
dcRackName := cc.GetDCRackName(dcName, rackName)
lastAction := &status.CassandraRackStatus[dcRackName].CassandraLastAction

logrusFields := logrus.Fields{"cluster": cc.Name, "rack": dcRackName,
"ReadyReplicas": storedStatefulSet.Status.ReadyReplicas, "Replicas": storedStatefulSet.Status.Replicas,
"RequestedReplicas": *storedStatefulSet.Spec.Replicas}

if status.CassandraRackStatus[dcRackName].Phase == api.ClusterPhaseInitial.Name {
nodesPerRacks := cc.GetNodesPerRacks(dcRackName)
//If we are stuck in initializing state, we can rollback the add of dc which implies decommissioning nodes
Expand All @@ -438,59 +444,45 @@ func (rcc *ReconcileCassandraCluster) UpdateCassandraRackStatusPhase(cc *api.Cas
"rack": dcRackName}).Warn("Aborting Initializing..., start ScaleDown")
setDecommissionStatus(status, dcRackName)
ClusterPhaseMetric.set(api.ClusterPhasePending, cc.Name)
return nil
return
}

ClusterPhaseMetric.set(api.ClusterPhaseInitial, cc.Name)

//Do we have reach requested number of replicas ?
if isStatefulSetNotReady(storedStatefulSet) {
logrus.Infof("[%s][%s]: Initializing StatefulSet: Replicas Number Not OK: %d on %d, ready[%d]",
cc.Name, dcRackName, storedStatefulSet.Status.Replicas, *storedStatefulSet.Spec.Replicas,
storedStatefulSet.Status.ReadyReplicas)
} else {
//If yes, just check that lastPod is running
podsList, err := rcc.ListPods(cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
nb := len(podsList.Items)
if err != nil || nb < 1 {
return nil
}
nodesPerRacks := cc.GetNodesPerRacks(dcRackName)
if len(podsList.Items) < int(nodesPerRacks) {
logrus.Infof("[%s][%s]: StatefulSet is waiting for scaleUp", cc.Name, dcRackName)
return nil
}
pod := podsList.Items[nodesPerRacks-1]
if cassandraPodIsReady(&pod) {
status.CassandraRackStatus[dcRackName].Phase = api.ClusterPhaseRunning.Name
ClusterPhaseMetric.set(api.ClusterPhaseRunning, cc.Name)
now := metav1.Now()
lastAction.EndTime = &now
lastAction.Status = api.StatusDone
logrus.Infof("[%s][%s]: StatefulSet(%s): Replicas Number OK: ready[%d]", cc.Name, dcRackName, lastAction.Name, storedStatefulSet.Status.ReadyReplicas)
return nil
}
return nil

logrus.WithFields(logrusFields).Infof("Initializing StatefulSet: Replicas count is not okay")
return
}

} else {

//We are no more in Initializing state
if isStatefulSetNotReady(storedStatefulSet) {
logrus.Infof("[%s][%s]: StatefulSet(%s) Replicas Number Not OK: %d on %d, ready[%d]", cc.Name,
dcRackName, lastAction.Name, storedStatefulSet.Status.Replicas, *storedStatefulSet.Spec.Replicas,
storedStatefulSet.Status.ReadyReplicas)
status.CassandraRackStatus[dcRackName].Phase = api.ClusterPhasePending.Name
ClusterPhaseMetric.set(api.ClusterPhasePending, cc.Name)
} else if status.CassandraRackStatus[dcRackName].Phase != api.ClusterPhaseRunning.Name {
logrus.Infof("[%s][%s]: StatefulSet(%s): Replicas Number OK: ready[%d]", cc.Name, dcRackName,
lastAction.Name, storedStatefulSet.Status.ReadyReplicas)
//If yes, just check that lastPod is running
podsList, err := rcc.ListPods(cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
if err != nil || len(podsList.Items) < 1 {
return
}
if len(podsList.Items) < int(nodesPerRacks) {
logrus.WithFields(logrusFields).Infof("StatefulSet is scaling up")
}
pod := podsList.Items[nodesPerRacks-1]
if cassandraPodIsReady(&pod) {
status.CassandraRackStatus[dcRackName].Phase = api.ClusterPhaseRunning.Name
ClusterPhaseMetric.set(api.ClusterPhaseRunning, cc.Name)
now := metav1.Now()
lastAction.EndTime = &now
lastAction.Status = api.StatusDone
logrus.WithFields(logrusFields).Infof("StatefulSet: Replicas count is okay")
}
}
return nil

//No more in Initializing state
if isStatefulSetNotReady(storedStatefulSet) {
logrus.WithFields(logrusFields).Infof("StatefulSet: Replicas count is not okay")
status.CassandraRackStatus[dcRackName].Phase = api.ClusterPhasePending.Name
ClusterPhaseMetric.set(api.ClusterPhasePending, cc.Name)
} else if status.CassandraRackStatus[dcRackName].Phase != api.ClusterPhaseRunning.Name {
logrus.WithFields(logrusFields).Infof("StatefulSet: Replicas count is not okay")
status.CassandraRackStatus[dcRackName].Phase = api.ClusterPhaseRunning.Name
ClusterPhaseMetric.set(api.ClusterPhaseRunning, cc.Name)
}
}

func setDecommissionStatus(status *api.CassandraClusterStatus, dcRackName string) {
Expand Down
16 changes: 7 additions & 9 deletions pkg/controller/cassandracluster/cassandra_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,14 @@ func helperCreateCassandraCluster(t *testing.T, cassandraClusterFileName string)
if err != nil {
t.Fatalf("get statefulset: (%v)", err)
}
// Check if the quantity of Replicas for this deployment is equals the specification
dsize := *sts.Spec.Replicas
if dsize != 1 {
t.Errorf("dep size (%d) is not the expected size (%d)", dsize, cc.Spec.NodesPerRacks)
}

//Now simulate sts to be ready for CassKop
sts.Status.Replicas = *sts.Spec.Replicas
sts.Status.ReadyReplicas = *sts.Spec.Replicas
rcc.UpdateStatefulSet(sts)

//Create Statefulsets associated fake Pods
pod := &v1.Pod{
podTemplate := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
Expand All @@ -196,18 +192,20 @@ func helperCreateCassandraCluster(t *testing.T, cassandraClusterFileName string)
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "cassandra",
//Image: cc.Spec.BaseImage + ":" + cc.Spec.Version
Name: "cassandra",
Ready: true,
},
},
},
}

for i := 0; i < int(sts.Status.Replicas); i++ {
pod := podTemplate.DeepCopy()
pod.Name = sts.Name + strconv.Itoa(i)
pod.Spec.Hostname = pod.Name
pod.Spec.Subdomain = cc.Name
if err = rcc.CreatePod(pod); err != nil {
t.Fatalf("can't create pod: (%v)", err)
t.Fatalf("can't create pod %s: (%v)", pod.Name, err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ func (rcc *ReconcileCassandraCluster) Reconcile(request reconcile.Request) (reco
}
cc.CheckDefaults()

err = rcc.CheckDeletePVC(cc)
if err != nil {
if err = rcc.CheckDeletePVC(cc); err != nil {
return forget, err
}

Expand All @@ -145,8 +144,7 @@ func (rcc *ReconcileCassandraCluster) Reconcile(request reconcile.Request) (reco
}

//ReconcileRack will also add and initiate new racks, we must not go through racks before this method
err = rcc.ReconcileRack(cc, status)
if err != nil {
if err = rcc.ReconcileRack(cc, status); err != nil {
return requeue5, err
}

Expand Down
Loading