Skip to content

Commit

Permalink
Update to ranch 2.0
Browse files Browse the repository at this point in the history
(cherry picked from commit 61f7b2a)
  • Loading branch information
dcorbacho authored and michaelklishin committed Apr 22, 2021
1 parent af07bda commit 80b8fdd
Show file tree
Hide file tree
Showing 16 changed files with 921 additions and 76 deletions.
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
-behaviour(supervisor2).
-behaviour(ranch_protocol).

-export([start_link/4, reader/1]).
-export([start_link/3, reader/1]).

-export([init/1]).

-include_lib("rabbit_common/include/rabbit.hrl").

%%----------------------------------------------------------------------------

-spec start_link(any(), rabbit_net:socket(), module(), any()) ->
-spec start_link(any(), module(), any()) ->
{'ok', pid(), pid()}.

start_link(Ref, _Sock, _Transport, _Opts) ->
start_link(Ref, _Transport, _Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-export([start_link/4, start_keepalive_link/0]).
-export([start_link/3, start_keepalive_link/0]).

-export([init/1]).

%%----------------------------------------------------------------------------

start_link(Ref, _Sock, _Transport, []) ->
start_link(Ref, _Transport, []) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, KeepaliveSup} = supervisor2:start_child(
SupPid,
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_stomp/src/rabbit_stomp_client_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-export([start_link/4, init/1]).
-export([start_link/3, init/1]).

start_link(Ref, _Sock, _Transport, Configuration) ->
start_link(Ref, _Transport, Configuration) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, HelperPid} =
supervisor2:start_child(SupPid,
Expand Down
363 changes: 363 additions & 0 deletions deps/rabbitmq_stream_management/rabbitmq-components.mk

Large diffs are not rendered by default.

363 changes: 363 additions & 0 deletions deps/rabbitmq_stream_prometheus/rabbitmq-components.mk

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion deps/rabbitmq_web_dispatch/src/rabbit_web_dispatch_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ensure_listener(Listener) ->
[rabbit_cowboy_middleware, cowboy_router, cowboy_handler],
stream_handlers => StreamHandlers},
ProtoOptsMap),
Child = ranch:child_spec(rabbit_networking:ranch_ref(Listener), 100,
Child = ranch:child_spec(rabbit_networking:ranch_ref(Listener),
Transport, TransportOpts,
cowboy_clear, CowboyOptsMap),
case supervisor:start_child(?SUP, Child) of
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ mqtt_init() ->
]}]),
CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes},
middlewares => [cowboy_router, rabbit_web_mqtt_middleware, cowboy_handler],
proxy_header => get_env(proxy_protocol, false)},
proxy_header => get_env(proxy_protocol, false),
stream_handlers => [rabbit_web_mqtt_stream_handler, cowboy_stream_h]},
case get_env(tcp_config, []) of
[] -> ok;
TCPConf0 -> start_tcp_listener(TCPConf0, CowboyOpts)
Expand Down
9 changes: 4 additions & 5 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-export([start_link/4, start_keepalive_link/0]).
-export([start_link/3, start_keepalive_link/0]).

-export([init/1]).

%%----------------------------------------------------------------------------

start_link(Ref, Sock, Transport, CowboyOpts0) ->
start_link(Ref, Transport, CowboyOpts0) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, KeepaliveSup} = supervisor2:start_child(
SupPid,
Expand All @@ -31,16 +31,15 @@ start_link(Ref, Sock, Transport, CowboyOpts0) ->
%% then have the middleware rabbit_web_mqtt_middleware place it
%% in the initial handler state.
Env = maps:get(env, CowboyOpts0),
CowboyOpts = CowboyOpts0#{env => Env#{keepalive_sup => KeepaliveSup,
socket => Sock}},
CowboyOpts = CowboyOpts0#{env => Env#{keepalive_sup => KeepaliveSup}},
Protocol = case Transport of
ranch_tcp -> cowboy_clear;
ranch_ssl -> cowboy_tls
end,
{ok, ReaderPid} = supervisor2:start_child(
SupPid,
{Protocol,
{Protocol, start_link, [Ref, Sock, Transport, CowboyOpts]},
{Protocol, start_link, [Ref, Transport, CowboyOpts]},
intrinsic, ?WORKER_WAIT, worker, [Protocol]}),
{ok, SupPid, ReaderPid}.

