diff --git a/interop/lib/interop/client.ex b/interop/lib/interop/client.ex index d8661ea9..76013736 100644 --- a/interop/lib/interop/client.ex +++ b/interop/lib/interop/client.ex @@ -1,8 +1,13 @@ defmodule Interop.Client do + import ExUnit.Assertions, only: [refute: 1] require Logger + # To better understand the behavior of streams used in this module + # we suggest you to check the documentation for `GRPC.Stub.recv/2` + # there is some unusual behavior that can be observed. + def connect(host, port, opts \\ []) do {:ok, ch} = GRPC.Stub.connect(host, port, opts) ch @@ -103,12 +108,9 @@ defmodule Interop.Client do params = Enum.map([31415, 9, 2653, 58979], &res_param(&1)) req = Grpc.Testing.StreamingOutputCallRequest.new(response_parameters: params) {:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req) - result = Enum.map([31415, 9, 2653, 58979], &String.duplicate(<<0>>, &1)) + result = Enum.map([9, 2653, 31415, 58979], &String.duplicate(<<0>>, &1)) - ^result = - Enum.map(res_enum, fn {:ok, res} -> - res.payload.body - end) + ^result = res_enum |> Enum.map(fn {:ok, res} -> res.payload.body end) |> Enum.sort() end def server_compressed_streaming!(ch) do @@ -122,10 +124,7 @@ defmodule Interop.Client do {:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req) result = Enum.map([31415, 92653], &String.duplicate(<<0>>, &1)) - ^result = - Enum.map(res_enum, fn {:ok, res} -> - res.payload.body - end) + ^result = res_enum |> Enum.map(fn {:ok, res} -> res.payload.body end) |> Enum.sort() end def ping_pong!(ch) do @@ -143,15 +142,13 @@ defmodule Interop.Client do {:ok, res_enum} = GRPC.Stub.recv(stream) reply = String.duplicate(<<0>>, 31415) - {:ok, %{payload: %{body: ^reply}}} = - Stream.take(res_enum, 1) |> Enum.to_list() |> List.first() + {:ok, %{payload: %{body: ^reply}}} = Enum.at(res_enum, 0) Enum.each([{9, 8}, {2653, 1828}, {58979, 45904}], fn {res, payload} -> GRPC.Stub.send_request(stream, req.(res, payload)) reply = String.duplicate(<<0>>, res) - {:ok, %{payload: %{body: ^reply}}} = - Stream.take(res_enum, 1) |> Enum.to_list() |> List.first() + {:ok, %{payload: %{body: ^reply}}} = Enum.at(res_enum, 0) end) GRPC.Stub.end_stream(stream) @@ -191,19 +188,32 @@ defmodule Interop.Client do payload: payload(271_828) ) - {:ok, res_enum, %{headers: new_headers}} = + {headers, data, trailers} = ch |> Grpc.Testing.TestService.Stub.full_duplex_call(metadata: metadata) |> GRPC.Stub.send_request(req, end_stream: true) |> GRPC.Stub.recv(return_headers: true) + |> process_full_duplex_response() reply = String.duplicate(<<0>>, 314_159) - {:ok, %{payload: %{body: ^reply}}} = - Stream.take(res_enum, 1) |> Enum.to_list() |> List.first() + %{payload: %{body: ^reply}} = data - {:trailers, new_trailers} = Stream.take(res_enum, 1) |> Enum.to_list() |> List.first() - validate_headers!(new_headers, new_trailers) + validate_headers!(headers, trailers) + end + + defp process_full_duplex_response({:ok, res_enum, %{headers: new_headers}}) do + {:ok, data} = Enum.at(res_enum, 0) + {:trailers, new_trailers} = Enum.at(res_enum, 0) + {new_headers, data, new_trailers} + end + + + defp process_full_duplex_response({:ok, res_enum}) do + {:headers, headers} = Enum.at(res_enum, 0) + {:ok, data} = Enum.at(res_enum, 0) + {:trailers, trailers} = Enum.at(res_enum, 0) + {headers, data, trailers} end def status_code_and_message!(ch) do @@ -226,6 +236,10 @@ defmodule Interop.Client do |> Grpc.Testing.TestService.Stub.full_duplex_call() |> GRPC.Stub.send_request(req, end_stream: true) |> GRPC.Stub.recv() + |> case do + {:ok, stream} -> Enum.at(stream, 0) + error -> error + end end def unimplemented_service!(ch) do @@ -260,7 +274,7 @@ defmodule Interop.Client do |> GRPC.Stub.send_request(req) |> GRPC.Stub.recv() - {:ok, _} = Stream.take(res_enum, 1) |> Enum.to_list() |> List.first() + {:ok, _} = Enum.at(res_enum, 0) stream = GRPC.Stub.cancel(stream) {:error, %GRPC.RPCError{status: 1}} = GRPC.Stub.recv(stream) end diff --git a/interop/mix.lock b/interop/mix.lock index 43470da5..fe9d6037 100644 --- a/interop/mix.lock +++ b/interop/mix.lock @@ -7,6 +7,8 @@ "grpc_prometheus": {:hex, :grpc_prometheus, "0.1.0", "a2f45ca83018c4ae59e4c293b7455634ac09e38c36cba7cc1fb8affdf462a6d5", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm", "8b9ab3098657e7daec0b3edc78e1d02418bc0871618d8ca89b51b74a8086bb71"}, "grpc_statsd": {:hex, :grpc_statsd, "0.1.0", "a95ae388188486043f92a3c5091c143f5a646d6af80c9da5ee616546c4d8f5ff", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:statix, ">= 0.0.0", [hex: :statix, repo: "hexpm", optional: true]}], "hexpm", "de0c05db313c7b3ffeff345855d173fd82fec3de16591a126b673f7f698d9e74"}, "gun": {:hex, :grpc_gun, "2.0.1", "221b792df3a93e8fead96f697cbaf920120deacced85c6cd3329d2e67f0871f8", [:rebar3], [{:cowlib, "~> 2.11", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "795a65eb9d0ba16697e6b0e1886009ce024799e43bb42753f0c59b029f592831"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, + "mint": {:hex, :mint, "1.4.2", "50330223429a6e1260b2ca5415f69b0ab086141bc76dc2fbf34d7c389a6675b2", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "ce75a5bbcc59b4d7d8d70f8b2fc284b1751ffb35c7b6a6302b5192f8ab4ddd80"}, "prometheus": {:hex, :prometheus, "4.2.2", "a830e77b79dc6d28183f4db050a7cac926a6c58f1872f9ef94a35cd989aceef8", [:mix, :rebar3], [], "hexpm", "b479a33d4aa4ba7909186e29bb6c1240254e0047a8e2a9f88463f50c0089370e"}, "prometheus_ex": {:hex, :prometheus_ex, "3.0.5", "fa58cfd983487fc5ead331e9a3e0aa622c67232b3ec71710ced122c4c453a02f", [:mix], [{:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm", "9fd13404a48437e044b288b41f76e64acd9735fb8b0e3809f494811dfa66d0fb"}, "prometheus_httpd": {:hex, :prometheus_httpd, "2.1.11", "f616ed9b85b536b195d94104063025a91f904a4cfc20255363f49a197d96c896", [:rebar3], [{:accept, "~> 0.3", [hex: :accept, repo: "hexpm", optional: false]}, {:prometheus, "~> 4.2", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm", "0bbe831452cfdf9588538eb2f570b26f30c348adae5e95a7d87f35a5910bcf92"}, diff --git a/interop/script/run.exs b/interop/script/run.exs index 99d334e7..8806229d 100644 --- a/interop/script/run.exs +++ b/interop/script/run.exs @@ -12,50 +12,49 @@ Logger.configure(level: level) Logger.info("Rounds: #{rounds}; concurrency: #{concurrency}; port: #{port}") +alias GRPC.Client.Adapters.Gun +alias GRPC.Client.Adapters.Mint alias Interop.Client {:ok, _pid, port} = GRPC.Server.start_endpoint(Interop.Endpoint, port) -1..concurrency -|> Task.async_stream(fn _cli -> - ch = Client.connect("127.0.0.1", port, interceptors: [GRPCPrometheus.ClientInterceptor, GRPC.Client.Interceptors.Logger]) - - for _ <- 1..rounds do - Client.empty_unary!(ch) - Client.cacheable_unary!(ch) - Client.large_unary!(ch) - Client.large_unary2!(ch) - Client.client_compressed_unary!(ch) - Client.server_compressed_unary!(ch) - Client.client_streaming!(ch) - Client.client_compressed_streaming!(ch) - Client.server_streaming!(ch) - Client.server_compressed_streaming!(ch) - Client.ping_pong!(ch) - Client.empty_stream!(ch) - Client.custom_metadata!(ch) - Client.status_code_and_message!(ch) - Client.unimplemented_service!(ch) - Client.cancel_after_begin!(ch) - Client.cancel_after_first_response!(ch) - Client.timeout_on_sleeping_server!(ch) +defmodule InteropTestRunner do + def run(_cli, adapter, port, rounds) do + opts = [interceptors: [GRPCPrometheus.ClientInterceptor, GRPC.Client.Interceptors.Logger], adapter: adapter] + ch = Client.connect("127.0.0.1", port, opts) + + for _ <- 1..rounds do + Client.empty_unary!(ch) + Client.cacheable_unary!(ch) + Client.large_unary!(ch) + Client.large_unary2!(ch) + Client.client_compressed_unary!(ch) + Client.server_compressed_unary!(ch) + Client.client_streaming!(ch) + Client.client_compressed_streaming!(ch) + Client.server_streaming!(ch) + Client.server_compressed_streaming!(ch) + Client.ping_pong!(ch) + Client.empty_stream!(ch) + Client.custom_metadata!(ch) + Client.status_code_and_message!(ch) + Client.unimplemented_service!(ch) + Client.cancel_after_begin!(ch) + Client.cancel_after_first_response!(ch) + Client.timeout_on_sleeping_server!(ch) + end + :ok end - :ok -end, max_concurrency: concurrency, ordered: false, timeout: :infinity) -|> Enum.to_list() - -# defmodule Helper do -# def flush() do -# receive do -# msg -> -# IO.inspect(msg) -# flush() -# after -# 0 -> :ok -# end -# end -# end -# Helper.flush() +end + +for adapter <- [Gun, Mint] do + Logger.info("Starting run for adapter: #{adapter}") + args = [adapter, port, rounds] + stream_opts = [max_concurrency: concurrency, ordered: false, timeout: :infinity] + 1..concurrency + |> Task.async_stream(InteropTestRunner, :run, args, stream_opts) + |> Enum.to_list() +end Logger.info("Succeed!") :ok = GRPC.Server.stop_endpoint(Interop.Endpoint) diff --git a/lib/grpc/client/adapter.ex b/lib/grpc/client/adapter.ex index b1f684a6..5160d55f 100644 --- a/lib/grpc/client/adapter.ex +++ b/lib/grpc/client/adapter.ex @@ -22,4 +22,30 @@ defmodule GRPC.Client.Adapter do """ @callback receive_data(stream :: Stream.t(), opts :: keyword()) :: GRPC.Stub.receive_data_return() | {:error, any()} + + @doc """ + This callback is used to open a stream connection to the server. + Mostly used when the payload for this request is streamed. + To send data using the open stream request, you should use `send_data/3` + """ + @callback send_headers(stream :: Stream.t(), opts :: keyword()) :: Stream.t() + + @doc """ + This callback will be responsible to send data to the server on a stream + request is open using `send_headers/2` + Opts: + - :send_end_stream (optional) - ends the request stream + """ + @callback send_data(stream :: Stream.t(), message :: binary(), opts :: keyword()) :: Stream.t() + + @doc """ + Similarly to the option sent on `send_data/2` - :send_end_stream - + this callback will end request stream + """ + @callback end_stream(stream :: Stream.t()) :: Stream.t() + + @doc """ + Cancel a stream in a streaming client. + """ + @callback cancel(stream :: Stream.t()) :: :ok | {:error, any()} end diff --git a/lib/grpc/client/adapters/gun.ex b/lib/grpc/client/adapters/gun.ex index b602b704..47f12843 100644 --- a/lib/grpc/client/adapters/gun.ex +++ b/lib/grpc/client/adapters/gun.ex @@ -107,6 +107,7 @@ defmodule GRPC.Client.Adapters.Gun do :gun.post(conn_pid, path, headers, data) end + @impl true def send_headers( %{channel: %{adapter_payload: %{conn_pid: conn_pid}}, path: path} = stream, opts @@ -116,6 +117,7 @@ defmodule GRPC.Client.Adapters.Gun do GRPC.Client.Stream.put_payload(stream, :stream_ref, stream_ref) end + @impl true def send_data(%{channel: channel, payload: %{stream_ref: stream_ref}} = stream, message, opts) do conn_pid = channel.adapter_payload[:conn_pid] fin = if opts[:send_end_stream], do: :fin, else: :nofin @@ -124,13 +126,20 @@ defmodule GRPC.Client.Adapters.Gun do stream end + @impl true def end_stream(%{channel: channel, payload: %{stream_ref: stream_ref}} = stream) do conn_pid = channel.adapter_payload[:conn_pid] :gun.data(conn_pid, stream_ref, :fin, "") stream end - def cancel(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}) do + @impl true + def cancel(stream) do + %{ + channel: %{adapter_payload: %{conn_pid: conn_pid}}, + payload: %{stream_ref: stream_ref} + } = stream + :gun.cancel(conn_pid, stream_ref) end diff --git a/lib/grpc/client/adapters/mint.ex b/lib/grpc/client/adapters/mint.ex new file mode 100644 index 00000000..eea1c8a2 --- /dev/null +++ b/lib/grpc/client/adapters/mint.ex @@ -0,0 +1,212 @@ +defmodule GRPC.Client.Adapters.Mint do + @moduledoc """ + A client adapter using mint + """ + + alias GRPC.Channel + alias GRPC.Client.Adapters.Mint.ConnectionProcess + alias GRPC.Client.Adapters.Mint.StreamResponseProcess + alias GRPC.Credential + + @behaviour GRPC.Client.Adapter + + @default_connect_opts [protocols: [:http2]] + @default_transport_opts [timeout: :infinity] + + @impl true + def connect(%{host: host, port: port} = channel, opts \\ []) do + opts = Keyword.merge(@default_connect_opts, connect_opts(channel, opts)) + Process.flag(:trap_exit, true) + + channel + |> mint_scheme() + |> ConnectionProcess.start_link(host, port, opts) + |> case do + {:ok, pid} -> + {:ok, %{channel | adapter_payload: %{conn_pid: pid}}} + + error -> + {:error, "Error while opening connection: #{inspect(error)}"} + end + catch + :exit, reason -> + {:error, "Error while opening connection: #{inspect(reason)}"} + end + + @impl true + def disconnect(%{adapter_payload: %{conn_pid: pid}} = channel) + when is_pid(pid) do + :ok = ConnectionProcess.disconnect(pid) + {:ok, %{channel | adapter_payload: nil}} + end + + def disconnect(%{adapter_payload: nil} = channel) do + {:ok, channel} + end + + @impl true + def send_request(%{channel: %{adapter_payload: nil}}, _message, _opts), + do: raise(ArgumentError, "Can't perform a request without a connection process") + + def send_request(stream, message, opts) do + {:ok, data, _} = GRPC.Message.to_data(message, opts) + do_request(stream, opts, data) + end + + @impl true + def receive_data(stream, opts) do + if success_response?(stream) do + do_receive_data(stream, stream.grpc_type, opts) + else + handle_errors_receive_data(stream, opts) + end + end + + @impl true + def send_headers(%{channel: %{adapter_payload: nil}}, _opts), + do: raise("Can't start a client stream without a connection process") + + def send_headers(stream, opts) do + do_request(stream, opts, :stream) + end + + @impl true + def send_data( + %{ + channel: %{adapter_payload: %{conn_pid: pid}}, + payload: %{response: {:ok, %{request_ref: request_ref}}} + } = stream, + message, + opts + ) do + {:ok, data, _} = GRPC.Message.to_data(message, opts) + :ok = ConnectionProcess.stream_request_body(pid, request_ref, data) + if opts[:send_end_stream], do: ConnectionProcess.stream_request_body(pid, request_ref, :eof) + stream + end + + @impl true + def end_stream( + %{ + channel: %{adapter_payload: %{conn_pid: pid}}, + payload: %{response: {:ok, %{request_ref: request_ref}}} + } = stream + ) do + ConnectionProcess.stream_request_body(pid, request_ref, :eof) + stream + end + + @impl true + def cancel(stream) do + %{ + channel: %{adapter_payload: %{conn_pid: conn_pid}}, + payload: %{response: {:ok, %{request_ref: request_ref}}} + } = stream + + ConnectionProcess.cancel(conn_pid, request_ref) + end + + defp connect_opts(%Channel{scheme: "https"} = channel, opts) do + %Credential{ssl: ssl} = Map.get(channel, :cred) || %Credential{} + + transport_opts = + opts + |> Keyword.get(:transport_opts, []) + |> Keyword.merge(ssl) + + [transport_opts: Keyword.merge(@default_transport_opts, transport_opts)] + end + + defp connect_opts(_channel, opts) do + transport_opts = Keyword.get(opts, :transport_opts, []) + [transport_opts: Keyword.merge(@default_transport_opts, transport_opts)] + end + + defp mint_scheme(%Channel{scheme: "https"} = _channel), do: :https + defp mint_scheme(_channel), do: :http + + defp do_receive_data(%{payload: %{stream_response_pid: pid}}, request_type, opts) + when request_type in [:bidirectional_stream, :server_stream] do + produce_trailers? = opts[:return_headers] == true + stream = StreamResponseProcess.build_stream(pid, produce_trailers?) + headers_or_error = Enum.at(stream, 0) + # if this check fails then an error tuple will be returned + with {:headers, headers} <- headers_or_error do + if opts[:return_headers] do + {:ok, stream, %{headers: headers}} + else + {:ok, stream} + end + end + end + + defp do_receive_data( + %{payload: %{stream_response_pid: pid}}, + request_type, + opts + ) + when request_type in [:client_stream, :unary] do + responses = pid |> StreamResponseProcess.build_stream() |> Enum.to_list() + + with :ok <- check_for_error(responses) do + data = Keyword.fetch!(responses, :ok) + + if opts[:return_headers] do + {:ok, data, get_headers_and_trailers(responses)} + else + {:ok, data} + end + end + end + + def handle_errors_receive_data(%GRPC.Client.Stream{payload: %{response: response}}, _opts) do + {:error, "Error occurred while receiving data: #{inspect(response)}"} + end + + defp success_response?(%GRPC.Client.Stream{ + payload: %{response: {:ok, _resp}} + }), + do: true + + defp success_response?(_stream), do: false + + defp do_request( + %{channel: %{adapter_payload: %{conn_pid: pid}}, path: path} = stream, + opts, + body + ) do + headers = GRPC.Transport.HTTP2.client_headers_without_reserved(stream, opts) + + {:ok, stream_response_pid} = + StreamResponseProcess.start_link(stream, return_headers_for_request?(stream, opts)) + + response = + ConnectionProcess.request(pid, "POST", path, headers, body, + stream_response_pid: stream_response_pid + ) + + stream + |> GRPC.Client.Stream.put_payload(:response, response) + |> GRPC.Client.Stream.put_payload(:stream_response_pid, stream_response_pid) + end + + defp get_headers_and_trailers(responses) do + %{headers: Keyword.get(responses, :headers), trailers: Keyword.get(responses, :trailers)} + end + + def check_for_error(responses) do + error = Keyword.get(responses, :error) + + if error, do: {:error, error}, else: :ok + end + + defp return_headers_for_request?(%GRPC.Client.Stream{grpc_type: type}, _opts) + when type in [:bidirectional_stream, :server_stream] do + true + end + + defp return_headers_for_request?(_stream, opts) do + # Explicitly check for true to ensure the boolean type here + opts[:return_headers] == true + end +end diff --git a/lib/grpc/client/adapters/mint/connection_process/connection_process.ex b/lib/grpc/client/adapters/mint/connection_process/connection_process.ex new file mode 100644 index 00000000..0fab29d9 --- /dev/null +++ b/lib/grpc/client/adapters/mint/connection_process/connection_process.ex @@ -0,0 +1,400 @@ +defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do + @moduledoc false + + # This module is responsible for managing a connection with a gRPC server. + # It's also responsible for managing requests, which also includes checks for the + # connection/request window size, splitting a given payload into appropriate sized chunks + # and streaming those to the server using an internal queue. + + use GenServer + + alias GRPC.Client.Adapters.Mint.ConnectionProcess.State + alias GRPC.Client.Adapters.Mint.StreamResponseProcess + + require Logger + + @connection_closed_error "the connection is closed" + + @doc """ + Starts and link connection process + """ + @spec start_link(Mint.Types.scheme(), Mint.Types.address(), :inet.port_number(), keyword()) :: + GenServer.on_start() + def start_link(scheme, host, port, opts \\ []) do + opts = Keyword.put(opts, :parent, self()) + GenServer.start_link(__MODULE__, {scheme, host, port, opts}) + end + + @doc """ + Sends a request to the connected server. + + ## Options + + * :stream_response_pid (required) - the process to where send the responses coming from the connection will be sent to be processed + """ + @spec request( + pid :: pid(), + method :: String.t(), + path :: String.t(), + Mint.Types.headers(), + body :: iodata() | nil | :stream, + opts :: keyword() + ) :: {:ok, %{request_ref: Mint.Types.request_ref()}} | {:error, Mint.Types.error()} + def request(pid, method, path, headers, body, opts \\ []) do + GenServer.call(pid, {:request, method, path, headers, body, opts}) + end + + @doc """ + Closes the given connection. + """ + @spec disconnect(pid :: pid()) :: :ok + def disconnect(pid) do + GenServer.call(pid, :disconnect) + end + + @doc """ + Streams a chunk of the request body on the connection or signals the end of the body. + """ + @spec stream_request_body( + pid(), + Mint.Types.request_ref(), + iodata() | :eof | {:eof, trailing_headers :: Mint.Types.headers()} + ) :: :ok | {:error, Mint.Types.error()} + def stream_request_body(pid, request_ref, body) do + GenServer.call(pid, {:stream_body, request_ref, body}) + end + + @doc """ + cancels an open request request + """ + @spec cancel(pid(), Mint.Types.request_ref()) :: :ok | {:error, Mint.Types.error()} + def cancel(pid, request_ref) do + GenServer.call(pid, {:cancel_request, request_ref}) + end + + ## Callbacks + + @impl true + def init({scheme, host, port, opts}) do + case Mint.HTTP.connect(scheme, host, port, opts) do + {:ok, conn} -> + {:ok, State.new(conn, opts[:parent])} + + {:error, reason} -> + Logger.error("unable to establish a connection. reason: #{inspect(reason)}") + {:stop, reason} + end + catch + :exit, reason -> + Logger.error("unable to establish a connection. reason: #{inspect(reason)}") + {:stop, reason} + end + + @impl true + def handle_call(:disconnect, _from, state) do + # TODO add a code to if disconnect is brutal we just stop if is friendly we wait for pending requests + {:ok, conn} = Mint.HTTP.close(state.conn) + {:stop, :normal, :ok, State.update_conn(state, conn)} + end + + def handle_call(_request, _from, %{conn: %Mint.HTTP2{state: :closed}} = state) do + {:reply, {:error, "the connection is closed"}, state} + end + + def handle_call( + {:request, method, path, headers, :stream, opts}, + _from, + state + ) do + case Mint.HTTP.request(state.conn, method, path, headers, :stream) do + {:ok, conn, request_ref} -> + new_state = + state + |> State.update_conn(conn) + |> State.put_empty_ref_state(request_ref, opts[:stream_response_pid]) + + {:reply, {:ok, %{request_ref: request_ref}}, new_state} + + {:error, conn, reason} -> + new_state = State.update_conn(state, conn) + {:reply, {:error, reason}, new_state} + end + end + + def handle_call( + {:request, method, path, headers, body, opts}, + _from, + state + ) do + case Mint.HTTP.request(state.conn, method, path, headers, :stream) do + {:ok, conn, request_ref} -> + queue = :queue.in({request_ref, body, nil}, state.request_stream_queue) + + new_state = + state + |> State.update_conn(conn) + |> State.update_request_stream_queue(queue) + |> State.put_empty_ref_state(request_ref, opts[:stream_response_pid]) + + {:reply, {:ok, %{request_ref: request_ref}}, new_state, + {:continue, :process_request_stream_queue}} + + {:error, conn, reason} -> + new_state = State.update_conn(state, conn) + {:reply, {:error, reason}, new_state} + end + end + + def handle_call({:stream_body, request_ref, :eof}, _from, state) do + case Mint.HTTP.stream_request_body(state.conn, request_ref, :eof) do + {:ok, conn} -> + {:reply, :ok, State.update_conn(state, conn)} + + {:error, conn, error} -> + {:reply, {:error, error}, State.update_conn(state, conn)} + end + end + + def handle_call({:stream_body, request_ref, body}, from, state) do + queue = :queue.in({request_ref, body, from}, state.request_stream_queue) + + {:noreply, State.update_request_stream_queue(state, queue), + {:continue, :process_request_stream_queue}} + end + + def handle_call({:cancel_request, request_ref}, _from, state) do + state = process_response({:done, request_ref}, state) + + case Mint.HTTP2.cancel_request(state.conn, request_ref) do + {:ok, conn} -> {:reply, :ok, State.update_conn(state, conn)} + {:error, conn, error} -> {:reply, {:error, error}, State.update_conn(state, conn)} + end + end + + @impl true + def handle_info(message, state) do + case Mint.HTTP.stream(state.conn, message) do + :unknown -> + Logger.debug(fn -> "Received unknown message: " <> inspect(message) end) + {:noreply, state} + + {:ok, conn, responses} -> + state = State.update_conn(state, conn) + + state = + case state.requests do + requests when map_size(requests) == 0 -> + state + + _ -> + Enum.reduce(responses, state, &process_response/2) + end + + check_connection_status(state) + + {:error, conn, _error, _responses} -> + state = State.update_conn(state, conn) + check_connection_status(state) + end + end + + @impl true + def handle_continue(:process_request_stream_queue, state) do + {{:value, request}, queue} = :queue.out(state.request_stream_queue) + {ref, body, _from} = request + window_size = get_window_size(state.conn, ref) + dequeued_state = State.update_request_stream_queue(state, queue) + + cond do + # Do nothing, wait for server (on stream/2) to give us more window size + window_size == 0 -> {:noreply, state} + IO.iodata_length(body) > window_size -> chunk_body_and_enqueue_rest(request, dequeued_state) + true -> stream_body_and_reply(request, dequeued_state) + end + end + + @impl true + def terminate(_reason, _state) do + :normal + end + + defp process_response({:status, request_ref, status}, state) do + State.update_response_status(state, request_ref, status) + end + + defp process_response({:headers, request_ref, headers}, state) do + if State.empty_headers?(state, request_ref) do + new_state = State.update_response_headers(state, request_ref, headers) + + :ok = + new_state + |> State.stream_response_pid(request_ref) + |> StreamResponseProcess.consume(:headers, headers) + + new_state + else + :ok = + state + |> State.stream_response_pid(request_ref) + |> StreamResponseProcess.consume(:trailers, headers) + + state + end + end + + defp process_response({:data, request_ref, new_data}, state) do + :ok = + state + |> State.stream_response_pid(request_ref) + |> StreamResponseProcess.consume(:data, new_data) + + state + end + + defp process_response({:done, request_ref}, state) do + :ok = + state + |> State.stream_response_pid(request_ref) + |> StreamResponseProcess.done() + + {_ref, new_state} = State.pop_ref(state, request_ref) + new_state + end + + defp chunk_body_and_enqueue_rest({request_ref, body, from}, state) do + {head, tail} = chunk_body(body, get_window_size(state.conn, request_ref)) + + case Mint.HTTP.stream_request_body(state.conn, request_ref, head) do + {:ok, conn} -> + queue = :queue.in_r({request_ref, tail, from}, state.request_stream_queue) + + new_state = + state + |> State.update_conn(conn) + |> State.update_request_stream_queue(queue) + + {:noreply, new_state} + + {:error, conn, error} -> + if from != nil do + # We need an explicit reply here because the process that called this GenServer + # isn't the same one that's expecting the reply. + GenServer.reply(from, {:error, error}) + else + :ok = + state + |> State.stream_response_pid(request_ref) + |> StreamResponseProcess.consume(:error, error) + end + + {:noreply, State.update_conn(state, conn)} + end + end + + defp stream_body_and_reply({request_ref, body, from}, state) do + send_eof? = is_nil(from) + + case stream_body(state.conn, request_ref, body, send_eof?) do + {:ok, conn} -> + if not send_eof? do + GenServer.reply(from, :ok) + end + + check_request_stream_queue(State.update_conn(state, conn)) + + {:error, conn, error} -> + if not send_eof? do + GenServer.reply(from, {:error, error}) + else + :ok = + state + |> State.stream_response_pid(request_ref) + |> StreamResponseProcess.consume(:error, error) + end + + check_request_stream_queue(State.update_conn(state, conn)) + end + end + + defp stream_body(conn, request_ref, body, true = _stream_eof?) do + case Mint.HTTP.stream_request_body(conn, request_ref, body) do + {:ok, conn} -> Mint.HTTP.stream_request_body(conn, request_ref, :eof) + error -> error + end + end + + defp stream_body(conn, request_ref, body, false = _stream_eof?) do + Mint.HTTP.stream_request_body(conn, request_ref, body) + end + + def check_request_stream_queue(state) do + if :queue.is_empty(state.request_stream_queue) do + {:noreply, state} + else + {:noreply, state, {:continue, :process_request_stream_queue}} + end + end + + defp chunk_body(body, bytes_length) do + case body do + <> -> {head, tail} + _other -> {body, <<>>} + end + end + + def get_window_size(conn, ref) do + min( + Mint.HTTP2.get_window_size(conn, {:request, ref}), + Mint.HTTP2.get_window_size(conn, :connection) + ) + end + + defp finish_all_pending_requests(state) do + new_state = + state.request_stream_queue + |> :queue.to_list() + |> Enum.reduce(state, fn {request_ref, _, _} = request, acc_state -> + case request do + {ref, _body, nil} -> + acc_state + |> State.stream_response_pid(ref) + |> send_connection_close_and_end_stream_response() + + {ref, _body, from} -> + acc_state + |> State.stream_response_pid(ref) + |> send_connection_close_and_end_stream_response() + + GenServer.reply(from, {:error, @connection_closed_error}) + end + + {_request_data, new_state} = State.pop_ref(acc_state, request_ref) + new_state + end) + + # Inform the parent that the connection is down + send(new_state.parent, {:elixir_grpc, :connection_down, self()}) + + new_state.requests + |> Enum.each(fn {ref, _} -> + new_state + |> State.stream_response_pid(ref) + |> send_connection_close_and_end_stream_response() + end) + + {:noreply, State.update_request_stream_queue(%{new_state | requests: %{}}, :queue.new())} + end + + defp send_connection_close_and_end_stream_response(pid) do + :ok = StreamResponseProcess.consume(pid, :error, @connection_closed_error) + :ok = StreamResponseProcess.done(pid) + end + + defp check_connection_status(state) do + if Mint.HTTP.open?(state.conn) do + check_request_stream_queue(state) + else + finish_all_pending_requests(state) + end + end +end diff --git a/lib/grpc/client/adapters/mint/connection_process/state.ex b/lib/grpc/client/adapters/mint/connection_process/state.ex new file mode 100644 index 00000000..bad73233 --- /dev/null +++ b/lib/grpc/client/adapters/mint/connection_process/state.ex @@ -0,0 +1,55 @@ +defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do + @moduledoc false + + defstruct [:conn, :parent, requests: %{}, request_stream_queue: :queue.new()] + + @type t :: %__MODULE__{ + conn: Mint.HTTP.t(), + requests: map(), + parent: pid() + } + + def new(conn, parent) do + %__MODULE__{conn: conn, request_stream_queue: :queue.new(), parent: parent} + end + + def update_conn(state, conn) do + %{state | conn: conn} + end + + def update_request_stream_queue(state, queue) do + %{state | request_stream_queue: queue} + end + + def put_empty_ref_state(state, ref, response_pid) do + put_in(state.requests[ref], %{ + stream_response_pid: response_pid, + done: false, + response: %{} + }) + end + + def update_response_status(state, ref, status) do + put_in(state.requests[ref].response[:status], status) + end + + def update_response_headers(state, ref, headers) do + put_in(state.requests[ref].response[:headers], headers) + end + + def empty_headers?(state, ref) do + is_nil(state.requests[ref].response[:headers]) + end + + def stream_response_pid(state, ref) do + state.requests[ref].stream_response_pid + end + + def pop_ref(state, ref) do + pop_in(state.requests[ref]) + end + + def append_response_data(state, ref, new_data) do + update_in(state.requests[ref].response[:data], fn data -> (data || "") <> new_data end) + end +end diff --git a/lib/grpc/client/adapters/mint/stream_response_process.ex b/lib/grpc/client/adapters/mint/stream_response_process.ex new file mode 100644 index 00000000..249bd280 --- /dev/null +++ b/lib/grpc/client/adapters/mint/stream_response_process.ex @@ -0,0 +1,205 @@ +defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do + @moduledoc false + # This module represents the process responsible for consuming the + # incoming messages from a connection. For each request, there will be + # a process responsible for consuming its messages. At the end of a stream + # this process will automatically be killed. + + # TODO: Refactor the GenServer.call/3 occurrences on this module to produce + # telemetry events and log entries in case of failures + + @typep accepted_types :: :data | :trailers | :headers | :error + @typep data_types :: binary() | Mint.Types.headers() | Mint.Types.error() + + @accepted_types [:data, :trailers, :headers, :error] + @header_types [:headers, :trailers] + + use GenServer + + @spec start_link(GRPC.Client.Stream.t(), send_headers_or_trailers? :: boolean()) :: + GenServer.on_start() + def start_link(stream, send_headers_or_trailers?) do + GenServer.start_link(__MODULE__, {stream, send_headers_or_trailers?}) + end + + @doc """ + Given a pid from this process, build an Elixir.Stream that will consume the accumulated + data inside this process + """ + @spec build_stream(pid(), produce_trailers? :: boolean) :: Enumerable.t() + def build_stream(pid, produce_trailers? \\ true) do + Stream.unfold(pid, fn pid -> + pid + |> GenServer.call(:get_response, :infinity) + |> process_response(produce_trailers?, pid) + end) + end + + defp process_response(nil = _response, _produce_trailers, _pid), do: nil + + defp process_response({:trailers, _trailers}, false = produce_trailers?, pid) do + pid + |> GenServer.call(:get_response, :infinity) + |> process_response(produce_trailers?, pid) + end + + defp process_response(response, _produce_trailers, pid) do + {response, pid} + end + + @doc """ + Cast a message to process to inform that the stream has finished + once all messages are produced. This process will automatically + be killed. + """ + @spec done(pid()) :: :ok + def done(pid) do + :ok = GenServer.call(pid, {:consume_response, :done}) + :ok + end + + @doc """ + Consume an incoming data or trailers/headers + """ + @spec consume(pid(), type :: accepted_types, data :: data_types) :: :ok + def consume(pid, type, data) when type in @accepted_types do + :ok = GenServer.call(pid, {:consume_response, {type, data}}) + :ok + end + + # Callbacks + + @impl true + def init({stream, send_headers_or_trailers?}) do + state = %{ + grpc_stream: stream, + send_headers_or_trailers: send_headers_or_trailers?, + buffer: <<>>, + responses: [], + done: false, + from: nil, + compressor: nil + } + + {:ok, state} + end + + @impl true + def handle_call(:get_response, from, state) do + {:noreply, put_in(state[:from], from), {:continue, :produce_response}} + end + + def handle_call({:consume_response, {:data, data}}, _from, state) do + %{ + buffer: buffer, + grpc_stream: %{response_mod: res_mod, codec: codec}, + responses: responses + } = state + + case GRPC.Message.get_message(buffer <> data, state.compressor) do + {{_, message}, rest} -> + # TODO add code here to handle compressor headers + response = codec.decode(message, res_mod) + new_responses = [{:ok, response} | responses] + new_state = %{state | buffer: rest, responses: new_responses} + {:reply, :ok, new_state, {:continue, :produce_response}} + + _ -> + new_state = %{state | buffer: buffer <> data} + {:reply, :ok, new_state, {:continue, :produce_response}} + end + end + + def handle_call( + {:consume_response, {type, headers}}, + _from, + %{send_headers_or_trailers: true, responses: responses} = state + ) + when type in @header_types do + state = update_compressor({type, headers}, state) + new_responses = [get_headers_response(headers, type) | responses] + {:reply, :ok, %{state | responses: new_responses}, {:continue, :produce_response}} + end + + def handle_call( + {:consume_response, {type, headers}}, + _from, + %{send_headers_or_trailers: false, responses: responses} = state + ) + when type in @header_types do + state = update_compressor({type, headers}, state) + + case get_headers_response(headers, type) do + {:error, _rpc_error} = error -> + {:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}} + + _any -> + {:reply, :ok, state, {:continue, :produce_response}} + end + end + + def handle_call( + {:consume_response, {:error, _error} = error}, + _from, + %{responses: responses} = state + ) do + {:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}} + end + + def handle_call({:consume_response, :done}, _from, state) do + {:reply, :ok, %{state | done: true}, {:continue, :produce_response}} + end + + @impl true + def handle_continue(:produce_response, state) do + case state do + %{from: nil} -> + {:noreply, state} + + %{from: from, responses: [], done: true} -> + GenServer.reply(from, nil) + {:stop, :normal, state} + + %{responses: []} -> + {:noreply, state} + + %{responses: [response | rest], from: from} -> + GenServer.reply(from, response) + {:noreply, %{state | responses: rest, from: nil}} + end + end + + defp get_headers_response(headers, type) do + decoded_trailers = GRPC.Transport.HTTP2.decode_headers(headers) + status = String.to_integer(decoded_trailers["grpc-status"] || "0") + + if status == GRPC.Status.ok() do + {type, decoded_trailers} + else + rpc_error = %GRPC.RPCError{status: status, message: decoded_trailers["grpc-message"]} + {:error, rpc_error} + end + end + + defp update_compressor({:headers, headers}, state) do + decoded_trailers = GRPC.Transport.HTTP2.decode_headers(headers) + + compressor = + get_compressor(decoded_trailers["grpc-encoding"], state.grpc_stream.accepted_compressors) + + %{state | compressor: compressor} + end + + defp update_compressor(_headers, state), do: state + + defp get_compressor(nil = _encoding_name, _accepted_compressors), do: nil + + defp get_compressor(encoding_name, accepted_compressors) do + Enum.find(accepted_compressors, nil, fn c -> c.name() == encoding_name end) + end + + @impl true + def terminate(_reason, _state) do + :normal + end +end diff --git a/lib/grpc/credential.ex b/lib/grpc/credential.ex index 1df78ea7..db15d368 100644 --- a/lib/grpc/credential.ex +++ b/lib/grpc/credential.ex @@ -17,7 +17,7 @@ defmodule GRPC.Credential do """ @type t :: %__MODULE__{ssl: [:ssl.tls_option()]} - defstruct [:ssl] + defstruct ssl: [] @doc """ Creates credential. diff --git a/lib/grpc/message.ex b/lib/grpc/message.ex index 2961cbb2..03ac2e33 100644 --- a/lib/grpc/message.ex +++ b/lib/grpc/message.ex @@ -172,4 +172,28 @@ defmodule GRPC.Message do def get_message(_) do false end + + def get_message(data, nil = _compressor) do + case data do + <> -> + {{flag, message}, rest} + + _other -> + data + end + end + + def get_message(data, compressor) do + case data do + <<1::8, length::unsigned-integer-32, message::bytes-size(length), rest::binary>> -> + {{1, compressor.decompress(message)}, rest} + + <<0::8, length::unsigned-integer-32, message::bytes-size(length), rest::binary>> -> + {{0, message}, rest} + + _other -> + data + end + end end diff --git a/lib/grpc/service.ex b/lib/grpc/service.ex index 5357b6c2..9eaf717c 100644 --- a/lib/grpc/service.ex +++ b/lib/grpc/service.ex @@ -57,5 +57,5 @@ defmodule GRPC.Service do def grpc_type({_, {_, false}, {_, false}}), do: :unary def grpc_type({_, {_, true}, {_, false}}), do: :client_stream def grpc_type({_, {_, false}, {_, true}}), do: :server_stream - def grpc_type({_, {_, true}, {_, true}}), do: :bidi_stream + def grpc_type({_, {_, true}, {_, true}}), do: :bidirectional_stream end diff --git a/lib/grpc/stub.ex b/lib/grpc/stub.ex index 3aec8dd8..0efec677 100644 --- a/lib/grpc/stub.ex +++ b/lib/grpc/stub.ex @@ -218,26 +218,24 @@ defmodule GRPC.Stub do adapter.disconnect(channel) end - @doc """ - The actual function invoked when invoking a rpc function. - - ## Returns - - * Unary calls. `{:ok, reply} | {:ok, headers_map} | {:error, error}` - * Client streaming. A `GRPC.Client.Stream` - * Server streaming. `{:ok, Enumerable.t} | {:ok, Enumerable.t, trailers_map} | {:error, error}` - - ## Options - - * `:timeout` - request timeout. Default is 10s for unary calls and `:infinity` for - client or server streaming calls - * `:deadline` - when the request is timeout, will override timeout - * `:metadata` - a map, your custom metadata - * `:return_headers` - default is false. When it's true, a three elem tuple will be returned - with the last elem being a map of headers `%{headers: headers, trailers: trailers}`(unary) or - `%{headers: headers}`(server streaming) - """ - @spec call(atom(), tuple(), GRPC.Client.Stream.t(), struct() | nil, keyword()) :: rpc_return + @doc false + # # The actual function invoked when invoking an RPC function. + # + # Returns + # + # * Unary calls. `{:ok, reply} | {:ok, headers_map} | {:error, error}` + # * Client streaming. A `GRPC.Client.Stream` + # * Server streaming. `{:ok, Enumerable.t} | {:ok, Enumerable.t, trailers_map} | {:error, error}` + # + # Options + # + # * `:timeout` - request timeout. Default is 10s for unary calls and `:infinity` for + # client or server streaming calls + # * `:deadline` - when the request is timeout, will override timeout + # * `:metadata` - a map, your custom metadata + # * `:return_headers` - default is false. When it's true, a three elem tuple will be returned + # with the last elem being a map of headers `%{headers: headers, trailers: trailers}`(unary) or + # `%{headers: headers}`(server streaming) def call(_service_mod, rpc, %{channel: channel} = stream, request, opts) do {_, {req_mod, req_stream}, {res_mod, response_stream}} = rpc @@ -350,8 +348,8 @@ defmodule GRPC.Stub do After that, callings to `recv/2` will return a CANCEL error. """ - def cancel(%{channel: channel, payload: payload} = stream) do - case channel.adapter.cancel(channel.adapter_payload, payload) do + def cancel(%{channel: channel} = stream) do + case channel.adapter.cancel(stream) do :ok -> %{stream | canceled: true} other -> other end @@ -373,14 +371,32 @@ defmodule GRPC.Stub do {:ok, reply} = GRPC.Stub.recv(stream) # Reply is streaming - {:ok, enum} = GRPC.Stub.recv(stream) - replies = Enum.map(enum, fn({:ok, reply}) -> reply end) + {:ok, ex_stream} = GRPC.Stub.recv(stream) + replies = Enum.map(ex_stream, fn({:ok, reply}) -> reply end) ## Options * `:timeout` - request timeout * `:deadline` - when the request is timeout, will override timeout * `:return_headers` - when true, headers will be returned. + + ## Stream behavior + We build the Stream struct using `Stream.unfold/2`. + + The unfold function is built in such a way that - for both adapters - the accumulator is a map used to find the + `connection_stream`process and the `next_fun` argument is a function that reads directly from the `connection_stream` + that is producing data. + Every time we execute `next_fun` we read a chunk of data. This means that `next_fun` will have the side effect of updating the state of the `connection_stream` process, removing the chunk of data that's being read from the underlying `GenServer`'s state. + + + ## Examples + + iex> ex_stream |> Stream.take(1) |> Enum.to_list() + [1] + iex> ex_stream |> Enum.to_list() + [2, 3] + iex> ex_stream |> Enum.to_list() + [] """ @spec recv(GRPC.Client.Stream.t(), keyword()) :: {:ok, struct()} diff --git a/mix.exs b/mix.exs index 6dc7a1d0..8d8fbe74 100644 --- a/mix.exs +++ b/mix.exs @@ -43,10 +43,12 @@ defmodule GRPC.Mixfile do # This is the same as :gun 2.0.0-rc.2, # but we can't depend on an RC for releases {:gun, "~> 2.0.1", hex: :grpc_gun}, + {:mint, "~> 1.4.2"}, {:cowlib, "~> 2.11"}, {:protobuf, "~> 0.11", only: [:dev, :test]}, {:ex_doc, "~> 0.28.0", only: :dev}, - {:dialyxir, "~> 1.1.0", only: [:dev, :test], runtime: false} + {:dialyxir, "~> 1.1.0", only: [:dev, :test], runtime: false}, + {:ex_parameterized, "~> 1.3.7", only: :test} ] end diff --git a/mix.lock b/mix.lock index 37a8e4f0..157cc46b 100644 --- a/mix.lock +++ b/mix.lock @@ -6,11 +6,14 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.26", "f4291134583f373c7d8755566122908eb9662df4c4b63caa66a0eabe06569b0a", [:mix], [], "hexpm", "48d460899f8a0c52c5470676611c01f64f3337bad0b26ddab43648428d94aabc"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.28.4", "001a0ea6beac2f810f1abc3dbf4b123e9593eaa5f00dd13ded024eae7c523298", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bf85d003dd34911d89c8ddb8bda1a958af3471a274a4c2150a9c01c78ac3f8ed"}, + "ex_parameterized": {:hex, :ex_parameterized, "1.3.7", "801f85fc4651cb51f11b9835864c6ed8c5e5d79b1253506b5bb5421e8ab2f050", [:mix], [], "hexpm", "1fb0dc4aa9e8c12ae23806d03bcd64a5a0fc9cd3f4c5602ba72561c9b54a625c"}, "gun": {:hex, :grpc_gun, "2.0.1", "221b792df3a93e8fead96f697cbaf920120deacced85c6cd3329d2e67f0871f8", [:rebar3], [{:cowlib, "~> 2.11", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "795a65eb9d0ba16697e6b0e1886009ce024799e43bb42753f0c59b029f592831"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, + "mint": {:hex, :mint, "1.4.2", "50330223429a6e1260b2ca5415f69b0ab086141bc76dc2fbf34d7c389a6675b2", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "ce75a5bbcc59b4d7d8d70f8b2fc284b1751ffb35c7b6a6302b5192f8ab4ddd80"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, "protobuf": {:hex, :protobuf, "0.11.0", "58d5531abadea3f71135e97bd214da53b21adcdb5b1420aee63f4be8173ec927", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "30ad9a867a5c5a0616cac9765c4d2c2b7b0030fa81ea6d0c14c2eb5affb6ac52"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, diff --git a/test/grpc/adapter/gun_test.exs b/test/grpc/client/adapters/gun_test.exs similarity index 98% rename from test/grpc/adapter/gun_test.exs rename to test/grpc/client/adapters/gun_test.exs index b3d089e0..405a2a3d 100644 --- a/test/grpc/adapter/gun_test.exs +++ b/test/grpc/client/adapters/gun_test.exs @@ -1,7 +1,5 @@ defmodule GRPC.Client.Adapters.GunTest do - use ExUnit.Case, async: true - - import GRPC.Factory + use GRPC.DataCase, async: true alias GRPC.Client.Adapters.Gun diff --git a/test/grpc/client/adapters/mint/connection_process_test.exs b/test/grpc/client/adapters/mint/connection_process_test.exs new file mode 100644 index 00000000..8989a6ae --- /dev/null +++ b/test/grpc/client/adapters/mint/connection_process_test.exs @@ -0,0 +1,455 @@ +defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do + use GRPC.DataCase + alias GRPC.Client.Adapters.Mint.ConnectionProcess + alias GRPC.Client.Adapters.Mint.StreamResponseProcess + + import ExUnit.CaptureLog + + setup do + {:ok, _, port} = GRPC.Server.start(FeatureServer, 0) + + on_exit(fn -> + :ok = GRPC.Server.stop(FeatureServer) + end) + + %{port: port} + end + + describe "start_link/4" do + test "non-successful connection stops the connection process without exit it's caller" do + logs = + capture_log(fn -> + assert {:error, %Mint.TransportError{reason: :econnrefused}} == + ConnectionProcess.start_link(:http, "localhost", 12345) + end) + + assert logs =~ "unable to establish a connection" + end + + test "connects insecurely (default options)", %{port: port} do + {:ok, pid} = ConnectionProcess.start_link(:http, "localhost", port) + assert Process.alive?(pid) + end + end + + describe "disconnect/1" do + test "stop the process when disconnecting", %{port: port} do + {:ok, pid} = ConnectionProcess.start_link(:http, "localhost", port) + assert :ok == ConnectionProcess.disconnect(pid) + refute Process.alive?(pid) + end + + test "close the connection when disconnect", %{port: port} do + {:ok, pid} = ConnectionProcess.start_link(:http, "localhost", port) + state = :sys.get_state(pid) + + assert {:stop, :normal, :ok, new_state} = + ConnectionProcess.handle_call(:disconnect, nil, state) + + refute Mint.HTTP.open?(new_state.conn) + end + end + + describe "handle_call/2 - request - :stream" do + setup :valid_connection + + test "start stream request and put empty state for ref", %{ + request: {method, path, headers}, + state: state + } do + request = {:request, method, path, headers, :stream, [stream_response_pid: self()]} + response = ConnectionProcess.handle_call(request, nil, state) + + assert {:reply, {:ok, %{request_ref: request_ref}}, new_state} = response + assert is_reference(request_ref) + + assert %{ + stream_response_pid: self(), + done: false, + response: %{} + } == new_state.requests[request_ref] + + assert state.conn != new_state.conn + end + + test "returns error when connection is closed", %{ + request: {method, path, headers}, + state: state + } do + {:ok, conn} = Mint.HTTP.close(state.conn) + request = {:request, method, path, headers, :stream, [stream_response_pid: self()]} + response = ConnectionProcess.handle_call(request, nil, %{state | conn: conn}) + + assert {:reply, {:error, error}, new_state} = response + assert state.conn != new_state.conn + assert "the connection is closed" == error + end + + test "returns error response when mint returns an error when starting stream request", %{ + request: {method, path, headers}, + state: state + } do + # Simulates the server closing the connection before we update the state + {:ok, _conn} = Mint.HTTP.close(state.conn) + + request = {:request, method, path, headers, :stream, [stream_response_pid: self()]} + response = ConnectionProcess.handle_call(request, nil, state) + + assert {:reply, {:error, error}, new_state} = response + assert state.conn == new_state.conn + assert %Mint.TransportError{__exception__: true, reason: :closed} == error + end + end + + describe "handle_call/2 - request - payload" do + setup :valid_connection + + test "start stream request, enqueue payload to be process and continue", %{ + request: {method, path, headers}, + state: state + } do + body = <<1, 2, 3>> + request = {:request, method, path, headers, body, [stream_response_pid: self()]} + response = ConnectionProcess.handle_call(request, nil, state) + + assert {:reply, {:ok, %{request_ref: request_ref}}, new_state, + {:continue, :process_request_stream_queue}} = response + + assert is_reference(request_ref) + + assert %{ + stream_response_pid: self(), + done: false, + response: %{} + } == new_state.requests[request_ref] + + assert {[{request_ref, body, nil}], []} == new_state.request_stream_queue + assert state.conn != new_state.conn + end + + test "returns error response when connection is closed", %{ + request: {method, path, headers}, + state: state + } do + {:ok, conn} = Mint.HTTP.close(state.conn) + body = <<1, 2, 3>> + request = {:request, method, path, headers, body, [stream_response_pid: self()]} + response = ConnectionProcess.handle_call(request, nil, %{state | conn: conn}) + + assert {:reply, {:error, error}, new_state} = response + assert state.conn != new_state.conn + assert "the connection is closed" == error + end + + test "returns error response when mint returns an error when starting stream request", %{ + request: {method, path, headers}, + state: state + } do + body = <<1, 2, 3>> + request = {:request, method, path, headers, body, [stream_response_pid: self()]} + + # Simulates the server closing the connection before we update the state + {:ok, _conn} = Mint.HTTP.close(state.conn) + + response = ConnectionProcess.handle_call(request, nil, state) + + assert {:reply, {:error, error}, _new_state} = response + assert %Mint.TransportError{reason: :closed} == error + end + end + + describe "handle_call/2 - stream_body" do + setup :valid_connection + setup :valid_stream_request + + test "reply with :ok when stream :eof is successful", %{ + request_ref: request_ref, + state: state + } do + response = ConnectionProcess.handle_call({:stream_body, request_ref, :eof}, nil, state) + assert {:reply, :ok, new_state} = response + assert new_state.conn != state.conn + end + + test "reply with error when stream :eof is errors", %{request_ref: request_ref, state: state} do + # Simulates the server closing the connection before we update the state + {:ok, _conn} = Mint.HTTP.close(state.conn) + + response = ConnectionProcess.handle_call({:stream_body, request_ref, :eof}, nil, state) + + assert {:reply, {:error, error}, _new_state} = response + assert %Mint.TransportError{__exception__: true, reason: :closed} == error + end + + test "continue to process payload stream", %{request_ref: request_ref, state: state} do + response = + ConnectionProcess.handle_call({:stream_body, request_ref, <<1, 2, 3>>}, self(), state) + + assert {:noreply, new_state, {:continue, :process_request_stream_queue}} = response + assert {[{request_ref, <<1, 2, 3>>, self()}], []} == new_state.request_stream_queue + assert new_state.conn == state.conn + end + end + + describe "handle_call/2 - cancel_request" do + setup :valid_connection + setup :valid_stream_request + setup :valid_stream_response + + test "reply with :ok when canceling the request is successful, also set stream response pid to done and remove request ref from state", + %{ + request_ref: request_ref, + stream_response_pid: response_pid, + state: state + } do + response = ConnectionProcess.handle_call({:cancel_request, request_ref}, nil, state) + assert {:reply, :ok, new_state} = response + assert %{} == new_state.requests + response_state = :sys.get_state(response_pid) + assert [] == response_state.responses + assert true == response_state.done + end + end + + describe "handle_continue/2 - :process_stream_queue" do + setup :valid_connection + setup :valid_stream_request + setup :valid_stream_response + + test "do nothing when there is no window_size in the connection", %{ + request_ref: request_ref, + state: state + } do + # hacky to simulate a window size of zero since this is usually updated with the requests interaction + state = %{state | conn: %{state.conn | window_size: 0}} + # enqueue the payload onto the request queue + {_, state, _} = + ConnectionProcess.handle_call({:stream_body, request_ref, <<1, 2, 3>>}, self(), state) + + assert {:noreply, new_state} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + assert new_state == state + end + + test "(body_size > window_size) chunk payload stream what is possible and enqueue the rest at the begining of the queue to give priority to the current request", + %{request_ref: request_ref, state: state} do + # hacky to simulate a window size of 2 bytes since this is usually updated with the requests interaction + state = %{state | conn: %{state.conn | window_size: 2}} + # enqueue the payload onto the request queue. Add to items to the queue, + # this way we can check is the rest is of the payload goes to the first position to the queue + {_, state, _} = + ConnectionProcess.handle_call({:stream_body, request_ref, <<1, 2, 3>>}, self(), state) + + {_, state, _} = + ConnectionProcess.handle_call({:stream_body, request_ref, <<4, 5, 6>>}, self(), state) + + assert {:noreply, new_state} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + # mint update window_size for us. + # This is how we check if the body was streamed + assert new_state.conn.window_size == 0 + assert {{:value, head_of_queue}, queue} = :queue.out(new_state.request_stream_queue) + assert {{:value, rest}, {[], []}} = :queue.out(queue) + + # <<1, 2, 3>> got enqueue first, we streamed 2 bytes, now we have only one left <<3>> + assert {request_ref, <<3>>, self()} == head_of_queue + + # Next to be processed is <<4, 5, 6>> + assert {request_ref, <<4, 5, 6>>, self()} == rest + end + + test "(window_size >= body_size) stream bod, send end_stream message and reply caller process (when process ref is present)", + %{request_ref: request_ref, state: state} do + {_, state, _} = + ConnectionProcess.handle_call( + {:stream_body, request_ref, <<1, 2, 3>>}, + {self(), :tag}, + state + ) + + assert {:noreply, _new_state} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + assert_receive {:tag, :ok}, 500 + end + + test "(window_size >= body_size) stream body, send end_stream message and don't reply caller process (when precess ref is nil)", + %{request_ref: request_ref, state: state} do + {_, state, _} = + ConnectionProcess.handle_call({:stream_body, request_ref, <<1, 2, 3>>}, nil, state) + + assert {:noreply, _new_state} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + refute_receive {:tag, :ok}, 500 + end + + test "(window_size >= body_size) stream body, send end_stream message and check request_queue when queue is not empty", + %{request_ref: request_ref, state: state} do + {_, state, _} = + ConnectionProcess.handle_call( + {:stream_body, request_ref, <<1, 2, 3>>}, + {self(), :tag}, + state + ) + + {_, state, _} = + ConnectionProcess.handle_call( + {:stream_body, request_ref, <<4, 5, 6>>}, + {self(), :tag}, + state + ) + + assert {:noreply, _new_state, {:continue, :process_request_stream_queue}} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + assert_receive {:tag, :ok}, 500 + end + + test "send error to the caller process when server return an error and there is a process ref", + %{request_ref: request_ref, state: state} do + # Simulates the server closing the connection before we update the state + {:ok, _conn} = Mint.HTTP.close(state.conn) + + {_, state, _} = + ConnectionProcess.handle_call( + {:stream_body, request_ref, <<1, 2, 3>>}, + {self(), :tag}, + state + ) + + assert {:noreply, _new_state} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + assert_receive {:tag, {:error, %Mint.TransportError{reason: :closed, __exception__: true}}}, + 500 + end + + test "send error message to stream response process when caller process ref is empty", + %{request_ref: request_ref, state: state, stream_response_pid: response_pid} do + {_, state, _} = + ConnectionProcess.handle_call({:stream_body, request_ref, <<1>>}, nil, state) + + # Close connection to simulate an error (like the server closing the connection before we update state) + {:ok, _conn} = Mint.HTTP.close(state.conn) + + assert {:noreply, _new_state} = + ConnectionProcess.handle_continue(:process_request_stream_queue, state) + + response_state = :sys.get_state(response_pid) + + assert [error: %Mint.TransportError{reason: :closed, __exception__: true}] == + response_state.responses + end + end + + describe "handle_info - connection_closed - no requests" do + setup :valid_connection + + test "send a message to parent process to inform the connection is down", %{ + state: state + } do + socket = state.conn.socket + # this is a mocked message to inform the connection is closed + tcp_message = {:tcp_closed, socket} + + assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state) + assert new_state.conn.state == :closed + assert_receive {:elixir_grpc, :connection_down, pid}, 500 + assert pid == self() + end + end + + describe "handle_info - connection_closed - with request" do + setup :valid_connection + setup :valid_stream_request + setup :valid_stream_response + + test "send a message to parent process to inform the connection is down and end stream response process", + %{ + state: state, + stream_response_pid: response_pid + } do + socket = state.conn.socket + # this is a mocked message to inform the connection is closed + tcp_message = {:tcp_closed, socket} + + assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state) + assert new_state.conn.state == :closed + assert_receive {:elixir_grpc, :connection_down, pid}, 500 + response_state = :sys.get_state(response_pid) + assert [error: "the connection is closed"] == response_state.responses + assert true == response_state.done + assert pid == self() + end + + test "send a message to parent process to inform the connection is down and reply pending process", + %{ + state: state, + request_ref: request_ref, + stream_response_pid: response_pid + } do + socket = state.conn.socket + # this is a mocked message to inform the connection is closed + tcp_message = {:tcp_closed, socket} + + response = + ConnectionProcess.handle_call( + {:stream_body, request_ref, <<1, 2, 3>>}, + {self(), :tag}, + state + ) + + {:noreply, state, {:continue, :process_request_stream_queue}} = response + + assert {:noreply, new_state} = ConnectionProcess.handle_info(tcp_message, state) + assert new_state.conn.state == :closed + assert_receive {:elixir_grpc, :connection_down, pid}, 500 + response_state = :sys.get_state(response_pid) + assert [error: "the connection is closed"] == response_state.responses + assert true == response_state.done + assert pid == self() + end + end + + defp valid_connection(%{port: port}) do + {:ok, pid} = ConnectionProcess.start_link(:http, "localhost", port, protocols: [:http2]) + state = :sys.get_state(pid) + version = Application.spec(:grpc) |> Keyword.get(:vsn) + + headers = [ + {"content-type", "application/grpc"}, + {"user-agent", "grpc-elixir/#{version}"}, + {"te", "trailers"} + ] + + %{ + process_pid: pid, + state: state, + request: {"POST", "/routeguide.RouteGuide/RecordRoute", headers} + } + end + + defp valid_stream_request(%{request: {method, path, headers}, process_pid: pid}) do + {:ok, %{request_ref: request_ref}} = + ConnectionProcess.request(pid, method, path, headers, :stream, stream_response_pid: self()) + + state = :sys.get_state(pid) + %{request_ref: request_ref, state: state} + end + + defp valid_stream_response(%{request_ref: request_ref, state: state} = ctx) do + stream = build(:client_stream) + {:ok, pid} = StreamResponseProcess.start_link(stream, true) + state = update_stream_response_process_to_test_pid(state, request_ref, pid) + Map.merge(ctx, %{state: state, stream_response_pid: pid}) + end + + def update_stream_response_process_to_test_pid(state, request_ref, test_pid) do + request_ref_state = state.requests[request_ref] + + %{state | requests: %{request_ref => %{request_ref_state | stream_response_pid: test_pid}}} + end +end diff --git a/test/grpc/client/adapters/mint/stream_response_process_test.exs b/test/grpc/client/adapters/mint/stream_response_process_test.exs new file mode 100644 index 00000000..b7baad16 --- /dev/null +++ b/test/grpc/client/adapters/mint/stream_response_process_test.exs @@ -0,0 +1,353 @@ +defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do + use GRPC.DataCase + use ExUnit.Parameterized + + alias GRPC.Client.Adapters.Mint.StreamResponseProcess + + setup do + state = %{ + buffer: "", + done: false, + from: nil, + grpc_stream: build(:client_stream), + responses: [], + compressor: nil, + send_headers_or_trailers: false + } + + %{state: state} + end + + describe "handle_call/3 - data" do + setup do + part_1 = <<0, 0, 0, 0, 12, 10, 10, 72, 101, 108>> + part_2 = <<108, 111, 32, 76, 117, 105, 115>> + full_message = part_1 <> part_2 + %{data: {part_1, part_2, full_message}} + end + + test "append message to buffer when message is incomplete", %{ + state: state, + data: {part1, _, _} + } do + response = + StreamResponseProcess.handle_call({:consume_response, {:data, part1}}, self(), state) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert new_state.buffer == part1 + end + + test "decode full message when incoming date is complete", %{ + state: state, + data: {_, _, full_message} + } do + expected_response_message = build(:hello_reply_rpc) + + response = + StreamResponseProcess.handle_call( + {:consume_response, {:data, full_message}}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert new_state.buffer == <<>> + assert [{:ok, response_message}] = new_state.responses + assert expected_response_message == response_message + end + + test "append incoming message to existing buffer", %{state: state, data: {part1, part2, _}} do + state = %{state | buffer: part1} + expected_response_message = build(:hello_reply_rpc) + + response = + StreamResponseProcess.handle_call({:consume_response, {:data, part2}}, self(), state) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert new_state.buffer == <<>> + assert [{:ok, response_message}] = new_state.responses + assert expected_response_message == response_message + end + + test "decode message and put rest on buffer", %{state: state, data: {_, _, full}} do + extra_data = <<0, 1, 2>> + data = full <> extra_data + expected_response_message = build(:hello_reply_rpc) + + response = + StreamResponseProcess.handle_call({:consume_response, {:data, data}}, self(), state) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert new_state.buffer == extra_data + assert [{:ok, response_message}] = new_state.responses + assert expected_response_message == response_message + end + end + + describe "handle_call/3 - headers/trailers" do + test_with_params( + "put error in responses when incoming headers has error status", + %{state: state}, + fn %{type: type, is_header_enabled: header_enabled?} -> + state = %{state | send_headers_or_trailers: header_enabled?} + + headers = [ + {"content-length", "0"}, + {"content-type", "application/grpc+proto"}, + {"grpc-message", "Internal Server Error"}, + {"grpc-status", "2"}, + {"server", "Cowboy"} + ] + + response = + StreamResponseProcess.handle_call( + {:consume_response, {type, headers}}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert [{:error, error}] = new_state.responses + assert %GRPC.RPCError{message: "Internal Server Error", status: 2} == error + end, + do: [ + {%{type: :headers, is_header_enabled: false}}, + {%{type: :headers, is_header_enabled: true}}, + {%{type: :trailers, is_header_enabled: true}}, + {%{type: :trailers, is_header_enabled: false}} + ] + ) + + test_with_params( + "append headers to response when headers are enabled", + %{state: state}, + fn type -> + state = %{state | send_headers_or_trailers: true} + + headers = [ + {"content-length", "0"}, + {"content-type", "application/grpc+proto"}, + {"grpc-message", ""}, + {"grpc-status", "0"}, + {"server", "Cowboy"} + ] + + response = + StreamResponseProcess.handle_call( + {:consume_response, {type, headers}}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert [{type_response, response_headers}] = new_state.responses + assert type == type_response + + assert %{ + "content-length" => "0", + "content-type" => "application/grpc+proto", + "grpc-message" => "", + "grpc-status" => "0", + "server" => "Cowboy" + } == response_headers + end, + do: [{:headers}, {:trailers}] + ) + + test_with_params( + "skip produce headers when flag is disabled and there are no errors", + %{state: state}, + fn type -> + headers = [ + {"content-length", "0"}, + {"content-type", "application/grpc+proto"}, + {"grpc-message", ""}, + {"grpc-status", "0"}, + {"server", "Cowboy"} + ] + + response = + StreamResponseProcess.handle_call( + {:consume_response, {type, headers}}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert [] == new_state.responses + end, + do: [{:headers}, {:trailers}] + ) + + test "add compressor to state when incoming headers match available compressor", %{ + state: state + } do + headers = [ + {"content-length", "0"}, + {"content-type", "application/grpc+proto"}, + {"grpc-message", ""}, + {"grpc-status", "0"}, + {"server", "Cowboy"}, + {"grpc-encoding", "gzip"} + ] + + response = + StreamResponseProcess.handle_call( + {:consume_response, {:headers, headers}}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert GRPC.Compressor.Gzip == new_state.compressor + end + + test "don't update compressor when unsupported compressor is returned by the server", %{ + state: state + } do + headers = [ + {"content-length", "0"}, + {"content-type", "application/grpc+proto"}, + {"grpc-message", ""}, + {"grpc-status", "0"}, + {"server", "Cowboy"}, + {"grpc-encoding", "suzana"} + ] + + response = + StreamResponseProcess.handle_call( + {:consume_response, {:headers, headers}}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert nil == new_state.compressor + end + end + + describe "handle_call/3 - errors" do + test "add error tuple to responses", %{state: state} do + error = {:error, "howdy"} + + response = + StreamResponseProcess.handle_call( + {:consume_response, error}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert [response_error] = new_state.responses + assert response_error == error + end + end + + describe "handle_call/3 - done" do + test "set state to done", %{state: state} do + response = + StreamResponseProcess.handle_call( + {:consume_response, :done}, + self(), + state + ) + + assert {:reply, :ok, new_state, {:continue, :produce_response}} = response + assert true == new_state.done + end + end + + describe "handle_continue/2 - produce_response" do + test "noreply when process ref is empty", %{state: state} do + {:noreply, new_state} = StreamResponseProcess.handle_continue(:produce_response, state) + assert new_state == state + end + + test "send nil message to caller process (ends Elixir.Stream) when all responses are sent and stream has ended (done: true)", + %{state: state} do + state = %{state | from: {self(), :tag}, done: true} + + {:stop, :normal, _new_state} = + StreamResponseProcess.handle_continue(:produce_response, state) + + assert_receive {:tag, nil} + end + + test "continue when there are no response to be sent and stream is not done yet", %{ + state: state + } do + state = %{state | from: {self(), :tag}, done: false} + {:noreply, new_state} = StreamResponseProcess.handle_continue(:produce_response, state) + assert state == new_state + end + + test "send response to caller when there are responses in the queue", %{state: state} do + state = %{state | from: {self(), :tag}, done: false, responses: [1, 2]} + {:noreply, new_state} = StreamResponseProcess.handle_continue(:produce_response, state) + %{from: from, responses: responses} = new_state + assert nil == from + assert [2] == responses + assert_receive {:tag, 1} + end + end + + describe "build_stream/1" do + setup do + {:ok, pid} = StreamResponseProcess.start_link(build(:client_stream), true) + + %{pid: pid} + end + + test "ends stream when done message is passed", %{pid: pid} do + stream = StreamResponseProcess.build_stream(pid) + StreamResponseProcess.done(pid) + assert Enum.to_list(stream) == [] + end + + test "emits error tuple on stream when error is given to consume", %{pid: pid} do + stream = StreamResponseProcess.build_stream(pid) + StreamResponseProcess.consume(pid, :error, "an error") + StreamResponseProcess.done(pid) + assert [error] = Enum.to_list(stream) + assert {:error, "an error"} == error + end + + test "emits an ok tuple with data", %{pid: pid} do + data_to_consume = <<0, 0, 0, 0, 12, 10, 10, 72, 101, 108, 108, 111, 32, 76, 117, 105, 115>> + stream = StreamResponseProcess.build_stream(pid) + StreamResponseProcess.consume(pid, :data, data_to_consume) + StreamResponseProcess.done(pid) + assert [data] = Enum.to_list(stream) + assert {:ok, build(:hello_reply_rpc)} == data + end + + test_with_params( + "emits headers to stream", + %{pid: pid}, + fn type -> + headers = [ + {"content-length", "0"}, + {"content-type", "application/grpc+proto"}, + {"grpc-message", ""}, + {"grpc-status", "0"}, + {"server", "Cowboy"} + ] + + stream = StreamResponseProcess.build_stream(pid) + StreamResponseProcess.consume(pid, type, headers) + StreamResponseProcess.done(pid) + assert [{response_type, response_headers}] = Enum.to_list(stream) + assert type == response_type + + assert %{ + "content-length" => "0", + "content-type" => "application/grpc+proto", + "grpc-message" => "", + "grpc-status" => "0", + "server" => "Cowboy" + } == response_headers + end, + do: [{:headers}, {:trailers}] + ) + end +end diff --git a/test/grpc/client/adapters/mint_test.exs b/test/grpc/client/adapters/mint_test.exs new file mode 100644 index 00000000..6b849be0 --- /dev/null +++ b/test/grpc/client/adapters/mint_test.exs @@ -0,0 +1,36 @@ +defmodule GRPC.Client.Adapters.MintTest do + use GRPC.DataCase + + alias GRPC.Client.Adapters.Mint + + describe "connect/2" do + setup do + {:ok, _, port} = GRPC.Server.start(FeatureServer, 0) + + on_exit(fn -> + :ok = GRPC.Server.stop(FeatureServer) + end) + + %{port: port} + end + + test "connects insecurely (default options)", %{port: port} do + channel = build(:channel, port: port, host: "localhost") + + assert {:ok, result} = Mint.connect(channel, []) + assert %{channel | adapter_payload: %{conn_pid: result.adapter_payload.conn_pid}} == result + end + + test "connects insecurely (custom options)", %{port: port} do + channel = build(:channel, port: port, host: "localhost") + + assert {:ok, result} = Mint.connect(channel, transport_opts: [ip: :loopback]) + assert %{channel | adapter_payload: %{conn_pid: result.adapter_payload.conn_pid}} == result + + # Ensure that changing one of the options breaks things + assert {:error, message} = Mint.connect(channel, transport_opts: [ip: "256.0.0.0"]) + + assert message == "Error while opening connection: {:error, :badarg}" + end + end +end diff --git a/test/support/data_case.ex b/test/support/data_case.ex new file mode 100644 index 00000000..8e0de59d --- /dev/null +++ b/test/support/data_case.ex @@ -0,0 +1,9 @@ +defmodule GRPC.DataCase do + use ExUnit.CaseTemplate + + using do + quote do + import GRPC.Factory + end + end +end diff --git a/test/support/factory.ex b/test/support/factory.ex index fceaae03..d38f10a2 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -4,10 +4,6 @@ defmodule GRPC.Factory do alias GRPC.Channel alias GRPC.Credential - @cert_path Path.expand("./tls/server1.pem", :code.priv_dir(:grpc)) - @key_path Path.expand("./tls/server1.key", :code.priv_dir(:grpc)) - @ca_path Path.expand("./tls/ca.pem", :code.priv_dir(:grpc)) - def build(resource, attrs \\ %{}) do name = :"#{resource}_factory" @@ -38,15 +34,47 @@ defmodule GRPC.Factory do end def credential_factory do + cert_path = Path.expand("./tls/server1.pem", :code.priv_dir(:grpc)) + key_path = Path.expand("./tls/server1.key", :code.priv_dir(:grpc)) + ca_path = Path.expand("./tls/ca.pem", :code.priv_dir(:grpc)) + %Credential{ ssl: [ - certfile: @cert_path, - cacertfile: @ca_path, - keyfile: @key_path, + certfile: cert_path, + cacertfile: ca_path, + keyfile: key_path, verify: :verify_peer, fail_if_no_peer_cert: true, versions: [:"tlsv1.2"] ] } end + + def client_stream_factory do + %GRPC.Client.Stream{ + __interface__: %{ + receive_data: &GRPC.Client.Stream.receive_data/2, + send_request: &GRPC.Client.Stream.send_request/3 + }, + canceled: false, + channel: build(:channel, adapter: GRPC.Client.Adapters.Mint), + codec: GRPC.Codec.Proto, + compressor: nil, + grpc_type: :unary, + headers: %{}, + method_name: "SayHello", + path: "/helloworld.Greeter/SayHello", + payload: %{}, + request_mod: Helloworld.HelloRequest, + response_mod: Helloworld.HelloReply, + rpc: {"say_hello", {Helloworld.HelloRequest, false}, {Helloworld.HelloReply, false}}, + server_stream: false, + service_name: "helloworld.Greeter", + accepted_compressors: [GRPC.Compressor.Gzip] + } + end + + def hello_reply_rpc_factory do + %Helloworld.HelloReply{message: "Hello Luis"} + end end diff --git a/test/support/feature_server.ex b/test/support/feature_server.ex index 3951ea8b..a3e70c7e 100644 --- a/test/support/feature_server.ex +++ b/test/support/feature_server.ex @@ -2,6 +2,10 @@ defmodule FeatureServer do use GRPC.Server, service: Routeguide.RouteGuide.Service def get_feature(point, _stream) do - Routeguide.Feature.new(location: point, name: "#{point.latitude},#{point.longitude}") + if point.latitude != 0 do + Routeguide.Feature.new(location: point, name: "#{point.latitude},#{point.longitude}") + else + {:error, "server error"} + end end end diff --git a/test/support/helloworld.pb.ex b/test/support/proto/helloworld.pb.ex similarity index 100% rename from test/support/helloworld.pb.ex rename to test/support/proto/helloworld.pb.ex diff --git a/test/support/helloworld.proto b/test/support/proto/helloworld.proto similarity index 100% rename from test/support/helloworld.proto rename to test/support/proto/helloworld.proto diff --git a/test/support/route_guide.pb.ex b/test/support/proto/route_guide.pb.ex similarity index 100% rename from test/support/route_guide.pb.ex rename to test/support/proto/route_guide.pb.ex diff --git a/test/support/route_guide.proto b/test/support/proto/route_guide.proto similarity index 100% rename from test/support/route_guide.proto rename to test/support/proto/route_guide.proto diff --git a/test/support/test_adapter.exs b/test/support/test_adapter.exs index 3f5d84d0..ff603557 100644 --- a/test/support/test_adapter.exs +++ b/test/support/test_adapter.exs @@ -5,6 +5,10 @@ defmodule GRPC.Test.ClientAdapter do def disconnect(channel), do: {:ok, channel} def send_request(stream, _message, _opts), do: stream def receive_data(_stream, _opts), do: {:ok, nil} + def send_data(stream, _message, _opts), do: stream + def send_headers(stream, _opts), do: stream + def end_stream(stream), do: stream + def cancel(stream), do: stream end defmodule GRPC.Test.ServerAdapter do