From 62a561f5a1926e977f47b2ec6d8ffd5ee6f41a7a Mon Sep 17 00:00:00 2001 From: Luiz Henrique Pegoraro Date: Fri, 10 Feb 2023 09:38:31 -0500 Subject: [PATCH] fix(maestro): clean up code and create wait mechanism for apply deployments (#2197) * fix(maestro): fix extract id. Signed-off-by: Luiz Pegoraro * fix(maestro): clean up code and enhanced verification of collector apply. Signed-off-by: Luiz Pegoraro * fix(maestro): fix method calls. Signed-off-by: Luiz Pegoraro * fix(maestro): fix method calls. Signed-off-by: Luiz Pegoraro --------- Signed-off-by: Luiz Pegoraro --- maestro/kubecontrol/kubecontrol.go | 142 +++++++++++------------------ maestro/monitor/monitor.go | 12 +-- maestro/redis/consumer/hashset.go | 4 +- maestro/redis/consumer/streams.go | 4 +- 4 files changed, 60 insertions(+), 102 deletions(-) diff --git a/maestro/kubecontrol/kubecontrol.go b/maestro/kubecontrol/kubecontrol.go index e154b92e8..3323b511c 100644 --- a/maestro/kubecontrol/kubecontrol.go +++ b/maestro/kubecontrol/kubecontrol.go @@ -3,7 +3,6 @@ package kubecontrol import ( "bufio" "context" - "fmt" "github.com/ns1labs/orb/pkg/errors" "github.com/plgd-dev/kit/v2/codec/json" "go.uber.org/zap" @@ -46,115 +45,80 @@ type Service interface { CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error // DeleteOtelCollector - delete an existing collector by id - DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error + DeleteOtelCollector(ctx context.Context, ownerID, sinkID string) error // UpdateOtelCollector - update an existing collector by id UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error } -func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error { - - fileContent := []byte(manifest) - tmp := strings.Split(string(fileContent), "\n") - newContent := strings.Join(tmp[1:], "\n") - - _, status, err := svc.getDeploymentState(ctx, ownerID, sinkId) +func (svc *deployService) removeDeployment(ctx context.Context, ownerID string, sinkId string) (error, bool) { + deploymentName, status, err := svc.getDeploymentState(ctx, ownerID, sinkId) if err != nil { if status == "broken" { - operation = "delete" + svc.logger.Info("get the deployed pod broke", zap.String("sinkID", sinkId), zap.Error(err)) + return nil, true } } - if operation == "apply" { - if status == "active" { - svc.logger.Info("Already applied Sink ID=" + sinkId) - return nil - } - } else if operation == "delete" { - if status == "deleted" { - svc.logger.Info("Already deleted Sink ID=" + sinkId) - return nil - } + if status == "deleted" { + svc.logger.Info("Already deleted collector for Sink ID", zap.String("sinkID", sinkId)) + return nil, true } - - err = os.WriteFile("/tmp/otel-collector-"+sinkId+".json", []byte(newContent), 0644) + err = svc.clientSet.AppsV1().Deployments(namespace).Delete(ctx, deploymentName, k8smetav1.DeleteOptions{}) if err != nil { - svc.logger.Error("failed to write file content", zap.Error(err)) - return err - } - - stdOutListenFunction := func(out *bufio.Scanner, err *bufio.Scanner) { - for out.Scan() { - svc.logger.Info("Deploy Info: " + out.Text()) - } - for err.Scan() { - svc.logger.Info("Deploy Error: " + err.Text()) - } + svc.logger.Info("failed to remove deployment", zap.Error(err)) + return err, true } - - // execute action - cmd := exec.Command("kubectl", operation, "-f", "/tmp/otel-collector-"+sinkId+".json", "-n", namespace) - _, _, err = execCmd(ctx, cmd, svc.logger, stdOutListenFunction) - - if err == nil { - svc.logger.Info(fmt.Sprintf("successfully %s the otel-collector for sink-id: %s", operation, sinkId)) - } - svc.logger.Info(fmt.Sprintf("successfully %s the otel-collector for sink-id: %s", operation, sinkId)) - - return nil + return nil, false } -func (svc *deployService) newCollectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error { +func (svc *deployService) applyDeployment(ctx context.Context, ownerID string, sinkId string, manifest string) (error, bool) { fileContent := []byte(manifest) tmp := strings.Split(string(fileContent), "\n") newContent := strings.Join(tmp[1:], "\n") - deploymentName, status, err := svc.getDeploymentState(ctx, ownerID, sinkId) + _, status, err := svc.getDeploymentState(ctx, ownerID, sinkId) if err != nil { - if status == "broken" { - operation = "delete" - } + return err, false } fileName := "/tmp/otel-collector-" + sinkId + ".json" err = os.WriteFile(fileName, []byte(newContent), 0644) if err != nil { svc.logger.Error("failed to write file content", zap.Error(err)) - return err + return err, true } - if operation == "apply" { - if status == "active" { - err := os.Remove(fileName) - if err != nil { - svc.logger.Info("failed to remove file", zap.Error(err)) - return err - } - svc.logger.Info("Already applied collector for Sink ID", zap.String("sinkID", sinkId)) - return nil - } - var applyConfig k8sv1.DeploymentApplyConfiguration - err := json.Decode(fileContent, applyConfig) + if status == "active" { + err := os.Remove(fileName) if err != nil { - return err + svc.logger.Info("failed to remove file", zap.Error(err)) + return err, true } - svc.logger.Info("DEBUG applyConfig", zap.Any("applyConfig", applyConfig)) - result, err := svc.clientSet.AppsV1().Deployments(namespace).Apply(ctx, &applyConfig, k8smetav1.ApplyOptions{ - Force: true, - }) - if err != nil { - svc.logger.Info("failed to apply deployment", zap.Error(err)) - return err - } - svc.logger.Info("DEBUG result", zap.Any("result", result)) - } else if operation == "delete" { - if status == "deleted" { - svc.logger.Info("Already deleted collector for Sink ID", zap.String("sinkID", sinkId)) - return nil + svc.logger.Info("Already applied collector for Sink ID", zap.String("sinkID", sinkId)) + return nil, true + } + var applyConfig k8sv1.DeploymentApplyConfiguration + err = json.Decode(fileContent, applyConfig) + if err != nil { + return err, true + } + _, err = svc.clientSet.AppsV1().Deployments(namespace).Apply(ctx, &applyConfig, k8smetav1.ApplyOptions{ + Force: true, + }) + if err != nil { + svc.logger.Info("failed to apply deployment", zap.Error(err)) + return err, true + } + // wait until deployment is active before returning + for i := 0; i < 4; i++ { + time.Sleep(1 * time.Second) + _, status, err := svc.getDeploymentState(ctx, ownerID, sinkId) + if status == "broken" || err != nil { + svc.logger.Error("Failed during deployment, deployment is not active", zap.String("sinkID", sinkId), zap.Error(err)) + return err, false } - err := svc.clientSet.AppsV1().Deployments(namespace).Delete(ctx, deploymentName, k8smetav1.DeleteOptions{}) - if err != nil { - svc.logger.Info("failed to remove deployment", zap.Error(err)) - return err + if status == "active" { + break } } - return nil + return nil, false } func execCmd(_ context.Context, cmd *exec.Cmd, logger *zap.Logger, stdOutFunc func(stdOut *bufio.Scanner, stdErr *bufio.Scanner)) (*bufio.Scanner, *bufio.Scanner, error) { @@ -203,16 +167,16 @@ func (svc *deployService) getDeploymentState(ctx context.Context, _, sinkId stri } func (svc *deployService) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error { - err := svc.newCollectorDeploy(ctx, "apply", ownerID, sinkID, deploymentEntry) - if err != nil { - return err + err2, done := svc.applyDeployment(ctx, ownerID, sinkID, deploymentEntry) + if done { + return err2 } return nil } func (svc *deployService) UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error { - err := svc.DeleteOtelCollector(ctx, ownerID, sinkID, deploymentEntry) + err := svc.DeleteOtelCollector(ctx, ownerID, sinkID) if err != nil { return err } @@ -225,10 +189,10 @@ func (svc *deployService) UpdateOtelCollector(ctx context.Context, ownerID, sink return nil } -func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error { - err := svc.newCollectorDeploy(ctx, "delete", ownerID, sinkID, deploymentEntry) - if err != nil { - return err +func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sinkID string) error { + err2, done := svc.removeDeployment(ctx, ownerID, sinkID) + if done { + return err2 } return nil } diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index efccba152..a841340dd 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -170,13 +170,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { if sink == nil { svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name)) sinkId := collector.Name[5:51] - svc.logger.Info("Debug extracted sinkID", zap.String("sinkID", sinkId)) - deployment, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId) - if errDeploy != nil { - svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(errDeploy)) - continue - } - err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deployment) + err = svc.kubecontrol.DeleteOtelCollector(ctx, "", sinkId) if err != nil { svc.logger.Error("error removing otel collector", zap.Error(err)) } @@ -220,12 +214,12 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { svc.logger.Error("error on remove sink activity", zap.Error(err)) continue } - deployment, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id) + _, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id) if errDeploy != nil { svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr)) continue } - err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deployment) + err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id) if err != nil { svc.logger.Error("error removing otel collector", zap.Error(err)) } diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index 4a32f6c7e..36cc7f5eb 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -33,12 +33,12 @@ func (es eventStore) GetDeploymentEntryFromSinkId(ctx context.Context, sinkId st // handleSinksDeleteCollector will delete Deployment Entry and force delete otel collector func (es eventStore) handleSinksDeleteCollector(ctx context.Context, event redis.SinksUpdateEvent) error { es.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) - deployment, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID) + _, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID) if err != nil { es.logger.Error("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) return err } - err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID, deployment) + err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID) if err != nil { return err } diff --git a/maestro/redis/consumer/streams.go b/maestro/redis/consumer/streams.go index 6518f39cc..f95ecda59 100644 --- a/maestro/redis/consumer/streams.go +++ b/maestro/redis/consumer/streams.go @@ -136,11 +136,11 @@ func (es eventStore) Subscribe(context context.Context) error { // handleSinkerDeleteCollector Delete collector func (es eventStore) handleSinkerDeleteCollector(ctx context.Context, event redis2.SinkerUpdateEvent) error { es.logger.Info("Received maestro DELETE event from sinker, sink state", zap.String("state", event.State), zap.String("sinkdID", event.SinkID), zap.String("ownerID", event.Owner)) - deployment, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID) + _, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID) if err != nil { return err } - err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID, deployment) + err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID) if err != nil { return err }