Skip to content

Commit

Permalink
Use topology name for message and exception tags (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmartin8080 authored Apr 11, 2022
1 parent ad2c47e commit d769563
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
32 changes: 23 additions & 9 deletions lib/prom_ex/plugins/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ if Code.ensure_loaded?(Broadway) do
end
```
## GenStage Producer
To correctly capture per-message metrics and error rate, add the following transform to your pipeline:
```
defmodule WebApp.MyPipeline do
Expand All @@ -54,15 +56,25 @@ if Code.ensure_loaded?(Broadway) do
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end
def ack(:ack_id, _successful_messages, _failed_messages) do
:ok
end
end
```
## BroadwayRabbitMQ.Producer
There's no need to configure an acknowledger on messages when using BroadwayRabbitMQ.
`BroadwayRabbitMQ.Producer` handles acking messages internally, which involves a unique `:delivery_tag` and `AMQP`.
"""

use PromEx.Plugin

require Logger

alias Broadway.{BatchInfo, Message, Options}
alias Broadway.{BatchInfo, Options}
alias PromEx.Utils

@millisecond_duration_buckets [10, 100, 500, 1_000, 10_000, 30_000, 60_000]
Expand Down Expand Up @@ -268,12 +280,7 @@ if Code.ensure_loaded?(Broadway) do
buckets: @millisecond_duration_buckets
],
tags: [:processor_key, :name],
tag_values: fn %{processor_key: processor_key, message: %Message{acknowledger: {acknowledger, _, _}}} ->
%{
processor_key: processor_key,
name: Utils.normalize_module_name(acknowledger)
}
end,
tag_values: &extract_message_tag_values/1,
unit: {:native, :millisecond}
),
distribution(
Expand Down Expand Up @@ -338,6 +345,13 @@ if Code.ensure_loaded?(Broadway) do
)
end

defp extract_message_tag_values(%{processor_key: processor_key, topology_name: name}) do
%{
processor_key: processor_key,
name: Utils.normalize_module_name(name)
}
end

defp extract_batcher_tag_values(%{batch_info: batch_info = %BatchInfo{}, topology_name: name}) do
%{
name: Utils.normalize_module_name(name),
Expand All @@ -362,15 +376,15 @@ if Code.ensure_loaded?(Broadway) do
kind: kind,
reason: reason,
stacktrace: stacktrace,
message: %Message{acknowledger: {acknowledger, _, _}}
topology_name: name
}) do
reason = Utils.normalize_exception(kind, reason, stacktrace)

%{
processor_key: processor_key,
kind: kind,
reason: reason,
name: Utils.normalize_module_name(acknowledger)
name: Utils.normalize_module_name(name)
}
end
end
Expand Down
32 changes: 32 additions & 0 deletions test/prom_ex/plugins/broadway_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
defmodule PromEx.Plugins.BroadwayTest do
use ExUnit.Case, async: true

alias PromEx.MetricTypes.Event
alias PromEx.Plugins.Broadway
alias PromEx.Test.Support.{Events, Metrics}
alias Telemetry.Metrics.Distribution

@default_metadata %{
processor_key: :default,
topology_name: Elixir.SomeModule,
message: %Elixir.Broadway.Message{acknowledger: {Elixir.SomeAcker}, data: %{}}
}

defmodule WebApp.PromEx do
use PromEx, otp_app: :web_app
Expand All @@ -24,4 +32,28 @@ defmodule PromEx.Plugins.BroadwayTest do

assert metrics == Metrics.read_expected(:broadway)
end

describe "event_metrics/1" do
test "returns topology_name for message tags" do
metric = assert_event_metric(:broadway_message_event_metrics, [:broadway, :processor, :message, :stop])
assert %{name: "SomeModule", processor_key: :default} = metric.tag_values.(@default_metadata)
end

test "returns topology_name for message exception tags" do
metric = assert_event_metric(:broadway_message_event_metrics, [:broadway, :processor, :message, :exception])
exception_metadata = Map.merge(@default_metadata, %{kind: Error, reason: "notsure", stacktrace: []})
assert %{name: "SomeModule", processor_key: :default} = metric.tag_values.(exception_metadata)
end

defp assert_event_metric(metric_group, event) do
assert event_metrics = Broadway.event_metrics(otp_app: :web_app)

assert %Event{metrics: message_metrics} =
Enum.find(event_metrics, fn metrics -> metrics.group_name == metric_group end)

assert %Distribution{} = metric = Enum.find(message_metrics, fn dist -> dist.event_name == event end)

metric
end
end
end

0 comments on commit d769563

Please sign in to comment.