Skip to content

Commit

Permalink
wip: working pool
Browse files Browse the repository at this point in the history
pool is now working.

- connections are lazily handled
- fix #181
- fix #206
- fix #226
- fix #247
- fix #183

TODO:
-----

- For now HTTP requests don't work. it needs to be linked to the new
  connection system
- unitests needs to be added
- pool metrics
  • Loading branch information
benoitc committed Oct 25, 2015
1 parent 60e350e commit 769b6ce
Show file tree
Hide file tree
Showing 25 changed files with 1,082 additions and 1,374 deletions.
16 changes: 16 additions & 0 deletions include/hackney.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,19 @@
method = nil,
path,
ctype = nil}).



-define(DEFAULT_CACHE_SIZE, 1000).
-define(TAB, hackney_server).
-define(LOOKUP_CACHE, hackney_lookup).

%% default pool info
-define(DEFAULT_IDLE_TIMEOUT, 150000). %% default time until a connectino is forced to closed
-define(DEFAULT_GROUP_LIMIT, 6). %% max number of connections kept for a group
-define(DEFAULT_PROXY_LIMIT, 20). %% max number of connections cached / proxy
-define(DEFAULT_MAX_CONNS, 200). %% maximum number of connections kept

%% connectors options
-define(DEFAULT_NB_CONNECTORS, 20).
-define(DEFAULT_FALLBACK_TIME, 300).
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
{idna, "1.0.2"},
{mimerl, "1.0.0"},
{certifi, "0.1.1"},
{ssl_verify_hostname, "1.0.5"}
{ssl_verify_hostname, "1.0.5"},
{lru, "1.2.0"}
]}.

