Skip to content

Commit

Permalink
PISTON-1177: new approach to determine initial availability for agent…
Browse files Browse the repository at this point in the history
…s of queue (#6688)

- when acdc_queue_manager boots, do not immediately assume all member
agents are available
- when manager proc is ready, pub msg so that agents send an
availability update
- any subsequent agent starts/availability changes will already be
picked up by existing bindings
  • Loading branch information
danielfinke authored and jamesaimonetti committed Feb 1, 2021
1 parent f7c41c3 commit 1634056
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 104 deletions.
21 changes: 15 additions & 6 deletions applications/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
,end_wrapup/1

,add_acdc_queue/2, rm_acdc_queue/2
,send_availability_update/2
,update_presence/3
,agent_logout/1
,refresh/2
Expand Down Expand Up @@ -308,6 +309,14 @@ add_acdc_queue(ServerRef, QueueId) ->
rm_acdc_queue(ServerRef, QueueId) ->
gen_statem:cast(ServerRef, {'rm_acdc_queue', QueueId}).

%%------------------------------------------------------------------------------
%% @doc Send an availability update
%% @end
%%------------------------------------------------------------------------------
-spec send_availability_update(kz_types:server_ref(), kz_term:ne_binary()) -> 'ok'.
send_availability_update(ServerRef, QueueId) ->
gen_statem:cast(ServerRef, {'send_availability_update', QueueId}).

%%------------------------------------------------------------------------------
%% @doc
%% @end
Expand Down Expand Up @@ -1398,6 +1407,9 @@ handle_event({'add_acdc_queue', QueueId}, StateName, #state{agent_listener=Agent
handle_event({'rm_acdc_queue', QueueId}, StateName, #state{agent_listener=AgentListener}=State) ->
acdc_agent_listener:rm_acdc_queue(AgentListener, QueueId),
{'next_state', StateName, State};
handle_event({'send_availability_update', QueueId}, StateName, #state{agent_listener=AgentListener}=State) ->
acdc_agent_listener:send_availability_update(AgentListener, StateName, QueueId),
{'next_state', StateName, State};
handle_event({'update_presence', PresenceId, PresenceState}, 'ready', State) ->
handle_presence_update(PresenceId, PresenceState, State),
{'next_state', 'ready', State};
Expand Down Expand Up @@ -1967,14 +1979,11 @@ apply_state_updates_fold({_, StateName, #state{account_id=AccountId
,pause_alias=Alias
}}=Acc, []) ->
lager:debug("resulting agent state ~s", [StateName]),
acdc_agent_listener:send_availability_update(AgentListener, StateName),
case StateName of
'ready' ->
acdc_agent_listener:send_agent_available(AgentListener),
acdc_agent_stats:agent_ready(AccountId, AgentId);
'ready' -> acdc_agent_stats:agent_ready(AccountId, AgentId);
'wrapup' -> acdc_agent_stats:agent_wrapup(AccountId, AgentId, time_left(WRef));
'paused' ->
acdc_agent_listener:send_agent_busy(AgentListener),
acdc_agent_stats:agent_paused(AccountId, AgentId, time_left(PRef), Alias)
'paused' -> acdc_agent_stats:agent_paused(AccountId, AgentId, time_left(PRef), Alias)
end,
Acc;
apply_state_updates_fold({_, _, State}, [{'pause', Timeout, Alias}|Updates]) ->
Expand Down
14 changes: 14 additions & 0 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
,handle_agent_message/2
,handle_config_change/2
,handle_presence_probe/2
,handle_queue_started_notif/2
]).

-include("acdc.hrl").
Expand Down Expand Up @@ -490,3 +491,16 @@ maybe_update_presence(Sup, JObj, PresenceState) ->
APid = acdc_agent_sup:listener(Sup),
acdc_agent_listener:maybe_update_presence_id(APid, presence_id(JObj)),
acdc_agent_listener:presence_update(APid, presence_state(JObj, PresenceState)).

%%------------------------------------------------------------------------------
%% @doc When a queue this agent is bound to (a member of) is started, handle the
%% notif by sending an availability update to the queue so it knows the
%% availability state of this agent for taking calls.
%% @end
%%------------------------------------------------------------------------------
-spec handle_queue_started_notif(kz_json:object(), kz_term:proplist()) -> 'ok'.
handle_queue_started_notif(JObj, Props) ->
'true' = kapi_acdc_queue:started_notif_v(JObj),
FSM = props:get_value('fsm_pid', Props),
QueueId = kz_json:get_ne_binary_value(<<"Queue-ID">>, JObj),
acdc_agent_fsm:send_availability_update(FSM, QueueId).
72 changes: 34 additions & 38 deletions applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
,originate_execute/2
,originate_uuid/3
,outbound_call/2
,send_agent_available/1
,send_agent_busy/1
,send_availability_update/2, send_availability_update/3
,send_sync_req/1
,send_sync_resp/3, send_sync_resp/4
,config/1, refresh_config/3
Expand Down Expand Up @@ -154,6 +153,9 @@
,{{'acdc_agent_handler', 'handle_config_change'}
,[{<<"configuration">>, <<"*">>}]
}
,{{'acdc_agent_handler', 'handle_queue_started_notif'}
,[{<<"queue">>, <<"started_notif">>}]
}
]).

