Skip to content

Commit

Permalink
[ORB-689] sinks status synchronization (#2231)
Browse files Browse the repository at this point in the history
* Update monitor.go

* Update kubecontrol.go

* change to 10 minutes to be idle

* set to change if idle be active if have activity

* back to active after idle

* Update kubecontrol.go

* increase idle time to 15 minutes

* fix idle

* fix sinker activity

* add changes

* changes

* add changes

* add changes

* add changes

* add changes

* add changes

* add changes

* add changes

* add changes

* add changes

* add changes

* add changes
  • Loading branch information
etaques authored Feb 19, 2023
1 parent 0adbec6 commit 502af9f
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 60 deletions.
56 changes: 37 additions & 19 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"context"
"encoding/json"
"errors"
"github.com/ns1labs/orb/maestro/kubecontrol"
rediscons1 "github.com/ns1labs/orb/maestro/redis/consumer"
"io"
"strings"
"time"

"github.com/ns1labs/orb/maestro/kubecontrol"
rediscons1 "github.com/ns1labs/orb/maestro/redis/consumer"

maestroconfig "github.com/ns1labs/orb/maestro/config"
sinkspb "github.com/ns1labs/orb/sinks/pb"
"go.uber.org/zap"
Expand All @@ -21,7 +22,7 @@ import (
)

const (
idleTimeSeconds = 300
idleTimeSeconds = 600
TickerForScan = 1 * time.Minute
namespace = "otelcollectors"
)
Expand Down Expand Up @@ -185,31 +186,40 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
}
data.SinkID = sink.Id
data.OwnerID = sink.OwnerID
logs, err := svc.getPodLogs(ctx, collector)
if err != nil {
svc.logger.Error("error on getting logs, skipping", zap.Error(err))
continue
}
status, logsErr := svc.analyzeLogs(logs)
var idleLimit int64 = 0
if status == "fail" {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
// only analyze logs if current status is active
var logsErr error
var status string
if sink.GetState() == "active" {
logs, err := svc.getPodLogs(ctx, collector)
if err != nil {
svc.logger.Error("error on getting logs, skipping", zap.Error(err))
continue
}
status, logsErr = svc.analyzeLogs(logs)
if status == "fail" {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
}
}
var lastActivity int64
var activityErr error
if status == "active" {
lastActivity, activityErr = svc.eventStore.GetActivity(sink.Id)
// if logs reported 'active' status
// here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle
var idleLimit int64 = 0
if activityErr != nil || lastActivity == 0 {
svc.logger.Error("error on getting last collector activity", zap.Error(activityErr))
status = "unknown"
continue
} else {
idleLimit = time.Now().Unix() - idleTimeSeconds // within 30 minutes
idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes
}
if idleLimit >= lastActivity {
//changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err)
//changing state on redis sinker
data.State.SetFromString("idle")
svc.eventStore.UpdateSinkStateCache(ctx, data)
deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr))
Expand All @@ -219,14 +229,22 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
continue
}
} else if sink.GetState() != status { //updating status
}
//set the new sink status if changed during checks
if sink.GetState() != status && status != "" {
svc.logger.Info("changing sink status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
if err != nil {
svc.logger.Info("updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("error_message (opt)", err.Error()), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
svc.logger.Error("error updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("error_message (opt)", err.Error()), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
} else {
svc.logger.Info("updating status", zap.Any("before", sink.GetState()), zap.String("new status", status), zap.String("SinkID", sink.Id), zap.String("ownerID", sink.OwnerID))
// changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, status, logsErr, err)
// changing state on redis sinker
data.State.SetFromString(status)
svc.eventStore.UpdateSinkStateCache(ctx, data)
}
svc.eventStore.PublishSinkStateChange(sink, status, logsErr, err)
}
}
}
Expand All @@ -250,7 +268,7 @@ func (svc *monitorService) analyzeLogs(logEntry []string) (status string, err er
}
if strings.Contains(logLine, "Permanent error: remote write returned HTTP status 429 Too Many Requests") {
errorMessage := "error: remote write returned HTTP status 429 Too Many Requests"
return "error", errors.New(errorMessage)
return "warning", errors.New(errorMessage)
}
// other errors
if strings.Contains(logLine, "error") {
Expand Down
24 changes: 22 additions & 2 deletions maestro/redis/consumer/hashset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"errors"
"fmt"
redis2 "github.com/go-redis/redis/v8"
"strconv"
"time"

redis2 "github.com/go-redis/redis/v8"

"github.com/ns1labs/orb/maestro/config"
"github.com/ns1labs/orb/maestro/redis"
"github.com/ns1labs/orb/pkg/types"
Expand Down Expand Up @@ -108,7 +109,13 @@ func (es eventStore) handleSinksUpdateCollector(ctx context.Context, event redis
if err != nil {
return err
}

// changing state on updated sink to unknown
sinkData.OwnerID = event.Owner
es.PublishSinkStateChange(sinkData, "unknown", err, err)
data.SinkID = sinkData.Id
data.OwnerID = sinkData.OwnerID
data.State.SetFromString("unknown")
es.UpdateSinkStateCache(ctx, data)
return nil
}

Expand All @@ -126,6 +133,19 @@ func (es eventStore) UpdateSinkCache(ctx context.Context, data config.SinkData)
return
}

func (es eventStore) UpdateSinkStateCache(ctx context.Context, data config.SinkData) (err error) {
keyPrefix := "sinker_key"
skey := fmt.Sprintf("%s-%s:%s", keyPrefix, data.OwnerID, data.SinkID)
bytes, err := json.Marshal(data)
if err != nil {
return err
}
if err = es.sinkerKeyRedisClient.Set(ctx, skey, bytes, 0).Err(); err != nil {
return err
}
return
}

// GetActivity collector activity
func (es eventStore) GetActivity(sinkID string) (int64, error) {
if sinkID == "" {
Expand Down
35 changes: 20 additions & 15 deletions maestro/redis/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package consumer

import (
"context"
"time"

"github.com/ns1labs/orb/maestro/config"
"github.com/ns1labs/orb/pkg/errors"
"time"

"github.com/ns1labs/orb/maestro/kubecontrol"
maestroredis "github.com/ns1labs/orb/maestro/redis"
Expand Down Expand Up @@ -36,6 +37,7 @@ type Subscriber interface {
GetDeploymentEntryFromSinkId(ctx context.Context, sinkId string) (string, error)

UpdateSinkCache(ctx context.Context, data config.SinkData) (err error)
UpdateSinkStateCache(ctx context.Context, data config.SinkData) (err error)
PublishSinkStateChange(sink *sinkspb.SinkRes, status string, logsErr error, err error)

GetActivity(sinkID string) (int64, error)
Expand Down Expand Up @@ -87,20 +89,23 @@ func (es eventStore) SubscribeSinkerEvents(ctx context.Context) error {
for _, msg := range streams[0].Messages {
event := msg.Values
rte := decodeSinkerStateUpdate(event)
es.logger.Info("received message in sinker event bus", zap.Any("operation", event["operation"]))
switch event["operation"] {
case sinkerUpdate:
go func() {
err = es.handleSinkerCreateCollector(ctx, rte) //sinker request create collector
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
} else {
es.streamRedisClient.XAck(ctx, streamSinker, groupMaestro, msg.ID)
}
}()

case <-ctx.Done():
return errors.New("stopped listening to sinks, due to context cancellation")
// here we should listen just event coming from sinker, not our own "publishState" events
if rte.State == "active" {
es.logger.Info("received message in sinker event bus", zap.Any("operation", event["operation"]))
switch event["operation"] {
case sinkerUpdate:
go func() {
err = es.handleSinkerCreateCollector(ctx, rte) //sinker request to create collector
if err != nil {
es.logger.Error("Failed to handle sinks event", zap.Any("operation", event["operation"]), zap.Error(err))
} else {
es.streamRedisClient.XAck(ctx, streamSinker, groupMaestro, msg.ID)
}
}()

case <-ctx.Done():
return errors.New("stopped listening to sinks, due to context cancellation")
}
}
}
}
Expand Down
49 changes: 29 additions & 20 deletions sinker/otel/bridgeservice/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package bridgeservice

import (
"context"
"sort"
"time"

fleetpb "github.com/ns1labs/orb/fleet/pb"
policiespb "github.com/ns1labs/orb/policies/pb"
"github.com/ns1labs/orb/sinker/config"
"go.uber.org/zap"
"sort"
"time"
)

type BridgeService interface {
Expand Down Expand Up @@ -44,33 +45,41 @@ func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwner
}

// only updates sink state if status Idle or Unknown
// if state is Active, we should register activity
if cfgRepo.State == config.Idle || cfgRepo.State == config.Unknown {
cfgRepo.LastRemoteWrite = time.Now()
err = cfgRepo.State.SetFromString(newState)
if err != nil {
bs.logger.Error("unable to set state", zap.String("new_state", newState), zap.Error(err))
return err
}
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
// only deploy collector if new state is "active" and current state "not active"
if newState == "active" && cfgRepo.State != config.Active {
err = cfgRepo.State.SetFromString(newState)
if err != nil {
bs.logger.Error("unable to set state", zap.String("new_state", newState), zap.Error(err))
return err
}
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
err = bs.sinkerCache.DeployCollector(ctx, cfgRepo)
if err != nil {
bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("waking up sink to active", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
} else {
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
}
err = bs.sinkerCache.DeployCollector(ctx, cfgRepo)
if err != nil {
bs.logger.Error("error during update sink cache", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("notified active sink", zap.String("sinkID", sinkId), zap.String("newState", newState))
} else if cfgRepo.State == config.Active {
cfgRepo.LastRemoteWrite = time.Now()
bs.logger.Info("sink is already active, registering activity")
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
} else if cfgRepo.State == config.Error {
cfgRepo.Msg = message
}
Expand Down
7 changes: 3 additions & 4 deletions sinks/redis/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (es eventStore) CreateSink(ctx context.Context, token string, s sinks.Sink)

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
es.logger.Error("error sending event to sinks event store", zap.Error(err))
}

}()
Expand Down Expand Up @@ -93,10 +93,9 @@ func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink)

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
es.logger.Error("error sending event to sinks event store", zap.Error(err))
}
}()

return es.svc.UpdateSink(ctx, token, s)
}

Expand Down Expand Up @@ -144,7 +143,7 @@ func (es eventStore) DeleteSink(ctx context.Context, token, id string) (err erro

err = es.client.XAdd(ctx, record).Err()
if err != nil {
es.logger.Error("error sending event to sinker event store", zap.Error(err))
es.logger.Error("error sending event to sinks event store", zap.Error(err))
return err
}
return nil
Expand Down

0 comments on commit 502af9f

Please sign in to comment.