From 3f82f26c9df076789778a7c9db6e8a04ee14e89c Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 31 Jan 2023 14:47:42 -0300 Subject: [PATCH 1/2] feat(agent): fix context propagation for failure in otel Signed-off-by: Luiz Pegoraro --- agent/backend/pktvisor/scrape.go | 8 +++++--- agent/otel/bridgeservice.go | 26 +++++++++++++++++--------- agent/otel/otlpmqttexporter/otlp.go | 5 +++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/agent/backend/pktvisor/scrape.go b/agent/backend/pktvisor/scrape.go index cee984a76..974f43597 100644 --- a/agent/backend/pktvisor/scrape.go +++ b/agent/backend/pktvisor/scrape.go @@ -35,9 +35,9 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er return metrics, nil } -func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context) (component.MetricsExporter, error) { +func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (component.MetricsExporter, error) { - bridgeService := otel.NewBridgeService(&p.policyRepo, p.agentTags) + bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags) if p.mqttClient != nil { cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(p.logger) @@ -168,7 +168,7 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) { if p.mqttClient != nil { if !ok { var errStartExp error - p.exporter[policyID], errStartExp = p.createOtlpMqttExporter(exeCtx) + p.exporter[policyID], errStartExp = p.createOtlpMqttExporter(exeCtx, execCancelF) if errStartExp != nil { p.logger.Error("failed to create a exporter", zap.Error(err)) return @@ -204,6 +204,8 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) { } } select { + case <-exeCtx.Done(): + ctx.Done() case <-ctx.Done(): err := p.exporter[policyID].Shutdown(exeCtx) if err != nil { diff --git a/agent/otel/bridgeservice.go b/agent/otel/bridgeservice.go index 2a625551d..9278b9755 100644 --- a/agent/otel/bridgeservice.go +++ b/agent/otel/bridgeservice.go @@ -1,12 +1,14 @@ package otel import ( + "context" "github.com/ns1labs/orb/agent/policies" "strings" ) type AgentBridgeService interface { RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) + NotifyAgentDisconnection(ctx context.Context, err error) } type AgentDataPerPolicy struct { @@ -15,21 +17,23 @@ type AgentDataPerPolicy struct { AgentTags map[string]string } -var _ AgentBridgeService = (*bridgeService)(nil) +var _ AgentBridgeService = (*BridgeService)(nil) -type bridgeService struct { - policyRepo policies.PolicyRepo - AgentTags map[string]string +type BridgeService struct { + bridgeContext context.Context + policyRepo policies.PolicyRepo + AgentTags map[string]string } -func NewBridgeService(policyRepo *policies.PolicyRepo, agentTags map[string]string) *bridgeService { - return &bridgeService{ - policyRepo: *policyRepo, - AgentTags: agentTags, +func NewBridgeService(ctx context.Context, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService { + return &BridgeService{ + bridgeContext: ctx, + policyRepo: *policyRepo, + AgentTags: agentTags, } } -func (b *bridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) { +func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) { pData, err := b.policyRepo.GetByName(policyName) if err != nil { return nil, err @@ -40,3 +44,7 @@ func (b *bridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent AgentTags: b.AgentTags, }, nil } + +func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) { + +} diff --git a/agent/otel/otlpmqttexporter/otlp.go b/agent/otel/otlpmqttexporter/otlp.go index f4eaf901b..9b2020b5c 100644 --- a/agent/otel/otlpmqttexporter/otlp.go +++ b/agent/otel/otlpmqttexporter/otlp.go @@ -230,7 +230,7 @@ func (e *exporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { e.logger.Info("request metrics count per policyID", zap.String("policyID", e.policyID), zap.Int("metric_count", md.MetricCount())) err = e.export(ctx, e.config.MetricsTopic, request) if err != nil { - defer ctx.Done() + ctx.Done() return err } return err @@ -240,11 +240,12 @@ func (e *exporter) pushLogs(_ context.Context, _ plog.Logs) error { return fmt.Errorf("not implemented") } -func (e *exporter) export(_ context.Context, metricsTopic string, request []byte) error { +func (e *exporter) export(ctx context.Context, metricsTopic string, request []byte) error { compressedPayload := e.compressBrotli(request) c := *e.config.Client if token := c.Publish(metricsTopic, 1, false, compressedPayload); token.Wait() && token.Error() != nil { e.logger.Error("error sending metrics RPC", zap.String("topic", metricsTopic), zap.Error(token.Error())) + e.config.OrbAgentService.NotifyAgentDisconnection(ctx, token.Error()) return token.Error() } e.logger.Info("scraped and published metrics", zap.String("topic", metricsTopic), zap.Int("payload_size_b", len(request)), zap.Int("compressed_payload_size_b", len(compressedPayload))) From 974cf5e9b384980ccd0fe399289ac7fa66259b49 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 2 Feb 2023 17:10:52 -0300 Subject: [PATCH 2/2] feat(agent): add context propagation. Signed-off-by: Luiz Pegoraro --- agent/backend/pktvisor/scrape.go | 1 + agent/otel/bridgeservice.go | 3 ++- maestro/service.go | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/agent/backend/pktvisor/scrape.go b/agent/backend/pktvisor/scrape.go index 974f43597..de00f8bcc 100644 --- a/agent/backend/pktvisor/scrape.go +++ b/agent/backend/pktvisor/scrape.go @@ -206,6 +206,7 @@ func (p *pktvisorBackend) scrapeOpenTelemetry(ctx context.Context) { select { case <-exeCtx.Done(): ctx.Done() + p.cancelFunc() case <-ctx.Done(): err := p.exporter[policyID].Shutdown(exeCtx) if err != nil { diff --git a/agent/otel/bridgeservice.go b/agent/otel/bridgeservice.go index 9278b9755..d260ca879 100644 --- a/agent/otel/bridgeservice.go +++ b/agent/otel/bridgeservice.go @@ -46,5 +46,6 @@ func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent } func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) { - + ctx.Done() + b.bridgeContext.Done() } diff --git a/maestro/service.go b/maestro/service.go index 8ee979f3a..71f08343e 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -85,6 +85,7 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can svc.logger.Warn("failed to unmarshal sink, skipping", zap.String("sink-id", sinkRes.Id)) continue } + svc.logger.Info("DEBUG sinkres", zap.Any("sinkres", sinkRes)) if val, _ := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkRes.Id); val != "" { svc.logger.Info("Skipping deploymentEntry because it is already created")