%%%=============================================================================
Expand Down Expand Up @@ -254,13 +256,13 @@ originate_uuid(Srv, UUID, CtlQ) ->
outbound_call(Srv, CallId) ->
gen_listener:cast(Srv, {'outbound_call', CallId}).

-spec send_agent_available(pid()) -> 'ok'.
send_agent_available(Srv) ->
gen_listener:cast(Srv, 'send_agent_available').
-spec send_availability_update(pid(), fsm_state_name()) -> 'ok'.
send_availability_update(Srv, StateName) ->
gen_listener:cast(Srv, {'send_availability_update', StateName}).

-spec send_agent_busy(pid()) -> 'ok'.
send_agent_busy(Srv) ->
gen_listener:cast(Srv, 'send_agent_busy').
-spec send_availability_update(pid(), fsm_state_name(), kz_term:ne_binary()) -> 'ok'.
send_availability_update(Srv, StateName, QueueId) ->
gen_listener:cast(Srv, {'send_availability_update', StateName, QueueId}).

-spec send_sync_req(pid()) -> 'ok'.
send_sync_req(Srv) -> gen_listener:cast(Srv, {'send_sync_req'}).
Expand Down Expand Up @@ -410,16 +412,14 @@ handle_cast({'fsm_started', FSMPid}, State) ->
handle_cast({'gen_listener', {'created_queue', Q}}, State) ->
{'noreply', State#state{my_q=Q}, 'hibernate'};

handle_cast({'add_acdc_queue', Q, StateName}, #state{agent_queues=Qs
,acct_id=AcctId
,agent_id=AgentId
}=State) when is_binary(Q) ->
handle_cast({'add_acdc_queue', Q, StateName}, #state{agent_queues=Qs}=State) when is_binary(Q) ->
case lists:member(Q, Qs) of
'true' ->
lager:debug("queue ~s already added", [Q]),
do_send_availability_update(Q, StateName, State),
{'noreply', State};
'false' ->
add_queue_binding(AcctId, AgentId, Q, StateName),
add_queue_binding(Q, StateName, State),
{'noreply', State#state{agent_queues=[Q|Qs]}}
end;

Expand All @@ -445,11 +445,8 @@ handle_cast({'rm_acdc_queue', Q}, #state{agent_queues=Qs
{'noreply', State}
end;

handle_cast('bind_to_member_reqs', #state{agent_queues=Qs
,acct_id=AcctId
,agent_id=AgentId
}=State) ->
_ = [add_queue_binding(AcctId, AgentId, Q, 'ready') || Q <- Qs],
handle_cast('bind_to_member_reqs', #state{agent_queues=Qs}=State) ->
_ = [add_queue_binding(Q, 'ready', State) || Q <- Qs],
{'noreply', State};

handle_cast({'rebind_events', OldCallId, NewCallId}, State) ->
Expand Down Expand Up @@ -739,18 +736,12 @@ handle_cast({'outbound_call', CallId}, #state{agent_id=AgentId
lager:debug("bound to agent's outbound call ~s", [CallId]),
{'noreply', State#state{call=kapps_call:set_call_id(CallId, kapps_call:new())}, 'hibernate'};

handle_cast('send_agent_available', #state{agent_id=AgentId
,acct_id=AcctId
,agent_queues=Qs
}=State) ->
[send_agent_available(AcctId, AgentId, QueueId) || QueueId <- Qs],
handle_cast({'send_availability_update', StateName}, #state{agent_queues=Qs}=State) ->
[do_send_availability_update(QueueId, StateName, State) || QueueId <- Qs],
{'noreply', State};

handle_cast('send_agent_busy', #state{agent_id=AgentId
,acct_id=AcctId
,agent_queues=Qs
}=State) ->
[send_agent_busy(AcctId, AgentId, QueueId) || QueueId <- Qs],
handle_cast({'send_availability_update', StateName, QueueId}, State) ->
do_send_availability_update(QueueId, StateName, State),
{'noreply', State};

handle_cast({'send_sync_req'}, #state{my_id=MyId
Expand Down Expand Up @@ -1093,9 +1084,11 @@ outbound_call_id(CallId, AgentId) when is_binary(CallId) ->
outbound_call_id(Call, AgentId) ->
outbound_call_id(kapps_call:call_id(Call), AgentId).

-spec add_queue_binding(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), fsm_state_name()) ->
-spec add_queue_binding(kz_term:ne_binary(), fsm_state_name(), state()) ->
'ok'.
add_queue_binding(AcctId, AgentId, QueueId, StateName) ->
add_queue_binding(QueueId, StateName, #state{agent_id=AgentId
,acct_id=AcctId
}=State) ->
lager:debug("adding queue binding for ~s", [QueueId]),
Body = kz_json:from_list([{<<"agent_id">>, AgentId}
,{<<"queue_id">>, QueueId}
Expand All @@ -1104,11 +1097,11 @@ add_queue_binding(AcctId, AgentId, QueueId, StateName) ->
kz_edr:event(?APP_NAME, ?APP_VERSION, 'ok', 'info', Body, AcctId),
gen_listener:add_binding(self()
,'acdc_queue'
,[{'restrict_to', ['member_connect_req']}
,[{'restrict_to', ['member_connect_req', 'started_notif']}
,{'queue_id', QueueId}
,{'account_id', AcctId}
]),
send_availability_update(AcctId, AgentId, QueueId, StateName).
do_send_availability_update(QueueId, StateName, State).

-spec rm_queue_binding(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
rm_queue_binding(AcctId, AgentId, QueueId) ->
Expand All @@ -1120,17 +1113,20 @@ rm_queue_binding(AcctId, AgentId, QueueId) ->
kz_edr:event(?APP_NAME, ?APP_VERSION, 'ok', 'info', Body, AcctId),
gen_listener:rm_binding(self()
,'acdc_queue'
,[{'restrict_to', ['member_connect_req']}
,[{'restrict_to', ['member_connect_req', 'started_notif']}
,{'queue_id', QueueId}
,{'account_id', AcctId}
]),
send_agent_unavailable(AcctId, AgentId, QueueId).

-spec send_availability_update(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), fsm_state_name()) ->
'ok'.
send_availability_update(AcctId, AgentId, QueueId, 'ready') ->
-spec do_send_availability_update(kz_term:ne_binary(), fsm_state_name(), state()) -> 'ok'.
do_send_availability_update(QueueId, 'ready', #state{agent_id=AgentId
,acct_id=AcctId
}) ->
send_agent_available(AcctId, AgentId, QueueId);
send_availability_update(AcctId, AgentId, QueueId, _) ->
do_send_availability_update(QueueId, _, #state{agent_id=AgentId
,acct_id=AcctId
}) ->
send_agent_busy(AcctId, AgentId, QueueId).

-spec send_agent_available(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
Expand Down Expand Up @@ -1238,7 +1234,7 @@ stop_agent_leg(ACallId, ACtrlQ) ->
Command = [{<<"Application-Name">>, <<"hangup">>}
,{<<"Insert-At">>, <<"now">>}
,{<<"Call-ID">>, ACallId}
| kz_api:default_headers(<<>>, <<"call">>, <<"command">>, ?APP_NAME, ?APP_VERSION)
| kz_api:default_headers(<<"call">>, <<"command">>, ?APP_NAME, ?APP_VERSION)
],
lager:debug("sending hangup to ~s: ~s", [ACallId, ACtrlQ]),
kapi_dialplan:publish_command(ACtrlQ, Command).
Expand Down
76 changes: 18 additions & 58 deletions applications/acdc/src/acdc_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ init(Super, AccountId, QueueId, QueueJObj) ->

gen_listener:cast(self(), {'start_workers'}),
Strategy = get_strategy(kz_json:get_value(<<"strategy">>, QueueJObj)),
StrategyState = create_strategy_state(Strategy, AccountDb, QueueId),

_ = update_strategy_state(self(), Strategy, StrategyState),
StrategyState = create_strategy_state(Strategy),

lager:debug("queue mgr started for ~s", [QueueId]),
{'ok', update_properties(QueueJObj, #state{account_id=AccountId
Expand Down Expand Up @@ -524,15 +522,18 @@ handle_cast({'reject_member_call', Call, JObj}, #state{account_id=AccountId
publish_member_call_failure(Q, AccountId, QueueId, kapps_call:call_id(Call), <<"no agents">>),
{'noreply', State};

handle_cast({'sync_with_agent', A}, #state{account_id=AccountId}=State) ->
{'ok', Status} = acdc_agent_util:most_recent_status(AccountId, A),
case acdc_agent_util:status_should_auto_start(Status) of
'true' -> 'ok';
'false' -> gen_listener:cast(self(), {'agent_unavailable', A})
end,
handle_cast({'gen_listener', {'created_queue', ?SECONDARY_QUEUE_NAME(QueueId)}}, #state{queue_id=QueueId}=State) ->
{'noreply', State};

handle_cast({'gen_listener', {'created_queue', _}}, State) ->
handle_cast({'gen_listener', {'created_queue', _}}, #state{account_id=AccountId
,queue_id=QueueId
}=State) ->
kapi_acdc_queue:publish_started_notif(
kz_json:from_list([{<<"Account-ID">>, AccountId}
,{<<"Queue-ID">>, QueueId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
])
),
{'noreply', State};

handle_cast({'refresh', QueueJObj}, State) ->
Expand Down Expand Up @@ -855,55 +856,14 @@ get_strategy(<<"round_robin">>) -> 'rr';
get_strategy(<<"most_idle">>) -> 'mi';
get_strategy(_) -> 'rr'.

-spec create_strategy_state(queue_strategy(), kz_term:ne_binary(), kz_term:ne_binary()) -> strategy_state().
create_strategy_state(Strategy, AcctDb, QueueId) ->
create_strategy_state(Strategy, #strategy_state{}, AcctDb, QueueId).

-spec create_strategy_state(queue_strategy(), strategy_state(), kz_term:ne_binary(), kz_term:ne_binary()) -> strategy_state().
create_strategy_state('rr', #strategy_state{agents='undefined'}=SS, AcctDb, QueueId) ->
create_strategy_state('rr', SS#strategy_state{agents=queue:new()}, AcctDb, QueueId);
create_strategy_state('rr', #strategy_state{agents=AgentQ}=SS, AcctDb, QueueId) ->
case acdc_util:agents_in_queue(AcctDb, QueueId) of
[] -> lager:debug("no agents around"), SS;
JObjs ->
Q = queue:from_list([Id || JObj <- JObjs,
not queue:member((Id = kz_doc:id(JObj)), AgentQ)
]),
Details = lists:foldl(fun(JObj, Acc) ->
dict:store(kz_doc:id(JObj), {1, 'undefined'}, Acc)
end, dict:new(), JObjs),
SS#strategy_state{agents=queue:join(AgentQ, Q)
,details=Details
}
end;
create_strategy_state('mi', #strategy_state{agents='undefined'}=SS, AcctDb, QueueId) ->
create_strategy_state('mi', SS#strategy_state{agents=[]}, AcctDb, QueueId);
create_strategy_state('mi', #strategy_state{agents=AgentL}=SS, AcctDb, QueueId) ->
case acdc_util:agents_in_queue(AcctDb, QueueId) of
[] -> lager:debug("no agents around"), SS;
JObjs ->
AgentL1 = lists:foldl(fun(JObj, Acc) ->
Id = kz_doc:id(JObj),
case lists:member(Id, Acc) of
'true' -> Acc;
'false' -> [Id | Acc]
end
end, AgentL, JObjs),
Details = lists:foldl(fun(JObj, Acc) ->
dict:store(kz_doc:id(JObj), {1, 'undefined'}, Acc)
end, dict:new(), JObjs),
SS#strategy_state{agents=AgentL1
,details=Details
}
end.
-spec create_strategy_state(queue_strategy()) -> strategy_state().
create_strategy_state(Strategy) ->
Agents = create_ss_agents(Strategy),
#strategy_state{agents=Agents}.

update_strategy_state(Srv, 'rr', #strategy_state{agents=AgentQueue}) ->
L = queue:to_list(AgentQueue),
update_strategy_state(Srv, L);
update_strategy_state(Srv, 'mi', #strategy_state{agents=AgentL}) ->
update_strategy_state(Srv, AgentL).
update_strategy_state(Srv, L) ->
[gen_listener:cast(Srv, {'sync_with_agent', A}) || A <- L].
-spec create_ss_agents(queue_strategy()) -> queue_strategy_state().
create_ss_agents('rr') -> queue:new();
create_ss_agents('mi') -> [].

-spec call_position(kz_term:ne_binary(), [kapps_call:call()]) -> kz_term:api_integer().
call_position(CallId, Calls) ->
Expand Down
Loading

0 comments on commit 1634056

Please sign in to comment.