Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-create missing pod which has incompleted downscale operation #824

Merged
merged 20 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package v1beta1

import "strings"
import (
"strings"
)

// RackAwarenessState stores info about rack awareness status
type RackAwarenessState string
Expand Down Expand Up @@ -197,6 +199,8 @@ type BrokerState struct {
Version string `json:"version,omitempty"`
// Image specifies the current docker image of the broker
Image string `json:"image,omitempty"`
// Compressed data from broker configuration to restore broker pod in specific cases
ConfigurationBackup string `json:"configurationBackup,omitempty"`
}

const (
Expand Down
4 changes: 4 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19132,6 +19132,10 @@ spec:
additionalProperties:
description: BrokerState holds information about broker state
properties:
configurationBackup:
description: Compressed data from broker configuration to restore
broker pod in specific cases
type: string
configurationState:
description: ConfigurationState holds info about the config
type: string
Expand Down
4 changes: 4 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19131,6 +19131,10 @@ spec:
additionalProperties:
description: BrokerState holds information about broker state
properties:
configurationBackup:
description: Compressed data from broker configuration to restore
broker pod in specific cases
type: string
configurationState:
description: ConfigurationState holds info about the config
type: string
Expand Down
53 changes: 53 additions & 0 deletions pkg/k8sutil/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/util"
clientutil "github.com/banzaicloud/koperator/pkg/util/client"
)

Expand All @@ -43,6 +44,58 @@ func IsMarkedForDeletion(m metav1.ObjectMeta) bool {
return m.GetDeletionTimestamp() != nil
}

// UpdateBrokerConfigurationBackup updates the broker status with a backup from kafka broker configurations
func UpdateBrokerConfigurationBackup(c client.Client, cluster *banzaicloudv1beta1.KafkaCluster) error {
needsUpdate, err := generateBrokerConfigurationBackups(cluster)
if err != nil {
return err
}
if !needsUpdate {
return nil
}
ctx := context.Background()
if err := c.Status().Update(ctx, cluster); err != nil {
if !apierrors.IsConflict(err) {
return errors.WrapIff(err, "could not update Kafka broker(s) configuration backup state")
}
if err := c.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Name}, cluster); err != nil {
return errors.WrapIf(err, "could not get config for updating status")
}
needsUpdate, err := generateBrokerConfigurationBackups(cluster)
if err != nil {
return err
}
if !needsUpdate {
return nil
}
if err = c.Status().Update(ctx, cluster); err != nil {
return errors.WrapIff(err, "could not update Kafka broker(s) configuration backup state")
}
}
return nil
}

func generateBrokerConfigurationBackups(cluster *banzaicloudv1beta1.KafkaCluster) (bool, error) {
needsUpdate := false
if cluster.Status.BrokersState == nil {
cluster.Status.BrokersState = make(map[string]banzaicloudv1beta1.BrokerState)
}

for _, broker := range cluster.Spec.Brokers {
brokerState := cluster.Status.BrokersState[fmt.Sprint(broker.Id)]
configurationBackup, err := util.GzipAndBase64BrokerConfiguration(&broker)
if err != nil {
return false, errors.WrapIfWithDetails(err, "could not generate broker configuration backup", "brokerId", broker.Id)
}
if !needsUpdate && configurationBackup != brokerState.ConfigurationBackup {
needsUpdate = true
}
brokerState.ConfigurationBackup = configurationBackup
cluster.Status.BrokersState[fmt.Sprint(broker.Id)] = brokerState
}
return needsUpdate, nil
}

