Skip to content

Commit

Permalink
feat: add telemetry (#298)
Browse files Browse the repository at this point in the history
* feat: add server telemetry

* fix: restore call arg

* feat: add clien-side telemetry

* test: add tests

* wip: add livebook for grpc_prometheus replacement

* feat: add server telemetry prometheus conversion example

* chore: format

* test: test client telemetry events

* refactor: use :telemetry.span for server telemetry

* refactor: use telemetry span for client telemetry
  • Loading branch information
polvalente authored Jun 8, 2023
1 parent fc0b9bb commit 80a0858
Show file tree
Hide file tree
Showing 11 changed files with 744 additions and 29 deletions.
39 changes: 15 additions & 24 deletions lib/grpc/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -182,35 +182,26 @@ defmodule GRPC.Server do
%{server: server, endpoint: endpoint} = stream,
req
) do
last = fn r, s ->
reply = apply(server, func_name, [r, s])

if res_stream do
{:ok, stream}
else
{:ok, stream, reply}
GRPC.Telemetry.server_span(server, endpoint, func_name, stream, fn ->
last = fn r, s ->
reply = apply(server, func_name, [r, s])

if res_stream do
{:ok, stream}
else
{:ok, stream, reply}
end
end
end

interceptors = interceptors(endpoint, server)
interceptors = interceptors(endpoint, server)

next =
Enum.reduce(interceptors, last, fn {interceptor, opts}, acc ->
fn r, s -> interceptor.call(r, s, acc, opts) end
end)
next =
Enum.reduce(interceptors, last, fn {interceptor, opts}, acc ->
fn r, s -> interceptor.call(r, s, acc, opts) end
end)

try do
next.(req, stream)
rescue
e in GRPC.RPCError ->
{:error, e}
catch
kind, reason ->
stack = __STACKTRACE__
Logger.error(Exception.format(kind, reason, stack))
reason = Exception.normalize(kind, reason, stack)
{:error, %{kind: kind, reason: reason, stack: stack}}
end
end)
end

defp interceptors(nil, _), do: []
Expand Down
4 changes: 3 additions & 1 deletion lib/grpc/stub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ defmodule GRPC.Stub do
accepted_compressors: accepted_compressors
}

do_call(req_stream, stream, request, opts)
GRPC.Telemetry.client_span(stream, fn ->
do_call(req_stream, stream, request, opts)
end)
end

