Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for grpc-web #206

Merged
merged 19 commits into from
Jul 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions lib/grpc/codec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ defmodule GRPC.Codec do

@doc ~s(Name is identity of the codec, which will be suffix after content-type "application/grpc+" such as "proto".)
@callback name() :: String.t()
@callback encode(any()) :: binary()
@callback decode(any(), module()) :: any()
@callback encode(any) :: binary
@callback decode(any, module :: atom) :: any

@doc """
This function is invoked before the gRPC payload is transformed into a protobuf message whenever it is defined.

This can be used to apply a transform over the gRPC message before decoding it. For instance grpc-web using the `application/grpc-web-text`
content type requires the message to be Base64-encoded, so a server receving messages using grpc-web-text will be required to
do a Base64 decode on the payload before decoding the gRPC message.
"""
@callback unpack_from_channel(binary) :: binary

@doc """
This function is invoked whenever it is defined after the protobuf message has been transformed into a gRPC payload.

This can be used to apply a transform over the gRPC message before sending it.
For instance grpc-web using the `application/grpc-web-text` content type requires the message to be Base64-encoded, so a server sending messages using grpc-web-text will be required to
do a Base64 encode on the payload before sending the gRPC message.
"""
@callback pack_for_channel(iodata()) :: binary
@optional_callbacks unpack_from_channel: 1, pack_for_channel: 1
end
29 changes: 29 additions & 0 deletions lib/grpc/codec/web_text.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule GRPC.Codec.WebText do
@behaviour GRPC.Codec

def name() do
"text"
end

def encode(struct) do
Protobuf.Encoder.encode(struct)
end

def pack_for_channel(data) when is_list(data) do
data
|> IO.iodata_to_binary()
|> Base.encode64()
end

def pack_for_channel(binary) do
Base.encode64(binary)
end

def unpack_from_channel(binary) do
Base.decode64!(binary)
end

def decode(binary, module) do
Protobuf.Decoder.decode(binary, module)
end
end
8 changes: 8 additions & 0 deletions lib/grpc/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule GRPC.Message do
def to_data(message, opts \\ []) do
compressor = opts[:compressor]
iolist = opts[:iolist]
codec = opts[:codec]
max_length = opts[:max_message_length] || @max_message_length

{compress_flag, message} =
Expand All @@ -59,7 +60,14 @@ defmodule GRPC.Message do
{:error, "Encoded message is too large (#{length} bytes)"}
else
result = [compress_flag, <<length::size(4)-unit(8)>>, message]

result =
if function_exported?(codec, :pack_for_channel, 1),
do: codec.pack_for_channel(result),
else: result

result = if iolist, do: result, else: IO.iodata_to_binary(result)

{:ok, result, length + 5}
end
end
Expand Down
11 changes: 9 additions & 2 deletions lib/grpc/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule GRPC.Server do
quote bind_quoted: [opts: opts], location: :keep do
service_mod = opts[:service]
service_name = service_mod.__meta__(:name)
codecs = opts[:codecs] || [GRPC.Codec.Proto]
codecs = opts[:codecs] || [GRPC.Codec.Proto, GRPC.Codec.WebText]
compressors = opts[:compressors] || []

Enum.each(service_mod.__rpc_calls__, fn {name, _, _} = rpc ->
Expand Down Expand Up @@ -124,7 +124,14 @@ defmodule GRPC.Server do
) do
{:ok, data} = adapter.read_body(payload)

case GRPC.Message.from_data(stream, data) do
body =
if function_exported?(codec, :unpack_from_channel, 1) do
codec.unpack_from_channel(data)
else
data
end

case GRPC.Message.from_data(stream, body) do
{:ok, message} ->
request = codec.decode(message, req_mod)

Expand Down
10 changes: 8 additions & 2 deletions lib/grpc/server/adapters/cowboy/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
req = send_error(req, state, msg)
{:stop, req, state}
else
case GRPC.Message.to_data(data, compressor: compressor) do
case GRPC.Message.to_data(data, compressor: compressor, codec: opts[:codec]) do
{:ok, data, _size} ->
req = check_sent_resp(req)
:cowboy_req.stream_body(data, is_fin, req)
Expand Down Expand Up @@ -447,10 +447,16 @@ defmodule GRPC.Server.Adapters.Cowboy.Handler do
defp extract_subtype("application/grpc"), do: {:ok, "proto"}
defp extract_subtype("application/grpc+"), do: {:ok, "proto"}
defp extract_subtype("application/grpc;"), do: {:ok, "proto"}

