Skip to content

Commit

Permalink
fix(maestro): clean up code and create wait mechanism for apply deplo…
Browse files Browse the repository at this point in the history
…yments (#2197)

* fix(maestro): fix extract id.

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

* fix(maestro): clean up code and enhanced verification of collector apply.

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

* fix(maestro): fix method calls.

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

* fix(maestro): fix method calls.

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

---------

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>
  • Loading branch information
lpegoraro authored Feb 10, 2023
1 parent eda32b5 commit 62a561f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 102 deletions.
142 changes: 53 additions & 89 deletions maestro/kubecontrol/kubecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kubecontrol
import (
"bufio"
"context"
"fmt"
"github.com/ns1labs/orb/pkg/errors"
"github.com/plgd-dev/kit/v2/codec/json"
"go.uber.org/zap"
Expand Down Expand Up @@ -46,115 +45,80 @@ type Service interface {
CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error

// DeleteOtelCollector - delete an existing collector by id
DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error
DeleteOtelCollector(ctx context.Context, ownerID, sinkID string) error

// UpdateOtelCollector - update an existing collector by id
UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error
}

func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error {

fileContent := []byte(manifest)
tmp := strings.Split(string(fileContent), "\n")
newContent := strings.Join(tmp[1:], "\n")

_, status, err := svc.getDeploymentState(ctx, ownerID, sinkId)
func (svc *deployService) removeDeployment(ctx context.Context, ownerID string, sinkId string) (error, bool) {
deploymentName, status, err := svc.getDeploymentState(ctx, ownerID, sinkId)
if err != nil {
if status == "broken" {
operation = "delete"
svc.logger.Info("get the deployed pod broke", zap.String("sinkID", sinkId), zap.Error(err))
return nil, true
}
}
if operation == "apply" {
if status == "active" {
svc.logger.Info("Already applied Sink ID=" + sinkId)
return nil
}
} else if operation == "delete" {
if status == "deleted" {
svc.logger.Info("Already deleted Sink ID=" + sinkId)
return nil
}
if status == "deleted" {
svc.logger.Info("Already deleted collector for Sink ID", zap.String("sinkID", sinkId))
return nil, true
}

err = os.WriteFile("/tmp/otel-collector-"+sinkId+".json", []byte(newContent), 0644)
err = svc.clientSet.AppsV1().Deployments(namespace).Delete(ctx, deploymentName, k8smetav1.DeleteOptions{})
if err != nil {
svc.logger.Error("failed to write file content", zap.Error(err))
return err
}

stdOutListenFunction := func(out *bufio.Scanner, err *bufio.Scanner) {
for out.Scan() {
svc.logger.Info("Deploy Info: " + out.Text())
}
for err.Scan() {
svc.logger.Info("Deploy Error: " + err.Text())
}
svc.logger.Info("failed to remove deployment", zap.Error(err))
return err, true
}

// execute action
cmd := exec.Command("kubectl", operation, "-f", "/tmp/otel-collector-"+sinkId+".json", "-n", namespace)
_, _, err = execCmd(ctx, cmd, svc.logger, stdOutListenFunction)

if err == nil {
svc.logger.Info(fmt.Sprintf("successfully %s the otel-collector for sink-id: %s", operation, sinkId))
}
svc.logger.Info(fmt.Sprintf("successfully %s the otel-collector for sink-id: %s", operation, sinkId))

return nil
return nil, false
}

func (svc *deployService) newCollectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error {
func (svc *deployService) applyDeployment(ctx context.Context, ownerID string, sinkId string, manifest string) (error, bool) {
fileContent := []byte(manifest)
tmp := strings.Split(string(fileContent), "\n")
newContent := strings.Join(tmp[1:], "\n")
deploymentName, status, err := svc.getDeploymentState(ctx, ownerID, sinkId)
_, status, err := svc.getDeploymentState(ctx, ownerID, sinkId)
if err != nil {
if status == "broken" {
operation = "delete"
}
return err, false
}
fileName := "/tmp/otel-collector-" + sinkId + ".json"
err = os.WriteFile(fileName, []byte(newContent), 0644)
if err != nil {
svc.logger.Error("failed to write file content", zap.Error(err))
return err
return err, true
}
if operation == "apply" {
if status == "active" {
err := os.Remove(fileName)
if err != nil {
svc.logger.Info("failed to remove file", zap.Error(err))
return err
}
svc.logger.Info("Already applied collector for Sink ID", zap.String("sinkID", sinkId))
return nil
}
var applyConfig k8sv1.DeploymentApplyConfiguration
err := json.Decode(fileContent, applyConfig)
if status == "active" {
err := os.Remove(fileName)
if err != nil {
return err
svc.logger.Info("failed to remove file", zap.Error(err))
return err, true
}
svc.logger.Info("DEBUG applyConfig", zap.Any("applyConfig", applyConfig))
result, err := svc.clientSet.AppsV1().Deployments(namespace).Apply(ctx, &applyConfig, k8smetav1.ApplyOptions{
Force: true,
})
if err != nil {
svc.logger.Info("failed to apply deployment", zap.Error(err))
return err
}
svc.logger.Info("DEBUG result", zap.Any("result", result))
} else if operation == "delete" {
if status == "deleted" {
svc.logger.Info("Already deleted collector for Sink ID", zap.String("sinkID", sinkId))
return nil
svc.logger.Info("Already applied collector for Sink ID", zap.String("sinkID", sinkId))
return nil, true
}
var applyConfig k8sv1.DeploymentApplyConfiguration
err = json.Decode(fileContent, applyConfig)
if err != nil {
return err, true
}
_, err = svc.clientSet.AppsV1().Deployments(namespace).Apply(ctx, &applyConfig, k8smetav1.ApplyOptions{
Force: true,
})
if err != nil {
svc.logger.Info("failed to apply deployment", zap.Error(err))
return err, true
}
// wait until deployment is active before returning
for i := 0; i < 4; i++ {
time.Sleep(1 * time.Second)
_, status, err := svc.getDeploymentState(ctx, ownerID, sinkId)
if status == "broken" || err != nil {
svc.logger.Error("Failed during deployment, deployment is not active", zap.String("sinkID", sinkId), zap.Error(err))
return err, false
}
err := svc.clientSet.AppsV1().Deployments(namespace).Delete(ctx, deploymentName, k8smetav1.DeleteOptions{})
if err != nil {
svc.logger.Info("failed to remove deployment", zap.Error(err))
return err
if status == "active" {
break
}
}
return nil
return nil, false
}

func execCmd(_ context.Context, cmd *exec.Cmd, logger *zap.Logger, stdOutFunc func(stdOut *bufio.Scanner, stdErr *bufio.Scanner)) (*bufio.Scanner, *bufio.Scanner, error) {
Expand Down Expand Up @@ -203,16 +167,16 @@ func (svc *deployService) getDeploymentState(ctx context.Context, _, sinkId stri
}

func (svc *deployService) CreateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error {
err := svc.newCollectorDeploy(ctx, "apply", ownerID, sinkID, deploymentEntry)
if err != nil {
return err
err2, done := svc.applyDeployment(ctx, ownerID, sinkID, deploymentEntry)
if done {
return err2
}

return nil
}

func (svc *deployService) UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error {
err := svc.DeleteOtelCollector(ctx, ownerID, sinkID, deploymentEntry)
err := svc.DeleteOtelCollector(ctx, ownerID, sinkID)
if err != nil {
return err
}
Expand All @@ -225,10 +189,10 @@ func (svc *deployService) UpdateOtelCollector(ctx context.Context, ownerID, sink
return nil
}

func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error {
err := svc.newCollectorDeploy(ctx, "delete", ownerID, sinkID, deploymentEntry)
if err != nil {
return err
func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sinkID string) error {
err2, done := svc.removeDeployment(ctx, ownerID, sinkID)
if done {
return err2
}
return nil
}
12 changes: 3 additions & 9 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
if sink == nil {
svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name))
sinkId := collector.Name[5:51]
svc.logger.Info("Debug extracted sinkID", zap.String("sinkID", sinkId))
deployment, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(errDeploy))
continue
}
err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deployment)
err = svc.kubecontrol.DeleteOtelCollector(ctx, "", sinkId)
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
Expand Down Expand Up @@ -220,12 +214,12 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
svc.logger.Error("error on remove sink activity", zap.Error(err))
continue
}
deployment, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
_, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr))
continue
}
err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deployment)
err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id)
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
Expand Down
4 changes: 2 additions & 2 deletions maestro/redis/consumer/hashset.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func (es eventStore) GetDeploymentEntryFromSinkId(ctx context.Context, sinkId st
// handleSinksDeleteCollector will delete Deployment Entry and force delete otel collector
func (es eventStore) handleSinksDeleteCollector(ctx context.Context, event redis.SinksUpdateEvent) error {
es.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
deployment, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
_, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
if err != nil {
es.logger.Error("did not find collector entry for sink", zap.String("sink-id", event.SinkID))
return err
}
err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID, deployment)
err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions maestro/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (es eventStore) Subscribe(context context.Context) error {
// handleSinkerDeleteCollector Delete collector
func (es eventStore) handleSinkerDeleteCollector(ctx context.Context, event redis2.SinkerUpdateEvent) error {
es.logger.Info("Received maestro DELETE event from sinker, sink state", zap.String("state", event.State), zap.String("sinkdID", event.SinkID), zap.String("ownerID", event.Owner))
deployment, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
_, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
if err != nil {
return err
}
err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID, deployment)
err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID)
if err != nil {
return err
}
Expand Down

0 comments on commit 62a561f

Please sign in to comment.