Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Telemetry via cowboy stream_handler #39

Closed
50 changes: 49 additions & 1 deletion lib/plug/cowboy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ defmodule Plug.Cowboy do
other -> :erlang.error({:badarg, [other]})
end

:telemetry.attach(
:plug_cowboy,
[:cowboy, :request, :early_error],
&handle_event/4,
nil
)

apply(:cowboy, start, args(scheme, plug, opts, cowboy_options))
end

Expand Down Expand Up @@ -303,7 +310,7 @@ defmodule Plug.Cowboy do
[ref || build_ref(plug, scheme), transport_options, protocol_options]
end

@default_stream_handlers [Plug.Cowboy.Stream]
@default_stream_handlers [:cowboy_telemetry_h, :cowboy_stream_h]

defp set_stream_handlers(opts) do
compress = Keyword.get(opts, :compress)
Expand Down Expand Up @@ -338,6 +345,47 @@ defmodule Plug.Cowboy do
raise ArgumentError, "could not start Cowboy2 adapter, " <> message
end

def handle_event(
[:cowboy, :request, :early_error],
_,
%{reason: {:connection_error, :limit_reached, specific_reason}, partial_req: partial_req},
_
) do
Logger.error("""
Cowboy returned 431 because it was unable to parse the request headers.

This may happen because there are no headers, or there are too many headers
or the header name or value are too large (such as a large cookie).

More specific reason is:

#{inspect(specific_reason)}

You can customize those limits when configuring your http/https
server. The configuration option and default values are shown below:

protocol_options: [
max_header_name_length: 64,
max_header_value_length: 4096,
max_headers: 100
]

Request info:

peer: #{format_peer(partial_req.peer)}
method: #{partial_req.method || "<unable to parse>"}
path: #{partial_req.path || "<unable to parse>"}
""")
end

def handle_event(_, _, _, _) do
:ok
end

defp format_peer({addr, port}) do
"#{:inet_parse.ntoa(addr)}:#{port}"
end

defp option_deprecation_warning(:acceptors),
do: option_deprecation_warning(:acceptors, :num_acceptors)

Expand Down
78 changes: 0 additions & 78 deletions lib/plug/cowboy/stream.ex

This file was deleted.

63 changes: 63 additions & 0 deletions src/cowboy_telemetry_h.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-module(cowboy_telemetry_h).
-behavior(cowboy_stream).

-export([init/3]).
-export([data/4]).
-export([info/3]).
-export([terminate/3]).
-export([early_error/5]).