defp extract_subtype(<<"application/grpc+", rest::binary>>), do: {:ok, rest}
defp extract_subtype(<<"application/grpc;", rest::binary>>), do: {:ok, rest}

defp extract_subtype("application/grpc-web"), do: {:ok, "proto"}
defp extract_subtype("application/grpc-web+"), do: {:ok, "proto"}
defp extract_subtype("application/grpc-web;"), do: {:ok, "proto"}
defp extract_subtype("application/grpc-web-text"), do: {:ok, "text"}
defp extract_subtype("application/grpc-web+" <> rest), do: {:ok, rest}
defp extract_subtype("application/grpc-web-text+" <> rest), do: {:ok, rest}

defp extract_subtype(type) do
Logger.warn("Got unknown content-type #{type}, please create an issue.")
{:ok, "proto"}
Expand Down
2 changes: 1 addition & 1 deletion lib/grpc/server/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule GRPC.Server.Stream do
def send_reply(%{adapter: adapter, codec: codec} = stream, reply, opts) do
# {:ok, data, _size} = reply |> codec.encode() |> GRPC.Message.to_data()
data = codec.encode(reply)
adapter.send_reply(stream.payload, data, opts)
adapter.send_reply(stream.payload, data, Keyword.put(opts, :codec, codec))
stream
end
end
7 changes: 7 additions & 0 deletions lib/grpc/stub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,13 @@ defmodule GRPC.Stub do
nil
end

body =
if function_exported?(codec, :unpack_from_channel, 1) do
codec.unpack_from_channel(body)
else
body
end

case GRPC.Message.from_data(%{compressor: compressor}, body) do
{:ok, msg} ->
{:ok, codec.decode(msg, res_mod)}
Expand Down
20 changes: 10 additions & 10 deletions lib/grpc/transport/http2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ defmodule GRPC.Transport.HTTP2 do

require Logger

def server_headers(%{codec: GRPC.Codec.WebText = codec}) do
%{"content-type" => "application/grpc-web-#{codec.name()}"}
end

def server_headers(%{codec: codec}) do
%{"content-type" => "application/grpc+#{codec.name}"}
%{"content-type" => "application/grpc+#{codec.name()}"}
end

@spec server_trailers(integer, String.t()) :: map
Expand Down Expand Up @@ -57,15 +61,11 @@ defmodule GRPC.Transport.HTTP2 do

defp content_type(custom, _codec) when is_binary(custom), do: custom

defp content_type(_, codec) do
# Some gRPC implementations don't support application/grpc+xyz,
# to avoid this kind of trouble, use application/grpc by default
if codec == GRPC.Codec.Proto do
"application/grpc"
else
"application/grpc+#{codec.name}"
end
end
# Some gRPC implementations don't support application/grpc+xyz,
# to avoid this kind of trouble, use application/grpc by default
defp content_type(_, GRPC.Codec.Proto), do: "application/grpc"
defp content_type(_, codec = GRPC.Codec.WebText), do: "application/grpc-web-#{codec.name()}"
defp content_type(_, codec), do: "application/grpc+#{codec.name()}"

def extract_metadata(headers) do
headers
Expand Down
37 changes: 27 additions & 10 deletions test/grpc/integration/codec_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,54 @@ defmodule GRPC.Integration.CodecTest do
defmodule HelloServer do
use GRPC.Server,
service: Helloworld.Greeter.Service,
codecs: [GRPC.Codec.Proto, GRPC.Codec.Erlpack]
codecs: [GRPC.Codec.Proto, GRPC.Codec.Erlpack, GRPC.Codec.WebText]

def say_hello(req, _stream) do
Helloworld.HelloReply.new(message: "Hello, #{req.name}")
end
end