// UpdateBrokerStatus updates the broker status with rack and configuration infos
func UpdateBrokerStatus(c client.Client, brokerIDs []string, cluster *banzaicloudv1beta1.KafkaCluster, state interface{}, logger logr.Logger) error {
typeMeta := cluster.TypeMeta
Expand Down
83 changes: 64 additions & 19 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ const (
MetricsHealthCheck = "/-/healthy"
MetricsPort = 9020

// missingBrokerDownScaleRunningPriority the priority used for missing brokers where there is an incompleted downscale operation
missingBrokerDownScaleRunningPriority brokerReconcilePriority = iota
// newBrokerReconcilePriority the priority used for brokers that were just added to the cluster used to define its priority in the reconciliation order
newBrokerReconcilePriority brokerReconcilePriority = iota
newBrokerReconcilePriority
// missingBrokerReconcilePriority the priority used for missing brokers used to define its priority in the reconciliation order
missingBrokerReconcilePriority
// nonControllerBrokerReconcilePriority the priority used for running non-controller brokers used to define its priority in the reconciliation order
Expand Down Expand Up @@ -148,6 +150,11 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {

log.V(1).Info("Reconciling")

ctx := context.Background()
if err := k8sutil.UpdateBrokerConfigurationBackup(r.Client, r.KafkaCluster); err != nil {
log.Error(err, "failed to update broker configuration backup")
}

if r.KafkaCluster.Spec.HeadlessServiceEnabled {
// reconcile headless service
o := r.headlessService()
Expand Down Expand Up @@ -187,15 +194,15 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
return errors.WrapIf(err, "could not update status for external listeners")
}
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster)
err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, intListenerStatuses, extListenerStatuses)
err = k8sutil.UpdateListenerStatuses(ctx, r.Client, r.KafkaCluster, intListenerStatuses, extListenerStatuses)
if err != nil {
return errors.WrapIf(err, "failed to update listener statuses")
}

// Setup the PKI if using SSL
if r.KafkaCluster.Spec.ListenersConfig.SSLSecrets != nil {
// reconcile the PKI
if err := pki.GetPKIManager(r.Client, r.KafkaCluster, v1beta1.PKIBackendProvided).ReconcilePKI(context.TODO(), extListenerStatuses); err != nil {
if err := pki.GetPKIManager(r.Client, r.KafkaCluster, v1beta1.PKIBackendProvided).ReconcilePKI(ctx, extListenerStatuses); err != nil {
return err
}
}
Expand Down Expand Up @@ -232,7 +239,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {

var brokerPods corev1.PodList
matchingLabels := client.MatchingLabels(apiutil.LabelsForKafka(r.KafkaCluster.Name))
err = r.Client.List(context.TODO(), &brokerPods, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels))
err = r.Client.List(ctx, &brokerPods, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels))
if err != nil {
return errors.WrapIf(err, "failed to list broker pods that belong to Kafka cluster")
}
Expand All @@ -242,7 +249,13 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
log.Error(err, "could not find controller broker")
}

reorderedBrokers := reorderBrokers(brokerPods, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID)
var pvcList corev1.PersistentVolumeClaimList
err = r.Client.List(ctx, &pvcList, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels))
if err != nil {
return errors.WrapIf(err, "failed to list broker pvcs that belong to Kafka cluster")
}