init(StreamID, Req, Opts) ->
SystemTime = erlang:system_time(),
StartTime = erlang:monotonic_time(),
telemetry:execute(
[cowboy, request, start],
#{system_time => SystemTime},
#{stream_id => StreamID, req => Req}
),
{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
{Commands, [Next | StartTime]}.

data(StreamID, IsFin, Data, [Next0 | StartTime]) ->
{Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
{Commands, [Next | StartTime]}.

info(StreamID, Info, [Next0 | StartTime]) ->
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
case Commands of
[{response, _, _, _} = Response] ->
EndTime = erlang:monotonic_time(),
telemetry:execute(
[cowboy, request, stop],
#{duration => EndTime - StartTime},
#{stream_id => StreamID, response => Response}
);
[{error_response, _, _, _} = ErrorResponse | Commands1] ->
EndTime = erlang:monotonic_time(),
case lists:keyfind(internal_error, 1, Commands1) of
{internal_error, {'EXIT', _, Reason}, _} ->
telemetry:execute(
[cowboy, request, exception],
#{duration => EndTime - StartTime},
#{stream_id => StreamID, kind => exit, reason => Reason, error_response => ErrorResponse}
);
_ ->
ignore
end;
_ ->
ignore
end,
{Commands, [Next | StartTime]}.

terminate(StreamID, Reason, [Next | _]) ->
cowboy_stream:terminate(StreamID, Reason, Next).

early_error(StreamID, Reason, PartialReq, Resp0, Opts) ->
SystemTime = erlang:system_time(),
Resp = cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp0, Opts),
telemetry:execute(
[cowboy, request, early_error],
#{system_time => SystemTime},
#{stream_id => StreamID, reason => Reason, partial_req => PartialReq, response => Resp}
),
Resp.
82 changes: 71 additions & 11 deletions test/plug/cowboy/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,78 @@ defmodule Plug.Cowboy.ConnTest do
assert {200, _, _} = request(:get, "/headers", [{"foo", "bar"}, {"baz", "bat"}])
end

def telemetry(conn) do
Process.sleep(30)
send_resp(conn, 200, "TELEMETRY")
end

def telemetry_exception(conn) do
# send first because of the `rescue` in `call`
send_resp(conn, 200, "Fail")
raise "BadTimes"
end

test "emits telemetry events for start/stop" do
:telemetry.attach_many(
:start_stop_test,
[
[:cowboy, :request, :start],
[:cowboy, :request, :stop],
[:cowboy, :request, :exception]
],
fn event, measurements, metadata, test ->
send(test, {:telemetry, event, measurements, metadata})
end,
self()
)

assert {200, _, "TELEMETRY"} = request(:get, "/telemetry?foo=bar")

assert_receive {:telemetry, [:cowboy, :request, :start], %{system_time: _},
%{stream_id: _, req: req}}

assert req.path == "/telemetry"

assert_receive {:telemetry, [:cowboy, :request, :stop], %{duration: duration},
%{response: {:response, _, _, _}}}

duration_ms = System.convert_time_unit(duration, :native, :millisecond)

assert duration_ms >= 30
assert duration_ms < 100

refute_received {:telemetry, [:cowboy, :request, :exception], _, _}

:telemetry.detach(:start_stop_test)
end

test "emits telemetry events for exception" do
:telemetry.attach_many(
:exception_test,
[
[:cowboy, :request, :start],
[:cowboy, :request, :exception]
],
fn event, measurements, metadata, test ->
send(test, {:telemetry, event, measurements, metadata})
end,
self()
)

request(:get, "/telemetry_exception")

assert_receive {:telemetry, [:cowboy, :request, :start], _, _}

assert_receive {:telemetry, [:cowboy, :request, :exception], %{},
%{kind: :exit, reason: _reason}}

:telemetry.detach(:exception_test)
end

test "fails on large headers" do
:telemetry.attach(
:early_error_test,
[:plug_cowboy, :early_error],
[:cowboy, :request, :early_error],
fn name, measurements, metadata, test ->
send(test, {:event, name, measurements, metadata})
end,
Expand All @@ -153,21 +221,13 @@ defmodule Plug.Cowboy.ConnTest do
assert {200, _, _} = request(:get, "/headers", [{"foo", "bar"}, {"baz", "bat"}])
end) =~ "Cowboy returned 431 because it was unable to parse the request headers"

assert_receive {:event, [:plug_cowboy, :early_error],
assert_receive {:event, [:cowboy, :request, :early_error],
%{
system_time: _
},
%{
reason: {:connection_error, :limit_reached, _},
request: %{
method: "GET",
path: "/headers"
},
response: %{
status: 431,
headers: _,
body: _
}
partial_req: %{}
}}

:telemetry.detach(:early_error_test)
Expand Down
2 changes: 1 addition & 1 deletion test/plug/cowboy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ defmodule Plug.CowboyTest do
%{num_acceptors: 100, max_connections: 16_384, socket_opts: [port: 3000]},
%{
env: %{dispatch: @dispatch},
stream_handlers: [:cowboy_compress_h, Plug.Cowboy.Stream]
stream_handlers: [:cowboy_compress_h, :cowboy_telemetry_h, :cowboy_stream_h]
}
] = args(:http, __MODULE__, [], port: 3000, compress: true)
end
Expand Down