Skip to content

Commit

Permalink
fix(maestro): change from pod, to deployment and added logs for match…
Browse files Browse the repository at this point in the history
…ing items (#2198)

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>
  • Loading branch information
lpegoraro authored Feb 10, 2023
1 parent 62a561f commit 03a5e15
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
20 changes: 9 additions & 11 deletions maestro/kubecontrol/kubecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/ns1labs/orb/pkg/errors"
"github.com/plgd-dev/kit/v2/codec/json"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
k8sappsv1 "k8s.io/api/apps/v1"
k8smetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sv1 "k8s.io/client-go/applyconfigurations/apps/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -142,20 +142,18 @@ func (svc *deployService) getDeploymentState(ctx context.Context, _, sinkId stri
// Since this can take a while to be retrieved, we need to have a wait mechanism
for i := 0; i < 5; i++ {
time.Sleep(5 * time.Second)
pods, err2 := svc.clientSet.CoreV1().Pods(namespace).List(ctx, k8smetav1.ListOptions{})
deploymentList, err2 := svc.clientSet.AppsV1().Deployments(namespace).List(ctx, k8smetav1.ListOptions{})
if err2 != nil {
svc.logger.Error("error on reading pods", zap.Error(err2))
return "", "", err2
}
for _, pod := range pods.Items {
if strings.Contains(pod.Name, sinkId) {
deploymentName = pod.Name
if pod.Status.Phase == v1.PodFailed {
svc.logger.Error("error on retrieving collector, pod is broken")
return "", "broken", errors.New(pod.Status.Message)
}
if pod.Status.Phase != v1.PodRunning {
break
for _, deployment := range deploymentList.Items {
if strings.Contains(deployment.Name, sinkId) {
svc.logger.Info("found deployment for sink")
deploymentName = deployment.Name
if len(deployment.Status.Conditions) == 0 || deployment.Status.Conditions[0].Type == k8sappsv1.DeploymentReplicaFailure {
svc.logger.Error("error on retrieving collector, deployment is broken")
return "", "broken", errors.New("error on retrieving collector, deployment is broken")
}
status = "active"
return
Expand Down
3 changes: 1 addition & 2 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
return
}
sinksRes, err := svc.sinksClient.RetrieveSinks(ctx, &sinkspb.SinksFilterReq{OtelEnabled: "enabled"})

if err != nil {
svc.logger.Error("error collecting sinks", zap.Error(err))
return
Expand All @@ -161,8 +160,8 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
for _, collector := range runningCollectors {
var sink *sinkspb.SinkRes
for _, sinkRes := range sinksRes.Sinks {
svc.logger.Info("Debug collector name, collector id", zap.String("name", collector.Name), zap.String("sinkID", sinkRes.Id))
if strings.Contains(collector.Name, sinkRes.Id) {
svc.logger.Warn("collector found for sink", zap.String("collector name", collector.Name), zap.String("sink", sinkRes.Id))
sink = sinkRes
break
}
Expand Down

0 comments on commit 03a5e15

Please sign in to comment.