Expand Down
103 changes: 62 additions & 41 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

-module(rabbit_web_mqtt_handler).
-behaviour(cowboy_websocket).
-behaviour(cowboy_sub_protocol).

-export([
init/2,
Expand All @@ -17,6 +18,11 @@
]).
-export([close_connection/2]).

%% cowboy_sub_protocol
-export([upgrade/4,
upgrade/5,
takeover/7]).

-include_lib("amqp_client/include/amqp_client.hrl").

-record(state, {
Expand All @@ -33,55 +39,70 @@
connection
}).

%% cowboy_sub_protcol
upgrade(Req, Env, Handler, HandlerState) ->
upgrade(Req, Env, Handler, HandlerState, #{}).

upgrade(Req, Env, Handler, HandlerState, Opts) ->
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).

takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
Sock = case HandlerState#state.socket of
undefined ->
Socket;
ProxyInfo ->
{rabbit_proxy_socket, Socket, ProxyInfo}
end,
cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
{Handler, HandlerState#state{socket = Sock}}).

%% cowboy_websocket
init(Req, Opts) ->
{PeerAddr, _PeerPort} = maps:get(peer, Req),
{_, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Opts),
{_, Sock0} = lists:keyfind(socket, 1, Opts),
Sock = case maps:get(proxy_header, Req, undefined) of
undefined ->
Sock0;
ProxyInfo ->
{rabbit_proxy_socket, Sock0, ProxyInfo}
end,
SockInfo = maps:get(proxy_header, Req, undefined),
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
Req2 = case cowboy_req:header(<<"sec-websocket-protocol">>, Req) of
undefined -> Req;
SecWsProtocol ->
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, SecWsProtocol, Req)
end,
{?MODULE, Req2, #state{
keepalive = {none, none},
keepalive_sup = KeepaliveSup,
parse_state = rabbit_mqtt_frame:initial_state(),
state = running,
conserve_resources = false,
socket = SockInfo,
peername = PeerAddr
}, WsOpts}.

websocket_init(State0 = #state{socket = Sock, peername = PeerAddr}) ->
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
Req2 = case cowboy_req:header(<<"sec-websocket-protocol">>, Req) of
undefined -> Req;
SecWsProtocol ->
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, SecWsProtocol, Req)
end,
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
{cowboy_websocket, Req2, #state{
conn_name = ConnStr,
keepalive = {none, none},
keepalive_sup = KeepaliveSup,
parse_state = rabbit_mqtt_frame:initial_state(),
state = running,
conserve_resources = false,
socket = Sock,
peername = PeerAddr
}, WsOpts};
State = State0#state{
conn_name = ConnStr,
socket = Sock
},
rabbit_log_connection:info("accepting Web MQTT connection ~p (~s)~n", [self(), ConnStr]),
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'Web MQTT', "N/A"}),
RealSocket = rabbit_net:unwrap_socket(Sock),
ProcessorState = rabbit_mqtt_processor:initial_state(Sock,
rabbit_mqtt_reader:ssl_login_name(RealSocket),
AdapterInfo,
fun send_reply/2,
PeerAddr),
process_flag(trap_exit, true),
{ok,
rabbit_event:init_stats_timer(
State#state{proc_state = ProcessorState},
#state.stats_timer),
hibernate};
_ ->
{stop, Req}
{stop, State0}
end.

websocket_init(State = #state{conn_name = ConnStr, socket = Sock, peername = PeerAddr}) ->
rabbit_log_connection:info("accepting Web MQTT connection ~p (~s)~n", [self(), ConnStr]),
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'Web MQTT', "N/A"}),
RealSocket = rabbit_net:unwrap_socket(Sock),
ProcessorState = rabbit_mqtt_processor:initial_state(Sock,
rabbit_mqtt_reader:ssl_login_name(RealSocket),
AdapterInfo,
fun send_reply/2,
PeerAddr),
process_flag(trap_exit, true),
{ok,
rabbit_event:init_stats_timer(
State#state{proc_state = ProcessorState},
#state.stats_timer),
hibernate}.

-spec close_connection(pid(), string()) -> 'ok'.
close_connection(Pid, Reason) ->
rabbit_log_connection:info("Web MQTT: will terminate connection process ~p, reason: ~s",
Expand Down
4 changes: 1 addition & 3 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@

execute(Req, Env) ->
#{keepalive_sup := KeepaliveSup} = Env,
Sock = maps:get(socket, Env),
case maps:get(handler_opts, Env, undefined) of
undefined -> {ok, Req, Env};
Opts when is_list(Opts) ->
{ok, Req, Env#{handler_opts => [{keepalive_sup, KeepaliveSup},
{socket, Sock}
{ok, Req, Env#{handler_opts => [{keepalive_sup, KeepaliveSup}
|Opts]}}
end.
41 changes: 41 additions & 0 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_stream_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_web_mqtt_stream_handler).