reorderedBrokers := reorderBrokers(brokerPods, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, pvcList, controllerID, log)
allBrokerDynamicConfigSucceeded := true
for _, broker := range reorderedBrokers {
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
Expand Down Expand Up @@ -405,8 +418,7 @@ func (r *Reconciler) reconcileKafkaPodDelete(log logr.Logger) error {
for _, brokerID := range brokersToUpdateInStatus {
if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerID]; ok {
ccState := brokerState.GracefulActionState.CruiseControlState
if ccState != v1beta1.GracefulDownscaleRunning &&
(ccState == v1beta1.GracefulUpscaleSucceeded || ccState == v1beta1.GracefulUpscaleRequired) {
if ccState == v1beta1.GracefulUpscaleSucceeded || ccState == v1beta1.GracefulUpscaleRequired {
brokersPendingGracefulDownscale = append(brokersPendingGracefulDownscale, brokerID)
}
}
Expand Down Expand Up @@ -434,8 +446,7 @@ func (r *Reconciler) reconcileKafkaPodDelete(log logr.Logger) error {

if brokerState, ok := r.KafkaCluster.Status.BrokersState[broker.Labels["brokerId"]]; ok &&
brokerState.GracefulActionState.CruiseControlState != v1beta1.GracefulDownscaleSucceeded &&
brokerState.GracefulActionState.CruiseControlState != v1beta1.GracefulUpscaleRequired &&
broker.Status.Phase != corev1.PodPending {
brokerState.GracefulActionState.CruiseControlState != v1beta1.GracefulUpscaleRequired {
if brokerState.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleRunning {
log.Info("cc task is still running for broker", "brokerId", broker.Labels["brokerId"], "taskId", brokerState.GracefulActionState.CruiseControlTaskId)
}
Expand Down Expand Up @@ -699,15 +710,19 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return errorfactory.New(errorfactory.StatusUpdateError{}, statusErr, "updating per broker config status for resource failed", "kind", desiredType)
}

if val, ok := r.KafkaCluster.Status.BrokersState[desiredPod.Labels["brokerId"]]; ok && val.GracefulActionState.CruiseControlState != v1beta1.GracefulUpscaleSucceeded {
gracefulActionState := v1beta1.GracefulActionState{ErrorMessage: "CruiseControl not yet ready", CruiseControlState: v1beta1.GracefulUpscaleSucceeded}
if val, hasBrokerState := r.KafkaCluster.Status.BrokersState[desiredPod.Labels["brokerId"]]; hasBrokerState {
ccState := val.GracefulActionState.CruiseControlState
incompletedDownscale := ccState == v1beta1.GracefulDownscaleRequired || ccState == v1beta1.GracefulDownscaleRunning
if ccState != v1beta1.GracefulUpscaleSucceeded && !incompletedDownscale {
gracefulActionState := v1beta1.GracefulActionState{ErrorMessage: "CruiseControl not yet ready", CruiseControlState: v1beta1.GracefulUpscaleSucceeded}

if r.KafkaCluster.Status.CruiseControlTopicStatus == v1beta1.CruiseControlTopicReady {
gracefulActionState = v1beta1.GracefulActionState{ErrorMessage: "", CruiseControlState: v1beta1.GracefulUpscaleRequired}
}
statusErr = k8sutil.UpdateBrokerStatus(r.Client, []string{desiredPod.Labels["brokerId"]}, r.KafkaCluster, gracefulActionState, log)
if statusErr != nil {
return errorfactory.New(errorfactory.StatusUpdateError{}, statusErr, "could not update broker graceful action state")
if r.KafkaCluster.Status.CruiseControlTopicStatus == v1beta1.CruiseControlTopicReady {
gracefulActionState = v1beta1.GracefulActionState{ErrorMessage: "", CruiseControlState: v1beta1.GracefulUpscaleRequired}
}
statusErr = k8sutil.UpdateBrokerStatus(r.Client, []string{desiredPod.Labels["brokerId"]}, r.KafkaCluster, gracefulActionState, log)
if statusErr != nil {
return errorfactory.New(errorfactory.StatusUpdateError{}, statusErr, "could not update broker graceful action state")
}
}
}
log.Info("resource created")
Expand Down Expand Up @@ -1203,20 +1218,50 @@ func getServiceFromExternalListener(client client.Client, cluster *v1beta1.Kafka
// - prioritize upscale in order to allow upscaling the cluster even when there is a stuck RU
// - prioritize missing broker pods to be able for escaping from offline partitions, not all replicas in sync which
// could stall RU flow
func reorderBrokers(brokerPods corev1.PodList, desiredBrokers []v1beta1.Broker, brokersState map[string]v1beta1.BrokerState, controllerBrokerID int32) []v1beta1.Broker {
func reorderBrokers(brokerPods corev1.PodList, desiredBrokers []v1beta1.Broker, brokersState map[string]v1beta1.BrokerState, pvcList corev1.PersistentVolumeClaimList, controllerBrokerID int32, log logr.Logger) []v1beta1.Broker {
runningBrokers := make(map[string]struct{})
for _, b := range brokerPods.Items {
brokerID := b.GetLabels()["brokerId"]
runningBrokers[brokerID] = struct{}{}
}

presentPersistentVolumeClaims := make(map[string]struct{})
for _, pvc := range pvcList.Items {
brokerID := pvc.GetLabels()["brokerId"]
presentPersistentVolumeClaims[brokerID] = struct{}{}
}

brokersReconcilePriority := make(map[string]brokerReconcilePriority, len(desiredBrokers))
missingBrokerDownScaleRunning := make(map[string]struct{})
// logic for handle that case when a broker pod is removed before downscale operation completed
for id, brokerState := range brokersState {
_, running := runningBrokers[id]
_, pvcPresent := presentPersistentVolumeClaims[id]
ccState := brokerState.GracefulActionState.CruiseControlState
if !running && ccState == v1beta1.GracefulDownscaleRequired || ccState == v1beta1.GracefulDownscaleRunning {
log.Info("missing broker found with incompleted downscale operation", "brokerID", id)
if pvcPresent {
unfinishedBroker, err := util.GetBrokerFromBrokerConfigurationBackup(brokerState.ConfigurationBackup)
if err != nil {
log.Error(err, "unable to restore broker configuration from configuration backup", "brokerID", id)
continue
}
log.Info("re-creating broker pod to continue downscale operation", "brokerID", id)
missingBrokerDownScaleRunning[id] = struct{}{}
desiredBrokers = append(desiredBrokers, unfinishedBroker)
} else {
log.Info("pvc is missing, unable to reconstruct missing broker pod", "brokerID", id)
}
}
}

for _, b := range desiredBrokers {
brokerID := fmt.Sprintf("%d", b.Id)
brokersReconcilePriority[brokerID] = nonControllerBrokerReconcilePriority

if _, ok := brokersState[brokerID]; !ok {
if _, ok := missingBrokerDownScaleRunning[brokerID]; ok {
brokersReconcilePriority[brokerID] = missingBrokerDownScaleRunningPriority
} else if _, ok := brokersState[brokerID]; !ok {
brokersReconcilePriority[brokerID] = newBrokerReconcilePriority
} else if _, ok := runningBrokers[brokerID]; !ok {
brokersReconcilePriority[brokerID] = missingBrokerReconcilePriority
Expand Down
88 changes: 87 additions & 1 deletion pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"errors"

"github.com/go-logr/logr"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -309,6 +310,7 @@ func TestReorderBrokers(t *testing.T) {
testCases := []struct {
testName string
brokerPods corev1.PodList
brokersPVC corev1.PersistentVolumeClaimList
desiredBrokers []v1beta1.Broker
brokersState map[string]v1beta1.BrokerState
controllerBrokerID int32
Expand Down Expand Up @@ -438,6 +440,90 @@ func TestReorderBrokers(t *testing.T) {
{Id: 1}, // controller broker should be last
},
},
{
testName: "some missing broker pods, newly added brokers, and missing bokers with pvc and incompleted downscale operation",
brokerPods: corev1.PodList{
Items: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "0"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "1"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "2"}}},
},
},
brokersPVC: corev1.PersistentVolumeClaimList{
TypeMeta: metav1.TypeMeta{},
ListMeta: metav1.ListMeta{},
Items: []corev1.PersistentVolumeClaim{
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "5"}}},
},
},
desiredBrokers: []v1beta1.Broker{
{Id: 0},
{Id: 1},
{Id: 2},
{Id: 3},
{Id: 4},
{Id: 6}, // Kafka cluster upscaled by adding broker 6 to it
},
brokersState: map[string]v1beta1.BrokerState{
"0": {ConfigurationState: v1beta1.ConfigOutOfSync},
"1": {ConfigurationState: v1beta1.ConfigOutOfSync},
"2": {ConfigurationState: v1beta1.ConfigOutOfSync},
"3": {ConfigurationState: v1beta1.ConfigOutOfSync},
"4": {ConfigurationState: v1beta1.ConfigOutOfSync},
"5": {
ConfigurationState: v1beta1.ConfigInSync,
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulDownscaleRequired},
ConfigurationBackup: "H4sIAAxcrGIAA6tWykxRsjKt5QIAiMWU3gkAAAA=",
}},
controllerBrokerID: 1,
expectedReorderedBrokers: []v1beta1.Broker{
{Id: 5}, // broker pod 5 missing and there is incompleted downscale operation thus should have highest prio
{Id: 6}, // broker 6 is newly added thus should have higher prio
{Id: 3}, // broker pod 3 missing thus should have higher prio
{Id: 4}, // broker pod 4 missing thus should have higher prio
{Id: 0},
{Id: 2},
{Id: 1}, // controller broker should be last
},
},
{
testName: "some missing broker pods, newly added brokers, and missing bokers without pvc and incompleted downscale operation",
brokerPods: corev1.PodList{
Items: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "0"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "1"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"brokerId": "2"}}},
},
},
desiredBrokers: []v1beta1.Broker{
{Id: 0},
{Id: 1},
{Id: 2},
{Id: 3},
{Id: 4},
{Id: 6}, // Kafka cluster upscaled by adding broker 6 to it
},
brokersState: map[string]v1beta1.BrokerState{
"0": {ConfigurationState: v1beta1.ConfigOutOfSync},
"1": {ConfigurationState: v1beta1.ConfigOutOfSync},
"2": {ConfigurationState: v1beta1.ConfigOutOfSync},
"3": {ConfigurationState: v1beta1.ConfigOutOfSync},
"4": {ConfigurationState: v1beta1.ConfigOutOfSync},
"5": {
ConfigurationState: v1beta1.ConfigInSync,
GracefulActionState: v1beta1.GracefulActionState{CruiseControlState: v1beta1.GracefulDownscaleRequired},
ConfigurationBackup: "H4sIAAxcrGIAA6tWykxRsjKt5QIAiMWU3gkAAAA=",
}},
controllerBrokerID: 1,
expectedReorderedBrokers: []v1beta1.Broker{
{Id: 6}, // broker 6 is newly added thus should have higher prio
{Id: 3}, // broker pod 3 missing thus should have higher prio
{Id: 4}, // broker pod 4 missing thus should have higher prio
{Id: 0},
{Id: 2},
{Id: 1}, // controller broker should be last
},
},
}

t.Parallel()
Expand All @@ -447,7 +533,7 @@ func TestReorderBrokers(t *testing.T) {

t.Run(test.testName, func(t *testing.T) {
g := gomega.NewWithT(t)
reorderedBrokers := reorderBrokers(test.brokerPods, test.desiredBrokers, test.brokersState, test.controllerBrokerID)
reorderedBrokers := reorderBrokers(test.brokerPods, test.desiredBrokers, test.brokersState, test.brokersPVC, test.controllerBrokerID, logr.Discard())

g.Expect(reorderedBrokers).To(gomega.Equal(test.expectedReorderedBrokers))
})
Expand Down
Loading