diff --git a/include/hackney.hrl b/include/hackney.hrl index 845f6380..b606308a 100644 --- a/include/hackney.hrl +++ b/include/hackney.hrl @@ -43,3 +43,19 @@ method = nil, path, ctype = nil}). + + + +-define(DEFAULT_CACHE_SIZE, 1000). +-define(TAB, hackney_server). +-define(LOOKUP_CACHE, hackney_lookup). + +%% default pool info +-define(DEFAULT_IDLE_TIMEOUT, 150000). %% default time until a connectino is forced to closed +-define(DEFAULT_GROUP_LIMIT, 6). %% max number of connections kept for a group +-define(DEFAULT_PROXY_LIMIT, 20). %% max number of connections cached / proxy +-define(DEFAULT_MAX_CONNS, 200). %% maximum number of connections kept + +%% connectors options +-define(DEFAULT_NB_CONNECTORS, 20). +-define(DEFAULT_FALLBACK_TIME, 300). diff --git a/rebar.config b/rebar.config index f0ecc6e0..f68478b7 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,8 @@ {idna, "1.0.2"}, {mimerl, "1.0.0"}, {certifi, "0.1.1"}, - {ssl_verify_hostname, "1.0.5"} + {ssl_verify_hostname, "1.0.5"}, + {lru, "1.2.0"} ]}. %% Not yet supported in rebar3 diff --git a/rebar.lock b/rebar.lock index 97fd7e52..4db65518 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,4 +1,5 @@ [{<<"certifi">>,{pkg,<<"certifi">>,<<"0.1.1">>},0}, {<<"idna">>,{pkg,<<"idna">>,<<"1.0.2">>},0}, + {<<"lru">>,{pkg,<<"lru">>,<<"1.2.0">>},0}, {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.0.0">>},0}, {<<"ssl_verify_hostname">>,{pkg,<<"ssl_verify_hostname">>,<<"1.0.5">>},0}]. diff --git a/src/hackney.app.src b/src/hackney.app.src index 17ca44be..49e18a6d 100644 --- a/src/hackney.app.src +++ b/src/hackney.app.src @@ -4,8 +4,8 @@ {application, hackney, [ {description, "simple HTTP client"}, - {vsn, "1.3.2"}, - {registered, [hackney_pool]}, + {vsn, "2.0.0"}, + {registered, [hackney_sup, hackney_server]}, {applications, [kernel, stdlib, crypto, diff --git a/src/hackney.erl b/src/hackney.erl index 7b557f82..95b2f2d9 100644 --- a/src/hackney.erl +++ b/src/hackney.erl @@ -5,7 +5,9 @@ %%% -module(hackney). --export([start/0, start/1, stop/0]). +-export([start/0, stop/0]). +-export([start_pool/2, stop_pool/1, child_spec/2]). + -export([connect/1, connect/2, connect/3, connect/4, close/1, request_info/1, @@ -55,15 +57,30 @@ start() -> hackney_app:ensure_deps_started(), application:start(hackney). -start(PoolHandler) -> - application:set_env(hackney, pool_handler, PoolHandler), - start(). - %% @doc Stop the hackney process. Useful when testing using the shell. stop() -> application:stop(hackney). +start_pool(Name, Opts) -> + supervisor:start_child(hackney_sup, child_spec(Name, Opts)). + + +stop_pool(Name) -> + case supervisor:terminate_child(hackney_sup, {hackney_pool_sup, Name}) of + ok -> + %% make sure we delete the child on old erlang version + _ = supervisor:delete_child(hackney_sup, {hackney_pool_sup, Name}), + ok; + {error, Reason} -> + {error, Reason} + end. + +child_spec(Name, Opts) -> + {{hackney_pool_sup, Name}, {hackney_pool_sup, start_link, [Name, Opts]}, + permanent, infinity, supervisor, [hackney_pool_sup]}. + + connect(URL) -> connect(URL, []). @@ -303,7 +320,7 @@ request(Method, #hackney_url{}=URL0, Headers, Body, Options0) -> {body, Body}, {options, Options0}]), - #hackney_url{transport=Transport, + #hackney_url{scheme=Scheme, host = Host, port = Port, user = User, @@ -317,7 +334,7 @@ request(Method, #hackney_url{}=URL0, Headers, Body, Options0) -> {basic_auth, {User, Password}}) end, - case maybe_proxy(Transport, Host, Port, Options) of + case maybe_proxy(Scheme, Host, Port, Options) of {ok, Ref, AbsolutePath} -> Request = make_request(Method, URL, Headers, Body, Options, AbsolutePath), diff --git a/src/hackney_app.erl b/src/hackney_app.erl index 1c772c8c..7025ca21 100644 --- a/src/hackney_app.erl +++ b/src/hackney_app.erl @@ -51,3 +51,4 @@ get_app_env(Key, Default) -> {ok, Val} -> Val; undefined -> Default end. + diff --git a/src/hackney_server.erl b/src/hackney_server.erl new file mode 100644 index 00000000..8684d8c4 --- /dev/null +++ b/src/hackney_server.erl @@ -0,0 +1,84 @@ +-module(hackney_server). +-behaviour(gen_server). + + +-export([start_link/0]). +-export([get_pool/1, + register_pool/2]). + + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-include("hackney.hrl"). + +-define(SERVER, ?MODULE). +-record(state, {monitors=[]}). + +start_link() -> + _ = create_tabs(), + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + + + + +get_pool(Ref) -> + ets:lookup_element(?TAB, {pool, Ref}, 2). + +register_pool(Ref, Pid) -> + case ets:insert_new(?TAB, {{pool, Ref}, self()}) of + false -> false; + true -> + %% registering a pool is synchronous + call({monitor_pool, Ref, Pid}), + true + end. + + +call(Msg) -> + gen_server:call(?MODULE, Msg). + +create_tabs() -> + case ets:info(?TAB, name) of + undefined -> + ets:new(?TAB, [ordered_set, public, named_table, + {read_concurrency, true}, + {write_concurrency, true}]); + _ -> + ok + end. + + +init([]) -> + %% reste monitor for pools + Monitors = init_monitors(), + {ok, #state{monitors=Monitors}}. + +handle_call({monitor_pool, Ref, Pid}, _From, State) -> + #state{monitors=Monitors}=State, + MRef = erlang:monitor(process, Pid), + {reply, ok, State#state{monitors=[{{MRef, Pid}, Ref} | Monitors]}}; + +handle_call(_Msg, _From, State) -> + {reply, badarg, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', MRef, process, Pid}, #state{monitors=Monitors}=State) -> + {_, Ref} = lists:keyfind({MRef, Pid}, 1, Monitors), + true = ets:delete(?TAB, {pool, Ref}), + {noreply, State#state{monitors=lists:keydelete({MRef, Pid}, 1, Monitors)}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +init_monitors() -> + [{{erlang:monitor(process, Pid), Pid}, Ref} || + [Ref, Pid] <- ets:match(?TAB, {{pool, '$1'}, '$2'})]. diff --git a/src/hackney_sup.erl b/src/hackney_sup.erl index 43115eb2..7c85e56a 100644 --- a/src/hackney_sup.erl +++ b/src/hackney_sup.erl @@ -16,6 +16,9 @@ %% Supervisor callbacks -export([init/1]). + +-include("hackney.hrl"). + %% Helper macro for declaring children of supervisor -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). @@ -25,11 +28,7 @@ start_link() -> {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), - %% start the pool handler - PoolHandler = hackney_app:get_app_env(pool_handler, hackney_pool), - ok = PoolHandler:start(), - - %% finish to start the application + ok = init_pools(), {ok, Pid}. %% =================================================================== @@ -37,7 +36,42 @@ start_link() -> %% =================================================================== init([]) -> - Manager = ?CHILD(hackney_manager, worker), - {ok, { {one_for_one, 10, 1}, [Manager]}}. + %% server maintaing some meta data related to connections + Server = {hackney_server, + {hackney_server, start_link, []}, + permanent, 5000, worker, [hackney_server]}, + + %% cache used to store host lookup prefereneces + CacheSize = application:get_env(hackney, lookup_cache_size, ?DEFAULT_CACHE_SIZE), + Cache = {?LOOKUP_CACHE, + {lru, start_link, [{local, ?LOOKUP_CACHE}, CacheSize, []]}, + permanent, 5000, worker, [lru]}, + + + {ok, { {one_for_one, 10, 1}, [Manager, Server, Cache]}}. + + +default_pool() -> + IdleTimeout = hackney_util:get_env(idle_timeout, ?DEFAULT_IDLE_TIMEOUT), + GroupLimit = hackney_util:get_env(group_limit, ?DEFAULT_GROUP_LIMIT), + ProxyLimit = hackney_util:get_env(proxy_limit, ?DEFAULT_PROXY_LIMIT), + MaxConns = hackney_util:get_env(max_connections, ?DEFAULT_MAX_CONNS), + + [{idle_timeout, IdleTimeout}, + {group_limit, GroupLimit}, + {proxy_limit, ProxyLimit}, + {max_conns, MaxConns}]. + +init_pools() -> + Pools0 = hackney_util:get_env(pools, []), + DefaultPool = default_pool(), + Pools = case lists:member(default, Pools0) of + false -> [{default, DefaultPool} | Pools0]; + true -> Pools0 + end, + lists:foreach(fun({Name, Opts}) -> + {ok, _} = hackney:start_pool(Name, Opts) + end, Pools), + ok. diff --git a/src/hackney_util.erl b/src/hackney_util.erl index 611cd302..66b0b330 100644 --- a/src/hackney_util.erl +++ b/src/hackney_util.erl @@ -13,7 +13,8 @@ -export([privdir/0]). -export([mod_metrics/0]). -export([to_atom/1]). - +-export([get_env/1, get_env/2]). +-export([get_opt/2, get_opt/3]). -include("hackney.hrl"). @@ -127,3 +128,21 @@ to_atom(V) when is_binary(V) -> to_atom(binary_to_list(V)); to_atom(V) when is_atom(V) -> V. + +get_env(Key) -> + get_env(Key, undefined). + +get_env(Key, Default) -> + case application:get_env(hackney, Key) of + {ok, Value} -> Value; + _ -> Default + end. + +get_opt(Key, Opts) -> + get_opt(Key, Opts, undefined). + +get_opt(Key, Opts, Default) -> + case proplists:get_value(Key, Opts) of + undefined -> get_env(Key, Default); + Value -> Value + end. \ No newline at end of file diff --git a/src/http/hackney_url.erl b/src/http/hackney_url.erl index 2000c5cc..e0a687bf 100644 --- a/src/http/hackney_url.erl +++ b/src/http/hackney_url.erl @@ -12,7 +12,6 @@ -module(hackney_url). -export([parse_url/1, - transport_scheme/1, unparse_url/1, urldecode/1, urldecode/2, urlencode/1, urlencode/2, @@ -21,7 +20,8 @@ make_url/3, fix_path/1, pathencode/1, - normalize/1]). + normalize/1, + is_secure/1]). -include("hackney_lib.hrl"). @@ -37,14 +37,11 @@ parse_url(URL) when is_list(URL) -> parse_url(unicode:characters_to_binary(list_to_binary(URL))) end; parse_url(<<"http://", Rest/binary>>) -> - parse_url(Rest, #hackney_url{transport=hackney_tcp, - scheme=http}); + parse_url(Rest, #hackney_url{scheme=http}); parse_url(<<"https://", Rest/binary>>) -> - parse_url(Rest, #hackney_url{transport=hackney_ssl, - scheme=https}); + parse_url(Rest, #hackney_url{scheme=https}); parse_url(URL) -> - parse_url(URL, #hackney_url{transport=hackney_tcp, - scheme=http}). + parse_url(URL, #hackney_url{scheme=http}). parse_url(URL, S) -> {Addr, RawPath} = case binary:split(URL, <<"/">>) of @@ -67,6 +64,9 @@ parse_url(URL, S) -> fragment = Fragment}) end. +is_secure(#hackney_url{scheme=https}) -> true; +is_secure(_) -> false. + %% @doc Normalizes the encoding of a Url normalize(Url) when is_list(Url) orelse is_binary(Url) -> normalize(parse_url(Url)); @@ -99,10 +99,6 @@ normalize(#hackney_url{}=Url) -> Path1 = pathencode(Path), Url#hackney_url{host=Host, netloc=Netloc, path=Path1}. -transport_scheme(hackney_tcp) -> - http; -transport_scheme(hackney_ssl) -> - https. unparse_url(#hackney_url{}=Url) -> #hackney_url{scheme = Scheme, @@ -165,11 +161,11 @@ parse_addr(Addr, S) -> end. -parse_netloc(<<"[", Rest/binary>>, #hackney_url{transport=Transport}=S) -> +parse_netloc(<<"[", Rest/binary>>, #hackney_url{scheme=Scheme}=S) -> case binary:split(Rest, <<"]">>) of - [Host, <<>>] when Transport =:= hackney_tcp -> + [Host, <<>>] when Scheme =:= http -> S#hackney_url{host=binary_to_list(Host), port=80}; - [Host, <<>>] when Transport =:= hackney_ssl -> + [Host, <<>>] when Scheme =:= https -> S#hackney_url{host=binary_to_list(Host), port=443}; [Host, <<":", Port/binary>>] -> S#hackney_url{host=binary_to_list(Host), @@ -178,12 +174,12 @@ parse_netloc(<<"[", Rest/binary>>, #hackney_url{transport=Transport}=S) -> parse_netloc(Rest, S) end; -parse_netloc(Netloc, #hackney_url{transport=Transport}=S) -> +parse_netloc(Netloc, #hackney_url{scheme=Scheme}=S) -> case binary:split(Netloc, <<":">>) of - [Host] when Transport =:= hackney_tcp -> + [Host] when Scheme =:= http -> S#hackney_url{host=unicode:characters_to_list((Host)), port=80}; - [Host] when Transport =:= hackney_ssl -> + [Host] when Scheme =:= https -> S#hackney_url{host=unicode:characters_to_list(Host), port=443}; [Host, Port] -> diff --git a/src/socket/hackney_connect.erl b/src/socket/hackney_connect.erl deleted file mode 100644 index 4371aec6..00000000 --- a/src/socket/hackney_connect.erl +++ /dev/null @@ -1,364 +0,0 @@ -%%% -*- erlang -*- -%%% -%%% This file is part of hackney released under the Apache 2 license. -%%% See the NOTICE for more information. -%%% --module(hackney_connect). - --export([connect/3, connect/4, connect/5, - create_connection/4, create_connection/5, - maybe_connect/1, - reconnect/4, - set_sockopts/2, - ssl_opts/2, - check_or_close/1, - close/1, - is_pool/1]). - - --include("hackney.hrl"). --include_lib("../hackney_internal.hrl"). --include_lib("public_key/include/OTP-PUB-KEY.hrl"). - -connect(Transport, Host, Port) -> - connect(Transport, Host, Port, []). - -connect(Transport, Host, Port, Options) -> - connect(Transport, Host, Port, Options, false). - -connect(Transport, Host, Port, Options, Dynamic) when is_binary(Host) -> - connect(Transport, binary_to_list(Host), Port, Options, Dynamic); -connect(Transport, Host, Port, Options, Dynamic) -> - ?report_debug("connect", [{transport, Transport}, - {host, Host}, - {port, Port}, - {dynamic, Dynamic}]), - case create_connection(Transport, idna:utf8_to_ascii(Host), Port, - Options, Dynamic) of - {ok, #client{request_ref=Ref}} -> - {ok, Ref}; - Error -> - Error - end. - - -%% @doc create a connection and return a client state -create_connection(Transport, Host, Port, Options) -> - create_connection(Transport, Host, Port, Options, true). - -create_connection(Transport, Host, Port, Options, Dynamic) - when is_list(Options) -> - Netloc = case {Transport, Port} of - {hackney_tcp, 80} -> list_to_binary(Host); - {hackney_ssl, 443} -> list_to_binary(Host); - _ -> - iolist_to_binary([Host, ":", integer_to_list(Port)]) - end, - %% default timeout - Timeout = proplists:get_value(recv_timeout, Options, ?RECV_TIMEOUT), - FollowRedirect = proplists:get_value(follow_redirect, Options, false), - MaxRedirect = proplists:get_value(max_redirect, Options, 5), - ForceRedirect = proplists:get_value(force_redirect, Options, false), - Async = proplists:get_value(async, Options, false), - StreamTo = proplists:get_value(stream_to, Options, false), - WithBody = proplists:get_value(with_body, Options, false), - MaxBody = proplists:get_value(max_body, Options), - - %% get mod metrics - Mod = hackney_util:mod_metrics(), - - %% initial state - InitialState = #client{mod_metrics=Mod, - transport=Transport, - host=Host, - port=Port, - netloc=Netloc, - options=Options, - dynamic=Dynamic, - recv_timeout=Timeout, - follow_redirect=FollowRedirect, - max_redirect=MaxRedirect, - retries=MaxRedirect, - force_redirect=ForceRedirect, - async=Async, - with_body=WithBody, - max_body=MaxBody, - stream_to=StreamTo, - buffer = <<>>}, - %% if we use a pool then checkout the connection from the pool, else - %% connect the socket to the remote - %% - reconnect(Host, Port, Transport, InitialState). - - -%% @doc connect a socket and create a client state. -%% -maybe_connect(#client{state=closed, redirect=nil}=Client) -> - %% the socket has been closed, reconnect it. - #client{transport=Transport, - host=Host, - port=Port} = Client, - reconnect(Host, Port, Transport, Client); -maybe_connect(#client{state=closed, redirect=Redirect}=Client) -> - %% connection closed after a redirection, reinit the options and - %% reconnect it. - {Transport, Host, Port, Options} = Redirect, - Client1 = Client#client{options=Options, - redirect=nil}, - reconnect(Host, Port, Transport, Client1); -maybe_connect(#client{redirect=nil}=Client) -> - {ok, check_mod_metrics(Client)}; -maybe_connect(#client{redirect=Redirect}=Client) -> - %% reinit the options and reconnect the client - {Transport, Host, Port, Options} = Redirect, - reconnect(Host, Port, Transport, Client#client{options=Options, - redirect=nil}). - -check_or_close(#client{socket=nil}=Client) -> - Client; -check_or_close(Client) -> - case is_pool(Client) of - false -> - close(Client); - true -> - #client{socket=Socket, socket_ref=Ref, pool_handler=Handler}=Client, - _ = Handler:checkin(Ref, Socket), - Client#client{socket=nil, state=closed} - end. - - - -%% @doc add set sockets options in the client -set_sockopts(#client{transport=Transport, socket=Skt}, Options) -> - Transport:setopts(Skt, Options). - - -%% @doc close the client -%% -%% -close(#client{socket=nil}=Client) -> - Client#client{state = closed}; -close(#client{transport=Transport, socket=Skt}=Client) -> - Transport:close(Skt), - Client#client{state = closed, socket=nil}; -close(Ref) when is_reference(Ref) -> - hackney_manager:close_request(Ref). - - -%% @doc get current pool pid or name used by a client if needed -is_pool(#client{options=Opts}) -> - UseDefaultPool = use_default_pool(), - case proplists:get_value(pool, Opts) of - false -> - false; - undefined when UseDefaultPool =:= true -> - true; - undefined -> - false; - _ -> - true - end. - -reconnect(Host, Port, Transport, State) -> - %% if we use a pool then checkout the connection from the pool, else - %% connect the socket to the remote - case is_pool(State) of - false -> - %% the client won't use any pool - do_connect(Host, Port, Transport, check_mod_metrics(State)); - true -> - socket_from_pool(Host, Port, Transport, check_mod_metrics(State)) - end. - -%% -%% internal functions -%% - -socket_from_pool(Host, Port, Transport, Client0) -> - PoolHandler = hackney_app:get_app_env(pool_handler, hackney_pool), - PoolName = proplists:get_value(pool, Client0#client.options, default), - Mod = Client0#client.mod_metrics, - - %% new request - {_RequestRef, Client} = hackney_manager:new_request(Client0), - - case PoolHandler:checkout(Host, Port, Transport, Client) of - {ok, Ref, Skt} -> - ?report_debug("reuse a connection", [{pool, PoolName}]), - Mod:update_meter([hackney_pool, PoolName, take_rate], 1), - Mod:increment_counter([hackney_pool, Host, reuse_connection]), - Client1 = Client#client{socket=Skt, - socket_ref=Ref, - pool_handler=PoolHandler, - state = connected}, - - hackney_manager:update_state(Client1), - {ok, Client1}; - {error, no_socket, Ref} -> - ?report_trace("no socket in the pool", [{pool, PoolName}]), - - Mod:increment_counter([hackney_pool, PoolName, no_socket]), - do_connect(Host, Port, Transport, Client#client{socket_ref=Ref}, - pool); - Error -> - Error - end. - -do_connect(Host, Port, Transport, Client) -> - do_connect(Host, Port, Transport, Client, direct). - - - -do_connect(Host, Port, Transport, #client{mod_metrics=Mod, - options=Opts}=Client0, Type) -> - Begin = os:timestamp(), - {_RequestRef, Client} = case Type of - pool -> - {Client0#client.request_ref, Client0}; - direct -> - hackney_manager:new_request(Client0) - end, - - ConnectOpts0 = proplists:get_value(connect_options, Opts, []), - ConnectTimeout = proplists:get_value(connect_timeout, Opts, 8000), - - %% handle ipv6 - ConnectOpts1 = case lists:member(inet, ConnectOpts0) orelse - lists:member(inet6, ConnectOpts0) of - true -> - ConnectOpts0; - false -> - case hackney_util:is_ipv6(Host) of - true -> - [inet6 | ConnectOpts0]; - false -> - ConnectOpts0 - end - end, - - ConnectOpts = case Transport of - hackney_ssl -> - ConnectOpts1 ++ ssl_opts(Host, Opts); - _ -> - ConnectOpts1 - end, - case Transport:connect(Host, Port, ConnectOpts, ConnectTimeout) of - {ok, Skt} -> - ?report_trace("new connection", []), - ConnectTime = timer:now_diff(os:timestamp(), Begin)/1000, - Mod:update_histogram([hackney, Host, connect_time], ConnectTime), - Mod:increment_counter([hackney_pool, Host, new_connection]), - Client1 = Client#client{socket=Skt, - state = connected}, - hackney_manager:update_state(Client1), - {ok, Client1}; - {error, timeout} -> - ?report_trace("connect timeout", []), - Mod:increment_counter([hackney, Host, connect_timeout]), - hackney_manager:cancel_request(Client), - {error, connect_timeout}; - Error -> - ?report_trace("connect error", []), - Mod:increment_counter([hackney, Host, connect_error]), - hackney_manager:cancel_request(Client), - Error - end. - - -use_default_pool() -> - case application:get_env(hackney, use_default_pool) of - {ok, Val} -> - Val; - _ -> - true - end. - -check_mod_metrics(#client{mod_metrics=Mod}=State) - when Mod /= nil, Mod /= undefined -> - State; -check_mod_metrics(State) -> - State#client{mod_metrics=hackney_util:mod_metrics()}. - -ssl_opts(Host, Options) -> - case proplists:get_value(ssl_options, Options) of - undefined -> - Insecure = proplists:get_value(insecure, Options), - UseSecureSsl = check_ssl_version(), - CACerts = certifi:cacerts(), - - case {Insecure, UseSecureSsl} of - {true, _} -> - [{verify, verify_none}, - {reuse_sessions, true}]; - {_, true} -> - - VerifyFun = {fun ssl_verify_hostname:verify_fun/3, - [{check_hostname, Host}]}, - [{verify, verify_peer}, - {depth, 99}, - {cacerts, CACerts}, - {partial_chain, fun partial_chain/1}, - {verify_fun, VerifyFun}]; - {_, _} -> - [{cacerts, CACerts}, - {verify, verify_peer}, {depth, 2}] - end; - SSLOpts -> - SSLOpts - end. - -%% code from rebar3 undert BSD license -partial_chain(Certs) -> - Certs1 = lists:reverse([{Cert, public_key:pkix_decode_cert(Cert, otp)} || - Cert <- Certs]), - CACerts = certifi:cacerts(), - CACerts1 = [public_key:pkix_decode_cert(Cert, otp) || Cert <- CACerts], - - - case find(fun({_, Cert}) -> - check_cert(CACerts1, Cert) - end, Certs1) of - {ok, Trusted} -> - {trusted_ca, element(1, Trusted)}; - _ -> - unknown_ca - end. - -extract_public_key_info(Cert) -> - ((Cert#'OTPCertificate'.tbsCertificate)#'OTPTBSCertificate'.subjectPublicKeyInfo). - -check_cert(CACerts, Cert) -> - lists:any(fun(CACert) -> - extract_public_key_info(CACert) == extract_public_key_info(Cert) - end, CACerts). - -check_ssl_version() -> - case application:get_key(ssl, vsn) of - {ok, Vsn} -> - parse_vsn(Vsn) >= {5, 3, 6}; - _ -> - false - end. - -parse_vsn(Vsn) -> - version_pad(string:tokens(Vsn, ".")). - -version_pad([Major]) -> - {list_to_integer(Major), 0, 0}; -version_pad([Major, Minor]) -> - {list_to_integer(Major), list_to_integer(Minor), 0}; -version_pad([Major, Minor, Patch]) -> - {list_to_integer(Major), list_to_integer(Minor), list_to_integer(Patch)}; -version_pad([Major, Minor, Patch | _]) -> - {list_to_integer(Major), list_to_integer(Minor), list_to_integer(Patch)}. - --spec find(fun(), list()) -> {ok, term()} | error. -find(Fun, [Head|Tail]) when is_function(Fun) -> - case Fun(Head) of - true -> - {ok, Head}; - false -> - find(Fun, Tail) - end; -find(_Fun, []) -> - error. diff --git a/src/socket/hackney_connector.erl b/src/socket/hackney_connector.erl new file mode 100644 index 00000000..d8793a4d --- /dev/null +++ b/src/socket/hackney_connector.erl @@ -0,0 +1,101 @@ +-module(hackney_connector). + +-export([start_link/2, + init/2]). + +-include("hackney.hrl"). +-include("hackney_socket.hrl"). + +-record(state, {pool, + fallback_time}). + +-define(DEFAULT_LOOKUP_ORDER, [inet6, inet]). + +start_link(Pool, FallbackTime) -> + Pid = spawn_link(?MODULE, init, [Pool, FallbackTime]), + {ok, Pid}. + + +init(Pool, FallbackTime) -> + hackney_pool:register_connector(Pool, self()), + loop(#state{pool=Pool, fallback_time=FallbackTime}). + + +loop(State) -> + receive + {connect, Group, {Host, Port, Options, Timeout}} -> + Ref = make_ref(), + Req = {Group, Host, Port, Options, Timeout}, + LookupOrder = lru:get(?LOOKUP_CACHE, Host, ?DEFAULT_LOOKUP_ORDER), + handle_connect(Ref, Req, LookupOrder, State), + loop(State); + stop -> + exit(normal) + end. + + +handle_connect(Ref, Req, LookupOrder, State) -> + [F1, F2] = LookupOrder, + Pid = spawn_connection(Ref, Req, F1, State#state.pool), + TRef = erlang:send_after(State#state.fallback_time, self(), {Ref, fallback, F2}), + connect_loop(Ref, TRef, Req, Pid, nil, LookupOrder, State). + +connect_loop(Ref, TRef, {_Group, H, _, _, _} = Req, P1, P2, LookupOrder, State) -> + receive + {Ref, connected, P1} -> + maybe_kill_job(Ref, TRef, P2), + loop(State); + {Ref, connected, P2} -> + catch exit(P1, normal), + flush(Ref, P2), + %% lookup order is reversed, store it. + lru:add(?LOOKUP_CACHE, H, lists:reverse(LookupOrder)), + loop(State); + {Ref, fallback, Familly} -> + Pid = spawn_connection(Ref, Req, Familly, State#state.pool), + connect_loop(Ref, TRef, Req, P1, Pid, LookupOrder, State); + {Ref, 'DOWN', P1, _Error} -> + connect_loop(Ref, TRef, Req, P1, P2, LookupOrder, State); + {Ref, 'DOWN', P2, Error} -> + error_logger:error_msg( + "hackney connector: connection failure; " + "with reason: ~p~n", [Error]), + loop(State) + end. + +maybe_kill_job(Ref, TRef, Pid) -> + case is_pid(Pid) of + true -> catch exit(Pid, normal); + false -> erlang:cancel_timer(TRef) + end, + flush(Ref, Pid). + + +flush(Ref, Pid) -> + receive + {Ref, connected, Pid} -> ok; + {Ref, fallback, _Familly} -> ok; + _Else -> + ok + after 0 -> ok + end. + +spawn_connection(Ref, {Group, Host, Port, Opts0, Timeout}, Familly, Pool) -> + Opts = [Familly | Opts0], + Connector = self(), + spawn_link(fun() -> + case hackney_tcp:connect(Host, Port, Opts, Timeout) of + {ok, Sock} -> + HS = #hackney_socket{transport=hackney_tcp, + sock=Sock, + host=Host, + port=Port, + group=Group, + pool=Pool}, + + Connector ! {Ref, connected, self()}, + hackney_pool:release(HS); + Error -> + Connector ! {Ref, 'DOWN', self(), Error} + end + end). diff --git a/src/socket/hackney_connector_sup.erl b/src/socket/hackney_connector_sup.erl new file mode 100644 index 00000000..a8a8a537 --- /dev/null +++ b/src/socket/hackney_connector_sup.erl @@ -0,0 +1,17 @@ +-module(hackney_connector_sup). +-behaviour(supervisor). + +-export([start_link/3]). +-export([init/1]). + +start_link(Pool, NbConnectors, FallbackTime) -> + supervisor:start_link(?MODULE, [Pool, NbConnectors, FallbackTime]). + + +init([Ref, NbConnectors, FallbackTime]) -> + Pool = hackney_server:get_pool(Ref), + + Procs = [{{hackney_connector, self(), N}, + {hackney_connector, start_link, [Pool, FallbackTime]}, + permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbConnectors)], + {ok, {{one_for_one, 10, 10}, Procs}}. \ No newline at end of file diff --git a/src/socket/hackney_http_connect.erl b/src/socket/hackney_http_connect.erl deleted file mode 100644 index 15e31eb6..00000000 --- a/src/socket/hackney_http_connect.erl +++ /dev/null @@ -1,192 +0,0 @@ -%%% -*- erlang -*- -%%% -%%% This file is part of hackney released under the Apache 2 license. -%%% See the NOTICE for more information. -%%% -%%% Copyright (c) 2012-2014 Benoît Chesneau -%%% -%%% --module(hackney_http_connect). - --export([messages/1, - connect/3, connect/4, - recv/2, recv/3, - send/2, - setopts/2, - controlling_process/2, - peername/1, - close/1, - shutdown/2, - sockname/1]). - --define(TIMEOUT, infinity). - --type http_socket() :: {atom(), inet:socket()}. --export_type([http_socket/0]). - -%% @doc Atoms used to identify messages in {active, once | true} mode. -messages({hackney_ssl, _}) -> - {ssl, ssl_closed, ssl_error}; -messages({_, _}) -> - {tcp, tcp_closed, tcp_error}. - - -connect(ProxyHost, ProxyPort, Opts) -> - connect(ProxyHost, ProxyPort, Opts, infinity). - -connect(ProxyHost, ProxyPort, Opts, Timeout) - when is_list(ProxyHost), is_integer(ProxyPort), - (Timeout =:= infinity orelse is_integer(Timeout)) -> - - %% get the host and port to connect from the options - Host = proplists:get_value(connect_host, Opts), - Port = proplists:get_value(connect_port, Opts), - Transport = proplists:get_value(connect_transport, Opts), - - %% filter connection options - AcceptedOpts = [linger, nodelay, send_timeout, - send_timeout_close, raw], - BaseOpts = [binary, {active, false}, {packet, 0}, {keepalive, true}, - {nodelay, true}], - ConnectOpts = hackney_util:filter_options(Opts, AcceptedOpts, BaseOpts), - - %% connnect to the proxy, and upgrade the socket if needed. - case gen_tcp:connect(ProxyHost, ProxyPort, ConnectOpts) of - {ok, Socket} -> - case do_handshake(Socket, Host, Port, Opts) of - ok -> - %% if we are connecting to a remote https source, we - %% upgrade the connection socket to handle SSL. - case Transport of - hackney_ssl -> - SSLOpts = hackney_connect:ssl_opts(Host, Opts), - %% upgrade the tcp connection - case ssl:connect(Socket, SSLOpts) of - {ok, SslSocket} -> - {ok, {Transport, SslSocket}}; - Error -> - Error - end; - _ -> - {ok, {Transport, Socket}} - end; - Error -> - Error - end; - Error -> - Error - end. - -recv(Socket, Length) -> - recv(Socket, Length, infinity). - -%% @doc Receive a packet from a socket in passive mode. -%% @see gen_tcp:recv/3 --spec recv(http_socket(), non_neg_integer(), timeout()) - -> {ok, any()} | {error, closed | atom()}. -recv({Transport, Socket}, Length, Timeout) -> - Transport:recv(Socket, Length, Timeout). - - -%% @doc Send a packet on a socket. -%% @see gen_tcp:send/2 --spec send(http_socket(), iolist()) -> ok | {error, atom()}. -send({Transport, Socket}, Packet) -> - Transport:send(Socket, Packet). - -%% @doc Set one or more options for a socket. -%% @see inet:setopts/2 --spec setopts(http_socket(), list()) -> ok | {error, atom()}. -setopts({Transport, Socket}, Opts) -> - Transport:setopts(Socket, Opts). - -%% @doc Assign a new controlling process Pid to Socket. -%% @see gen_tcp:controlling_process/2 --spec controlling_process(http_socket(), pid()) - -> ok | {error, closed | not_owner | atom()}. -controlling_process({Transport, Socket}, Pid) -> - Transport:controlling_process(Socket, Pid). - -%% @doc Return the address and port for the other end of a connection. -%% @see inet:peername/1 --spec peername(http_socket()) - -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. -peername({Transport, Socket}) -> - Transport:peername(Socket). - -%% @doc Close a socks5 socket. -%% @see gen_tcp:close/1 --spec close(http_socket()) -> ok. -close({Transport, Socket}) -> - Transport:close(Socket). - -%% @doc Immediately close a socket in one or two directions. -%% @see gen_tcp:shutdown/2 --spec shutdown(http_socket(), read | write | read_write) -> ok. -shutdown({Transport, Socket}, How) -> - Transport:shutdown(Socket, How). - - -%% @doc Get the local address and port of a socket -%% @see inet:sockname/1 --spec sockname(http_socket()) - -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. -sockname({Transport, Socket}) -> - Transport:sockname(Socket). - -%% private functions -do_handshake(Socket, Host, Port, Options) -> - ProxyUser = proplists:get_value(connect_user, Options), - ProxyPass = proplists:get_value(connect_pass, Options, <<>>), - ProxyPort = proplists:get_value(connect_port, Options), - - %% set defaults headers - HostHdr = case ProxyPort of - 80 -> - list_to_binary(Host); - _ -> - iolist_to_binary([Host, ":", integer_to_list(Port)]) - end, - UA = hackney_request:default_ua(), - Headers0 = [<<"Host", HostHdr/binary>>, - <<"User-Agent: ", UA/binary >>], - - Headers = case ProxyUser of - undefined -> - Headers0; - _ -> - Credentials = base64:encode(<>), - Headers0 ++ [<< "Proxy-Authorization: Basic ", Credentials/binary >>] - end, - Path = iolist_to_binary([Host, ":", integer_to_list(Port)]), - - Payload = [<< "CONNECT ", Path/binary, " HTTP/1.1", "\r\n" >>, - hackney_bstr:join(lists:reverse(Headers), <<"\r\n">>), - <<"\r\n\r\n">>], - case gen_tcp:send(Socket, Payload) of - ok -> - check_response(Socket); - Error -> - Error - end. - -check_response(Socket) -> - case gen_tcp:recv(Socket, 0, ?TIMEOUT) of - {ok, Data} -> - check_status(Data); - Error -> - Error - end. - -check_status(<< "HTTP/1.1 200", _/bits >>) -> - ok; -check_status(<< "HTTP/1.1 201", _/bits >>) -> - ok; -check_status(<< "HTTP/1.0 200", _/bits >>) -> - ok; -check_status(<< "HTTP/1.0 201", _/bits >>) -> - ok; -check_status(Else) -> - error_logger:error_msg("proxy error: ~w~n", [Else]), - false. diff --git a/src/socket/hackney_pool.erl b/src/socket/hackney_pool.erl index bfefc168..126cbf9e 100644 --- a/src/socket/hackney_pool.erl +++ b/src/socket/hackney_pool.erl @@ -1,437 +1,286 @@ -%%% -*- erlang -*- -%%% -%%% This file is part of hackney released under the Apache 2 license. -%%% See the NOTICE for more information. -%%% -%%% Copyright (c) 2009, Erlang Training and Consulting Ltd. -%%% Copyright (c) 2012-2015, Benoît Chesneau - -%% @doc pool of sockets connections -%% -module(hackney_pool). --behaviour(gen_server). -%% PUBLIC API --export([start/0, - checkout/4, - checkin/2]). - --export([start_pool/2, - stop_pool/1, - find_pool/1, - notify/2]). - - --export([count/1, count/2, - max_connections/1, - set_max_connections/2, - timeout/1, - set_timeout/2, - child_spec/2]). +%% public API +-export([request/6, + release/1]). +%% hackney internal api +-export([register_connector/2]). -export([start_link/2]). -%% gen_server callbacks - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - code_change/3, terminate/2]). +%% pool internals +-export([init/3]). +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). -include("hackney.hrl"). --include_lib("../hackney_internal.hrl"). +-include("hackney_socket.hrl"). -record(state, { - name, - mod_metrics, - max_connections, - timeout, - clients = dict:new(), - queues = dict:new(), % Dest => queue of Froms - connections = dict:new(), - sockets = dict:new(), - nb_waiters=0}). - - -start() -> - %% NB this is first called from hackney_sup:start_link - %% BEFORE the hackney_pool ETS table exists - ok. - -%% @doc fetch a socket from the pool -checkout(Host0, Port, Transport, #client{options=Opts}=Client) -> - Host = string:to_lower(Host0), - Pid = self(), - RequestRef = Client#client.request_ref, - Name = proplists:get_value(pool, Opts, default), - Pool = find_pool(Name, Opts), - case gen_server:call(Pool, {checkout, {Host, Port, Transport}, - Pid, RequestRef}, infinity) of - {ok, Socket, Owner} -> - CheckinReference = {Host, Port, Transport}, - {ok, {Name, RequestRef, CheckinReference, Owner, Transport}, Socket}; - {error, no_socket, Owner} -> - CheckinReference = {Host, Port, Transport}, - {error, no_socket, {Name, RequestRef, CheckinReference, Owner, - Transport}}; - - {error, Reason} -> - {error, Reason} - end. + name :: atom(), + parent :: pid(), + idle_timeout :: non_neg_integer(), + group_limit = 8, + proxy_limit = 20, + max_conns = 200, + refs, + sockets, + connectors=[], + next=0}). + +request(Pool, Group, Host, Port, Options, Timeout) when is_pid(Pool) -> + Expires = case Timeout of + infinity -> infinity; + Timeout -> now_in_ms() + (Timeout * 1000) + end, + Tag = erlang:monitor(process, Pool), + From = {self(), Tag}, + Req = {Host, Port, Options, Timeout}, + catch Pool ! {request, From, Group, Req, Expires}, + receive + {Tag, {ok, HS}} -> + {ok, HS}; + {'DOWN', Tag, _, _, Reason} -> + {error, Reason}; + {Tag, Error} -> + Error + after Timeout -> + {error, timeout} + end; +request(Pool, Group, Host, Port, Options, Timeout) -> + request(hackney_server:get_pool(Pool), Group, Host, Port, Options, Timeout). -%% @doc release a socket in the pool -checkin({_Name, Ref, Dest, Owner, Transport}, Socket) -> - Transport:setopts(Socket, [{active, false}]), - case sync_socket(Transport, Socket) of - true -> - case Transport:controlling_process(Socket, Owner) of - ok -> - gen_server:call(Owner, {checkin, Ref, Dest, Socket, Transport}, - infinity); - _Error -> - catch Transport:close(Socket), - ok - end; + +release(HS) -> + hackney_socket:setopts(HS, [{active, false}]), + case sync_socket(HS) of + true -> release1(HS); false -> - catch Transport:close(Socket), - ok + catch hackney_socket:close(HS), + {error, sync_error} end. -%% @doc start a pool -start_pool(Name, Options) -> - case find_pool(Name, Options) of - Pid when is_pid(Pid) -> +release1(#hackney_socket{pool=Pool}=HS) -> + hackney_socket:controlling_process(HS, Pool), + Tag = erlang:monitor(process, Pool), + Pool ! {release, {self(), Tag}, HS}, + %% we directly pass the socket control to a pending connection if any. if + %% the pool can't accept the socket we kill it. + receive + {Tag, ok} -> ok; - Error -> + {'DOWN', Tag, _, _, Reason} -> + hackney_socket:close(HS), + {error, Reason}; + {Tag, Error} -> + hackney_socket:close(HS), Error end. +register_connector(Pool, Pid) -> + Pool ! {register_connector, Pid}, + receive Pool -> ok end. -%% @doc stop a pool -stop_pool(Name) -> - case find_pool(Name) of - undefined -> - ok; - _Pid -> - case supervisor:terminate_child(hackney_sup, Name) of - ok -> - supervisor:delete_child(hackney_sup, Name), - ets:delete(hackney_pool, Name), - ok; - Error -> - Error - end - end. -notify(Pool, Msg) -> - case find_pool(Pool) of - undefined -> ok; - Pid -> Pid ! Msg - end. +start_link(Ref, Opts) -> + proc_lib:start_link(?MODULE, init, [self(), Ref, Opts]). +init(Parent, Ref, Opts) -> + true = hackney_server:register_pool(Ref, self()), -%% -%% util functions for this pool -%% + IdleTimeout = hackney_util:get_opt(idle_timeout, Opts, ?DEFAULT_IDLE_TIMEOUT), + GroupLimit = hackney_util:get_opt(group_limit, Opts, ?DEFAULT_GROUP_LIMIT), + ProxyLimit = hackney_util:get_opt(group_limit, Opts, ?DEFAULT_PROXY_LIMIT), + MaxConns = hackney_util:get_opt(max_connections, Opts, ?DEFAULT_MAX_CONNS), + %% init tables + Refs = ets:new(hackney_pool_refs, [bag]), + Sockets = ets:new(hackney_pool_sockets, [set]), -%% @doc return a child spec suitable for embeding your pool in the -%% supervisor -child_spec(Name, Options0) -> - Options = [{name, Name} | Options0], - {Name, {hackney_pool, start_link, [Name, Options]}, - permanent, 10000, worker, [hackney_pool]}. + ok = proc_lib:init_ack(Parent, {ok, self()}), + loop(#state{name = Ref, + parent = Parent, + idle_timeout = IdleTimeout, + group_limit = GroupLimit, + proxy_limit = ProxyLimit, + max_conns = MaxConns, + refs = Refs, + sockets = Sockets}). -%% @doc get the number of connections in the pool -count(Name) -> - gen_server:call(find_pool(Name), count). +loop(State=#state{parent=Parent}) -> + receive + {request, {Pid, Tag} = From, Group, CReq, Expires} -> + case reuse_connection(Group, State) of + {ok, HS} -> + hackney_socket:controlling_process(HS, Pid), + Pid ! {Tag, {ok, HS}}, + loop(State); + no_socket -> + %% insert pending request + ets:insert(State#state.refs, {{pending, Group}, {From, Expires}}), + State2 = request_socket(Group, CReq, State), + loop(State2) + end; + {preconnect, {Pid, Tag}, Group, CReq} -> + State2 = request_socket(Group, CReq, State), + Pid ! {Tag, ok}, + loop(State2); + {release, {Pid, Tag}, HS} -> + Reply = release_socket(HS, State), + Pid ! {Tag, Reply}, + loop(State); + {register_connector, Pid} -> + _ = erlang:monitor(process, Pid), + Pid ! self(), + loop(State#state{connectors=[Pid | State#state.connectors]}); + {timeout, Sock} -> + delete_socket(Sock, State), + loop(State); + {tcp, Sock, _} -> + delete_socket(Sock, State), + loop(State); + {tcp_closed, Sock} -> + delete_socket(Sock, State), + loop(State); + {tcp_error, Sock, _} -> + delete_socket(Sock, State), + loop(State); + {ssl, Sock, _} -> + delete_socket(Sock, State), + loop(State); + {ssl_closed, Sock} -> + delete_socket(Sock, State), + loop(State); + {ssl_error, Sock, _} -> + delete_socket(Sock, State), + loop(State); + {'DOWN', _, process, Pid, Reason} -> + error_logger:error_msg( + "hackney connector failure failure; " + "~p crashed with reason: ~p~n", [Pid, Reason]), + Connectors2 = State#state.connectors -- [Pid], + loop(State#state{connectors=Connectors2}); + {'EXIT', Parent, Reason} -> + exit(Reason); + {system, From, Request} -> + system:handle_system_msg(Request, From, Parent, ?MODULE, [], + State); + Msg -> + error_logger:error_msg( + "Hackney pool ~p received an unexped message ~p", + [State#state.name, Msg]) + end. -%% @doc get the number of connections in the pool for `{Host0, Port, Transport}' -count(Name, {Host0, Port, Transport}) -> - Host = string:to_lower(Host0), - gen_server:call(find_pool(Name), {count, {Host, Port, Transport}}). +system_continue(_, _, State) -> + loop(State). -%% @doc get max pool size -max_connections(Name) -> - gen_server:call(find_pool(Name), max_connections). +system_terminate(Reason, _, _, _State) -> + exit(Reason). -%% @doc change the pool size -set_max_connections(Name, NewSize) -> - gen_server:cast(find_pool(Name), {set_maxconn, NewSize}). +system_code_change(Misc, _, _, _) -> + {ok, Misc}. -%% @doc get timeout -timeout(Name) -> - gen_server:call(find_pool(Name), timeout). -%% @doc change the connection timeout -%% -set_timeout(Name, NewTimeout) -> - gen_server:cast(find_pool(Name), {set_timeout, NewTimeout}). +reuse_connection(Group, State) -> + Refs = ets:lookup(State#state.refs, {conns, Group}), + reuse_connection1(Refs, State). -%% @private -%% -%% -do_start_pool(Name, Options) -> - Spec = child_spec(Name, Options), - case supervisor:start_child(hackney_sup, Spec) of - {ok, Pid} -> - Pid; - {error, {already_started, _}} -> - find_pool(Name, Options) - end. +reuse_connection1([], _State) -> + no_socket; +reuse_connection1([{Group, #hackney_socket{sock=S}=HS} | Rest], State) -> + [{_, _, HS, T}] = ets:lookup(State#state.sockets, S), -find_pool(Name) -> - case ets:lookup(?MODULE, Name) of - [] -> - undefined; - [{_, Pid}] -> - Pid + ets:delete_object(State#state.refs, {Group, HS}), + ets:delete(State#state.sockets, S), + cancel_timer(T, S), + hackney_socket:setopts(HS, [{active, false}]), + case sync_socket(HS) of + true -> + {ok, HS}; + false -> + reuse_connection1(Rest, State) end. -find_pool(Name, Options) -> - case ets:lookup(?MODULE, Name) of - [] -> - do_start_pool(Name, Options); - [{_, Pid}] -> - Pid +request_socket(Group, Req, State) -> + %% we simply balance connections tasks between connectors using an RR algorithm + {Connector, State2} = pick_connector(State), + Connector ! {connect, Group, Req}, + State2. + + +pick_connector(State=#state{connectors=Connectors}) -> + [Connector | Rest] = Connectors, + {Connector, State#state{connectors = Rest ++ [Connector]}}. + + +release_socket(#hackney_socket{group=Group}=HS, State) -> + Pending = ets:lookup(State#state.refs, {pending, Group}), + case Pending of + [] -> cache_socket(HS, State); + _ -> dispatch_socket(Pending, HS, State) end. -start_link(Name, Options0) -> - Options = hackney_util:maybe_apply_defaults([max_connections, timeout], - Options0), - gen_server:start_link(?MODULE, [Name, Options], []). +cache_socket(HS=#hackney_socket{sock=Sock, group=Group}, State) -> + Conns = ets:lookup(State#state.refs, {conns, Group}), + TotalGroup = length(Conns), + TotalConns = ets:info(State#state.sockets, size), -init([Name, Options]) -> - process_flag(priority, high), - case lists:member({seed,1}, ssl:module_info(exports)) of - true -> - % Make sure that the ssl random number generator is seeded - % This was new in R13 (ssl-3.10.1 in R13B vs. ssl-3.10.0 in R12B-5) - apply(ssl, seed, [crypto:rand_bytes(255)]); - false -> - ok + Limit = case Group of + <<"proxy/">> -> State#state.proxy_limit; + _ -> State#state.group_limit end, - MaxConn = case proplists:get_value(pool_size, Options) of - undefined -> - proplists:get_value(max_connections, Options); - Size -> - Size - end, - Timeout = proplists:get_value(timeout, Options), - - %% register the module - ets:insert(?MODULE, {Name, self()}), - - %% initialize metrics - Mod = init_metrics(Name), - - {ok, #state{name=Name, mod_metrics=Mod, max_connections=MaxConn, - timeout=Timeout}}. - -handle_call(count, _From, #state{sockets=Sockets}=State) -> - {reply, dict:size(Sockets), State}; -handle_call(timeout, _From, #state{timeout=Timeout}=State) -> - {reply, Timeout, State}; -handle_call(max_connections, _From, #state{max_connections=MaxConn}=State) -> - {reply, MaxConn, State}; -handle_call({checkout, Dest, Pid, RequestRef}, From, State) -> - #state{name=PoolName, - mod_metrics = Mod, - max_connections=MaxConn, - clients=Clients, - queues = Queues, - nb_waiters = NbWaiters} = State, - - {Reply, State2} = find_connection(Dest, Pid, State), - case Reply of - {ok, _Socket, _Owner} -> - State3 = monitor_client(Dest, RequestRef, State2), - update_usage(State3), - {reply, Reply, State3}; - no_socket -> - case dict:size(Clients) >= MaxConn of - true -> - Queues2 = add_to_queue(Dest, From, RequestRef, Queues), - NbWaiters2 = NbWaiters + 1, - Mod:update_histogram([hackney_pool, PoolName, queue_count], - NbWaiters2), - {noreply, State2#state{queues = Queues2, - nb_waiters=NbWaiters2}}; - false -> - State3 = monitor_client(Dest, RequestRef, State2), - update_usage(State3), - {reply, {error, no_socket, self()}, State3} - end - end; -handle_call({checkin, Ref, Dest, Socket, Transport}, From, State) -> - gen_server:reply(From, ok), - Clients2 = case dict:find(Ref, State#state.clients) of - {ok, Dest} -> - dict:erase(Ref, State#state.clients); - error -> - State#state.clients - end, - State2 = case Transport:peername(Socket) of - {ok, {_Adress, _Port}} -> - %% socket is not closed, try to deliver it or store it - deliver_socket(Socket, Dest, State#state{clients=Clients2}); - Error -> - %% socket may be half-closed, close it and return - catch Transport:close(Socket), - ?report_trace("checkin: socket is not ok~n", [{socket, Socket}, {peername, Error}]), - State#state{clients=Clients2} - end, - update_usage(State2), - {noreply, State2}; - -handle_call({count, Key}, _From, #state{connections=Conns}=State) -> - Size = case dict:find(Key, Conns) of - {ok, Sockets} -> - length(Sockets); - error -> - 0 - end, - {reply, Size, State}. - -handle_cast({set_maxconn, MaxConn}, State) -> - {noreply, State#state{max_connections=MaxConn}}; -handle_cast({set_timeout, NewTimeout}, State) -> - {noreply, State#state{timeout=NewTimeout}}; - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({timeout, Socket}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({tcp, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({tcp_closed, Socket}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({ssl, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({ssl_closed, Socket}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({tcp_error, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({ssl_error, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({'DOWN', Ref, request, _Pid, _Reason}, State) -> - Mod = State#state.mod_metrics, - case dict:find(Ref, State#state.clients) of - {ok, Dest} -> - Clients2 = dict:erase(Ref, State#state.clients), - case queue_out(Dest, State#state.queues) of - empty -> - {noreply, State#state{clients = Clients2}}; - {ok, {From, Ref2}, Queues2} -> - NbWaiters = State#state.nb_waiters - 1, - Mod:update_histogram([hackney_pool, State#state.name, - queue_count], NbWaiters), - gen_server:reply(From, {error, no_socket, self()}), - State2 = State#state{queues = Queues2, clients = Clients2, - nb_waiters=NbWaiters}, - {noreply, monitor_client(Dest, Ref2, State2)} - end; - error -> - {noreply, State} - end; -handle_info(_, State) -> - {noreply, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, #state{name=PoolName, mod_metrics=Mod, sockets=Sockets}) -> - %% close any sockets in the pool - lists:foreach(fun({Socket, {{_, _, Transport}, Timer}}) -> - cancel_timer(Socket, Timer), - Transport:close(Socket) - end, dict:to_list(Sockets)), - - %% delete pool metrics - delete_metrics(Mod, PoolName), - ok. - -%% internals - -find_connection({_Host, _Port, Transport}=Dest, Pid, - #state{connections=Conns, sockets=Sockets}=State) -> - case dict:find(Dest, Conns) of - {ok, [S | Rest]} -> - Transport:setopts(S, [{active, false}]), - case sync_socket(Transport, S) of - true -> - case Transport:controlling_process(S, Pid) of - ok -> - {_, Timer} = dict:fetch(S, Sockets), - cancel_timer(S, Timer), - NewConns = update_connections(Rest, Dest, Conns), - NewSockets = dict:erase(S, Sockets), - NewState = State#state{connections=NewConns, - sockets=NewSockets}, - {{ok, S, self()}, NewState}; - {error, badarg} -> - %% something happened here normally the PID died, - %% but make sure we still have the control of the - %% process - catch Transport:controlling_process(S, self()), - %% and then close it - find_connection(Dest, Pid, - remove_socket(S, State)); - _Else -> - find_connection(Dest, Pid, remove_socket(S, State)) - end; - false -> - ?report_trace("checkout: socket unsynced~n", []), - find_connection(Dest, Pid, remove_socket(S, State)) - end; - _Else -> - {no_socket, State} + case {TotalConns < State#state.max_conns, TotalGroup < Limit} of + {true, true} -> + T = erlang:send_after(State#state.idle_timeout, self(), {timeout, Sock}), + ets:insert(State#state.sockets, {Sock, Group, HS, T}), + ets:insert(State#state.refs, {{conns, Group}, HS}), + ok; + {_, _} -> + {error, max_conns} end. -remove_socket(Socket, #state{connections=Conns, sockets=Sockets}=State) -> - Mod = State#state.mod_metrics, - - Mod:update_histogram([hackney, State#state.name, free_count], - dict:size(Sockets)), - case dict:find(Socket, Sockets) of - {ok, {{_Host, _Port, Transport}=Key, Timer}} -> - cancel_timer(Socket, Timer), - catch Transport:close(Socket), - ConnSockets = lists:delete(Socket, dict:fetch(Key, Conns)), - NewConns = update_connections(ConnSockets, Key, Conns), - NewSockets = dict:erase(Socket, Sockets), - State#state{connections=NewConns, sockets=NewSockets}; - error -> - State + +dispatch_socket([{_Key, {{Pid, Tag}, Expires}}=Obj | Rest], HS, State) -> + Now = now_in_ms(), + %% delete the pending connection from the list + ets:delete_object(State#state.refs, Obj), + %% if the pending connection expired, we ignore it, else + %% we try to give the socket to it. + if + Expires > Now -> + case (catch hackney_socket:controlling_process(HS, Pid)) of + ok -> + catch Pid ! {Tag, {ok, HS}}, + ok; + _Else -> + catch hackney_socket:close(HS), + dispatch_socket(Rest, HS, State) + end; + true -> + dispatch_socket(Rest, HS, State) end. -store_socket({_Host, _Port, Transport} = Dest, Socket, - #state{timeout=Timeout, connections=Conns, - sockets=Sockets}=State) -> - Timer = erlang:send_after(Timeout, self(), {timeout, Socket}), - %% make sure to close the socket if anything is received while we are in - %% the pool. - Transport:setopts(Socket, [{active, once}, {packet, 0}]), - ConnSockets = case dict:find(Dest, Conns) of - {ok, OldSockets} -> - [Socket | OldSockets]; - error -> [Socket] - end, - State#state{connections = dict:store(Dest, ConnSockets, Conns), - sockets = dict:store(Socket, {Dest, Timer}, Sockets)}. +delete_socket(Sock, State) -> + case ets:lookup(State#state.sockets, Sock) of + [] -> ok; + [{Sock, Group, HS, T}] -> + catch hackney_socket:close(HS), + cancel_timer(T, Sock), + ets:delete(State#state.sockets, Sock), + ets:delete_obj(State#state.refs, {{conns, Group}, HS}), + ok + end. -update_connections([], Key, Connections) -> - dict:erase(Key, Connections); -update_connections(Sockets, Key, Connections) -> - dict:store(Key, Sockets, Connections). -cancel_timer(Socket, Timer) -> +cancel_timer(Timer, Socket) -> case erlang:cancel_timer(Timer) of false -> receive @@ -442,106 +291,16 @@ cancel_timer(Socket, Timer) -> _ -> ok end. -%------------------------------------------------------------------------------ -%% @private -%%------------------------------------------------------------------------------ -add_to_queue({_Host, _Port, _Transport} = Dest, From, Ref, Queues) -> - case dict:find(Dest, Queues) of - error -> - dict:store(Dest, queue:in({From, Ref}, queue:new()), Queues); - {ok, Q} -> - dict:store(Dest, queue:in({From, Ref}, Q), Queues) - end. - -%------------------------------------------------------------------------------ -%% @private -%%------------------------------------------------------------------------------ -queue_out({_Host, _Port, _Transport} = Dest, Queues) -> - case dict:find(Dest, Queues) of - error -> - empty; - {ok, Q} -> - {{value, {From, Ref}}, Q2} = queue:out(Q), - Queues2 = case queue:is_empty(Q2) of - true -> - dict:erase(Dest, Queues); - false -> - dict:store(Dest, Q2, Queues) - end, - {ok, {From, Ref}, Queues2} - end. - -%------------------------------------------------------------------------------ -%% @private -%%------------------------------------------------------------------------------ -deliver_socket(Socket, {_, _, Transport} = Dest, State) -> - Mod = State#state.mod_metrics, - - case queue_out(Dest, State#state.queues) of - empty -> - store_socket(Dest, Socket, State); - {ok, {{PidWaiter, _} = FromWaiter, Ref}, Queues2} -> - NbWaiters = State#state.nb_waiters - 1, - Mod:update_histogram([hackney_pool, State#state.name, queue_count], - NbWaiters), - case Transport:controlling_process(Socket, PidWaiter) of - ok -> - gen_server:reply(FromWaiter, {ok, Socket, self()}), - monitor_client(Dest, Ref, - State#state{queues = Queues2, - nb_waiters=NbWaiters}); - _Error -> - % Something wrong, just remove the socket - catch Transport:close(Socket), - %% put the waiter back in the queue at the beginning - NewQueues = queue:in_r({FromWaiter, Ref}, Queues2), - State#state{queues = NewQueues, nb_waiters = NbWaiters + 1} - end - end. - %% check that no events from the sockets is received after setting it to %% passive. -sync_socket(Transport, Socket) -> - {Msg, MsgClosed, MsgError} = Transport:messages(Socket), +sync_socket(#hackney_socket{transport=Transport, sock=Sock} ) -> + {Msg, MsgClosed, MsgError} = Transport:messages(), receive - {Msg, Socket, _} -> false; - {MsgClosed, Socket} -> false; - {MsgError, Socket, _} -> false + {Msg, Sock, _} -> false; + {MsgClosed, Sock} -> false; + {MsgError, Sock, _} -> false after 0 -> true end. -%------------------------------------------------------------------------------ -%% @private -%%------------------------------------------------------------------------------ -monitor_client(Dest, Ref, State) -> - Clients2 = dict:store(Ref, Dest, State#state.clients), - State#state{clients = Clients2}. - - -init_metrics(PoolName) -> - %% get metrics module - Mod = hackney_util:mod_metrics(), - - %% initialise metrics - Mod:new(histogram, [hackney_pool, PoolName, take_rate]), - Mod:new(counter, [hackney_pool, PoolName, no_socket]), - Mod:new(histogram, [hackney_pool, PoolName, in_use_count]), - Mod:new(histogram, [hackney_pool, PoolName, free_count]), - Mod:new(histogram, [hackney_pool, PoolName, queue_counter]), - Mod. - -delete_metrics(Mod, PoolName) -> - Mod:delete([hackney_pool, PoolName, take_rate]), - Mod:delete([hackney_pool, PoolName, no_socket]), - Mod:delete([hackney_pool, PoolName, in_use_count]), - Mod:delete([hackney_pool, PoolName, free_count]), - Mod:delete([hackney_pool, PoolName, queue_counter]). - - -update_usage(#state{name=PoolName, mod_metrics=Mod, sockets=Sockets, - clients=Clients}) -> - Mod:update_histogram([hackney_pool, PoolName,in_use_count], - dict:size(Clients) - 1), - Mod:update_histogram([hackney_pool, PoolName, free_count], - dict:size(Sockets) - 1). +now_in_ms() -> timer:now_diff(os:timestamp(), {0, 0, 0}). \ No newline at end of file diff --git a/src/socket/hackney_pool_handler.erl b/src/socket/hackney_pool_handler.erl deleted file mode 100644 index cee54f12..00000000 --- a/src/socket/hackney_pool_handler.erl +++ /dev/null @@ -1,41 +0,0 @@ -%%% -*- erlang -*- -%%% -%%% This file is part of hackney released under the Apache 2 license. -%%% See the NOTICE for more information. -%%% --module(hackney_pool_handler). - --include("hackney.hrl"). - --type host() :: binary() | string(). --type client() :: #client{}. - --ifdef(no_callback_support). - --export([behaviour_info/1]). - --spec behaviour_info(atom()) -> [{atom(), arity()}] | undefined. -behaviour_info(callbacks) -> - [{start, 0}, - {checkout, 4}, - {checkin, 2}]; -behaviour_info(_) -> - undefined. - --else. - -%% start a bool handler --callback start() -> ok | {error, Reason :: any()}. - --callback checkout(Host::host(), Port::integer(),Transport::atom(), - Client::client()) -> - {ok, {Info::any(), CheckingReference::any(), Owner::pid(), - Transport::atom()}, Socket::inet:socket()} - | {error, Reason :: any()}. - --callback checkin({Info::any(), CheckingReference::any(), Owner::pid(), - Transport::atom()}, Socket::inet:socket()) -> - ok - | {error, Reason :: any()}. - --endif. diff --git a/src/socket/hackney_pool_sup.erl b/src/socket/hackney_pool_sup.erl new file mode 100644 index 00000000..10755b01 --- /dev/null +++ b/src/socket/hackney_pool_sup.erl @@ -0,0 +1,27 @@ +-module(hackney_pool_sup). +-behaviour(supervisor). + +-export([start_link/2]). +-export([init/1]). + +-include("hackney.hrl"). + +start_link(Ref, PoolOpts) -> + supervisor:start_link(?MODULE, {Ref, PoolOpts}). + + +init({Ref, PoolOpts}) -> + NbConnectors = hackney_util:get_opt(nb_connectors, PoolOpts, ?DEFAULT_NB_CONNECTORS), + FallbackTime = hackney_util:get_opt(fallback_time, PoolOpts, ?DEFAULT_FALLBACK_TIME), + + %% pool, caching sockets + Pool = {hackney_pool, + {hackney_pool, start_link, [Ref, PoolOpts]}, + permanent, brutal_kill, worker, [hackney_pool]}, + + %% connection supervisor + ConnectorSup = {hackney_connector_sup, + {hackney_connector_sup, start_link, [Ref, NbConnectors, FallbackTime]}, + permanent, infinity, supervisor, [hackney_connector_sup]}, + + {ok, {{rest_for_one, 10, 10}, [Pool, ConnectorSup]}}. diff --git a/src/socket/hackney_socket.erl b/src/socket/hackney_socket.erl new file mode 100644 index 00000000..4ede3e76 --- /dev/null +++ b/src/socket/hackney_socket.erl @@ -0,0 +1,235 @@ +-module(hackney_socket). + +-include("hackney_socket.hrl"). +-include_lib("public_key/include/OTP-PUB-KEY.hrl"). + +%% public api +-export([connect/3, connect/4, + release/1, close/1, + secure/2, secure/3, + recv/2, recv/3, + send/2, + setopts/2, + controlling_process/2, + info/1]). + +%% internal functions +-export([exit_if_closed/1]). +-export([groupname/1]). +-export([add_prefix/2]). + +-type hackney_socket() :: #hackney_socket{}. +-export_types([hackney_socket/0]). + + +%% @doc make a connection +connect(Host, Port, Options) -> + connect(Host, Port, Options, infinity). + +%% @doc make a connection +connect(Host, Port, Options, Timeout) -> + Pool = proplists:get_value(pool, Options, default), + Prefix = proplists:get_value(prefix, Options), + Secure = proplists:get_value(secure, Options, false), + + Parts0 = case Secure of + true -> ["ssl", netloc(Host,Port)]; + false -> [netloc(Host, Port)] + end, + Group = if + Prefix =:= undefined -> groupname(Parts0); + true -> groupname([Prefix | Parts0]) + end, + + Res = hackney_pool:request(Pool, Group, Host, Port, Options, Timeout), + case {Res, Secure} of + {{ok, HS}, true} -> secure(HS, Options); + {{ok, _HS}, _} -> Res; + {Error, _} -> Error + end. + +release(HS) -> + hackney_pool:release(HS). + +close(#hackney_socket{transport=T, sock=Sock}) -> + T:close(Sock). + +%% @doc Start a TLS client connection over a connected TCP client socket. +secure(HS, Options) -> + secure(HS, Options, infinity). + +%% @doc Start a TLS client connection over a connected TCP client socket. +secure(#hackney_socket{transport=hackney_ssl}=HS, _Options, _Timeout) -> + {ok, HS}; +secure(#hackney_socket{transport=hackney_tcp, sock=Sock}=HS, Options, Timeout) -> + SSLOpts = ssl_opts(HS#hackney_socket.host, Options), + %% Upgrades a gen_tcp, or equivalent, connected socket to an SSL socket + case ssl:connect(Sock, SSLOpts, Timeout) of + {ok, SslSock} -> + %% maybe edit the groupname + GroupParts = string:tokens(HS#hackney_socket.group, "/"), + Group = case lists:member("ssl", GroupParts) of + true -> HS#hackney_socket.group; + false -> + Len = length(GroupParts), + GroupParts2 = lists:sublist(GroupParts, Len - 1) ++ ["ssl", lists:last(GroupParts)], + groupname(GroupParts2) + end, + {ok, HS#hackney_socket{transport=hackney_ssl, + sock=SslSock, + group=Group, + secure=true}}; + Error -> + Error + end. + +%% @doc Receive a packet from a socket in passive mode. +-spec recv(hackney_socket(), non_neg_integer()) + -> {ok, any()} | {error, closed | atom()}. +recv(HS, Length) -> + recv(HS, Length, infinity). + +%% @doc Receive a packet from a socket in passive mode. +%% @see gen_tcp:recv/3 +-spec recv(hackney_socket(), non_neg_integer(), timeout()) + -> {ok, any()} | {error, closed | atom()}. +recv(#hackney_socket{transport=T, sock=S}, Length, Timeout) -> + T:recv(S, Length, Timeout). + +%% @doc Send a packet on a socket. +%% @see gen_tcp:send/2 +-spec send(hackney_socket(), iolist()) -> ok | {error, atom()}. +send(#hackney_socket{transport=T, sock=S}, Packet) -> + T:send(S, Packet). + +%% @doc Set one or more options for a socket. +%% @see inet:setopts/2 +-spec setopts(hackney_socket(), list()) -> ok | {error, atom()}. +setopts(#hackney_socket{transport=T, sock=S}, Opts) -> + T:setopts(S, Opts). + +%% @doc Assign a new controlling process Pid to Socket. +%% @see gen_tcp:controlling_process/2 +-spec controlling_process(hackney_socket(), pid()) + -> ok | {error, closed | not_owner | atom()}. +controlling_process(#hackney_socket{transport=T, sock=S}, Pid) -> + T:controlling_process(S, Pid). + + +info(#hackney_socket{transport=Transport, sock=Sock}=HS) -> + [{type=HS#hackney_socket.type, + connected=HS#hackney_socket.connected, + secure=HS#hackney_socket.secure, + peername=Transport:peername(Sock), + sockname=Transport:sockname(Sock), + tranport=Transport, + socket=Sock}]. + + +%% private functions + +%% @hidden +groupname(Parts) -> + string:join(Parts, "/"). + +netloc(Host, Port) when is_list(Host), is_integer(Port) -> + Host ++ ":" ++ integer_to_list(Port). + +%% @hidden +add_prefix(Prefix, Options) -> + case proplists:get_value(prefix, Options) of + undefined -> [{prefix, Prefix} | Options]; + OldPrefix -> + NewPrefix = groupname([Prefix, OldPrefix]), + lists:keyreplace(prefix, 1, Options, {prefix, NewPrefix}) + end. + +%% @hidden +exit_if_closed({error, closed}) -> exit({error, closed}); +exit_if_closed(Res) -> Res. + + + +ssl_opts(Host, Options) -> + case proplists:get_value(ssl_options, Options) of + undefined -> + Insecure = proplists:get_value(insecure, Options), + UseSecureSsl = check_ssl_version(), + CACerts = certifi:cacerts(), + + case {Insecure, UseSecureSsl} of + {true, _} -> + [{verify, verify_none}, + {reuse_sessions, true}]; + {_, true} -> + + VerifyFun = {fun ssl_verify_hostname:verify_fun/3, + [{check_hostname, Host}]}, + [{verify, verify_peer}, + {depth, 99}, + {cacerts, CACerts}, + {partial_chain, fun partial_chain/1}, + {verify_fun, VerifyFun}]; + {_, _} -> + [{cacerts, CACerts}, + {verify, verify_peer}, {depth, 2}] + end; + SSLOpts -> + SSLOpts + end. + +%% code from rebar3 undert BSD license +partial_chain(Certs) -> + Certs1 = lists:reverse([{Cert, public_key:pkix_decode_cert(Cert, otp)} || + Cert <- Certs]), + CACerts = certifi:cacerts(), + CACerts1 = [public_key:pkix_decode_cert(Cert, otp) || Cert <- CACerts], + + + case find(fun({_, Cert}) -> + check_cert(CACerts1, Cert) + end, Certs1) of + {ok, Trusted} -> + {trusted_ca, element(1, Trusted)}; + _ -> + unknown_ca + end. + +extract_public_key_info(Cert) -> + ((Cert#'OTPCertificate'.tbsCertificate)#'OTPTBSCertificate'.subjectPublicKeyInfo). + +check_cert(CACerts, Cert) -> + lists:any(fun(CACert) -> + extract_public_key_info(CACert) == extract_public_key_info(Cert) + end, CACerts). + +check_ssl_version() -> + case application:get_key(ssl, vsn) of + {ok, Vsn} -> + parse_vsn(Vsn) >= {5, 3, 6}; + _ -> + false + end. + +parse_vsn(Vsn) -> + version_pad(string:tokens(Vsn, ".")). + +version_pad([Major]) -> + {list_to_integer(Major), 0, 0}; +version_pad([Major, Minor]) -> + {list_to_integer(Major), list_to_integer(Minor), 0}; +version_pad([Major, Minor, Patch]) -> + {list_to_integer(Major), list_to_integer(Minor), list_to_integer(Patch)}; +version_pad([Major, Minor, Patch | _]) -> + {list_to_integer(Major), list_to_integer(Minor), list_to_integer(Patch)}. + +-spec find(fun(), list()) -> {ok, term()} | error. +find(Fun, [Head|Tail]) when is_function(Fun) -> + case Fun(Head) of + true -> + {ok, Head}; + false -> + find(Fun, Tail) + end; +find(_Fun, []) -> + error. \ No newline at end of file diff --git a/src/socket/hackney_socket.hrl b/src/socket/hackney_socket.hrl new file mode 100644 index 00000000..f70f1355 --- /dev/null +++ b/src/socket/hackney_socket.hrl @@ -0,0 +1,10 @@ +-record(hackney_socket, {transport, + sock, + host, + port, + group, + pool, + secure = false, + type = tcp, + connected = true, + count = 0}). diff --git a/src/socket/hackney_socks5.erl b/src/socket/hackney_socks5.erl deleted file mode 100644 index bcbdd4be..00000000 --- a/src/socket/hackney_socks5.erl +++ /dev/null @@ -1,223 +0,0 @@ -%%% -*- erlang -*- -%%% -%%% This file is part of hackney released under the Apache 2 license. -%%% See the NOTICE for more information. -%%% - -%% @doc socks 5 transport - --module(hackney_socks5). - --export([messages/1, - connect/3, connect/4, - recv/2, recv/3, - send/2, - setopts/2, - controlling_process/2, - peername/1, - close/1, - shutdown/2, - sockname/1]). - --define(TIMEOUT, infinity). - --type socks5_socket() :: {atom(), inet:socket()}. --export_type([socks5_socket/0]). - -%% @doc Atoms used to identify messages in {active, once | true} mode. -messages({hackney_ssl, _}) -> - {ssl, ssl_closed, ssl_error}; -messages({_, _}) -> - {tcp, tcp_closed, tcp_error}. - - -connect(Host, Port, Opts) -> - connect(Host, Port, Opts, infinity). - - -connect(Host, Port, Opts, Timeout) when is_list(Host), is_integer(Port), - (Timeout =:= infinity orelse is_integer(Timeout)) -> - %% get the proxy host and port from the options - ProxyHost = proplists:get_value(socks5_host, Opts), - ProxyPort = proplists:get_value(socks5_port, Opts), - Transport = proplists:get_value(socks5_transport, Opts), - - %% filter connection options - AcceptedOpts = [linger, nodelay, send_timeout, - send_timeout_close, raw], - BaseOpts = [binary, {active, false}, {packet, 0}, {keepalive, true}, - {nodelay, true}], - ConnectOpts = hackney_util:filter_options(Opts, AcceptedOpts, BaseOpts), - - %% connect to the socks 5 proxy - case gen_tcp:connect(ProxyHost, ProxyPort, ConnectOpts, Timeout) of - {ok, Socket} -> - case do_handshake(Socket, Host, Port, Opts) of - ok -> - case Transport of - hackney_ssl -> - SSlOpts = hackney_connect:ssl_opts(Host, Opts), - %% upgrade the tcp connection - case ssl:connect(Socket, SSlOpts) of - {ok, SslSocket} -> - {ok, {Transport, SslSocket}}; - Error -> - Error - end; - _ -> - {ok, {Transport, Socket}} - end; - Error -> - Error - end; - Error -> - Error - end. - - -recv(Socket, Length) -> - recv(Socket, Length, infinity). - -%% @doc Receive a packet from a socket in passive mode. -%% @see gen_tcp:recv/3 --spec recv(socks5_socket(), non_neg_integer(), timeout()) - -> {ok, any()} | {error, closed | atom()}. -recv({Transport, Socket}, Length, Timeout) -> - Transport:recv(Socket, Length, Timeout). - - -%% @doc Send a packet on a socket. -%% @see gen_tcp:send/2 --spec send(socks5_socket(), iolist()) -> ok | {error, atom()}. -send({Transport, Socket}, Packet) -> - Transport:send(Socket, Packet). - -%% @doc Set one or more options for a socket. -%% @see inet:setopts/2 --spec setopts(socks5_socket(), list()) -> ok | {error, atom()}. -setopts({Transport, Socket}, Opts) -> - Transport:setopts(Socket, Opts). - -%% @doc Assign a new controlling process Pid to Socket. -%% @see gen_tcp:controlling_process/2 --spec controlling_process(socks5_socket(), pid()) - -> ok | {error, closed | not_owner | atom()}. -controlling_process({Transport, Socket}, Pid) -> - Transport:controlling_process(Socket, Pid). - -%% @doc Return the address and port for the other end of a connection. -%% @see inet:peername/1 --spec peername(socks5_socket()) - -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. -peername({Transport, Socket}) -> - Transport:peername(Socket). - -%% @doc Close a socks5 socket. -%% @see gen_tcp:close/1 --spec close(socks5_socket()) -> ok. -close({Transport, Socket}) -> - Transport:close(Socket). - -%% @doc Immediately close a socket in one or two directions. -%% @see gen_tcp:shutdown/2 --spec shutdown(socks5_socket(), read | write | read_write) -> ok. -shutdown({Transport, Socket}, How) -> - Transport:shutdown(Socket, How). - -%% @doc Get the local address and port of a socket -%% @see inet:sockname/1 --spec sockname(socks5_socket()) - -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. -sockname({Transport, Socket}) -> - Transport:sockname(Socket). - -%% private functions -do_handshake(Socket, Host, Port, Options) -> - ProxyUser = proplists:get_value(socks5_user, Options), - ProxyPass = proplists:get_value(socks5_pass, Options, <<>>), - case ProxyUser of - undefined -> - %% no auth - ok = gen_tcp:send(Socket, << 5, 1, 0 >>), - case gen_tcp:recv(Socket, 2, ?TIMEOUT) of - {ok, << 5, 0 >>} -> - do_connection(Socket, Host, Port); - {ok, _Reply} -> - {error, unknown_reply}; - Error -> - Error - end; - _ -> - case do_authentication(Socket, ProxyUser, ProxyPass) of - ok -> - do_connection(Socket, Host, Port); - Error -> - Error - end - end. - -do_authentication(Socket, User, Pass) -> - ok = gen_tcp:send(Socket, << 5, 1, 2 >>), - case gen_tcp:recv(Socket, 2, ?TIMEOUT) of - {ok, <<5, 0>>} -> - ok; - {ok, <<5, 2>>} -> - UserLength = byte_size(User), - PassLength = byte_size(Pass), - Msg = iolist_to_binary([<< 1, UserLength >>, - User, << PassLength >>, - Pass]), - ok = gen_tcp:send(Socket, Msg), - case gen_tcp:recv(Socket, 2, ?TIMEOUT) of - {ok, <<1, 0>>} -> - ok; - _ -> - {error, not_authenticated} - end; - _ -> - {error, not_authenticated} - end. - - -do_connection(Socket, Host, Port) -> - Addr = case inet_parse:address(Host) of - {ok, {IP1, IP2, IP3, IP4}} -> - << 1, IP1, IP2, IP3, IP4, Port:16 >>; - {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} -> - << 4, IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8, Port:16 >>; - _ -> - %% domain name - case inet:getaddr(Host, inet) of - {ok, {IP1, IP2, IP3, IP4}} -> - << 1, IP1, IP2, IP3, IP4, Port:16 >>; - _Else -> - case inet:getaddr(Host, inet6) of - {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} -> - << 4, IP1, IP2, IP3, IP4, IP5, IP6, IP7, - IP8, Port:16 >>; - _ -> - Host1 = list_to_binary(Host), - HostLength = byte_size(Host1), - << 3, HostLength, Host1/binary, Port:16 >> - end - end - end, - ok = gen_tcp:send(Socket, << 5, 1, 0, Addr/binary >>), - case gen_tcp:recv(Socket, 10, ?TIMEOUT) of - {ok, << 5, 0, 0, BoundAddr/binary >>} -> - check_connection(BoundAddr); - {ok, _} -> - {error, badarg}; - Error -> - Error - end. - - -check_connection(<< 3, _DomainLen:8, _Domain/binary >>) -> - ok; -check_connection(<< 1, _Addr:32, _Port:16 >>) -> - ok; -check_connection(<< 4, _Addr:128, _Port:16 >>) -> - ok; -check_connection(_) -> - {error, no_connection}. diff --git a/src/socket/hackney_socks_connection.erl b/src/socket/hackney_socks_connection.erl new file mode 100644 index 00000000..8166b13a --- /dev/null +++ b/src/socket/hackney_socks_connection.erl @@ -0,0 +1,118 @@ +-module(hackney_socks_connection). +-export([open/5, open/6]). +-export([do_handshake/4]). + + +-define(TIMEOUT, 5000). + +open(Host, Port, TargetHost, TargetPort, Options) -> + open(Host, Port, TargetHost, TargetPort, Options, infinity). + +open(Host, Port, TargetHost, TargetPort, Options0, Timeout) -> + Options = hackney_socket:add_prefix("socks", Options0), + case hackney_socket:connect(Host, Port, Options, Timeout) of + {ok, Socket} -> + do_handshake(Socket, TargetHost, TargetPort, Options); + Error -> + Error + end. + +do_handshake(Socket, Host, Port, Options) -> + ProxyUser = proplists:get_value(user, Options), + ProxyPass = proplists:get_value(password, Options, <<>>), + Secure = proplists:get_value(secure, Options, false), + + Result = case ProxyUser of + undefined -> + %% no auth + ok = hackney_socket:send(Socket, << 5, 1, 0 >>), + case hackney_socket:recv(Socket, 2, ?TIMEOUT) of + {ok, << 5, 0 >>} -> + do_connection(Socket, Host, Port); + {ok, _Reply} -> + {error, unknown_reply}; + Error -> + Error + end; + _ -> + case do_authentication(Socket, ProxyUser, ProxyPass) of + ok -> + do_connection(Socket, Host, Port); + Error -> + Error + end + end, + + %% maybe upgrade the socket + case {Result, Secure} of + {ok, false} -> + {ok, Socket}; + {ok, true} -> + hackney_socket:secure(Socket, Options); + _ -> + %% error, close the underlying socket and return + catch hackney_socket:close(Socket), + Result + end. + +do_authentication(Socket, User, Pass) -> + ok = hackney_socket:send(Socket, << 5, 1, 2 >>), + case hackney_socket:recv(Socket, 2, ?TIMEOUT) of + {ok, <<5, 0>>} -> + ok; + {ok, <<5, 2>>} -> + UserLength = byte_size(User), + PassLength = byte_size(Pass), + Msg = iolist_to_binary([<< 1, UserLength >>, + User, << PassLength >>, + Pass]), + ok = hackney_socket:send(Socket, Msg), + case hackney_socket:recv(Socket, 2, ?TIMEOUT) of + {ok, <<1, 0>>} -> + ok; + _ -> + {error, {proxy_error, not_authenticated}} + end; + _ -> + {error, {proxy_error, not_authenticated}} + end. + + +do_connection(Socket, Host, Port) -> + Addr = case inet_parse:address(Host) of + {ok, {IP1, IP2, IP3, IP4}} -> + << 1, IP1, IP2, IP3, IP4, Port:16 >>; + {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} -> + << 4, IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8, Port:16 >>; + _ -> + %% domain name + case inet:getaddr(Host, inet) of + {ok, {IP1, IP2, IP3, IP4}} -> + << 1, IP1, IP2, IP3, IP4, Port:16 >>; + _Else -> + case inet:getaddr(Host, inet6) of + {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} -> + << 4, IP1, IP2, IP3, IP4, IP5, IP6, IP7, + IP8, Port:16 >>; + _ -> + Host1 = list_to_binary(Host), + HostLength = byte_size(Host1), + << 3, HostLength, Host1/binary, Port:16 >> + end + end + end, + ok = hackney_socket:send(Socket, << 5, 1, 0, Addr/binary >>), + case hackney_socket:recv(Socket, 10, ?TIMEOUT) of + {ok, << 5, 0, 0, BoundAddr/binary >>} -> + check_connection(BoundAddr); + {ok, _} -> + {error, {proxy_error, badarg}}; + Error -> + Error + end. + + +check_connection(<< 3, _DomainLen:8, _Domain/binary >>) -> ok; +check_connection(<< 1, _Addr:32, _Port:16 >>) -> ok; +check_connection(<< 4, _Addr:128, _Port:16 >>) -> ok; +check_connection(_) -> {error, {proxy_error, no_connection}}. diff --git a/src/socket/hackney_ssl.erl b/src/socket/hackney_ssl.erl index c4d9ab23..000e8a80 100644 --- a/src/socket/hackney_ssl.erl +++ b/src/socket/hackney_ssl.erl @@ -6,7 +6,7 @@ %%% Copyright (c) 2011-2012, Loïc Hoguin -module(hackney_ssl). --export([messages/1, +-export([messages/0, connect/3, connect/4, recv/3, recv/2, send/2, @@ -18,7 +18,7 @@ sockname/1]). %% @doc Atoms used to identify messages in {active, once | true} mode. -messages(_) -> {ssl, ssl_closed, ssl_error}. +messages() -> {ssl, ssl_closed, ssl_error}. connect(Host, Port, Opts) -> connect(Host, Port, Opts, infinity). diff --git a/src/socket/hackney_tcp.erl b/src/socket/hackney_tcp.erl index f2167ff8..8333096f 100644 --- a/src/socket/hackney_tcp.erl +++ b/src/socket/hackney_tcp.erl @@ -6,7 +6,7 @@ %%% Copyright (c) 2011-2012, Loïc Hoguin %%% -module(hackney_tcp). --export([messages/1, +-export([messages/0, connect/3, connect/4, recv/2, recv/3, send/2, @@ -18,7 +18,7 @@ sockname/1]). %% @doc Atoms used to identify messages in {active, once | true} mode. -messages(_) -> {tcp, tcp_closed, tcp_error}. +messages() -> {tcp, tcp_closed, tcp_error}. connect(Host, Port, Opts) -> connect(Host, Port, Opts, infinity). @@ -28,7 +28,7 @@ connect(Host, Port, Opts, Timeout) when is_list(Host), is_integer(Port), %% filter options AcceptedOpts = [linger, nodelay, keepalive, send_timeout, - send_timeout_close, raw, inet6, reuseaddr], + send_timeout_close, raw, inet6, reuseaddr, ip], BaseOpts = [binary, {active, false}, {packet, raw}], Opts1 = hackney_util:filter_options(Opts, AcceptedOpts, BaseOpts), diff --git a/src/socket/hackney_tunnel_connection.erl b/src/socket/hackney_tunnel_connection.erl new file mode 100644 index 00000000..2562da4e --- /dev/null +++ b/src/socket/hackney_tunnel_connection.erl @@ -0,0 +1,112 @@ +-module(hackney_tunnel_connection). + +-export([open/5, open/6]). +-export([do_handshake/4]). + +-define(RESPONSE_RECV_TIMEOUT, 300000). %% timeout waiting for response line +-define(HEADERS_RECV_TIMEOUT, 30000). %% timeout waiting for headers + +-define(MAX_HEADERS, 1000). + +open(Host, Port, TargetHost, TargetPort, Options) -> + open(Host, Port, TargetHost, TargetPort, Options, infinity). + +open(Host, Port, TargetHost, TargetPort, Options0, Timeout) -> + Options = hackney_socket:add_prefix("tunnel", Options0), + case hackney_socket:connect(Host, Port, [binary, {active, false}], Timeout) of + {ok, Socket} -> + do_handshake(Socket, TargetHost, TargetPort, Options); + Error -> + Error + end. + +do_handshake(Socket, Host, Port, Options) -> + ProxyUser = proplists:get_value(user, Options), + ProxyPass = proplists:get_value(password, Options, <<>>), + Secure = proplists:get_value(secure, Options, false), + + UA = hackney_request:default_ua(), + Path = iolist_to_binary([Host, ":", integer_to_list(Port)]), + HostHdr = case Port of + 80 -> list_to_binary(Host); + _ -> Path + end, + Headers = case ProxyUser of + undefined -> + [<<"Host: ", HostHdr/binary >>, + <<"User-Agent: ", UA/binary >>]; + _ -> + Credentials = base64:encode(<>), + [<<"Host: ", HostHdr/binary >>, + <<"User-Agent: ", UA/binary >>, + <<"Proxy-Authorization: Basic ", Credentials/binary >>] + end, + + Payload = [<<"CONNECT ", Path/binary, " HTTP/1.1\r\n" >>, + hackney_bstr:join(lists:reverse(Headers), <<"\r\n" >>), + <<"\r\n\r\n">>], + + Result = case hackney_socket:send(Socket, Payload) of + ok -> + try + wait_response(Socket) + catch + 'EXIT':Reason -> {error, Reason} + end; + Error -> + Error + end, + + case {Result, Secure} of + {ok, false} -> + {ok, Socket}; + {ok, true} -> + hackney_socket:secure(Socket, Options); + _ -> + catch hackney_socket:close(Socket), + Result + end. + + +wait_response(Socket) -> + ok = hackney_socket:exit_if_closed(hackney_socket:setopts(Socket, [{active, once}])), + receive + {http, _, {http_response, _, Status, Reason}} -> + case lists:member(Status, [200, 201]) of + true -> + ok = hackney_socket:exit_if_closed(hackney_socket:setopts(Socket, [{packet, http_hbin}])), + wait_headers(Socket, 0); + false -> + {error, {proxy_error, {Status, Reason}}} + end; + {http, _, {http_error, <<"\r\n">>}} -> + wait_response(Socket); + {http, _, {http_error, <<"\n">>}} -> + wait_response(Socket); + tcp_closed -> + {error, closed}; + {tcp_error, Reason} -> + {error, Reason} + after ?RESPONSE_RECV_TIMEOUT -> + {error, proxy_timeout} + end. + +wait_headers(Socket, ?MAX_HEADERS) -> + ok = hackney_socket:exit_if_closed(hackney_socket:setopts(Socket, [{packet, raw}])), + {error, max_headers}; +wait_headers(Socket, Count) -> + ok = hackney_socket:exit_if_closed(hackney_socket:setopts(Socket, [{active, once}])), + receive + {http, _, http_eoh} -> + hackney_socket:exit_if_closed(hackney_socket:setopts(Socket, [{packet, raw}])); + {http, _, {http_header, _, _, _, _}} -> + wait_headers(Socket, Count + 1); + tcp_closed -> + {error, closed}; + {tcp_error, Reason} -> + {error, Reason} + after + ?HEADERS_RECV_TIMEOUT -> + {error, proxy_timeout} + end. diff --git a/test/hackney_url_tests.erl b/test/hackney_url_tests.erl index b64e1708..5ad8ee2b 100644 --- a/test/hackney_url_tests.erl +++ b/test/hackney_url_tests.erl @@ -7,8 +7,7 @@ parse_and_unparse_url_test_() -> %% {Value, Result}. Tests = [ {<<"http://www.example.com/path?key=value#Section%205">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/path?key=value#Section%205">>, path = <<"/path">>, @@ -20,8 +19,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"http://www.example.com/">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/">>, path = <<"/">>, @@ -33,8 +31,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"http://[db8:0cec::99:123a]/">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"[db8:0cec::99:123a]">>, raw_path = <<"/">>, path = <<"/">>, @@ -46,8 +43,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"https://[db8:0cec::99:123a]/">>, - #hackney_url{transport =hackney_ssl, - scheme = https, + #hackney_url{scheme = https, netloc = <<"[db8:0cec::99:123a]">>, raw_path = <<"/">>, path = <<"/">>, @@ -59,8 +55,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"https://[db8:0cec::99:123a]:8080/">>, - #hackney_url{transport =hackney_ssl, - scheme = https, + #hackney_url{scheme = https, netloc = <<"[db8:0cec::99:123a]:8080">>, raw_path = <<"/">>, path = <<"/">>, @@ -72,8 +67,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"http://www.example.com/?key=value#Section%205">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/?key=value#Section%205">>, path = <<"/">>, @@ -85,8 +79,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"http://www.example.com:8080/path?key=value#Section%205">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com:8080">>, raw_path = <<"/path?key=value#Section%205">>, path = <<"/path">>, @@ -98,8 +91,7 @@ parse_and_unparse_url_test_() -> password = <<"">>} }, {<<"https://user:passwd@www.example.com/path?key=value#Section%205">>, - #hackney_url{transport =hackney_ssl, - scheme = https, + #hackney_url{scheme = https, netloc = <<"www.example.com">>, raw_path = <<"/path?key=value#Section%205">>, path = <<"/path">>, @@ -111,8 +103,7 @@ parse_and_unparse_url_test_() -> password = <<"passwd">>} }, {<<"https://user@www.example.com/path?key=value#Section%205">>, - #hackney_url{transport =hackney_ssl, - scheme = https, + #hackney_url{scheme = https, netloc = <<"www.example.com">>, raw_path = <<"/path?key=value#Section%205">>, path = <<"/path">>, @@ -131,8 +122,7 @@ parse_url_test_() -> %% {Value, Result}. Tests = [ {"http://www.example.com/path?key=value#Section%205", % list as argument - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/path?key=value#Section%205">>, path = <<"/path">>, @@ -144,8 +134,7 @@ parse_url_test_() -> password = <<"">>} }, {<<"www.example.com/path?key=value#Section%205">>, % without http:// - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/path?key=value#Section%205">>, path = <<"/path">>, @@ -157,8 +146,7 @@ parse_url_test_() -> password = <<"">>} }, {<<"www.example.com?key=value#Section%205">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/?key=value#Section%205">>, path = <<"/">>, @@ -170,8 +158,7 @@ parse_url_test_() -> password = <<"">>} }, {<<"http://www.example.com">>, - #hackney_url{transport =hackney_tcp, - scheme = http, + #hackney_url{scheme = http, netloc = <<"www.example.com">>, raw_path = <<"/">>, path = <<"/">>, @@ -185,13 +172,6 @@ parse_url_test_() -> ], [{V, fun() -> R = hackney_url:parse_url(V) end} || {V, R} <- Tests]. -transport_scheme_test_() -> - %% {Value, Result}. - Tests = [ - {hackney_tcp, http}, - {hackney_ssl, https} - ], - [{atom_to_list(V), fun() -> R = hackney_url:transport_scheme(V) end} || {V, R} <- Tests]. url_encode_and_decode_test_() -> %% {Value, Result}.