-behavior(cowboy_stream).

-export([init/3]).
-export([data/4]).
-export([info/3]).
-export([terminate/3]).
-export([early_error/5]).


-record(state, {next}).

init(StreamID, Req, Opts) ->
{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
{Commands, #state{next = Next}}.

data(StreamID, IsFin, Data, State = #state{next = Next0}) ->
{Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
{Commands, State#state{next = Next}}.

info(StreamID, {switch_protocol, Headers, _, InitialState}, State) ->
do_info(StreamID, {switch_protocol, Headers, rabbit_web_mqtt_handler, InitialState}, State);
info(StreamID, Info, State) ->
do_info(StreamID, Info, State).

do_info(StreamID, Info, State = #state{next = Next0}) ->
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
{Commands, State#state{next = Next}}.

terminate(StreamID, Reason, State = #state{next = Next}) ->
cowboy_stream:terminate(StreamID, Reason, Next).

early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
10 changes: 5 additions & 5 deletions deps/rabbitmq_web_stomp/src/rabbit_web_stomp_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-export([start_link/4, start_keepalive_link/0]).
-export([start_link/3, start_keepalive_link/0]).
-export([init/1]).

%%----------------------------------------------------------------------------

start_link(Ref, Sock, Transport, CowboyOpts0) ->
start_link(Ref, Transport, CowboyOpts0) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, KeepaliveSup} = supervisor2:start_child(
SupPid,
Expand All @@ -29,16 +29,16 @@ start_link(Ref, Sock, Transport, CowboyOpts0) ->
%% then have the middleware rabbit_web_mqtt_middleware place it
%% in the initial handler state.
Env = maps:get(env, CowboyOpts0),
CowboyOpts = CowboyOpts0#{env => Env#{keepalive_sup => KeepaliveSup,
socket => Sock}},
CowboyOpts = CowboyOpts0#{env => Env#{keepalive_sup => KeepaliveSup},
stream_handlers => [rabbit_web_stomp_stream_handler, cowboy_stream_h]},
Protocol = case Transport of
ranch_tcp -> cowboy_clear;
ranch_ssl -> cowboy_tls
end,
{ok, ReaderPid} = supervisor2:start_child(
SupPid,
{Protocol,
{Protocol, start_link, [Ref, Sock, Transport, CowboyOpts]},
{Protocol, start_link, [Ref, Transport, CowboyOpts]},
intrinsic, ?WORKER_WAIT, worker, [Protocol]}),
{ok, SupPid, ReaderPid}.

Expand Down
Loading

0 comments on commit 80b8fdd

Please sign in to comment.