Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): fix context propagation for failure in otel #2166

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions agent/backend/pktvisor/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er
return metrics, nil
}

func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context) (component.MetricsExporter, error) {
func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (component.MetricsExporter, error) {

bridgeService := otel.NewBridgeService(&p.policyRepo, p.agentTags)
bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags)
if p.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(p.logger)
Expand Down Expand Up @@ -168,7 +168,7 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) {
if p.mqttClient != nil {
if !ok {
var errStartExp error
p.exporter[policyID], errStartExp = p.createOtlpMqttExporter(exeCtx)
p.exporter[policyID], errStartExp = p.createOtlpMqttExporter(exeCtx, execCancelF)
if errStartExp != nil {
p.logger.Error("failed to create a exporter", zap.Error(err))
return
Expand Down Expand Up @@ -204,6 +204,9 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) {
}
}
select {
case <-exeCtx.Done():
ctx.Done()
p.cancelFunc()
case <-ctx.Done():
err := p.exporter[policyID].Shutdown(exeCtx)
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions agent/otel/bridgeservice.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package otel

import (
"context"
"github.com/ns1labs/orb/agent/policies"
"strings"
)

type AgentBridgeService interface {
RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error)
NotifyAgentDisconnection(ctx context.Context, err error)
}

type AgentDataPerPolicy struct {
Expand All @@ -15,21 +17,23 @@ type AgentDataPerPolicy struct {
AgentTags map[string]string
}

var _ AgentBridgeService = (*bridgeService)(nil)
var _ AgentBridgeService = (*BridgeService)(nil)

type bridgeService struct {
policyRepo policies.PolicyRepo
AgentTags map[string]string
type BridgeService struct {
bridgeContext context.Context
policyRepo policies.PolicyRepo
AgentTags map[string]string
}

func NewBridgeService(policyRepo *policies.PolicyRepo, agentTags map[string]string) *bridgeService {
return &bridgeService{
policyRepo: *policyRepo,
AgentTags: agentTags,
func NewBridgeService(ctx context.Context, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService {
return &BridgeService{
bridgeContext: ctx,
policyRepo: *policyRepo,
AgentTags: agentTags,
}
}

func (b *bridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) {
func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) {
pData, err := b.policyRepo.GetByName(policyName)
if err != nil {
return nil, err
Expand All @@ -40,3 +44,8 @@ func (b *bridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent
AgentTags: b.AgentTags,
}, nil
}

func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) {
ctx.Done()
b.bridgeContext.Done()
}
5 changes: 3 additions & 2 deletions agent/otel/otlpmqttexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (e *exporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
e.logger.Info("request metrics count per policyID", zap.String("policyID", e.policyID), zap.Int("metric_count", md.MetricCount()))
err = e.export(ctx, e.config.MetricsTopic, request)
if err != nil {
defer ctx.Done()
ctx.Done()
return err
}
return err
Expand All @@ -240,11 +240,12 @@ func (e *exporter) pushLogs(_ context.Context, _ plog.Logs) error {
return fmt.Errorf("not implemented")
}

func (e *exporter) export(_ context.Context, metricsTopic string, request []byte) error {
func (e *exporter) export(ctx context.Context, metricsTopic string, request []byte) error {
compressedPayload := e.compressBrotli(request)
c := *e.config.Client
if token := c.Publish(metricsTopic, 1, false, compressedPayload); token.Wait() && token.Error() != nil {
e.logger.Error("error sending metrics RPC", zap.String("topic", metricsTopic), zap.Error(token.Error()))
e.config.OrbAgentService.NotifyAgentDisconnection(ctx, token.Error())
return token.Error()
}
e.logger.Info("scraped and published metrics", zap.String("topic", metricsTopic), zap.Int("payload_size_b", len(request)), zap.Int("compressed_payload_size_b", len(compressedPayload)))
Expand Down
1 change: 1 addition & 0 deletions maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can
svc.logger.Warn("failed to unmarshal sink, skipping", zap.String("sink-id", sinkRes.Id))
continue
}
svc.logger.Info("DEBUG sinkres", zap.Any("sinkres", sinkRes))

if val, _ := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkRes.Id); val != "" {
svc.logger.Info("Skipping deploymentEntry because it is already created")
Expand Down