From b901c080de17e101d24a7cabd019af5294825422 Mon Sep 17 00:00:00 2001 From: Amir Blum Date: Fri, 18 Oct 2024 09:12:42 +0300 Subject: [PATCH] feat: node collector info in odigos describe (#1597) This PR: - record the status of autoscaler node collector info into the collector group condition to make it available for display. - enhance the `odigos describe` command to include info about the node collector deployment: ![image](https://github.com/user-attachments/assets/9d45ebb0-8a3d-44c7-a3da-de3daaacb167) --- .../controllers/common/deployedcondition.go | 44 +++++++ .../controllers/datacollection/daemonset.go | 14 ++- autoscaler/controllers/gateway/root.go | 45 +------ k8sutils/pkg/describe/odigos.go | 115 +++++++++++++++--- .../controllers/collectorsgroup_controller.go | 1 + 5 files changed, 160 insertions(+), 59 deletions(-) create mode 100644 autoscaler/controllers/common/deployedcondition.go diff --git a/autoscaler/controllers/common/deployedcondition.go b/autoscaler/controllers/common/deployedcondition.go new file mode 100644 index 000000000..efcf4ee55 --- /dev/null +++ b/autoscaler/controllers/common/deployedcondition.go @@ -0,0 +1,44 @@ +package common + +import ( + "encoding/json" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetCollectorsGroupDeployedConditionsPatch(err error) string { + + status := metav1.ConditionTrue + if err != nil { + status = metav1.ConditionFalse + } + + message := "Gateway collector is deployed in the cluster" + if err != nil { + message = err.Error() + } + + reason := "GatewayDeployedCreatedSuccessfully" + if err != nil { + // in the future, we can be more specific and break it down to + // more detailed reasons about what exactly failed + reason = "GatewayDeployedCreationFailed" + } + + patch := map[string]interface{}{ + "status": map[string]interface{}{ + "conditions": []metav1.Condition{{ + Type: "Deployed", + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: metav1.NewTime(time.Now()), + }}, + }, + } + + patchData, _ := json.Marshal(patch) + // marshal error is ignored as it is not expected to happen + return string(patchData) +} diff --git a/autoscaler/controllers/datacollection/daemonset.go b/autoscaler/controllers/datacollection/daemonset.go index 3f0bd968a..0ea7df5a9 100644 --- a/autoscaler/controllers/datacollection/daemonset.go +++ b/autoscaler/controllers/datacollection/daemonset.go @@ -16,6 +16,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -56,10 +57,20 @@ func (dm *DelayManager) RunSyncDaemonSetWithDelayAndSkipNewCalls(delay time.Dura // Finish the function execution after the delay time.AfterFunc(delay, func() { + var err error + logger := log.FromContext(ctx) + dm.mu.Lock() defer dm.mu.Unlock() defer dm.finishProgress() - var err error + defer func() { + statusPatchString := common.GetCollectorsGroupDeployedConditionsPatch(err) + statusErr := c.Status().Patch(ctx, collection, client.RawPatch(types.MergePatchType, []byte(statusPatchString))) + if statusErr != nil { + logger.Error(statusErr, "Failed to patch collectors group status") + // just log the error, do not fail the reconciliation + } + }() for i := 0; i < retries; i++ { _, err = syncDaemonSet(ctx, dests, collection, c, scheme, secrets, version) @@ -67,6 +78,7 @@ func (dm *DelayManager) RunSyncDaemonSetWithDelayAndSkipNewCalls(delay time.Dura return } } + log.FromContext(ctx).Error(err, "Failed to sync DaemonSet") }) } diff --git a/autoscaler/controllers/gateway/root.go b/autoscaler/controllers/gateway/root.go index e7492cd03..c9552203b 100644 --- a/autoscaler/controllers/gateway/root.go +++ b/autoscaler/controllers/gateway/root.go @@ -2,18 +2,15 @@ package gateway import ( "context" - "encoding/json" - "time" appsv1 "k8s.io/api/apps/v1" odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common" - "github.com/odigos-io/odigos/common" + odigoscommon "github.com/odigos-io/odigos/common" k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts" "github.com/odigos-io/odigos/k8sutils/pkg/env" "github.com/odigos-io/odigos/k8sutils/pkg/utils" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -26,42 +23,6 @@ var ( } ) -func getCollectorsGroupDeployedConditionsPatch(err error) string { - - status := metav1.ConditionTrue - if err != nil { - status = metav1.ConditionFalse - } - - message := "Gateway collector is deployed in the cluster" - if err != nil { - message = err.Error() - } - - reason := "GatewayDeployedCreatedSuccessfully" - if err != nil { - // in the future, we can be more specific and break it down to - // more detailed reasons about what exactly failed - reason = "GatewayDeployedCreationFailed" - } - - patch := map[string]interface{}{ - "status": map[string]interface{}{ - "conditions": []metav1.Condition{{ - Type: "Deployed", - Status: status, - Reason: reason, - Message: message, - LastTransitionTime: metav1.NewTime(time.Now()), - }}, - }, - } - - patchData, _ := json.Marshal(patch) - // marshal error is ignored as it is not expected to happen - return string(patchData) -} - func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) error { logger := log.FromContext(ctx) @@ -95,7 +56,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme, } err = syncGateway(&dests, &processors, &gatewayCollectorGroup, ctx, k8sClient, scheme, imagePullSecrets, odigosVersion, &odigosConfig) - statusPatchString := getCollectorsGroupDeployedConditionsPatch(err) + statusPatchString := commonconf.GetCollectorsGroupDeployedConditionsPatch(err) statusErr := k8sClient.Status().Patch(ctx, &gatewayCollectorGroup, client.RawPatch(types.MergePatchType, []byte(statusPatchString))) if statusErr != nil { logger.Error(statusErr, "Failed to patch collectors group status") @@ -106,7 +67,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme, func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, - c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *common.OdigosConfiguration) error { + c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *odigoscommon.OdigosConfiguration) error { logger := log.FromContext(ctx) logger.V(0).Info("Syncing gateway") diff --git a/k8sutils/pkg/describe/odigos.go b/k8sutils/pkg/describe/odigos.go index e80ab76ea..c8ed1566e 100644 --- a/k8sutils/pkg/describe/odigos.go +++ b/k8sutils/pkg/describe/odigos.go @@ -22,6 +22,18 @@ type clusterCollectorResources struct { LatestRevisionPods *corev1.PodList } +type nodeCollectorResources struct { + CollectorsGroup *odigosv1.CollectorsGroup + DaemonSet *appsv1.DaemonSet +} + +type odigosResources struct { + ClusterCollector clusterCollectorResources + NodeCollector nodeCollectorResources + Destinations *odigosv1.DestinationList + InstrumentationConfigs *odigosv1.InstrumentationConfigList +} + func getClusterCollectorResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (clusterCollector clusterCollectorResources, err error) { clusterCollector = clusterCollectorResources{} @@ -77,19 +89,41 @@ func getClusterCollectorResources(ctx context.Context, kubeClient kubernetes.Int return } -func getRelevantOdigosResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (clusterCollector clusterCollectorResources, destinations *odigosv1.DestinationList, instrumentationConfigs *odigosv1.InstrumentationConfigList, err error) { +func getNodeCollectorResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (nodeCollector nodeCollectorResources, err error) { + + nodeCollector = nodeCollectorResources{} + + nodeCollector.CollectorsGroup, err = odigosClient.CollectorsGroups(odigosNs).Get(ctx, consts.OdigosNodeCollectorCollectorGroupName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return + } + + nodeCollector.DaemonSet, err = kubeClient.AppsV1().DaemonSets(odigosNs).Get(ctx, consts.OdigosNodeCollectorDaemonSetName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return + } + + return +} + +func getRelevantOdigosResources(ctx context.Context, kubeClient kubernetes.Interface, odigosClient odigosclientset.OdigosV1alpha1Interface, odigosNs string) (odigos odigosResources, err error) { + + odigos.ClusterCollector, err = getClusterCollectorResources(ctx, kubeClient, odigosClient, odigosNs) + if err != nil { + return + } - clusterCollector, err = getClusterCollectorResources(ctx, kubeClient, odigosClient, odigosNs) + odigos.NodeCollector, err = getNodeCollectorResources(ctx, kubeClient, odigosClient, odigosNs) if err != nil { return } - destinations, err = odigosClient.Destinations(odigosNs).List(ctx, metav1.ListOptions{}) + odigos.Destinations, err = odigosClient.Destinations(odigosNs).List(ctx, metav1.ListOptions{}) if err != nil { return } - instrumentationConfigs, err = odigosClient.InstrumentationConfigs("").List(ctx, metav1.ListOptions{}) + odigos.InstrumentationConfigs, err = odigosClient.InstrumentationConfigs("").List(ctx, metav1.ListOptions{}) if err != nil { return } @@ -109,8 +143,8 @@ func printOdigosPipelineStatus(numInstrumentationConfigs, numDestinations int, e } } -func printClusterGatewayStatus(clusterCollector clusterCollectorResources, expectingPipeline bool, sb *strings.Builder) { - describeText(sb, 1, "Cluster Gateway:") +func printClusterCollectorStatus(clusterCollector clusterCollectorResources, expectingPipeline bool, sb *strings.Builder) { + describeText(sb, 1, "Cluster Collector:") if clusterCollector.CollectorsGroup == nil { describeText(sb, 2, wrapTextSuccessOfFailure("Collectors Group Not Created", !expectingPipeline)) return @@ -129,13 +163,16 @@ func printClusterGatewayStatus(clusterCollector clusterCollectorResources, expec describeText(sb, 2, wrapTextInRed("Deployed: Status Unavailable")) } else { if deployedCondition.Status == metav1.ConditionTrue { - describeText(sb, 2, wrapTextInGreen("Deployed: True")) + describeText(sb, 2, wrapTextInGreen("Deployed: true")) } else { - describeText(sb, 2, wrapTextInRed("Deployed: False")) + describeText(sb, 2, wrapTextInRed("Deployed: false")) describeText(sb, 2, wrapTextInRed(fmt.Sprintf("Reason: %s", deployedCondition.Message))) } } + ready := clusterCollector.CollectorsGroup.Status.Ready + describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Ready: %t", ready), ready)) + if clusterCollector.LatestRevisionPods == nil || clusterCollector.Deployment == nil { describeText(sb, 2, wrapTextInRed("Number of Replicas: Status Unavailable")) } else { @@ -173,23 +210,69 @@ func printClusterGatewayStatus(clusterCollector clusterCollectorResources, expec } } -func printOdigosPipeline(clusterCollector clusterCollectorResources, destinations *odigosv1.DestinationList, instrumentationConfigs *odigosv1.InstrumentationConfigList, sb *strings.Builder) { +func printNodeCollectorStatus(nodeCollector nodeCollectorResources, expectingNodeCollector bool, sb *strings.Builder) { + describeText(sb, 1, "Node Collector:") + if nodeCollector.CollectorsGroup == nil { + describeText(sb, 2, wrapTextSuccessOfFailure("Collectors Group Not Created", !expectingNodeCollector)) + return + } + + describeText(sb, 2, wrapTextSuccessOfFailure("Collectors Group Created", expectingNodeCollector)) + + var deployedCondition *metav1.Condition + for _, condition := range nodeCollector.CollectorsGroup.Status.Conditions { + if condition.Type == "Deployed" { + deployedCondition = &condition + break + } + } + if deployedCondition == nil { + describeText(sb, 2, wrapTextInRed("Deployed: Status Unavailable")) + } else { + if deployedCondition.Status == metav1.ConditionTrue { + describeText(sb, 2, wrapTextInGreen("Deployed: True")) + } else { + describeText(sb, 2, wrapTextInRed("Deployed: False")) + describeText(sb, 2, wrapTextInRed(fmt.Sprintf("Reason: %s", deployedCondition.Message))) + } + } + + ready := nodeCollector.CollectorsGroup.Status.Ready + describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Ready: %t", ready), ready)) + + // this is copied from k8sutils/pkg/describe/describe.go + // I hope the info is accurate since there can be many edge cases + describeText(sb, 2, "Desired Number of Nodes Scheduled: %d", nodeCollector.DaemonSet.Status.DesiredNumberScheduled) + currentMeetsDesired := nodeCollector.DaemonSet.Status.DesiredNumberScheduled == nodeCollector.DaemonSet.Status.CurrentNumberScheduled + describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Current Number of Nodes Scheduled: %d", nodeCollector.DaemonSet.Status.CurrentNumberScheduled), currentMeetsDesired)) + updatedMeetsDesired := nodeCollector.DaemonSet.Status.DesiredNumberScheduled == nodeCollector.DaemonSet.Status.UpdatedNumberScheduled + describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Number of Nodes Scheduled with Up-to-date Pods: %d", nodeCollector.DaemonSet.Status.UpdatedNumberScheduled), updatedMeetsDesired)) + availableMeetsDesired := nodeCollector.DaemonSet.Status.DesiredNumberScheduled == nodeCollector.DaemonSet.Status.NumberAvailable + describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Number of Nodes Scheduled with Available Pods: %d", nodeCollector.DaemonSet.Status.NumberAvailable), availableMeetsDesired)) + noMisscheduled := nodeCollector.DaemonSet.Status.NumberMisscheduled == 0 + describeText(sb, 2, wrapTextSuccessOfFailure(fmt.Sprintf("Number of Nodes Misscheduled: %d", nodeCollector.DaemonSet.Status.NumberMisscheduled), noMisscheduled)) +} + +func printOdigosPipeline(odigosResources odigosResources, sb *strings.Builder) { describeText(sb, 0, "Odigos Pipeline:") - numDestinations := len(destinations.Items) - numInstrumentationConfigs := len(instrumentationConfigs.Items) + numDestinations := len(odigosResources.Destinations.Items) + numInstrumentationConfigs := len(odigosResources.InstrumentationConfigs.Items) // odigos will only initiate pipeline if there are any sources or destinations expectingPipeline := numDestinations > 0 || numInstrumentationConfigs > 0 printOdigosPipelineStatus(numInstrumentationConfigs, numDestinations, expectingPipeline, sb) - printClusterGatewayStatus(clusterCollector, expectingPipeline, sb) + printClusterCollectorStatus(odigosResources.ClusterCollector, expectingPipeline, sb) + sb.WriteString("\n") + expectingNodeCollector := odigosResources.ClusterCollector.CollectorsGroup != nil && odigosResources.ClusterCollector.CollectorsGroup.Status.Ready && numInstrumentationConfigs > 0 + printNodeCollectorStatus(odigosResources.NodeCollector, expectingNodeCollector, sb) } -func printDescribeOdigos(odigosVersion string, clusterCollector clusterCollectorResources, destinations *odigosv1.DestinationList, instrumentationConfigs *odigosv1.InstrumentationConfigList) string { +func printDescribeOdigos(odigosVersion string, odigosResources odigosResources) string { var sb strings.Builder printOdigosVersion(odigosVersion, &sb) sb.WriteString("\n") - printOdigosPipeline(clusterCollector, destinations, instrumentationConfigs, &sb) + printOdigosPipeline(odigosResources, &sb) return sb.String() } @@ -201,10 +284,10 @@ func DescribeOdigos(ctx context.Context, kubeClient kubernetes.Interface, odigos return fmt.Sprintf("Error: %v\n", err) } - clusterCollector, destinations, instrumentationConfigs, err := getRelevantOdigosResources(ctx, kubeClient, odigosClient, odigosNs) + odigosResources, err := getRelevantOdigosResources(ctx, kubeClient, odigosClient, odigosNs) if err != nil { return fmt.Sprintf("Error: %v\n", err) } - return printDescribeOdigos(odigosVersion, clusterCollector, destinations, instrumentationConfigs) + return printDescribeOdigos(odigosVersion, odigosResources) } diff --git a/scheduler/controllers/collectorsgroup_controller.go b/scheduler/controllers/collectorsgroup_controller.go index edf563af4..17f7ab728 100644 --- a/scheduler/controllers/collectorsgroup_controller.go +++ b/scheduler/controllers/collectorsgroup_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/k8sutils/pkg/utils" "github.com/odigos-io/odigos/scheduler/controllers/collectorgroups"