defmodule HelloErlpackStub do
defmodule HelloStub do
use GRPC.Stub, service: Helloworld.Greeter.Service
end

test "Says hello over erlpack" do
test "Says hello over erlpack, GRPC-web-text" do
run_server(HelloServer, fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
name = "Mairbek"
req = Helloworld.HelloRequest.new(name: name)

{:ok, reply} = channel |> HelloErlpackStub.say_hello(req, codec: GRPC.Codec.Erlpack)
assert reply.message == "Hello, #{name}"

# verify that proto still works
{:ok, reply} = channel |> HelloErlpackStub.say_hello(req, codec: GRPC.Codec.Proto)
assert reply.message == "Hello, #{name}"
for codec <- [GRPC.Codec.Erlpack, GRPC.Codec.WebText, GRPC.Codec.Proto] do
{:ok, reply} = HelloStub.say_hello(channel, req, codec: codec)
assert reply.message == "Hello, #{name}"
end

# codec not registered
{:error, reply} = channel |> HelloErlpackStub.say_hello(req, codec: NotRegisteredCodec)
{:error, reply} = HelloStub.say_hello(channel, req, codec: NotRegisteredCodec)

assert %GRPC.RPCError{
status: GRPC.Status.unimplemented(),
message: "No codec registered for content-type application/grpc+not-registered"
} == reply
end)
end

test "sets the correct content-type based on codec name" do
run_server(HelloServer, fn port ->
{:ok, channel} = GRPC.Stub.connect("localhost:#{port}")
name = "Mairbek"
req = Helloworld.HelloRequest.new(name: name)

for {expected_content_type, codec} <- [
{"grpc-web-text", GRPC.Codec.WebText},
{"grpc+erlpack", GRPC.Codec.Erlpack},
{"grpc+proto", GRPC.Codec.Proto}
] do
{:ok, _reply, headers} =
HelloStub.say_hello(channel, req, codec: codec, return_headers: true)

assert headers[:headers]["content-type"] == "application/#{expected_content_type}"
end
end)
end
end
6 changes: 6 additions & 0 deletions test/grpc/message_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ defmodule GRPC.MessageTest do
assert {:ok, IO.iodata_to_binary(message)} ==
GRPC.Message.from_data(%{compressor: GRPC.Compressor.Gzip}, binary)
end

test "to_data/2 invokes codec.pack_for_channel on the gRPC body if codec implements it" do
message = "web-text"
assert {:ok, base64_payload, _} = GRPC.Message.to_data(message, %{codec: GRPC.Codec.WebText})
assert message == GRPC.Message.from_data(Base.decode64!(base64_payload))
end
end
16 changes: 15 additions & 1 deletion test/grpc/transport/http2_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ defmodule GRPC.Transport.HTTP2Test do
alias GRPC.Channel
alias GRPC.Transport.HTTP2

@channel %Channel{scheme: "http", host: "grpc.io"}
alias GRPC.Client.Stream
alias GRPC.Server.Stream, as: ServerStream

@channel %Channel{scheme: "http", host: "grpc.io"}

defp assert_header({key, _v} = pair, headers) do
assert pair == Enum.find(headers, nil, fn {k, _v} -> if k == key, do: true end)
Expand Down Expand Up @@ -99,4 +101,16 @@ defmodule GRPC.Transport.HTTP2Test do
assert {_, "application/grpc+custom-codec"} =
Enum.find(headers, fn {key, _} -> key == "content-type" end)
end

test "server_headers/3 sets content-type based on the codec name" do
for {expected_content_type, codec} <- [
{"grpc-web-text", GRPC.Codec.WebText},
{"grpc+erlpack", GRPC.Codec.Erlpack}
] do
stream = %ServerStream{codec: codec}

assert %{"content-type" => "application/" <> ^expected_content_type} =
HTTP2.server_headers(stream)
end
end
end
7 changes: 7 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Code.require_file("./support/test_adapter.exs", __DIR__)

codecs = [
GRPC.Codec.Erlpack,
GRPC.Codec.WebText,
GRPC.Codec.Proto
]

Enum.each(codecs, &Code.ensure_loaded/1)
ExUnit.start(capture_log: true)