defp do_call(
Expand Down
159 changes: 159 additions & 0 deletions lib/grpc/telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
defmodule GRPC.Telemetry do
@moduledoc """
Events published by GRPC
These can be divided in client-side events and server-side events.
## Client-side Events
* `[:grpc, :client, :rpc, :start]` - Published before all interceptors are executed.
* `[:grpc, :client, :rpc, :stop]` - Published after all interceptors executed successfully.
* `:duration` - the duration as measured through `System.monotonic_time()`
for the whole interceptor pipeline.
* `[:grpc, :client, :rpc, :exception]` - Published if any exception occurs while receiving a message.
* `:duration` - the duration as measured through `System.monotonic_time()`
for the execution since the start of the pipeline until the exception happened.
| event | measurements | metadata |
|--------------|--------------|----------|
| `[:rpc, :start]` | `:count` | `:stream` |
| `[:rpc, :stop]` | `:duration` | `:stream` |
| `[:rpc, :exception]` | `:duration` | `:stream`, `:kind`, `:reason`, `:stacktrace` |
### Metadata
* `:stream` - the `%GRPC.Server.Stream{}` for the request
* `:function_name` - the name of the function called
* `:server` - the server module name
* `:endpoint` - the endpoint module name
`:exception` events also include some error metadata:
* `:reason` is the error value in case of `catch` or the actual exception in case of `rescue`.
* `:kind` can be one of:
* `:error` — from an `{:error, error}` return value. Some Erlang functions may also throw an
`:error` tuple, which will be reported as `:error`.
* `:exit` — from a caught process exit.
* `:throw` — from a caught value, this doesn't necessarily mean that an error occurred.
## Server-side Events
* `[:grpc, :server, :rpc, :start]` - Published before all interceptors are executed.
* `[:grpc, :server, :rpc, :stop]` - Published after all interceptors executed successfully.
* `:duration` - the duration as measured through `System.monotonic_time()`
for the whole interceptor pipeline.
* `[:grpc, :server, :rpc, :exception]` - Published if any exception occurs while receiving a message.
* `:duration` - the duration as measured through `System.monotonic_time()`
for the execution since the start of the pipeline until the exception happened.
| event | measurements | metadata |
|--------------|--------------|----------|
| `[:rpc, :start]` | `:count` | `:stream`, `:server`, `:endpoint`, `:function_name` |
| `[:rpc, :stop]` | `:duration` | `:stream`, `:server`, `:endpoint`, `:function_name` , `:result` |
| `[:rpc, :exception]` | `:duration` | `:stream`, `:server`, `:endpoint`, `:function_name`, `:kind`, `:reason`, `:stacktrace` |
### Metadata
* `:stream` - the `%GRPC.Server.Stream{}` for the request.
* `:function_name` - the name of the function called.
* `:server` - the server module name.
* `:endpoint` - the endpoint module name.
* `:result` - the result returned from the interceptor pipeline.
`:exception` events also include some error metadata:
* `:reason` is the error value in case of `catch` or the actual exception in case of `rescue`.
* `:kind` can be one of:
* `:error` — from an `{:error, error}` return value. Some Erlang functions may also throw an
`:error` tuple, which will be reported as `:error`.
* `:exit` — from a caught process exit.
* `:throw` — from a caught value, this doesn't necessarily mean that an error occurred.
"""

require Logger

@server_rpc [:grpc, :server, :rpc]
@client_rpc [:grpc, :client, :rpc]

@doc "The server telemetry event prefix."
def server_rpc_prefix, do: @server_rpc

@doc "The client telemetry event prefix."
def client_rpc_prefix, do: @client_rpc

@doc false
def server_span(server, endpoint, func_name, stream, span_fn) do
start_metadata = %{
server: server,
endpoint: endpoint,
function_name: func_name,
stream: stream
}

:telemetry.span(@server_rpc, start_metadata, fn ->
result = span_fn.()

{result, Map.put(start_metadata, :result, result)}
end)
rescue
e in GRPC.RPCError ->
{:error, e}
catch
kind, reason ->
stacktrace = __STACKTRACE__
Logger.error(Exception.format(kind, reason, stacktrace))
{:error, %{kind: kind, reason: reason, stack: stacktrace}}
end

@client_rpc_start_name @client_rpc ++ [:start]
@doc false
def client_rpc_start_name, do: @client_rpc_start_name

@doc false
def client_span(stream, span_fn) do
start_metadata = %{stream: stream}

:telemetry.span(@client_rpc, start_metadata, fn ->
try do
{span_fn.(), start_metadata}
rescue
e ->
:erlang.error(Exception.normalize(:error, e, __STACKTRACE__))
end
end)
catch
kind, reason ->
stacktrace = __STACKTRACE__
Logger.error(Exception.format(kind, reason, stacktrace))
:erlang.raise(kind, reason, stacktrace)
end

@client_rpc_stop_name @client_rpc ++ [:stop]
@doc false
def client_rpc_stop_name, do: @client_rpc_stop_name

@doc false
def client_rpc_stop(stream, duration) do
:telemetry.execute(@client_rpc_stop_name, %{duration: duration}, %{stream: stream})
end

@client_rpc_exception_name @client_rpc ++ [:exception]
@doc false
def client_rpc_exception_name, do: @client_rpc_exception_name

@doc false
def client_rpc_exception(
stream,
kind,
reason,
stacktrace,
duration
) do
:telemetry.execute(@client_rpc_exception_name, %{duration: duration}, %{
stream: stream,
kind: kind,
reason: reason,
stacktrace: stacktrace
})
end
end
Loading

0 comments on commit 80a0858

Please sign in to comment.