Skip to content

Commit

Permalink
feat: report event when the service doesn't exist/when endpointslice …
Browse files Browse the repository at this point in the history
…api is disabled

Signed-off-by: jwcesign <jwcesign@gmail.com>
  • Loading branch information
jwcesign committed Dec 18, 2023
1 parent 21b2842 commit 230d31b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *EndpointsliceDispatchController) newClusterFunc() handler.MapFunc {

var requests []reconcile.Request
for _, mcs := range mcsList.Items {
clusterSet, _, err := helper.GetConsumerClustres(c.Client, mcs.DeepCopy())
clusterSet, err := helper.GetConsumerClustres(c.Client, mcs.DeepCopy())
if err != nil {
klog.Errorf("Failed to get provider clusters, error: %v", err)
continue
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx
continue
}

consumerClusters, _, err := helper.GetConsumerClustres(c.Client, mcs)
consumerClusters, err := helper.GetConsumerClustres(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get consumer clusters, error is: %v", err)
return err
Expand Down Expand Up @@ -315,7 +315,7 @@ func (c *EndpointsliceDispatchController) dispatchEndpointSlice(ctx context.Cont
return err
}

consumerClusters, _, err := helper.GetConsumerClustres(c.Client, mcs)
consumerClusters, err := helper.GetConsumerClustres(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get consumer clusters, error is: %v", err)
return err
Expand All @@ -324,62 +324,26 @@ func (c *EndpointsliceDispatchController) dispatchEndpointSlice(ctx context.Cont
if clusterName == epsSourceCluster {
continue
}

// It couldn't happen here
if len(work.Spec.Workload.Manifests) == 0 {
continue
}

// There should be only one manifest in the work, let's use the first one.
manifest := work.Spec.Workload.Manifests[0]
unstructuredObj := &unstructured.Unstructured{}
if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil {
klog.Errorf("Failed to unmarshal work manifest, error is: %v", err)
return err
}

endpointSlice := &discoveryv1.EndpointSlice{}
if err := helper.ConvertToTypedObject(unstructuredObj, endpointSlice); err != nil {
klog.Errorf("Failed to convert unstructured object to typed object, error is: %v", err)
clusterObj, err := util.GetCluster(c.Client, clusterName)
if err != nil {
if apierrors.IsNotFound(err) {
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonClusterNotFound, "Consumer cluster %s is not found", clusterName)
continue
}
klog.Errorf("Failed to get cluster %s, error is: %v", clusterName, err)
return err
}

// Use this name to avoid naming conflicts and locate the EPS source cluster.
endpointSlice.Name = epsSourceCluster + "-" + endpointSlice.Name
clusterNamespace := names.GenerateExecutionSpaceName(clusterName)
endpointSlice.Labels = map[string]string{
discoveryv1.LabelServiceName: mcs.Name,
workv1alpha1.WorkNamespaceLabel: clusterNamespace,
workv1alpha1.WorkNameLabel: work.Name,
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
discoveryv1.LabelManagedBy: util.EndpointSliceDispatchControllerLabelValue,
if !util.IsClusterReady(&clusterObj.Status) {
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceFailed,
"Consumer cluster %s is not ready, skip to propagate MultiClusterService", clusterName)
continue
}
endpointSlice.Annotations = map[string]string{
// This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
if !helper.IsAPIEnabled(clusterObj.Status.APIEnablements, util.EndpointSliceGVK.GroupVersion().String(), util.EndpointSliceGVK.Kind) {
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonAPIIncompatible, "Consumer cluster %s does not support EndpointSlice", clusterName)
continue
}

workMeta := metav1.ObjectMeta{
Name: work.Name,
Namespace: clusterNamespace,
Finalizers: []string{util.ExecutionControllerFinalizer},
Annotations: map[string]string{
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
},
Labels: map[string]string{
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
util.MultiClusterServiceNameLabel: mcs.Name,
util.MultiClusterServiceNamespaceLabel: mcs.Namespace,
},
}
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
if err != nil {
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), epsSourceCluster, clusterName, err)
if err := c.ensureEndpointSliceWork(mcs, work, epsSourceCluster, clusterName); err != nil {
return err
}
}
Expand All @@ -394,6 +358,69 @@ func (c *EndpointsliceDispatchController) dispatchEndpointSlice(ctx context.Cont
return nil
}

func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(mcs *networkingv1alpha1.MultiClusterService,
work *workv1alpha1.Work, providerCluster, consumerCluster string) error {
// It couldn't happen here
if len(work.Spec.Workload.Manifests) == 0 {
return nil
}

// There should be only one manifest in the work, let's use the first one.
manifest := work.Spec.Workload.Manifests[0]
unstructuredObj := &unstructured.Unstructured{}
if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil {
klog.Errorf("Failed to unmarshal work manifest, error is: %v", err)
return err
}

endpointSlice := &discoveryv1.EndpointSlice{}
if err := helper.ConvertToTypedObject(unstructuredObj, endpointSlice); err != nil {
klog.Errorf("Failed to convert unstructured object to typed object, error is: %v", err)
return err
}

// Use this name to avoid naming conflicts and locate the EPS source cluster.
endpointSlice.Name = providerCluster + "-" + endpointSlice.Name
clusterNamespace := names.GenerateExecutionSpaceName(consumerCluster)
endpointSlice.Labels = map[string]string{
discoveryv1.LabelServiceName: mcs.Name,
workv1alpha1.WorkNamespaceLabel: clusterNamespace,
workv1alpha1.WorkNameLabel: work.Name,
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
discoveryv1.LabelManagedBy: util.EndpointSliceDispatchControllerLabelValue,
}
endpointSlice.Annotations = map[string]string{
// This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version
util.EndpointSliceProvisionClusterAnnotation: providerCluster,
}

workMeta := metav1.ObjectMeta{
Name: work.Name,
Namespace: clusterNamespace,
Finalizers: []string{util.ExecutionControllerFinalizer},
Annotations: map[string]string{
util.EndpointSliceProvisionClusterAnnotation: providerCluster,
},
Labels: map[string]string{
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
util.MultiClusterServiceNameLabel: mcs.Name,
util.MultiClusterServiceNamespaceLabel: mcs.Namespace,
},
}
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
if err != nil {
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
}

return nil
}

func (c *EndpointsliceDispatchController) cleanupEndpointSliceFromConsumerClusters(ctx context.Context, work *workv1alpha1.Work) error {
// TBD: There may be a better way without listing all works.
workList := &workv1alpha1.WorkList{}
Expand Down
66 changes: 37 additions & 29 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package multiclusterservice
import (
"context"
"fmt"
"strings"

"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -86,23 +85,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
return c.handleMultiClusterServiceDelete(mcs.DeepCopy())
}

providerClusters, noneExistProviderClusters, err := helper.GetProviderClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provider clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return controllerruntime.Result{}, err
}
consumerClusters, noneExistConsumerClusters, err := helper.GetConsumerClustres(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get consumer clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return controllerruntime.Result{}, err
}
if len(noneExistProviderClusters) != 0 || len(noneExistConsumerClusters) != 0 {
msgProvider := strings.Join(noneExistProviderClusters.UnsortedList(), ",")
msgConsumer := strings.Join(noneExistConsumerClusters.UnsortedList(), ",")
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonConfigurationRedundant,
fmt.Sprintf("ProviderClusters(%s)/ConsumerClusters(%s) dont's exist, Karmada will ignore them", msgProvider, msgConsumer))
}

var err error
defer func() {
if err != nil {
_ = c.updateMultiClusterServiceStatus(mcs, metav1.ConditionFalse, "ServiceAppliedFailed", err.Error())
Expand All @@ -113,7 +96,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonSyncServiceSucceed, "Service is propagated to target clusters.")
}()

if err = c.handleMultiClusterServiceCreateOrUpdate(mcs.DeepCopy(), providerClusters, consumerClusters); err != nil {
if err = c.handleMultiClusterServiceCreateOrUpdate(mcs.DeepCopy()); err != nil {
return controllerruntime.Result{}, err
}
return controllerruntime.Result{}, nil
Expand Down Expand Up @@ -230,9 +213,20 @@ func (c *MCSController) cleanProviderEndpointSliceWork(work *workv1alpha1.Work)
return nil
}

func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(mcs *networkingv1alpha1.MultiClusterService, providerClusters, consumerClusters sets.Set[string]) error {
func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(mcs *networkingv1alpha1.MultiClusterService) error {
klog.V(4).Infof("Begin to handle MultiClusterService(%s/%s) create or update event", mcs.Namespace, mcs.Name)

providerClusters, err := helper.GetProviderClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provider clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}
consumerClusters, err := helper.GetConsumerClustres(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get consumer clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}

// 1. if mcs not contain CrossCluster type, delete service work if needed
if !helper.MultiClusterServiceCrossClusterEnabled(mcs) {
if err := c.retrieveService(mcs); err != nil {
Expand Down Expand Up @@ -265,18 +259,16 @@ func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(mcs *networkingv

// 5. make sure service exist
svc := &corev1.Service{}
err := c.Client.Get(context.Background(), types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
err = c.Client.Get(context.Background(), types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc)
// If the Serivice are deleted, the Service's ResourceBinding will be cleaned by GC
if err != nil && !apierrors.IsNotFound(err) {
if err != nil {
klog.Errorf("Failed to get service(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}

// 6. if service exists, create or update corresponding ResourceBinding
if err == nil {
if err := c.propagateService(context.Background(), mcs, svc, providerClusters, consumerClusters); err != nil {
return err
}
if err := c.propagateService(context.Background(), mcs, svc, providerClusters, consumerClusters); err != nil {
return err
}

klog.V(4).Infof("Success to reconcile MultiClusterService(%s/%s)", mcs.Namespace, mcs.Name)
Expand All @@ -285,9 +277,25 @@ func (c *MCSController) handleMultiClusterServiceCreateOrUpdate(mcs *networkingv

func (c *MCSController) propagateMultiClusterService(mcs *networkingv1alpha1.MultiClusterService, providerClusters sets.Set[string]) error {
for clusterName := range providerClusters {
if !c.IsClusterReady(clusterName) {
clusterObj, err := util.GetCluster(c.Client, clusterName)
if err != nil {
if apierrors.IsNotFound(err) {
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonClusterNotFound, "Provider cluster %s is not found", clusterName)
continue
}
klog.Errorf("Failed to get cluster %s, error is: %v", clusterName, err)
return err
}
if !util.IsClusterReady(&clusterObj.Status) {
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceFailed,
"Provider cluster %s is not ready, skip to propagate MultiClusterService", clusterName)
continue
}
if !helper.IsAPIEnabled(clusterObj.Status.APIEnablements, util.EndpointSliceGVK.GroupVersion().String(), util.EndpointSliceGVK.Kind) {
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonAPIIncompatible, "Provider cluster %s does not support EndpointSlice", clusterName)
continue
}

workMeta := metav1.ObjectMeta{
Name: names.GenerateWorkName(mcs.Kind, mcs.Name, mcs.Namespace),
Namespace: names.GenerateExecutionSpaceName(clusterName),
Expand Down Expand Up @@ -639,7 +647,7 @@ func (c *MCSController) needSyncMultiClusterService(mcs *networkingv1alpha1.Mult
return true, nil
}

providerClusters, _, err := helper.GetProviderClusters(c.Client, mcs)
providerClusters, err := helper.GetProviderClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provider clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return false, err
Expand All @@ -648,7 +656,7 @@ func (c *MCSController) needSyncMultiClusterService(mcs *networkingv1alpha1.Mult
return true, nil
}

consumerClusters, _, err := helper.GetConsumerClustres(c.Client, mcs)
consumerClusters, err := helper.GetConsumerClustres(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get consumer clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return false, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ const (
EventReasonDispatchEndpointSliceFailed = "DispatchEndpointSliceFailed"
// EventReasonDispatchEndpointSliceSucceed indicates that dispatch endpointslice succeed.
EventReasonDispatchEndpointSliceSucceed = "DispatchEndpointSliceSucceed"
// EventReasonConfigurationRedundant indicates that MultiClusterService configuration redundant.
EventReasonConfigurationRedundant = "ConfigurationRedundant"
// EventReasonClusterNotFound indicates that the cluster configured in MultiClusterService does not exist.
EventReasonClusterNotFound = "ClusterNotFound"
// EventReasonAPIIncompatible indicates that the MultiClusterService may not function properly as some member clusters do not support EndpointSlice.
EventReasonAPIIncompatible = "APIIncompatible"
)
10 changes: 9 additions & 1 deletion pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ limitations under the License.

package util

import "time"
import (
"time"

discoveryv1 "k8s.io/api/discovery/v1"
)

// Define labels used by karmada system.
const (
Expand Down Expand Up @@ -211,3 +215,7 @@ const (
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
CacheSyncTimeout = 2 * time.Minute
)

var (
EndpointSliceGVK = discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice")
)
24 changes: 12 additions & 12 deletions pkg/util/helper/mcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,34 @@ func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiCluster
return false
}

func GetProviderClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (existClusters, noneExistClusters sets.Set[string], err error) {
func GetProviderClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
providerClusters := sets.New[string]()
for _, p := range mcs.Spec.ProviderClusters {
providerClusters.Insert(p.Name)
}
if len(providerClusters) != 0 {
return providerClusters, nil
}
allClusters, err := util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, nil, err
}
if len(providerClusters) == 0 {
return allClusters, nil, nil
return nil, err
}
return providerClusters.Clone().Intersection(allClusters), providerClusters.Clone().Delete(allClusters.UnsortedList()...), nil
return allClusters, nil
}

func GetConsumerClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (existClusters, noneExistClusters sets.Set[string], err error) {
func GetConsumerClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
consumerClusters := sets.New[string]()
for _, c := range mcs.Spec.ConsumerClusters {
consumerClusters.Insert(c.Name)
}
if len(consumerClusters) != 0 {
return consumerClusters, nil
}
allClusters, err := util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, nil, err
}
if len(consumerClusters) == 0 {
return allClusters, nil, nil
return nil, err
}
return consumerClusters.Clone().Intersection(allClusters), consumerClusters.Clone().Delete(allClusters.UnsortedList()...), nil
return allClusters, nil
}

0 comments on commit 230d31b

Please sign in to comment.