diff --git a/lib/plug/cowboy.ex b/lib/plug/cowboy.ex index e7ace50..1fc007a 100644 --- a/lib/plug/cowboy.ex +++ b/lib/plug/cowboy.ex @@ -240,6 +240,13 @@ defmodule Plug.Cowboy do other -> :erlang.error({:badarg, [other]}) end + :telemetry.attach( + :plug_cowboy, + [:cowboy, :request, :early_error], + &handle_event/4, + nil + ) + apply(:cowboy, start, args(scheme, plug, opts, cowboy_options)) end @@ -303,7 +310,7 @@ defmodule Plug.Cowboy do [ref || build_ref(plug, scheme), transport_options, protocol_options] end - @default_stream_handlers [Plug.Cowboy.Stream] + @default_stream_handlers [:cowboy_telemetry_h, :cowboy_stream_h] defp set_stream_handlers(opts) do compress = Keyword.get(opts, :compress) @@ -338,6 +345,47 @@ defmodule Plug.Cowboy do raise ArgumentError, "could not start Cowboy2 adapter, " <> message end + def handle_event( + [:cowboy, :request, :early_error], + _, + %{reason: {:connection_error, :limit_reached, specific_reason}, partial_req: partial_req}, + _ + ) do + Logger.error(""" + Cowboy returned 431 because it was unable to parse the request headers. + + This may happen because there are no headers, or there are too many headers + or the header name or value are too large (such as a large cookie). + + More specific reason is: + + #{inspect(specific_reason)} + + You can customize those limits when configuring your http/https + server. The configuration option and default values are shown below: + + protocol_options: [ + max_header_name_length: 64, + max_header_value_length: 4096, + max_headers: 100 + ] + + Request info: + + peer: #{format_peer(partial_req.peer)} + method: #{partial_req.method || ""} + path: #{partial_req.path || ""} + """) + end + + def handle_event(_, _, _, _) do + :ok + end + + defp format_peer({addr, port}) do + "#{:inet_parse.ntoa(addr)}:#{port}" + end + defp option_deprecation_warning(:acceptors), do: option_deprecation_warning(:acceptors, :num_acceptors) diff --git a/lib/plug/cowboy/stream.ex b/lib/plug/cowboy/stream.ex deleted file mode 100644 index ecc23e7..0000000 --- a/lib/plug/cowboy/stream.ex +++ /dev/null @@ -1,78 +0,0 @@ -defmodule Plug.Cowboy.Stream do - @moduledoc false - - require Logger - - def init(stream_id, req, opts) do - :cowboy_stream_h.init(stream_id, req, opts) - end - - def data(stream_id, is_fin, data, state) do - :cowboy_stream_h.data(stream_id, is_fin, data, state) - end - - def info(stream_id, info, state) do - :cowboy_stream_h.info(stream_id, info, state) - end - - def terminate(_stream_id, _reason, :undefined) do - :ok - end - - def terminate(stream_id, reason, state) do - :cowboy_stream_h.info(stream_id, reason, state) - :ok - end - - def early_error(_stream_id, reason, partial_req, resp, _opts) do - {:response, status, headers, body} = resp - - :telemetry.execute( - [:plug_cowboy, :early_error], - %{system_time: System.system_time()}, - %{ - reason: reason, - request: %{method: partial_req[:method], path: partial_req[:path]}, - response: %{status: status, headers: headers, body: body} - } - ) - - case reason do - {:connection_error, :limit_reached, specific_reason} -> - Logger.error(""" - Cowboy returned 431 because it was unable to parse the request headers. - - This may happen because there are no headers, or there are too many headers - or the header name or value are too large (such as a large cookie). - - More specific reason is: - - #{inspect(specific_reason)} - - You can customize those limits when configuring your http/https - server. The configuration option and default values are shown below: - - protocol_options: [ - max_header_name_length: 64, - max_header_value_length: 4096, - max_headers: 100 - ] - - Request info: - - peer: #{format_peer(partial_req.peer)} - method: #{Map.get(partial_req, :method, "")} - path: #{Map.get(partial_req, :path, "")} - """) - - _ -> - :ok - end - - resp - end - - defp format_peer({addr, port}) do - "#{:inet_parse.ntoa(addr)}:#{port}" - end -end diff --git a/src/cowboy_telemetry_h.erl b/src/cowboy_telemetry_h.erl new file mode 100644 index 0000000..60e35f0 --- /dev/null +++ b/src/cowboy_telemetry_h.erl @@ -0,0 +1,63 @@ +-module(cowboy_telemetry_h). +-behavior(cowboy_stream). + +-export([init/3]). +-export([data/4]). +-export([info/3]). +-export([terminate/3]). +-export([early_error/5]). + +init(StreamID, Req, Opts) -> + SystemTime = erlang:system_time(), + StartTime = erlang:monotonic_time(), + telemetry:execute( + [cowboy, request, start], + #{system_time => SystemTime}, + #{stream_id => StreamID, req => Req} + ), + {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts), + {Commands, [Next | StartTime]}. + +data(StreamID, IsFin, Data, [Next0 | StartTime]) -> + {Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0), + {Commands, [Next | StartTime]}. + +info(StreamID, Info, [Next0 | StartTime]) -> + {Commands, Next} = cowboy_stream:info(StreamID, Info, Next0), + case Commands of + [{response, _, _, _} = Response] -> + EndTime = erlang:monotonic_time(), + telemetry:execute( + [cowboy, request, stop], + #{duration => EndTime - StartTime}, + #{stream_id => StreamID, response => Response} + ); + [{error_response, _, _, _} = ErrorResponse | Commands1] -> + EndTime = erlang:monotonic_time(), + case lists:keyfind(internal_error, 1, Commands1) of + {internal_error, {'EXIT', _, Reason}, _} -> + telemetry:execute( + [cowboy, request, exception], + #{duration => EndTime - StartTime}, + #{stream_id => StreamID, kind => exit, reason => Reason, error_response => ErrorResponse} + ); + _ -> + ignore + end; + _ -> + ignore + end, + {Commands, [Next | StartTime]}. + +terminate(StreamID, Reason, [Next | _]) -> + cowboy_stream:terminate(StreamID, Reason, Next). + +early_error(StreamID, Reason, PartialReq, Resp0, Opts) -> + SystemTime = erlang:system_time(), + Resp = cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp0, Opts), + telemetry:execute( + [cowboy, request, early_error], + #{system_time => SystemTime}, + #{stream_id => StreamID, reason => Reason, partial_req => PartialReq, response => Resp} + ), + Resp. diff --git a/test/plug/cowboy/conn_test.exs b/test/plug/cowboy/conn_test.exs index 4a3d16f..0a3a825 100644 --- a/test/plug/cowboy/conn_test.exs +++ b/test/plug/cowboy/conn_test.exs @@ -136,10 +136,78 @@ defmodule Plug.Cowboy.ConnTest do assert {200, _, _} = request(:get, "/headers", [{"foo", "bar"}, {"baz", "bat"}]) end + def telemetry(conn) do + Process.sleep(30) + send_resp(conn, 200, "TELEMETRY") + end + + def telemetry_exception(conn) do + # send first because of the `rescue` in `call` + send_resp(conn, 200, "Fail") + raise "BadTimes" + end + + test "emits telemetry events for start/stop" do + :telemetry.attach_many( + :start_stop_test, + [ + [:cowboy, :request, :start], + [:cowboy, :request, :stop], + [:cowboy, :request, :exception] + ], + fn event, measurements, metadata, test -> + send(test, {:telemetry, event, measurements, metadata}) + end, + self() + ) + + assert {200, _, "TELEMETRY"} = request(:get, "/telemetry?foo=bar") + + assert_receive {:telemetry, [:cowboy, :request, :start], %{system_time: _}, + %{stream_id: _, req: req}} + + assert req.path == "/telemetry" + + assert_receive {:telemetry, [:cowboy, :request, :stop], %{duration: duration}, + %{response: {:response, _, _, _}}} + + duration_ms = System.convert_time_unit(duration, :native, :millisecond) + + assert duration_ms >= 30 + assert duration_ms < 100 + + refute_received {:telemetry, [:cowboy, :request, :exception], _, _} + + :telemetry.detach(:start_stop_test) + end + + test "emits telemetry events for exception" do + :telemetry.attach_many( + :exception_test, + [ + [:cowboy, :request, :start], + [:cowboy, :request, :exception] + ], + fn event, measurements, metadata, test -> + send(test, {:telemetry, event, measurements, metadata}) + end, + self() + ) + + request(:get, "/telemetry_exception") + + assert_receive {:telemetry, [:cowboy, :request, :start], _, _} + + assert_receive {:telemetry, [:cowboy, :request, :exception], %{}, + %{kind: :exit, reason: _reason}} + + :telemetry.detach(:exception_test) + end + test "fails on large headers" do :telemetry.attach( :early_error_test, - [:plug_cowboy, :early_error], + [:cowboy, :request, :early_error], fn name, measurements, metadata, test -> send(test, {:event, name, measurements, metadata}) end, @@ -153,21 +221,13 @@ defmodule Plug.Cowboy.ConnTest do assert {200, _, _} = request(:get, "/headers", [{"foo", "bar"}, {"baz", "bat"}]) end) =~ "Cowboy returned 431 because it was unable to parse the request headers" - assert_receive {:event, [:plug_cowboy, :early_error], + assert_receive {:event, [:cowboy, :request, :early_error], %{ system_time: _ }, %{ reason: {:connection_error, :limit_reached, _}, - request: %{ - method: "GET", - path: "/headers" - }, - response: %{ - status: 431, - headers: _, - body: _ - } + partial_req: %{} }} :telemetry.detach(:early_error_test) diff --git a/test/plug/cowboy_test.exs b/test/plug/cowboy_test.exs index f87a5b2..077cc50 100644 --- a/test/plug/cowboy_test.exs +++ b/test/plug/cowboy_test.exs @@ -153,7 +153,7 @@ defmodule Plug.CowboyTest do %{num_acceptors: 100, max_connections: 16_384, socket_opts: [port: 3000]}, %{ env: %{dispatch: @dispatch}, - stream_handlers: [:cowboy_compress_h, Plug.Cowboy.Stream] + stream_handlers: [:cowboy_compress_h, :cowboy_telemetry_h, :cowboy_stream_h] } ] = args(:http, __MODULE__, [], port: 3000, compress: true) end