From 80a08583a8091bc564410821c3b95491a414a56f Mon Sep 17 00:00:00 2001 From: Paulo Valente <16843419+polvalente@users.noreply.github.com> Date: Thu, 8 Jun 2023 04:17:44 -0300 Subject: [PATCH] feat: add telemetry (#298) * feat: add server telemetry * fix: restore call arg * feat: add clien-side telemetry * test: add tests * wip: add livebook for grpc_prometheus replacement * feat: add server telemetry prometheus conversion example * chore: format * test: test client telemetry events * refactor: use :telemetry.span for server telemetry * refactor: use telemetry span for client telemetry --- lib/grpc/server.ex | 39 +-- lib/grpc/stub.ex | 4 +- lib/grpc/telemetry.ex | 159 ++++++++++ livebooks/telemetry.livemd | 283 ++++++++++++++++++ mix.exs | 3 +- mix.lock | 1 + .../integration/client_interceptor_test.exs | 82 ++++- test/grpc/integration/server_test.exs | 181 ++++++++++- test/support/integration_test_case.ex | 19 ++ test/support/proto/helloworld.pb.ex | 1 + test/support/proto/helloworld.proto | 1 + 11 files changed, 744 insertions(+), 29 deletions(-) create mode 100644 lib/grpc/telemetry.ex create mode 100644 livebooks/telemetry.livemd diff --git a/lib/grpc/server.ex b/lib/grpc/server.ex index 0cbed721..f1bacb68 100644 --- a/lib/grpc/server.ex +++ b/lib/grpc/server.ex @@ -182,35 +182,26 @@ defmodule GRPC.Server do %{server: server, endpoint: endpoint} = stream, req ) do - last = fn r, s -> - reply = apply(server, func_name, [r, s]) - - if res_stream do - {:ok, stream} - else - {:ok, stream, reply} + GRPC.Telemetry.server_span(server, endpoint, func_name, stream, fn -> + last = fn r, s -> + reply = apply(server, func_name, [r, s]) + + if res_stream do + {:ok, stream} + else + {:ok, stream, reply} + end end - end - interceptors = interceptors(endpoint, server) + interceptors = interceptors(endpoint, server) - next = - Enum.reduce(interceptors, last, fn {interceptor, opts}, acc -> - fn r, s -> interceptor.call(r, s, acc, opts) end - end) + next = + Enum.reduce(interceptors, last, fn {interceptor, opts}, acc -> + fn r, s -> interceptor.call(r, s, acc, opts) end + end) - try do next.(req, stream) - rescue - e in GRPC.RPCError -> - {:error, e} - catch - kind, reason -> - stack = __STACKTRACE__ - Logger.error(Exception.format(kind, reason, stack)) - reason = Exception.normalize(kind, reason, stack) - {:error, %{kind: kind, reason: reason, stack: stack}} - end + end) end defp interceptors(nil, _), do: [] diff --git a/lib/grpc/stub.ex b/lib/grpc/stub.ex index 0efec677..06caae07 100644 --- a/lib/grpc/stub.ex +++ b/lib/grpc/stub.ex @@ -265,7 +265,9 @@ defmodule GRPC.Stub do accepted_compressors: accepted_compressors } - do_call(req_stream, stream, request, opts) + GRPC.Telemetry.client_span(stream, fn -> + do_call(req_stream, stream, request, opts) + end) end defp do_call( diff --git a/lib/grpc/telemetry.ex b/lib/grpc/telemetry.ex new file mode 100644 index 00000000..d7d5b84a --- /dev/null +++ b/lib/grpc/telemetry.ex @@ -0,0 +1,159 @@ +defmodule GRPC.Telemetry do + @moduledoc """ + Events published by GRPC + + These can be divided in client-side events and server-side events. + + ## Client-side Events + + * `[:grpc, :client, :rpc, :start]` - Published before all interceptors are executed. + * `[:grpc, :client, :rpc, :stop]` - Published after all interceptors executed successfully. + * `:duration` - the duration as measured through `System.monotonic_time()` + for the whole interceptor pipeline. + * `[:grpc, :client, :rpc, :exception]` - Published if any exception occurs while receiving a message. + * `:duration` - the duration as measured through `System.monotonic_time()` + for the execution since the start of the pipeline until the exception happened. + + | event | measurements | metadata | + |--------------|--------------|----------| + | `[:rpc, :start]` | `:count` | `:stream` | + | `[:rpc, :stop]` | `:duration` | `:stream` | + | `[:rpc, :exception]` | `:duration` | `:stream`, `:kind`, `:reason`, `:stacktrace` | + + ### Metadata + + * `:stream` - the `%GRPC.Server.Stream{}` for the request + * `:function_name` - the name of the function called + * `:server` - the server module name + * `:endpoint` - the endpoint module name + + `:exception` events also include some error metadata: + + * `:reason` is the error value in case of `catch` or the actual exception in case of `rescue`. + * `:kind` can be one of: + * `:error` — from an `{:error, error}` return value. Some Erlang functions may also throw an + `:error` tuple, which will be reported as `:error`. + * `:exit` — from a caught process exit. + * `:throw` — from a caught value, this doesn't necessarily mean that an error occurred. + + ## Server-side Events + + * `[:grpc, :server, :rpc, :start]` - Published before all interceptors are executed. + * `[:grpc, :server, :rpc, :stop]` - Published after all interceptors executed successfully. + * `:duration` - the duration as measured through `System.monotonic_time()` + for the whole interceptor pipeline. + * `[:grpc, :server, :rpc, :exception]` - Published if any exception occurs while receiving a message. + * `:duration` - the duration as measured through `System.monotonic_time()` + for the execution since the start of the pipeline until the exception happened. + + | event | measurements | metadata | + |--------------|--------------|----------| + | `[:rpc, :start]` | `:count` | `:stream`, `:server`, `:endpoint`, `:function_name` | + | `[:rpc, :stop]` | `:duration` | `:stream`, `:server`, `:endpoint`, `:function_name` , `:result` | + | `[:rpc, :exception]` | `:duration` | `:stream`, `:server`, `:endpoint`, `:function_name`, `:kind`, `:reason`, `:stacktrace` | + + ### Metadata + + * `:stream` - the `%GRPC.Server.Stream{}` for the request. + * `:function_name` - the name of the function called. + * `:server` - the server module name. + * `:endpoint` - the endpoint module name. + * `:result` - the result returned from the interceptor pipeline. + + `:exception` events also include some error metadata: + + * `:reason` is the error value in case of `catch` or the actual exception in case of `rescue`. + * `:kind` can be one of: + * `:error` — from an `{:error, error}` return value. Some Erlang functions may also throw an + `:error` tuple, which will be reported as `:error`. + * `:exit` — from a caught process exit. + * `:throw` — from a caught value, this doesn't necessarily mean that an error occurred. + """ + + require Logger + + @server_rpc [:grpc, :server, :rpc] + @client_rpc [:grpc, :client, :rpc] + + @doc "The server telemetry event prefix." + def server_rpc_prefix, do: @server_rpc + + @doc "The client telemetry event prefix." + def client_rpc_prefix, do: @client_rpc + + @doc false + def server_span(server, endpoint, func_name, stream, span_fn) do + start_metadata = %{ + server: server, + endpoint: endpoint, + function_name: func_name, + stream: stream + } + + :telemetry.span(@server_rpc, start_metadata, fn -> + result = span_fn.() + + {result, Map.put(start_metadata, :result, result)} + end) + rescue + e in GRPC.RPCError -> + {:error, e} + catch + kind, reason -> + stacktrace = __STACKTRACE__ + Logger.error(Exception.format(kind, reason, stacktrace)) + {:error, %{kind: kind, reason: reason, stack: stacktrace}} + end + + @client_rpc_start_name @client_rpc ++ [:start] + @doc false + def client_rpc_start_name, do: @client_rpc_start_name + + @doc false + def client_span(stream, span_fn) do + start_metadata = %{stream: stream} + + :telemetry.span(@client_rpc, start_metadata, fn -> + try do + {span_fn.(), start_metadata} + rescue + e -> + :erlang.error(Exception.normalize(:error, e, __STACKTRACE__)) + end + end) + catch + kind, reason -> + stacktrace = __STACKTRACE__ + Logger.error(Exception.format(kind, reason, stacktrace)) + :erlang.raise(kind, reason, stacktrace) + end + + @client_rpc_stop_name @client_rpc ++ [:stop] + @doc false + def client_rpc_stop_name, do: @client_rpc_stop_name + + @doc false + def client_rpc_stop(stream, duration) do + :telemetry.execute(@client_rpc_stop_name, %{duration: duration}, %{stream: stream}) + end + + @client_rpc_exception_name @client_rpc ++ [:exception] + @doc false + def client_rpc_exception_name, do: @client_rpc_exception_name + + @doc false + def client_rpc_exception( + stream, + kind, + reason, + stacktrace, + duration + ) do + :telemetry.execute(@client_rpc_exception_name, %{duration: duration}, %{ + stream: stream, + kind: kind, + reason: reason, + stacktrace: stacktrace + }) + end +end diff --git a/livebooks/telemetry.livemd b/livebooks/telemetry.livemd new file mode 100644 index 00000000..328f81a2 --- /dev/null +++ b/livebooks/telemetry.livemd @@ -0,0 +1,283 @@ +# Telemetry + +```elixir +my_app_root = Path.join(__DIR__, "..") + +Mix.install( + [ + {:grpc, path: my_app_root, env: :dev}, + {:telemetry_metrics, "~> 0.6"}, + {:telemetry_metrics_prometheus, "~> 1.1"}, + {:req, "~> 0.3"} + ], + config_path: Path.join(my_app_root, "config/config.exs"), + lockfile: Path.join(my_app_root, "mix.lock") +) +``` + +## Telemetry Events + +We know from the documentation `GRPC.Telemetry` that some server-side events and some client-side events are published. We can use those events to build a Prometheus metrics export through `Telemetry.Metrics` and [`TelemetryMetricsPrometheus`](https://hexdocs.pm/telemetry_metrics_prometheus/TelemetryMetricsPrometheus.html) that's retrocompatible to the deprecated `:grpc_prometheus` library. + +First, let's create a mock server and client that we can use. + +## GRPC Server and Client + +```elixir +# This whole code block is taken from test/support/proto/helloword.pb.ex +# Normally, this would be autogenerated from a valid protobuf file, +# but we're using this hardcoded version so that this Livebook is self-contained. + +defmodule Helloworld.HelloRequest do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field(:name, 1, type: :string) + field(:duration, 2, proto3_optional: true, type: :int32) +end + +defmodule Helloworld.HelloReply do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field(:message, 1, type: :string) +end + +defmodule Helloworld.HeaderRequest do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 +end + +defmodule Helloworld.HeaderReply do + @moduledoc false + use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 + + field(:authorization, 1, type: :string) +end + +defmodule Helloworld.Greeter.Service do + @moduledoc false + use GRPC.Service, name: "helloworld.Greeter", protoc_gen_elixir_version: "0.11.0" + + rpc(:SayHello, Helloworld.HelloRequest, Helloworld.HelloReply) + + rpc(:CheckHeaders, Helloworld.HeaderRequest, Helloworld.HeaderReply) +end + +defmodule Helloworld.Greeter.Stub do + @moduledoc false + use GRPC.Stub, service: Helloworld.Greeter.Service +end +``` + +```elixir +defmodule HelloServer do + use GRPC.Server, service: Helloworld.Greeter.Service + + def say_hello(%{name: "raise", duration: duration}, _stream) do + Process.sleep(duration) + raise ArgumentError, "exception raised" + end + + def say_hello(%{name: "ok", duration: duration}, _stream) do + Process.sleep(duration) + Helloworld.HelloReply.new(message: "Hello") + end + + def say_hello(%{name: "not_found", duration: duration}, _stream) do + Process.sleep(duration) + raise GRPC.RPCError, status: GRPC.Status.not_found() + end + + def check_headers(_req, stream) do + token = GRPC.Stream.get_headers(stream)["authorization"] + Helloworld.HeaderReply.new(authorization: token) + end +end +``` + +```elixir +defmodule MetricsSupervisor do + use Supervisor + import Telemetry.Metrics + + def start_link(arg) do + Supervisor.start_link(__MODULE__, arg, name: __MODULE__) + end + + def init(_arg) do + # We attach to these events to combine them into a single published metric + # This can actually be done directly in Prometheus through Recording Rules, + # but this shows how to build a drop-in replacement for :grpc_prometheus + :telemetry.attach_many( + "handler-#{__MODULE__}", + [ + [:grpc, :server, :rpc, :stop], + [:grpc, :server, :rpc, :exception] + ], + fn [:grpc, :server, :rpc, event_kind], + %{duration: duration}, + %{stream: stream} = metadata, + _opts -> + code = + case {event_kind, metadata[:result], metadata[:reason]} do + {:stop, {:ok, _}, _} -> GRPC.Status.code_name(0) + {:stop, {:ok, _, _}, _} -> GRPC.Status.code_name(0) + {:stop, {:error, %GRPC.RPCError{status: status}}, _} -> GRPC.Status.code_name(status) + {:exception, _, %GRPC.RPCError{status: status}} -> GRPC.Status.code_name(status) + _ -> GRPC.Status.code_name(GRPC.Status.unknown()) + end + + metadata = %{ + grpc_service: stream.service_name, + grpc_method: stream.method_name, + grpc_type: stream.grpc_type, + grpc_code: code + } + + if is_message(stream) do + :telemetry.execute([:custom_grpc, :server_rpc, :sent], %{duration: duration}, metadata) + end + + :telemetry.execute([:custom_grpc, :server_rpc, :handled], %{duration: duration}, metadata) + end, + nil + ) + + # This can also be achieved through some clever use of tags+tag_values, + # without having to attach and publish a new event. However, that would + # end up leaking an extraneous tag to Prometheus. + # This is cleaner in that sense. + :telemetry.attach( + "handler-#{__MODULE__}-start", + [:grpc, :server, :rpc, :start], + fn _event, _, %{stream: stream}, _opts -> + if is_message(stream) do + :telemetry.execute([:custom_grpc, :server_rpc, :message_received], %{count: 1}) + end + end, + nil + ) + + children = [ + {TelemetryMetricsPrometheus, [metrics: metrics(), port: 9568]} + ] + + Supervisor.init(children, strategy: :one_for_one) + end + + defp is_message(stream) do + stream.grpc_type in [:client_stream, :bidi_stream] + end + + @histogram_buckets_seconds [5.0e-3, 10.0e-3, 25.0e-3, 50.0e-3, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] + + @tags [:grpc_service, :grpc_method, :grpc_type] + @tags_with_code [:grpc_code | @tags] + + defp metrics do + [ + # Server Metrics + counter( + "grpc_server_started_total", + event_name: "grpc.server.rpc.start", + measurement: :count, + tags: @tags, + tag_values: &extract_tags_from_stream/1, + description: "Total number of RPCs started on the server" + ), + counter( + "grpc_server_msg_received_total", + event_name: "custom_grpc.server_rpc.message_received", + measurement: :count, + tags: @tags, + tag_values: &extract_tags_from_stream/1, + description: "Total number of RPC stream messages received on the server" + ), + counter( + "grpc_server_msg_sent_total", + event_name: "custom_grpc.server_rpc.sent", + measurement: :duration, + tags: @tags, + description: "Total number of gRPC stream messages sent by the server." + ), + counter( + "grpc_server_handled_total", + event_name: "custom_grpc.server_rpc.handled", + measurement: :duration, + tags: @tags_with_code, + description: + "Total number of RPCs completed on the server, regardless of success or failure." + ), + distribution( + "grpc_server_handled_latency_seconds", + event_name: "custom_grpc.server_rpc.handled", + description: "Histogram of response latency of rpcs handled by the server, in seconds.", + measurement: :duration, + tags: @tags_with_code, + unit: {:native, :second}, + reporter_options: [ + buckets: @histogram_buckets_seconds + ] + ) + + # Client Metrics + # TO-DO + ] + end + + defp extract_tags_from_stream(%{ + stream: %{ + service_name: service_name, + method_name: method_name, + grpc_type: grpc_type + } + }) do + %{ + grpc_service: service_name, + grpc_method: method_name, + grpc_type: grpc_type + } + end + + defp extract_tags_from_stream(_) do + %{grpc_service: nil, grpc_method: nil, grpc_type: nil} + end +end +``` + +```elixir +MetricsSupervisor.start_link([]) +GRPC.Server.start([HelloServer], 1337, []) +``` + +```elixir +{:ok, channel} = GRPC.Stub.connect("localhost:1337") + +# the requests take some time internally, so we might not get _exactly_ +# the bucket distribution we expect +for duration <- [0, 6, 101, 101, 501, 1001] do + success_request = Helloworld.HelloRequest.new(name: "ok", duration: duration) + exception_request = Helloworld.HelloRequest.new(name: "raise", duration: duration) + not_found_request = Helloworld.HelloRequest.new(name: "not_found", duration: duration) + + {:ok, _} = Helloworld.Greeter.Stub.say_hello(channel, success_request) + + {:error, %GRPC.RPCError{status: 2}} = + Helloworld.Greeter.Stub.say_hello(channel, exception_request) + + {:error, %GRPC.RPCError{status: 5}} = + Helloworld.Greeter.Stub.say_hello(channel, not_found_request) +end +``` + +```elixir +# Checking which metrics our endpoint returns +# TO-DO: replace this by actual server calls + +{:ok, %Req.Response{status: 200, body: body}} = Req.request(url: "http://localhost:9568/metrics") + +# Print the output in a readable format +IO.puts(body) +nil +``` diff --git a/mix.exs b/mix.exs index 431ad5c5..cc24060c 100644 --- a/mix.exs +++ b/mix.exs @@ -46,7 +46,8 @@ defmodule GRPC.Mixfile do {:protobuf, "~> 0.11", only: [:dev, :test]}, {:ex_doc, "~> 0.28.6", only: :dev}, {:dialyxir, "~> 1.1.0", only: [:dev, :test], runtime: false}, - {:ex_parameterized, "~> 1.3.7", only: :test} + {:ex_parameterized, "~> 1.3.7", only: :test}, + {:telemetry, "~> 1.0"} ] end diff --git a/mix.lock b/mix.lock index 4b98e57a..690168b5 100644 --- a/mix.lock +++ b/mix.lock @@ -17,4 +17,5 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "protobuf": {:hex, :protobuf, "0.11.0", "58d5531abadea3f71135e97bd214da53b21adcdb5b1420aee63f4be8173ec927", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "30ad9a867a5c5a0616cac9765c4d2c2b7b0030fa81ea6d0c14c2eb5affb6ac52"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, } diff --git a/test/grpc/integration/client_interceptor_test.exs b/test/grpc/integration/client_interceptor_test.exs index b24ef7cf..d01947ed 100644 --- a/test/grpc/integration/client_interceptor_test.exs +++ b/test/grpc/integration/client_interceptor_test.exs @@ -28,10 +28,25 @@ defmodule GRPC.Integration.ClientInterceptorTest do end end + defmodule RaiseClientInterceptor do + @behaviour GRPC.Client.Interceptor + + def init(opts), do: opts + + def call(_stream, _req, _next, %{ + error_function: error_function, + delay: delay, + message: message + }) do + Process.sleep(delay) + error_function.(message) + end + end + defmodule HelloEndpoint do use GRPC.Endpoint - run HelloServer + run(HelloServer) end test "client sends headers" do @@ -49,4 +64,69 @@ defmodule GRPC.Integration.ClientInterceptorTest do assert reply.message == "Hello, Elixir one two" end) end + + test "sends exception event upon client exception" do + message = "exception-#{inspect(self())}" + + for {function, kind, reason} <- [ + {&throw/1, :throw, message}, + {&:erlang.exit/1, :exit, message}, + {&raise/1, :error, %RuntimeError{message: message}}, + {&:erlang.error/1, :error, %ErlangError{original: message}} + ] do + client_prefix = GRPC.Telemetry.client_rpc_prefix() + stop_client_name = client_prefix ++ [:stop] + exception_client_name = client_prefix ++ [:exception] + + attach_events([ + stop_client_name, + exception_client_name + ]) + + run_endpoint(HelloEndpoint, fn port -> + delay = floor(:rand.uniform() * 500) + 500 + + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", + interceptors: [ + {RaiseClientInterceptor, + %{error_function: function, message: message, delay: delay}} + ] + ) + + req = Helloworld.HelloRequest.new(name: "Elixir") + + try do + Helloworld.Greeter.Stub.say_hello(channel, req) + rescue + _ -> :ok + catch + _, _ -> :ok + else + _ -> flunk("did not raise") + end + + assert_received {^exception_client_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > delay + + assert %{kind: ^kind, reason: ^reason, stacktrace: stacktrace} = metadata + + assert is_list(stacktrace) + + Enum.each(stacktrace, fn entry -> + # ensure stacktrace is a pure stacktrace + assert {mod, fun, arity, meta} = entry + assert is_atom(mod) + assert is_atom(fun) + assert is_integer(arity) + assert is_list(meta) + end) + end) + + assert_receive {:gun_down, _, _, _, _} + + refute_receive _ + end + end end diff --git a/test/grpc/integration/server_test.exs b/test/grpc/integration/server_test.exs index e2455719..378a6754 100644 --- a/test/grpc/integration/server_test.exs +++ b/test/grpc/integration/server_test.exs @@ -12,6 +12,16 @@ defmodule GRPC.Integration.ServerTest do defmodule HelloServer do use GRPC.Server, service: Helloworld.Greeter.Service + def say_hello(%{name: "raise", duration: duration}, _stream) do + Process.sleep(duration) + raise ArgumentError, "exception raised" + end + + def say_hello(%{name: "delay", duration: duration}, _stream) do + Process.sleep(duration) + Helloworld.HelloReply.new(message: "Hello") + end + def say_hello(%{name: "large response"}, _stream) do name = String.duplicate("a", round(:math.pow(2, 14))) Helloworld.HelloReply.new(message: "Hello, #{name}") @@ -234,11 +244,178 @@ defmodule GRPC.Integration.ServerTest do test "get cert returns correct client certificate when not present" do run_server([HelloServer], fn port -> - {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + assert {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") req = Helloworld.HelloRequest.new(name: "get cert") - {:ok, reply} = channel |> Helloworld.Greeter.Stub.say_hello(req) + assert {:ok, reply} = channel |> Helloworld.Greeter.Stub.say_hello(req) assert reply.message == "Hello, unauthenticated" end) end + + describe "telemetry" do + test "sends server start+stop events on success" do + server_rpc_prefix = GRPC.Telemetry.server_rpc_prefix() + start_server_name = server_rpc_prefix ++ [:start] + stop_server_name = server_rpc_prefix ++ [:stop] + exception_server_name = server_rpc_prefix ++ [:exception] + + client_rpc_prefix = GRPC.Telemetry.client_rpc_prefix() + start_client_name = client_rpc_prefix ++ [:start] + stop_client_name = client_rpc_prefix ++ [:stop] + exception_client_name = client_rpc_prefix ++ [:exception] + + attach_events([ + start_server_name, + stop_server_name, + exception_server_name, + start_client_name, + stop_client_name, + exception_client_name + ]) + + run_server([HelloServer], fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + + req = Helloworld.HelloRequest.new(name: "delay", duration: 1000) + + assert {:ok, _} = Helloworld.Greeter.Stub.say_hello(channel, req) + end) + + assert_received {^start_server_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{} + } = metadata + + assert_received {^stop_server_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1000 + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{} + } = metadata + + assert_received {:gun_down, _, _, _, _} + + assert_received {^start_client_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {"say_hello", {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}} + } + } = metadata + + assert_received {^stop_client_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1100 + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {"say_hello", {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}} + } + } = metadata + + refute_receive _ + end + + test "sends server start+exception events on success" do + server_rpc_prefix = GRPC.Telemetry.server_rpc_prefix() + start_server_name = server_rpc_prefix ++ [:start] + stop_server_name = server_rpc_prefix ++ [:stop] + exception_server_name = server_rpc_prefix ++ [:exception] + + client_rpc_prefix = GRPC.Telemetry.client_rpc_prefix() + start_client_name = client_rpc_prefix ++ [:start] + stop_client_name = client_rpc_prefix ++ [:stop] + exception_client_name = client_rpc_prefix ++ [:exception] + + attach_events([ + start_server_name, + stop_server_name, + exception_server_name, + start_client_name, + stop_client_name, + exception_client_name + ]) + + run_server([HelloServer], fn port -> + {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + + req = Helloworld.HelloRequest.new(name: "raise", duration: 1100) + + assert {:error, %GRPC.RPCError{status: 2}} = + Helloworld.Greeter.Stub.say_hello(channel, req) + end) + + assert_received {^start_server_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{} + } = metadata + + assert_received {^exception_server_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1100 + + assert %{ + server: HelloServer, + endpoint: nil, + function_name: :say_hello, + stream: %GRPC.Server.Stream{}, + kind: :error, + reason: %ArgumentError{message: "exception raised"}, + stacktrace: stacktrace + } = metadata + + assert is_list(stacktrace) + + Enum.each(stacktrace, fn entry -> + # ensure stacktrace is a pure stacktrace + assert {mod, fun, arity, meta} = entry + assert is_atom(mod) + assert is_atom(fun) + assert is_integer(arity) + assert is_list(meta) + end) + + assert_received {^start_client_name, measurements, metadata} + assert %{monotonic_time: _, system_time: _} = measurements + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {"say_hello", {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}} + } + } = metadata + + assert_received {^stop_client_name, measurements, metadata} + assert %{duration: duration} = measurements + assert duration > 1100 + + assert %{ + stream: %GRPC.Client.Stream{ + rpc: + {"say_hello", {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}} + } + } = metadata + + assert_received {:gun_down, _, _, _, _} + + refute_receive _ + end + end end diff --git a/test/support/integration_test_case.ex b/test/support/integration_test_case.ex index 24467ca2..484342f9 100644 --- a/test/support/integration_test_case.ex +++ b/test/support/integration_test_case.ex @@ -50,4 +50,23 @@ defmodule GRPC.Integration.TestCase do result end end + + def attach_events(event_names) do + test_pid = self() + + handler_id = "handler-#{inspect(test_pid)}" + + :telemetry.attach_many( + handler_id, + event_names, + fn name, measurements, metadata, [] -> + send(test_pid, {name, measurements, metadata}) + end, + [] + ) + + on_exit(fn -> + :telemetry.detach(handler_id) + end) + end end diff --git a/test/support/proto/helloworld.pb.ex b/test/support/proto/helloworld.pb.ex index 34e6b4f9..02e5b1d9 100644 --- a/test/support/proto/helloworld.pb.ex +++ b/test/support/proto/helloworld.pb.ex @@ -3,6 +3,7 @@ defmodule Helloworld.HelloRequest do use Protobuf, protoc_gen_elixir_version: "0.11.0", syntax: :proto3 field :name, 1, type: :string + field :duration, 2, proto3_optional: true, type: :int32 end defmodule Helloworld.HelloReply do diff --git a/test/support/proto/helloworld.proto b/test/support/proto/helloworld.proto index 0436df68..e0fac966 100644 --- a/test/support/proto/helloworld.proto +++ b/test/support/proto/helloworld.proto @@ -12,6 +12,7 @@ service Greeter { // The request message containing the user's name. message HelloRequest { string name = 1; + optional int32 duration = 2; } // The response message containing the greetings