Skip to content

Commit

Permalink
Merge pull request #40 from shopgun/feature/validate-connection-confi…
Browse files Browse the repository at this point in the history
…g-in-service-and-publisher

Add connection name validation
  • Loading branch information
ptrf authored Jun 4, 2018
2 parents 17dff0c + 83664b8 commit 0ea1a0e
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 109 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ become worse.

# Changes

* *Version 1.9.4* - Introduce Connection Name Validation

- Connection names are now validated when a publisher or a service
is created. So turtle now fails, if a service or publisher is
started with a connection that is not defined in the
`connection_config` section of turtle's application environment.

- Upgrade to AMQP Client version 3.7.5, fixing builds on Erlang 19.
This build bug was caused by the ranch_proxy_protocol hex package
being packed using a buggy vendored version of erl_tar.

* *Version 1.9.3* - Dependency bump

- Upgrade Turtle to the 3.7.4 version of AMQP Client.
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

{deps, [
{lager, "3.6.1"},
{rabbit_common, "3.7.4"},
{rabbit_common, "3.7.5"},
{amqp_client, "3.7.4"},
{exometer_core, "1.5.2"},
{gproc, "0.6.1"},
Expand Down
12 changes: 6 additions & 6 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
{<<"lager">>,{pkg,<<"lager">>,<<"3.6.1">>},0},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.0.0">>},1},
{<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.7.3">>},1},
{<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.7.4">>},0},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.4.0">>},1},
{<<"ranch_proxy_protocol">>,{pkg,<<"ranch_proxy_protocol">>,<<"1.4.4">>},1},
{<<"rabbit_common">>,{pkg,<<"rabbit_common">>,<<"3.7.5">>},0},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.5.0">>},1},
{<<"ranch_proxy_protocol">>,{pkg,<<"ranch_proxy_protocol">>,<<"1.5.0">>},1},
{<<"recon">>,{pkg,<<"recon">>,<<"2.3.2">>},1},
{<<"setup">>,{pkg,<<"setup">>,<<"1.8.4">>},1},
{<<"uuid">>,{pkg,<<"uuid_erl">>,<<"1.7.1">>},0}]}.
Expand All @@ -29,9 +29,9 @@
{<<"lager">>, <<"9D29C5FF7F926D25ECD9899990867C9152DCF34EEE65BAC8EC0DFC0D16A26E0C">>},
{<<"parse_trans">>, <<"9E96B1C9C3A0DF54E7B76F8F685D38BFA1EB21B31E042B1D1A5A70258E4DB1E3">>},
{<<"quickrand">>, <<"0E4FB48FAC904FE0C6E21D7E8C31A288A0700E1E81A35B38B649FC119079755D">>},
{<<"rabbit_common">>, <<"AB584F6D6E0515CC896EC8B908D3FA5397723F3722608ECDA743C0227E85FE7F">>},
{<<"ranch">>, <<"10272F95DA79340FA7E8774BA7930B901713D272905D0012B06CA6D994F8826B">>},
{<<"ranch_proxy_protocol">>, <<"8853B11757A9798E86C7D6D0FF783A8E2E87F77052AAD7F1C91108F254BA4A9C">>},
{<<"rabbit_common">>, <<"3346C2C65DF2528891C3113AA33E78AADBD5E6996E4230B937C2429F86861268">>},
{<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>},
{<<"ranch_proxy_protocol">>, <<"E698AAEB590AD504B649DC0D3055ABEE6CAF0B49D3CAEE1A080AE83B5B499F30">>},
{<<"recon">>, <<"4444C879BE323B1B133EEC5241CB84BD3821EA194C740D75617E106BE4744318">>},
{<<"setup">>, <<"738DB0685DC1741F45C6A9BF78478E0D5877F3D0876C0B50FD02F0210EDB5AA4">>},
{<<"uuid">>, <<"252D12D1154BC75C40BC03E2E67714BD843DDFAD40FD3BC1541F65F622C9E7FC">>}]}
Expand Down
19 changes: 19 additions & 0 deletions src/turtle_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
-module(turtle_config).
-include_lib("amqp_client/include/amqp_client.hrl").

% Query and Format API
-export([read_params/0, conn_params/1]).

% Validation API
-export([validate_conn_name/1]).

-spec read_params() -> [map()].
read_params() ->
{ok, Conf} = application:get_env(turtle, connection_config),
Conf.

