Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #241 from heroku/ypaq-logplex-shard-retries
Browse files Browse the repository at this point in the history
Add retry to establishing a shard pool connection
  • Loading branch information
cyx authored Mar 30, 2018
2 parents 8fd6350 + 3281f48 commit d57372d
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions src/logplex_shard.erl
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,25 @@ register_worker(WorkerType, Url, Pid) ->

async_add_pool(ReadMap, Url) ->
WorkerFun = fun () ->
ok = register_worker(ReadMap, Url, add_pool(Url))
{ok, Pool} = add_pool_with_retry(Url, 10),
ok = register_worker(ReadMap, Url, Pool)
end,
{async, spawn(WorkerFun)}.

add_pool(Url) ->
add_pool_with_retry(Url, 0) ->
Host = proplists:get_value(host, parse_redis_uri(Url)),
?ERR("error=failed_to_add_pool reason=out_of_retries redis_host=~p", [Host]),
{error, out_of_retries};
add_pool_with_retry(Url, N) ->
Opts = parse_redis_uri(Url),
case redo:start_link(undefined, Opts) of
{ok, Pid} when is_pid(Pid) -> Pid;
{error, {error, econnrefused}} -> undefined
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, Reason} ->
Host = proplists:get_value(host, Opts),
?WARN("warn=failed_to_add_pool reason=~p redis_host=~p retry_attempt=~p", [Reason, Host, N]),
timer:sleep(timer:seconds(5)),
add_pool_with_retry(Url, N-1)
end.

async_add_buffer(WriteMap, Url) ->
Expand Down Expand Up @@ -321,7 +331,7 @@ redis_buffer_opts(Url) ->
handle_child_death(Pid) ->
case logplex_shard_info:pid_info(Pid) of
{logplex_read_pool_map, {{Shard, {Url, Pid}}, Map, V}} ->
NewPid = add_pool(Url),
NewPid = add_pool_with_retry(Url, 10),
NewMap = dict:store(Shard, {Url, NewPid}, Map),
logplex_shard_info:save(logplex_read_pool_map, NewMap, V),
?INFO("at=read_pool_restart oldpid=~p newpid=~p",
Expand Down

0 comments on commit d57372d

Please sign in to comment.