From d76956385c5622096c88b33545c3998f7cc16de8 Mon Sep 17 00:00:00 2001 From: Troy Martin Date: Mon, 11 Apr 2022 12:18:22 -0400 Subject: [PATCH] Use topology name for message and exception tags (#134) --- lib/prom_ex/plugins/broadway.ex | 32 ++++++++++++++++++-------- test/prom_ex/plugins/broadway_test.exs | 32 ++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/lib/prom_ex/plugins/broadway.ex b/lib/prom_ex/plugins/broadway.ex index 21ca1b4a..62e76849 100644 --- a/lib/prom_ex/plugins/broadway.ex +++ b/lib/prom_ex/plugins/broadway.ex @@ -31,6 +31,8 @@ if Code.ensure_loaded?(Broadway) do end ``` + ## GenStage Producer + To correctly capture per-message metrics and error rate, add the following transform to your pipeline: ``` defmodule WebApp.MyPipeline do @@ -54,15 +56,25 @@ if Code.ensure_loaded?(Broadway) do acknowledger: {__MODULE__, :ack_id, :ack_data} } end + + def ack(:ack_id, _successful_messages, _failed_messages) do + :ok + end end ``` + + ## BroadwayRabbitMQ.Producer + + There's no need to configure an acknowledger on messages when using BroadwayRabbitMQ. + + `BroadwayRabbitMQ.Producer` handles acking messages internally, which involves a unique `:delivery_tag` and `AMQP`. """ use PromEx.Plugin require Logger - alias Broadway.{BatchInfo, Message, Options} + alias Broadway.{BatchInfo, Options} alias PromEx.Utils @millisecond_duration_buckets [10, 100, 500, 1_000, 10_000, 30_000, 60_000] @@ -268,12 +280,7 @@ if Code.ensure_loaded?(Broadway) do buckets: @millisecond_duration_buckets ], tags: [:processor_key, :name], - tag_values: fn %{processor_key: processor_key, message: %Message{acknowledger: {acknowledger, _, _}}} -> - %{ - processor_key: processor_key, - name: Utils.normalize_module_name(acknowledger) - } - end, + tag_values: &extract_message_tag_values/1, unit: {:native, :millisecond} ), distribution( @@ -338,6 +345,13 @@ if Code.ensure_loaded?(Broadway) do ) end + defp extract_message_tag_values(%{processor_key: processor_key, topology_name: name}) do + %{ + processor_key: processor_key, + name: Utils.normalize_module_name(name) + } + end + defp extract_batcher_tag_values(%{batch_info: batch_info = %BatchInfo{}, topology_name: name}) do %{ name: Utils.normalize_module_name(name), @@ -362,7 +376,7 @@ if Code.ensure_loaded?(Broadway) do kind: kind, reason: reason, stacktrace: stacktrace, - message: %Message{acknowledger: {acknowledger, _, _}} + topology_name: name }) do reason = Utils.normalize_exception(kind, reason, stacktrace) @@ -370,7 +384,7 @@ if Code.ensure_loaded?(Broadway) do processor_key: processor_key, kind: kind, reason: reason, - name: Utils.normalize_module_name(acknowledger) + name: Utils.normalize_module_name(name) } end end diff --git a/test/prom_ex/plugins/broadway_test.exs b/test/prom_ex/plugins/broadway_test.exs index 2bc77def..fda33aaf 100644 --- a/test/prom_ex/plugins/broadway_test.exs +++ b/test/prom_ex/plugins/broadway_test.exs @@ -1,8 +1,16 @@ defmodule PromEx.Plugins.BroadwayTest do use ExUnit.Case, async: true + alias PromEx.MetricTypes.Event alias PromEx.Plugins.Broadway alias PromEx.Test.Support.{Events, Metrics} + alias Telemetry.Metrics.Distribution + + @default_metadata %{ + processor_key: :default, + topology_name: Elixir.SomeModule, + message: %Elixir.Broadway.Message{acknowledger: {Elixir.SomeAcker}, data: %{}} + } defmodule WebApp.PromEx do use PromEx, otp_app: :web_app @@ -24,4 +32,28 @@ defmodule PromEx.Plugins.BroadwayTest do assert metrics == Metrics.read_expected(:broadway) end + + describe "event_metrics/1" do + test "returns topology_name for message tags" do + metric = assert_event_metric(:broadway_message_event_metrics, [:broadway, :processor, :message, :stop]) + assert %{name: "SomeModule", processor_key: :default} = metric.tag_values.(@default_metadata) + end + + test "returns topology_name for message exception tags" do + metric = assert_event_metric(:broadway_message_event_metrics, [:broadway, :processor, :message, :exception]) + exception_metadata = Map.merge(@default_metadata, %{kind: Error, reason: "notsure", stacktrace: []}) + assert %{name: "SomeModule", processor_key: :default} = metric.tag_values.(exception_metadata) + end + + defp assert_event_metric(metric_group, event) do + assert event_metrics = Broadway.event_metrics(otp_app: :web_app) + + assert %Event{metrics: message_metrics} = + Enum.find(event_metrics, fn metrics -> metrics.group_name == metric_group end) + + assert %Distribution{} = metric = Enum.find(message_metrics, fn dist -> dist.event_name == event end) + + metric + end + end end