diff --git a/maestro/kubecontrol/kubecontrol.go b/maestro/kubecontrol/kubecontrol.go index 3323b511c..8dc251dfb 100644 --- a/maestro/kubecontrol/kubecontrol.go +++ b/maestro/kubecontrol/kubecontrol.go @@ -6,7 +6,7 @@ import ( "github.com/ns1labs/orb/pkg/errors" "github.com/plgd-dev/kit/v2/codec/json" "go.uber.org/zap" - v1 "k8s.io/api/core/v1" + k8sappsv1 "k8s.io/api/apps/v1" k8smetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sv1 "k8s.io/client-go/applyconfigurations/apps/v1" "k8s.io/client-go/kubernetes" @@ -142,20 +142,18 @@ func (svc *deployService) getDeploymentState(ctx context.Context, _, sinkId stri // Since this can take a while to be retrieved, we need to have a wait mechanism for i := 0; i < 5; i++ { time.Sleep(5 * time.Second) - pods, err2 := svc.clientSet.CoreV1().Pods(namespace).List(ctx, k8smetav1.ListOptions{}) + deploymentList, err2 := svc.clientSet.AppsV1().Deployments(namespace).List(ctx, k8smetav1.ListOptions{}) if err2 != nil { svc.logger.Error("error on reading pods", zap.Error(err2)) return "", "", err2 } - for _, pod := range pods.Items { - if strings.Contains(pod.Name, sinkId) { - deploymentName = pod.Name - if pod.Status.Phase == v1.PodFailed { - svc.logger.Error("error on retrieving collector, pod is broken") - return "", "broken", errors.New(pod.Status.Message) - } - if pod.Status.Phase != v1.PodRunning { - break + for _, deployment := range deploymentList.Items { + if strings.Contains(deployment.Name, sinkId) { + svc.logger.Info("found deployment for sink") + deploymentName = deployment.Name + if len(deployment.Status.Conditions) == 0 || deployment.Status.Conditions[0].Type == k8sappsv1.DeploymentReplicaFailure { + svc.logger.Error("error on retrieving collector, deployment is broken") + return "", "broken", errors.New("error on retrieving collector, deployment is broken") } status = "active" return diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index a841340dd..76367ca7e 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -152,7 +152,6 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { return } sinksRes, err := svc.sinksClient.RetrieveSinks(ctx, &sinkspb.SinksFilterReq{OtelEnabled: "enabled"}) - if err != nil { svc.logger.Error("error collecting sinks", zap.Error(err)) return @@ -161,8 +160,8 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { for _, collector := range runningCollectors { var sink *sinkspb.SinkRes for _, sinkRes := range sinksRes.Sinks { - svc.logger.Info("Debug collector name, collector id", zap.String("name", collector.Name), zap.String("sinkID", sinkRes.Id)) if strings.Contains(collector.Name, sinkRes.Id) { + svc.logger.Warn("collector found for sink", zap.String("collector name", collector.Name), zap.String("sink", sinkRes.Id)) sink = sinkRes break }