Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ORB-689] sinks status synchronization #2231

Merged
merged 22 commits into from
Feb 19, 2023
56 changes: 37 additions & 19 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"context"
"encoding/json"
"errors"
"github.com/ns1labs/orb/maestro/kubecontrol"
rediscons1 "github.com/ns1labs/orb/maestro/redis/consumer"
"io"
"strings"
"time"

"github.com/ns1labs/orb/maestro/kubecontrol"
rediscons1 "github.com/ns1labs/orb/maestro/redis/consumer"

maestroconfig "github.com/ns1labs/orb/maestro/config"
sinkspb "github.com/ns1labs/orb/sinks/pb"
"go.uber.org/zap"
Expand All @@ -21,7 +22,7 @@ import (
)

const (
idleTimeSeconds = 300
idleTimeSeconds = 600
TickerForScan = 1 * time.Minute
namespace = "otelcollectors"
)
Expand Down Expand Up @@ -185,31 +186,40 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
}
data.SinkID = sink.Id
data.OwnerID = sink.OwnerID
logs, err := svc.getPodLogs(ctx, collector)
if err != nil {
svc.logger.Error("error on getting logs, skipping", zap.Error(err))
continue
}
status, logsErr := svc.analyzeLogs(logs)
var idleLimit int64 = 0
if status == "fail" {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
// only analyze logs if current status is active
var logsErr error
var status string
if sink.GetState() == "active" {
logs, err := svc.getPodLogs(ctx, collector)
if err != nil {
svc.logger.Error("error on getting logs, skipping", zap.Error(err))
continue
}
status, logsErr = svc.analyzeLogs(logs)
if status == "fail" {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
}
}
var lastActivity int64
var activityErr error
if status == "active" {
lastActivity, activityErr = svc.eventStore.GetActivity(sink.Id)
// if logs reported 'active' status
// here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle
var idleLimit int64 = 0
if activityErr != nil || lastActivity == 0 {
svc.logger.Error("error on getting last collector activity", zap.Error(activityErr))
status = "unknown"
continue
} else {
idleLimit = time.Now().Unix() - idleTimeSeconds // within 30 minutes
idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes
}
if idleLimit >= lastActivity {
//changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err)
//changing state on redis sinker
data.State.SetFromString("idle")
svc.eventStore.UpdateSinkStateCache(ctx, data)
deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr))
Expand All @@ -219,14 +229,22 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
continue
}
} else if sink.GetState() != status { //updating status
}
//set the new sink status if changed during checks
if sink.GetState() != status && status != "" {
svc.logger.Info("changing sink status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
if err != nil {
svc.logger.Info("updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("error_message (opt)", err.Error()), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
svc.logger.Error("error updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("error_message (opt)", err.Error()), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
} else {
svc.logger.Info("updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
// changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, status, logsErr, err)
// changing state on redis sinker
data.State.SetFromString(status)
svc.eventStore.UpdateSinkStateCache(ctx, data)
}
svc.eventStore.PublishSinkStateChange(sink, status, logsErr, err)
}
}
}
Expand All @@ -250,7 +268,7 @@ func (svc *monitorService) analyzeLogs(logEntry []string) (status string, err er
}
if strings.Contains(logLine, "Permanent error: remote write returned HTTP status 429 Too Many Requests") {
errorMessage := "error: remote write returned HTTP status 429 Too Many Requests"
return "error", errors.New(errorMessage)
return "warning", errors.New(errorMessage)
}
// other errors
if strings.Contains(logLine, "error") {
Expand Down
24 changes: 22 additions & 2 deletions maestro/redis/consumer/hashset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"errors"
"fmt"
redis2 "github.com/go-redis/redis/v8"
"strconv"
"time"

redis2 "github.com/go-redis/redis/v8"

"github.com/ns1labs/orb/maestro/config"
"github.com/ns1labs/orb/maestro/redis"
"github.com/ns1labs/orb/pkg/types"
Expand Down Expand Up @@ -108,7 +109,13 @@ func (es eventStore) handleSinksUpdateCollector(ctx context.Context, event redis
if err != nil {
return err
}

// changing state on updated sink to unknown
sinkData.OwnerID = event.Owner
es.PublishSinkStateChange(sinkData, "unknown", err, err)
data.SinkID = sinkData.Id
data.OwnerID = sinkData.OwnerID
data.State.SetFromString("unknown")
es.UpdateSinkStateCache(ctx, data)
return nil
}

Expand All @@ -126,6 +133,19 @@ func (es eventStore) UpdateSinkCache(ctx context.Context, data config.SinkData)
return
}

func (es eventStore) UpdateSinkStateCache(ctx context.Context, data config.SinkData) (err error) {
keyPrefix := "sinker_key"
skey := fmt.Sprintf("%s-%s:%s", keyPrefix, data.OwnerID, data.SinkID)
bytes, err := json.Marshal(data)
if err != nil {
return err
}
if err = es.sinkerKeyRedisClient.Set(ctx, skey, bytes, 0).Err(); err != nil {
return err
}
return
}

// GetActivity collector activity
func (es eventStore) GetActivity(sinkID string) (int64, error) {
if sinkID == "" {
Expand Down
35 changes: 20 additions & 15 deletions maestro/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package consumer

import (
"context"
"time"

"github.com/ns1labs/orb/maestro/config"
"github.com/ns1labs/orb/pkg/errors"
"time"

"github.com/ns1labs/orb/maestro/kubecontrol"
maestroredis "github.com/ns1labs/orb/maestro/redis"
Expand Down Expand Up @@ -36,6 +37,7 @@ type Subscriber interface {
GetDeploymentEntryFromSinkId(ctx context.Context, sinkId string) (string, error)

UpdateSinkCache(ctx context.Context, data config.SinkData) (err error)
UpdateSinkStateCache(ctx context.Context, data config.SinkData) (err error)
PublishSinkStateChange(sink *sinkspb.SinkRes, status string, logsErr error, err error)

GetActivity(sinkID string) (int64, error)
Expand Down Expand Up @@ -87,20 +89,23 @@ func (es eventStore) SubscribeSinkerEvents(ctx context.Context) error {
for _, msg := range streams[0].Messages {
event := msg.Values
rte := decodeSinkerStateUpdate(event)
es.logger.Info("received message in sinker event bus", zap.Any("operation", event["operation"]))
switch event["operation"] {
case sinkerUpdate:
go func() {
err = es.handleSinkerCreateCollector(ctx, rte) //sinker request create collector
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
} else {
es.streamRedisClient.XAck(ctx, streamSinker, groupMaestro, msg.ID)
}
}()

case <-ctx.Done():
return errors.New("stopped listening to sinks, due to context cancellation")
// here we should listen just event coming from sinker, not our own "publishState" events
if rte.State == "active" {
es.logger.Info("received message in sinker event bus", zap.Any("operation", event["operation"]))
switch event["operation"] {
case sinkerUpdate:
go func() {
err = es.handleSinkerCreateCollector(ctx, rte) //sinker request to create collector
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
} else {
es.streamRedisClient.XAck(ctx, streamSinker, groupMaestro, msg.ID)
}
}()

case <-ctx.Done():
return errors.New("stopped listening to sinks, due to context cancellation")
}
}
}
}
Expand Down
49 changes: 29 additions & 20 deletions sinker/otel/bridgeservice/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package bridgeservice

import (
"context"
"sort"
"time"

fleetpb "github.com/ns1labs/orb/fleet/pb"
policiespb "github.com/ns1labs/orb/policies/pb"
"github.com/ns1labs/orb/sinker/config"
"go.uber.org/zap"
"sort"
"time"
)

type BridgeService interface {
Expand Down Expand Up @@ -44,33 +45,41 @@ func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwner
}

// only updates sink state if status Idle or Unknown
// if state is Active, we should register activity
if cfgRepo.State == config.Idle || cfgRepo.State == config.Unknown {
cfgRepo.LastRemoteWrite = time.Now()
err = cfgRepo.State.SetFromString(newState)
if err != nil {
bs.logger.Error("unable to set state", zap.String("new_state", newState), zap.Error(err))
return err
}
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
// only deploy collector if new state is "active" and current state "not active"
if newState == "active" && cfgRepo.State != config.Active {
err = cfgRepo.State.SetFromString(newState)
if err != nil {
bs.logger.Error("unable to set state", zap.String("new_state", newState), zap.Error(err))
return err
}
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
err = bs.sinkerCache.DeployCollector(ctx, cfgRepo)
if err != nil {
bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("waking up sink to active", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
} else {
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
}
err = bs.sinkerCache.DeployCollector(ctx, cfgRepo)
if err != nil {
bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("notified active sink", zap.String("sinkID", sinkId), zap.String("newState", newState))
} else if cfgRepo.State == config.Active {
cfgRepo.LastRemoteWrite = time.Now()
bs.logger.Info("sink is already active, registering activity")
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
} else if cfgRepo.State == config.Error {
cfgRepo.Msg = message
}
Expand Down
7 changes: 3 additions & 4 deletions sinks/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (es eventStore) CreateSink(ctx context.Context, token string, s sinks.Sink)

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
es.logger.Error("error sending event to sinks event store", zap.Error(err))
}

}()
Expand Down Expand Up @@ -93,10 +93,9 @@ func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink)

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
es.logger.Error("error sending event to sinks event store", zap.Error(err))
}
}()

return es.svc.UpdateSink(ctx, token, s)
}

Expand Down Expand Up @@ -144,7 +143,7 @@ func (es eventStore) DeleteSink(ctx context.Context, token, id string) (err erro

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
es.logger.Error("error sending event to sinks event store", zap.Error(err))
return err
}
return nil
Expand Down