-spec conn_params(map()) -> term(). % @todo fix this typespec
conn_params(Ps) ->
#amqp_params_network {
username = username(Ps),
Expand All @@ -30,3 +36,16 @@ conn_params(Ps) ->
username(#{ username := U }) -> list_to_binary(U).
password(#{ password := PW }) -> list_to_binary(PW).
virtual_host(#{ virtual_host := VH }) -> list_to_binary(VH).


-spec validate_conn_name(term()) -> ok | unknown_conn_name.
validate_conn_name(Name) ->
ConfigList = application:get_env(turtle, connection_config, []),
validate_conn_name(Name, ConfigList).

validate_conn_name(_, []) ->
unknown_conn_name;
validate_conn_name(Name, [#{ conn_name := Name } | _]) ->
ok;
validate_conn_name(Name, [_ | ConfigList]) ->
validate_conn_name(Name, ConfigList).
68 changes: 36 additions & 32 deletions src/turtle_publisher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

%% API
-export([
publish/6,
publish_sync/6,
rpc_call/6,
rpc_cancel/2,
publish/6,
publish_sync/6,
rpc_call/6,
rpc_cancel/2,
update_configuration/4
]).

%% gen_server Callbacks
-export([
init/1,
handle_call/3,
Expand All @@ -31,21 +32,21 @@
]).

-record(track_db, {
monitors = #{},
live = #{}
monitors = #{},
live = #{}
}).

-record(state, {
conn_name,
name,
channel,
conn_ref,
confirms,
reply_queue,
corr_id,
consumer_tag,
in_flight = #track_db{},
unacked = gb_trees:empty()
conn_name,
name,
channel,
conn_ref,
confirms,
reply_queue,
corr_id,
consumer_tag,
in_flight = #track_db{},
unacked = gb_trees:empty()
}).

-define(DEFAULT_OPTIONS,
Expand Down Expand Up @@ -74,6 +75,7 @@ child_spec(Name, Conn, Decls, Options) ->
%% be executed against AMQP when setting up.
%% @end
start_link(Name, Connection, Declarations) ->
ok = turtle_config:validate_conn_name(Connection),
Options = maps:merge(?DEFAULT_OPTIONS, #{ declarations => Declarations}),
gen_server:start_link(?MODULE, [Name, Connection, Options], []).

Expand All @@ -88,9 +90,10 @@ start_link(Name, Connection, Declarations) ->
%% </dl>
%% @end
start_link(Name, Connection, Declarations, InOptions) ->
ok = turtle_config:validate_conn_name(Connection),
Options = maps:merge(?DEFAULT_OPTIONS, InOptions),
gen_server:start_link(?MODULE,
[Name, Connection, Options#{ declarations := Declarations }], []).
[Name, Connection, Options#{ declarations := Declarations }], []).

%% @doc
%% This variant of publisher dynamically updates configuration of
Expand All @@ -99,6 +102,7 @@ start_link(Name, Connection, Declarations, InOptions) ->
%% new publisher and closes down
%% @end
start_link(takeover, Name, Connection, Declarations, InOptions) ->
ok = turtle_config:validate_conn_name(Connection),
Options = maps:merge(?DEFAULT_OPTIONS, InOptions),
gen_server:start_link(?MODULE, [{takeover, Name}, Connection,
Options#{declarations := Declarations }], []).
Expand Down Expand Up @@ -170,7 +174,7 @@ handle_call(_Pub, _From, {initializing, _, _, _, _} = Init) ->
handle_call(_Pub, _From, {initializing_takeover, _, _, _, _} = Init) ->
{reply, {error, initializing_takeover}, Init};
handle_call({Kind, {publish, Pub, Props, Payload}}, From,
#state {conn_name = ConnName, name = Name } = InState) ->
#state {conn_name = ConnName, name = Name } = InState) ->
Res = publish({Kind, From}, Pub, #amqp_msg { props = Props, payload = Payload }, InState),
exometer:update([ConnName, Name, Kind], 1),
Res;
Expand All @@ -195,7 +199,7 @@ handle_cast(Pub, {initializing_takeover, _, _, _, _} = Init) ->
lager:warning("Publish while takeover initialization: ~p", [Pub]),
{noreply, Init};
handle_cast({publish, Pub, Props, Payload},
#state { conn_name = ConnName, name = Name } = InState) ->
#state { conn_name = ConnName, name = Name } = InState) ->
case publish({cast, undefined}, Pub, #amqp_msg { props = Props, payload = Payload }, InState) of
{noreply, State} ->
exometer:update([ConnName, Name, casts], 1),
Expand Down Expand Up @@ -253,11 +257,11 @@ handle_info({gproc, Ref, registered, {_, Pid, _}}, {initializing_takeover, N, Re
consumer_tag = Tag,
name = N}};
handle_info(#'basic.ack' { delivery_tag = Seq, multiple = Multiple},
#state { confirms = true } = InState) ->
#state { confirms = true } = InState) ->
{ok, State} = confirm(ack, Seq, Multiple, InState),
{noreply, State};
handle_info(#'basic.nack' { delivery_tag = Seq, multiple = Multiple },
#state { confirms = true } = State) ->
#state { confirms = true } = State) ->
{ok, State} = confirm(nack, Seq, Multiple, State),
{noreply, State};
handle_info({channel_closed, Ch, Reason}, #state { channel = Ch } = State) ->
Expand Down Expand Up @@ -322,7 +326,7 @@ handle_rpc(_, _) -> {ok, undefined, undefined}.
%% @doc handle_deliver/3 handles delivery of responses from RPC calls
%% @end
handle_deliver(Tag, #amqp_msg { payload = Payload, props = Props },
#state { in_flight = IF, channel = Ch } = State) ->
#state { in_flight = IF, channel = Ch } = State) ->
#'P_basic' { content_type = Type, correlation_id = <<CorrID:64/integer>> } = Props,
ok = amqp_channel:cast(Ch, #'basic.ack' { delivery_tag = Tag }),
case track_lookup(CorrID, IF) of
Expand Down Expand Up @@ -354,12 +358,12 @@ mk_publish(Exch, Key, ContentType, IODataPayload, Opts) ->
routing_key = Key
},
Props = properties(ContentType, Opts),

%% Much to our dismay, the amqp_client will only accept payloads which are
%% already flattened binaries. We claim to support general iodata() input, so
%% we have to convert the payload into the right form, which the amqp_client
%% understands

Payload = iolist_to_binary(IODataPayload),
{publish, Pub, Props, Payload}.

Expand All @@ -374,13 +378,13 @@ publish({call, From}, Pub, AMQPMsg, #state{ channel = Ch, confirms = true, unac
T = turtle_time:monotonic_time(),
{noreply, State#state{ unacked = gb_trees:insert(Seq, {From, T}, UA) }};
publish({rpc_call, From}, Pub, AMQPMsg,
#state {
channel = Ch,
confirms = true,
unacked = UA,
in_flight = IF,
corr_id = CorrID,
reply_queue = ReplyQ } = State) ->
#state {
channel = Ch,
confirms = true,
unacked = UA,
in_flight = IF,
corr_id = CorrID,
reply_queue = ReplyQ } = State) ->
Seq = amqp_channel:next_publish_seqno(Ch),
#amqp_msg { props = Props } = AMQPMsg,
WithReply = AMQPMsg#amqp_msg{ props = Props#'P_basic' {
Expand All @@ -394,7 +398,7 @@ publish({rpc_call, From}, Pub, AMQPMsg,
unacked = gb_trees:insert(Seq, {rpc, From, T, CorrID}, UA),
in_flight = track(From, CorrID, T, IF) }};
publish({rpc_call, From}, Pub, AMQPMsg,
#state { channel = Ch, confirms = false, in_flight = IF, corr_id = CorrID, reply_queue = ReplyQ } = State) ->
#state { channel = Ch, confirms = false, in_flight = IF, corr_id = CorrID, reply_queue = ReplyQ } = State) ->
#amqp_msg { props = Props } = AMQPMsg,
WithReply = AMQPMsg#amqp_msg{ props = Props#'P_basic' {
reply_to = ReplyQ,
Expand Down
25 changes: 13 additions & 12 deletions src/turtle_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
%% For the definition of the `Conf' parameter, see {@link child_spec/1}
%% @end
start_link(#{ name := Name } = Conf) ->
validate_config(Conf),
supervisor:start_link({via, gproc, {n,l,{turtle,service,Name}}}, ?MODULE, [Conf]).

%% @doc Generate a child specification for this supervisor
Expand Down Expand Up @@ -54,7 +55,6 @@ start_link(#{ name := Name } = Conf) ->
%% <dd>true/false value - designates if we should force passive queue/exchange creation.</dd>
%% </dl>
child_spec(#{ name := Name } = Conf) ->
validate_config(Conf),
{Name, {?MODULE, start_link, [Conf]},
permanent, infinity, supervisor, [?MODULE]}.

Expand All @@ -74,17 +74,17 @@ init([#{ name := Name } = Conf]) ->
{turtle_subscriber_pool, start_link, [Name]},
permanent, infinity, supervisor, [turtle_subscriber_pool]},
{ok, { { one_for_all, 5, 3600}, [ChanMgr, Pool]}}.

validate_config(#{
name := N,
connection := C,
function := F,
handle_info := HI,
init_state := _IS,
declarations := Decls,
subscriber_count := SC,
prefetch_count := PC,
consume_queue := Q } = Conf)
name := N,
connection := C,
function := F,
handle_info := HI,
init_state := _IS,
declarations := Decls,
subscriber_count := SC,
prefetch_count := PC,
consume_queue := Q } = Conf)
when
is_atom(N),
is_atom(C),
Expand All @@ -94,7 +94,8 @@ validate_config(#{
is_integer(SC), SC > 0,
is_integer(PC), PC >= 0,
is_binary(Q) ->
ok = mode_ok(Conf).
ok = mode_ok(Conf),
ok = turtle_config:validate_conn_name(C).

mode_ok(#{ mode := Mode }) when Mode == bulk; Mode == single -> ok;
mode_ok(#{}) -> ok.
Loading

0 comments on commit 0ea1a0e

Please sign in to comment.