%% Not yet supported in rebar3
Expand Down
1 change: 1 addition & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[{<<"certifi">>,{pkg,<<"certifi">>,<<"0.1.1">>},0},
{<<"idna">>,{pkg,<<"idna">>,<<"1.0.2">>},0},
{<<"lru">>,{pkg,<<"lru">>,<<"1.2.0">>},0},
{<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.0">>},0},
{<<"ssl_verify_hostname">>,{pkg,<<"ssl_verify_hostname">>,<<"1.0.5">>},0}].
4 changes: 2 additions & 2 deletions src/hackney.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{application, hackney,
[
{description, "simple HTTP client"},
{vsn, "1.3.2"},
{registered, [hackney_pool]},
{vsn, "2.0.0"},
{registered, [hackney_sup, hackney_server]},
{applications, [kernel,
stdlib,
crypto,
Expand Down
31 changes: 24 additions & 7 deletions src/hackney.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
%%%

-module(hackney).
-export([start/0, start/1, stop/0]).
-export([start/0, stop/0]).
-export([start_pool/2, stop_pool/1, child_spec/2]).

-export([connect/1, connect/2, connect/3, connect/4,
close/1,
request_info/1,
Expand Down Expand Up @@ -55,15 +57,30 @@ start() ->
hackney_app:ensure_deps_started(),
application:start(hackney).

start(PoolHandler) ->
application:set_env(hackney, pool_handler, PoolHandler),
start().

%% @doc Stop the hackney process. Useful when testing using the shell.
stop() ->
application:stop(hackney).


start_pool(Name, Opts) ->
supervisor:start_child(hackney_sup, child_spec(Name, Opts)).


stop_pool(Name) ->
case supervisor:terminate_child(hackney_sup, {hackney_pool_sup, Name}) of
ok ->
%% make sure we delete the child on old erlang version
_ = supervisor:delete_child(hackney_sup, {hackney_pool_sup, Name}),
ok;
{error, Reason} ->
{error, Reason}
end.

child_spec(Name, Opts) ->
{{hackney_pool_sup, Name}, {hackney_pool_sup, start_link, [Name, Opts]},
permanent, infinity, supervisor, [hackney_pool_sup]}.


connect(URL) ->
connect(URL, []).

Expand Down Expand Up @@ -303,7 +320,7 @@ request(Method, #hackney_url{}=URL0, Headers, Body, Options0) ->
{body, Body},
{options, Options0}]),

#hackney_url{transport=Transport,
#hackney_url{scheme=Scheme,
host = Host,
port = Port,
user = User,
Expand All @@ -317,7 +334,7 @@ request(Method, #hackney_url{}=URL0, Headers, Body, Options0) ->
{basic_auth, {User, Password}})
end,

case maybe_proxy(Transport, Host, Port, Options) of
case maybe_proxy(Scheme, Host, Port, Options) of
{ok, Ref, AbsolutePath} ->
Request = make_request(Method, URL, Headers, Body,
Options, AbsolutePath),
Expand Down
1 change: 1 addition & 0 deletions src/hackney_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ get_app_env(Key, Default) ->
{ok, Val} -> Val;
undefined -> Default
end.

84 changes: 84 additions & 0 deletions src/hackney_server.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
-module(hackney_server).
-behaviour(gen_server).


-export([start_link/0]).
-export([get_pool/1,
register_pool/2]).


-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

-include("hackney.hrl").

-define(SERVER, ?MODULE).
-record(state, {monitors=[]}).

start_link() ->
_ = create_tabs(),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).




get_pool(Ref) ->
ets:lookup_element(?TAB, {pool, Ref}, 2).

register_pool(Ref, Pid) ->
case ets:insert_new(?TAB, {{pool, Ref}, self()}) of
false -> false;
true ->
%% registering a pool is synchronous
call({monitor_pool, Ref, Pid}),
true
end.


call(Msg) ->
gen_server:call(?MODULE, Msg).

create_tabs() ->
case ets:info(?TAB, name) of
undefined ->
ets:new(?TAB, [ordered_set, public, named_table,
{read_concurrency, true},
{write_concurrency, true}]);
_ ->
ok
end.


init([]) ->
%% reste monitor for pools
Monitors = init_monitors(),
{ok, #state{monitors=Monitors}}.

handle_call({monitor_pool, Ref, Pid}, _From, State) ->
#state{monitors=Monitors}=State,
MRef = erlang:monitor(process, Pid),
{reply, ok, State#state{monitors=[{{MRef, Pid}, Ref} | Monitors]}};

handle_call(_Msg, _From, State) ->
{reply, badarg, State}.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'DOWN', MRef, process, Pid}, #state{monitors=Monitors}=State) ->
{_, Ref} = lists:keyfind({MRef, Pid}, 1, Monitors),
true = ets:delete(?TAB, {pool, Ref}),
{noreply, State#state{monitors=lists:keydelete({MRef, Pid}, 1, Monitors)}};
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.


init_monitors() ->
[{{erlang:monitor(process, Pid), Pid}, Ref} ||
[Ref, Pid] <- ets:match(?TAB, {{pool, '$1'}, '$2'})].
48 changes: 41 additions & 7 deletions src/hackney_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
%% Supervisor callbacks
-export([init/1]).


-include("hackney.hrl").

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

Expand All @@ -25,19 +28,50 @@

start_link() ->
{ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
%% start the pool handler
PoolHandler = hackney_app:get_app_env(pool_handler, hackney_pool),
ok = PoolHandler:start(),

%% finish to start the application
ok = init_pools(),
{ok, Pid}.

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

init([]) ->

Manager = ?CHILD(hackney_manager, worker),

{ok, { {one_for_one, 10, 1}, [Manager]}}.
%% server maintaing some meta data related to connections
Server = {hackney_server,
{hackney_server, start_link, []},
permanent, 5000, worker, [hackney_server]},

%% cache used to store host lookup prefereneces
CacheSize = application:get_env(hackney, lookup_cache_size, ?DEFAULT_CACHE_SIZE),
Cache = {?LOOKUP_CACHE,
{lru, start_link, [{local, ?LOOKUP_CACHE}, CacheSize, []]},
permanent, 5000, worker, [lru]},


{ok, { {one_for_one, 10, 1}, [Manager, Server, Cache]}}.


default_pool() ->
IdleTimeout = hackney_util:get_env(idle_timeout, ?DEFAULT_IDLE_TIMEOUT),
GroupLimit = hackney_util:get_env(group_limit, ?DEFAULT_GROUP_LIMIT),
ProxyLimit = hackney_util:get_env(proxy_limit, ?DEFAULT_PROXY_LIMIT),
MaxConns = hackney_util:get_env(max_connections, ?DEFAULT_MAX_CONNS),

[{idle_timeout, IdleTimeout},
{group_limit, GroupLimit},
{proxy_limit, ProxyLimit},
{max_conns, MaxConns}].

init_pools() ->
Pools0 = hackney_util:get_env(pools, []),
DefaultPool = default_pool(),
Pools = case lists:member(default, Pools0) of
false -> [{default, DefaultPool} | Pools0];
true -> Pools0
end,
lists:foreach(fun({Name, Opts}) ->
{ok, _} = hackney:start_pool(Name, Opts)
end, Pools),
ok.
21 changes: 20 additions & 1 deletion src/hackney_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
-export([privdir/0]).
-export([mod_metrics/0]).
-export([to_atom/1]).

-export([get_env/1, get_env/2]).
-export([get_opt/2, get_opt/3]).

-include("hackney.hrl").

Expand Down Expand Up @@ -127,3 +128,21 @@ to_atom(V) when is_binary(V) ->
to_atom(binary_to_list(V));
to_atom(V) when is_atom(V) ->
V.

get_env(Key) ->
get_env(Key, undefined).

get_env(Key, Default) ->
case application:get_env(hackney, Key) of
{ok, Value} -> Value;
_ -> Default
end.

get_opt(Key, Opts) ->
get_opt(Key, Opts, undefined).

get_opt(Key, Opts, Default) ->
case proplists:get_value(Key, Opts) of
undefined -> get_env(Key, Default);
Value -> Value
end.
32 changes: 14 additions & 18 deletions src/http/hackney_url.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
-module(hackney_url).

-export([parse_url/1,
transport_scheme/1,
unparse_url/1,
urldecode/1, urldecode/2,
urlencode/1, urlencode/2,
Expand All @@ -21,7 +20,8 @@
make_url/3,
fix_path/1,
pathencode/1,
normalize/1]).
normalize/1,
is_secure/1]).

-include("hackney_lib.hrl").

Expand All @@ -37,14 +37,11 @@ parse_url(URL) when is_list(URL) ->
parse_url(unicode:characters_to_binary(list_to_binary(URL)))
end;
parse_url(<<"http://", Rest/binary>>) ->
parse_url(Rest, #hackney_url{transport=hackney_tcp,
scheme=http});
parse_url(Rest, #hackney_url{scheme=http});
parse_url(<<"https://", Rest/binary>>) ->
parse_url(Rest, #hackney_url{transport=hackney_ssl,
scheme=https});
parse_url(Rest, #hackney_url{scheme=https});
parse_url(URL) ->
parse_url(URL, #hackney_url{transport=hackney_tcp,
scheme=http}).
parse_url(URL, #hackney_url{scheme=http}).
parse_url(URL, S) ->
{Addr, RawPath} =
case binary:split(URL, <<"/">>) of
Expand All @@ -67,6 +64,9 @@ parse_url(URL, S) ->
fragment = Fragment})
end.

is_secure(#hackney_url{scheme=https}) -> true;
is_secure(_) -> false.

%% @doc Normalizes the encoding of a Url
normalize(Url) when is_list(Url) orelse is_binary(Url) ->
normalize(parse_url(Url));
Expand Down Expand Up @@ -99,10 +99,6 @@ normalize(#hackney_url{}=Url) ->
Path1 = pathencode(Path),
Url#hackney_url{host=Host, netloc=Netloc, path=Path1}.

transport_scheme(hackney_tcp) ->
http;
transport_scheme(hackney_ssl) ->
https.

unparse_url(#hackney_url{}=Url) ->
#hackney_url{scheme = Scheme,
Expand Down Expand Up @@ -165,11 +161,11 @@ parse_addr(Addr, S) ->

end.

parse_netloc(<<"[", Rest/binary>>, #hackney_url{transport=Transport}=S) ->
parse_netloc(<<"[", Rest/binary>>, #hackney_url{scheme=Scheme}=S) ->
case binary:split(Rest, <<"]">>) of
[Host, <<>>] when Transport =:= hackney_tcp ->
[Host, <<>>] when Scheme =:= http ->
S#hackney_url{host=binary_to_list(Host), port=80};
[Host, <<>>] when Transport =:= hackney_ssl ->
[Host, <<>>] when Scheme =:= https ->
S#hackney_url{host=binary_to_list(Host), port=443};
[Host, <<":", Port/binary>>] ->
S#hackney_url{host=binary_to_list(Host),
Expand All @@ -178,12 +174,12 @@ parse_netloc(<<"[", Rest/binary>>, #hackney_url{transport=Transport}=S) ->
parse_netloc(Rest, S)
end;

parse_netloc(Netloc, #hackney_url{transport=Transport}=S) ->
parse_netloc(Netloc, #hackney_url{scheme=Scheme}=S) ->
case binary:split(Netloc, <<":">>) of
[Host] when Transport =:= hackney_tcp ->
[Host] when Scheme =:= http ->
S#hackney_url{host=unicode:characters_to_list((Host)),
port=80};
[Host] when Transport =:= hackney_ssl ->
[Host] when Scheme =:= https ->
S#hackney_url{host=unicode:characters_to_list(Host),
port=443};
[Host, Port] ->
Expand Down
Loading

0 comments on commit 769b6ce

Please sign in to comment.