Skip to content

Commit

Permalink
feat(agent): fix context propagation for failure in otel (#2166)
Browse files Browse the repository at this point in the history
* feat(agent): fix context propagation for failure in otel

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

* feat(agent): add context propagation.

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>

---------

Signed-off-by: Luiz Pegoraro <luiz.pegoraro@encora.com>
  • Loading branch information
lpegoraro authored Feb 3, 2023
1 parent 944e778 commit 2327fdb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
9 changes: 6 additions & 3 deletions agent/backend/pktvisor/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er
return metrics, nil
}

func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context) (component.MetricsExporter, error) {
func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (component.MetricsExporter, error) {

bridgeService := otel.NewBridgeService(&p.policyRepo, p.agentTags)
bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags)
if p.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(p.logger)
Expand Down Expand Up @@ -168,7 +168,7 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) {
if p.mqttClient != nil {
if !ok {
var errStartExp error
p.exporter[policyID], errStartExp = p.createOtlpMqttExporter(exeCtx)
p.exporter[policyID], errStartExp = p.createOtlpMqttExporter(exeCtx, execCancelF)
if errStartExp != nil {
p.logger.Error("failed to create a exporter", zap.Error(err))
return
Expand Down Expand Up @@ -204,6 +204,9 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) {
}
}
select {
case <-exeCtx.Done():
ctx.Done()
p.cancelFunc()
case <-ctx.Done():
err := p.exporter[policyID].Shutdown(exeCtx)
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions agent/otel/bridgeservice.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package otel

import (
"context"
"github.com/ns1labs/orb/agent/policies"
"strings"
)

type AgentBridgeService interface {
RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error)
NotifyAgentDisconnection(ctx context.Context, err error)
}

type AgentDataPerPolicy struct {
Expand All @@ -15,21 +17,23 @@ type AgentDataPerPolicy struct {
AgentTags map[string]string
}

var _ AgentBridgeService = (*bridgeService)(nil)
var _ AgentBridgeService = (*BridgeService)(nil)

type bridgeService struct {
policyRepo policies.PolicyRepo
AgentTags map[string]string
type BridgeService struct {
bridgeContext context.Context
policyRepo policies.PolicyRepo
AgentTags map[string]string
}

func NewBridgeService(policyRepo *policies.PolicyRepo, agentTags map[string]string) *bridgeService {
return &bridgeService{
policyRepo: *policyRepo,
AgentTags: agentTags,
func NewBridgeService(ctx context.Context, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService {
return &BridgeService{
bridgeContext: ctx,
policyRepo: *policyRepo,
AgentTags: agentTags,
}
}

func (b *bridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) {
func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) {
pData, err := b.policyRepo.GetByName(policyName)
if err != nil {
return nil, err
Expand All @@ -40,3 +44,8 @@ func (b *bridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent
AgentTags: b.AgentTags,
}, nil
}

func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) {
ctx.Done()
b.bridgeContext.Done()
}
5 changes: 3 additions & 2 deletions agent/otel/otlpmqttexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (e *exporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
e.logger.Info("request metrics count per policyID", zap.String("policyID", e.policyID), zap.Int("metric_count", md.MetricCount()))
err = e.export(ctx, e.config.MetricsTopic, request)
if err != nil {
defer ctx.Done()
ctx.Done()
return err
}
return err
Expand All @@ -240,11 +240,12 @@ func (e *exporter) pushLogs(_ context.Context, _ plog.Logs) error {
return fmt.Errorf("not implemented")
}

func (e *exporter) export(_ context.Context, metricsTopic string, request []byte) error {
func (e *exporter) export(ctx context.Context, metricsTopic string, request []byte) error {
compressedPayload := e.compressBrotli(request)
c := *e.config.Client
if token := c.Publish(metricsTopic, 1, false, compressedPayload); token.Wait() && token.Error() != nil {
e.logger.Error("error sending metrics RPC", zap.String("topic", metricsTopic), zap.Error(token.Error()))
e.config.OrbAgentService.NotifyAgentDisconnection(ctx, token.Error())
return token.Error()
}
e.logger.Info("scraped and published metrics", zap.String("topic", metricsTopic), zap.Int("payload_size_b", len(request)), zap.Int("compressed_payload_size_b", len(compressedPayload)))
Expand Down
1 change: 1 addition & 0 deletions maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can
svc.logger.Warn("failed to unmarshal sink, skipping", zap.String("sink-id", sinkRes.Id))
continue
}
svc.logger.Info("DEBUG sinkres", zap.Any("sinkres", sinkRes))

if val, _ := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkRes.Id); val != "" {
svc.logger.Info("Skipping deploymentEntry because it is already created")
Expand Down

0 comments on commit 2327fdb

Please sign in to comment.