Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.3] acdc - PISTON-55: acdc distributed member_connect_win #6558

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 94 additions & 57 deletions applications/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-export([start_link/2, start_link/3, start_link/4, start_link/5
,call_event/4
,member_connect_req/2
,member_connect_win/2
,member_connect_win/3
,agent_timeout/2
,originate_ready/2
,originate_resp/2, originate_started/2, originate_uuid/2
Expand Down Expand Up @@ -134,14 +134,17 @@ member_connect_req(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'member_connect_req', JObj}).

%%------------------------------------------------------------------------------
%% @doc When a queue receives a call and needs an agent, it will send a
%% `member_connect_req'. The agent will respond (if possible) with a
%% `member_connect_resp' payload or ignore the request
%% @doc When an agent has been selected to handle the queue call, each process
%% for the agent will receive a `member_connect_win' event. The event will
%% include a flag of whether the winner is on the current node - if true, the
%% agent process will handle call control. Otherwise, the agent process will
%% just follow along through state transitions.
%% @end
%%------------------------------------------------------------------------------
-spec member_connect_win(pid(), kz_json:object()) -> 'ok'.
member_connect_win(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'member_connect_win', JObj}).
-type member_connect_win_node() :: 'same_node' | 'different_node'.
-spec member_connect_win(pid(), kz_json:object(), member_connect_win_node()) -> 'ok'.
member_connect_win(ServerRef, JObj, Node) ->
gen_statem:cast(ServerRef, {'member_connect_win', JObj, Node}).

-spec agent_timeout(pid(), kz_json:object()) -> 'ok'.
agent_timeout(ServerRef, JObj) ->
Expand Down Expand Up @@ -561,13 +564,12 @@ ready('cast', {'sync_req', JObj}, #state{agent_listener=AgentListener}=State) ->
{'next_state', 'ready', State};
ready('cast', {'sync_resp', _}, State) ->
{'next_state', 'ready', State};
ready('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener
,endpoints=OrigEPs
,agent_listener_id=MyId
,account_id=AccountId
,agent_id=AgentId
,connect_failures=CF
}=State) ->
ready('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener
,endpoints=OrigEPs
,account_id=AccountId
,agent_id=AgentId
,connect_failures=CF
}=State) ->
Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, JObj)),
CallId = kapps_call:call_id(Call),

Expand All @@ -580,51 +582,80 @@ ready('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener
CDRUrl = cdr_url(JObj),
RecordingUrl = recording_url(JObj),

case kz_json:get_value(<<"Agent-Process-ID">>, JObj) of
MyId ->
lager:debug("trying to ring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),

case get_endpoints(OrigEPs, AgentListener, Call, AgentId, QueueId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
acdc_agent_stats:agent_logged_out(AccountId, AgentId),
agent_logout(self()),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'paused', State};
{'error', _E} ->
lager:debug("can't take the call, skip me: ~p", [_E]),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', UpdatedEPs} ->
acdc_agent_listener:bridge_to_member(AgentListener, Call, JObj, UpdatedEPs, CDRUrl, RecordingUrl),

CIDName = kapps_call:caller_id_name(Call),
CIDNum = kapps_call:caller_id_number(Call),

acdc_agent_stats:agent_connecting(AccountId, AgentId, CallId, CIDName, CIDNum, QueueId),
lager:info("trying to ring agent endpoints(~p)", [length(UpdatedEPs)]),
lager:debug("notifications for the queue: ~p", [kz_json:get_value(<<"Notifications">>, JObj)]),
{'next_state', 'ringing', State#state{wrapup_timeout=WrapupTimer
,member_call=Call
,member_call_id=CallId
,member_call_start=kz_time:now()
,member_call_queue_id=QueueId
,caller_exit_key=CallerExitKey
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
}}
end;
_OtherId ->
lager:debug("monitoring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),
lager:debug("trying to ring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),

acdc_agent_listener:monitor_call(AgentListener, Call, CDRUrl, RecordingUrl),
case get_endpoints(OrigEPs, AgentListener, Call, AgentId, QueueId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
acdc_agent_stats:agent_logged_out(AccountId, AgentId),
agent_logout(self()),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'paused', State};
{'error', _E} ->
lager:debug("can't take the call, skip me: ~p", [_E]),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', UpdatedEPs} ->
acdc_util:bind_to_call_events(Call, AgentListener),

acdc_agent_listener:bridge_to_member(AgentListener, Call, JObj, UpdatedEPs, CDRUrl, RecordingUrl),

CIDName = kapps_call:caller_id_name(Call),
CIDNum = kapps_call:caller_id_number(Call),

acdc_agent_stats:agent_connecting(AccountId, AgentId, CallId, CIDName, CIDNum, QueueId),
lager:info("trying to ring agent endpoints(~p)", [length(UpdatedEPs)]),
lager:debug("notifications for the queue: ~p", [kz_json:get_value(<<"Notifications">>, JObj)]),
{'next_state', 'ringing', State#state{wrapup_timeout=WrapupTimer
,member_call=Call
,member_call_id=CallId
,member_call_start=kz_time:now()
,member_call_queue_id=QueueId
,caller_exit_key=CallerExitKey
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
}}
end;
ready('cast', {'member_connect_win', JObj, 'different_node'}, #state{agent_listener=AgentListener
,endpoints=OrigEPs
,agent_id=AgentId
,connect_failures=CF
}=State) ->
Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, JObj)),
CallId = kapps_call:call_id(Call),

kz_util:put_callid(CallId),

WrapupTimer = kz_json:get_integer_value(<<"Wrapup-Timeout">>, JObj, 0),
CallerExitKey = kz_json:get_value(<<"Caller-Exit-Key">>, JObj, <<"#">>),
QueueId = kz_json:get_value(<<"Queue-ID">>, JObj),

CDRUrl = cdr_url(JObj),
RecordingUrl = recording_url(JObj),

%% Only start monitoring if the agent can actually take the call
case get_endpoints(OrigEPs, AgentListener, Call, AgentId, QueueId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
{'next_state', 'paused', State};
{'error', _E} ->
lager:debug("can't take the call, skip me: ~p", [_E]),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', UpdatedEPs} ->
acdc_util:bind_to_call_events(Call, AgentListener),

acdc_agent_listener:monitor_call(AgentListener, Call, CDRUrl, RecordingUrl),
NextState = 'ringing',

lager:debug("monitoring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),
{'next_state', NextState, State#state{wrapup_timeout=WrapupTimer
,member_call=Call
,member_call_id=CallId
,member_call_start=kz_time:now()
,member_call_queue_id=QueueId
,caller_exit_key=CallerExitKey
,agent_call_id='undefined'
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
}}
end;
ready('cast', {'member_connect_req', _}, #state{max_connect_failures=Max
Expand Down Expand Up @@ -700,7 +731,7 @@ ready('info', Evt, State) ->
-spec ringing(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
ringing('cast', {'member_connect_req', _}, State) ->
{'next_state', 'ringing', State};
ringing('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
ringing('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (already ringing)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

Expand Down Expand Up @@ -963,7 +994,7 @@ ringing('info', Evt, State) ->
-spec answered(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
answered('cast', {'member_connect_req', _}, State) ->
{'next_state', 'answered', State};
answered('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
answered('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (on the phone with someone)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

Expand Down Expand Up @@ -1142,10 +1173,13 @@ wrapup('cast', {'pause', Timeout}, #state{account_id=AccountId
{'next_state', 'paused', State#state{pause_ref=Ref}};
wrapup('cast', {'member_connect_req', _}, State) ->
{'next_state', 'wrapup', State#state{wrapup_timeout=0}};
wrapup('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
wrapup('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (in wrapup)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

{'next_state', 'wrapup', State#state{wrapup_timeout=0}};
wrapup('cast', {'member_connect_win', _, 'different_node'}, State) ->
lager:debug("received member_connect_win for different node (wrapup)"),
{'next_state', 'wrapup', State#state{wrapup_timeout=0}};
wrapup('cast', {'sync_req', JObj}, #state{agent_listener=AgentListener
,wrapup_ref=Ref
Expand Down Expand Up @@ -1208,7 +1242,7 @@ paused('cast', {'sync_resp', _}, State) ->
{'next_state', 'paused', State};
paused('cast', {'member_connect_req', _}, State) ->
{'next_state', 'paused', State};
paused('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
paused('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

Expand Down Expand Up @@ -1255,7 +1289,7 @@ paused('info', Evt, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec outbound(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
outbound('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
outbound('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (on outbound call)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'outbound', State};
Expand Down Expand Up @@ -1422,6 +1456,9 @@ handle_event(Event, StateName, State) ->
-spec handle_info(any(), atom(), state()) -> kz_types:handle_fsm_ret(state()).
handle_info({'timeout', _Ref, ?SYNC_RESPONSE_MESSAGE}, StateName, State) ->
{'next_state', StateName, State};
handle_info({'member_connect_win', _, 'different_node'}, StateName, State) ->
lager:debug("received member_connect_win for different node (~s)", [StateName]),
{'next_state', StateName, State};
handle_info({'endpoint_edited', EP}, StateName, #state{endpoints=EPs
,account_id=AccountId
,agent_id=AgentId
Expand Down
9 changes: 7 additions & 2 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,13 @@ handle_member_message(JObj, Props, <<"connect_req">>) ->
'true' = kapi_acdc_queue:member_connect_req_v(JObj),
acdc_agent_fsm:member_connect_req(props:get_value('fsm_pid', Props), JObj);
handle_member_message(JObj, Props, <<"connect_win">>) ->
'true' = kapi_acdc_queue:member_connect_win_v(JObj),
acdc_agent_fsm:member_connect_win(props:get_value('fsm_pid', Props), JObj);
'true' = kapi_acdc_agent:member_connect_win_v(JObj),
FSMPid = props:get_value('fsm_pid', Props),
MyId = acdc_util:proc_id(FSMPid),
case kz_json:get_value(<<"Agent-Process-ID">>, JObj) of
MyId -> acdc_agent_fsm:member_connect_win(FSMPid, JObj, 'same_node');
_ -> acdc_agent_fsm:member_connect_win(FSMPid, JObj, 'different_node')
end;
handle_member_message(_, _, EvtName) ->
lager:debug("not handling member event ~s", [EvtName]).

Expand Down
2 changes: 1 addition & 1 deletion applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
-define(BINDINGS(AcctId, AgentId), [{'self', []}
,{'acdc_agent', [{'account_id', AcctId}
,{'agent_id', AgentId}
,{'restrict_to', ['sync', 'stats_req']}
,{'restrict_to', ['member_connect_win', 'sync', 'stats_req']}
]}
,{'conf', [{'action', <<"*">>}
,{'db', kz_util:format_account_id(AcctId, 'encoded')}
Expand Down
4 changes: 2 additions & 2 deletions applications/acdc/src/acdc_queue_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,15 @@ send_member_connect_req(CallId, AccountId, QueueId, MyQ, MyId) ->
-spec send_member_connect_win(kz_json:object(), kapps_call:call(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:proplist()) -> 'ok'.
send_member_connect_win(RespJObj, Call, QueueId, MyQ, MyId, QueueOpts) ->
CallJSON = kapps_call:to_json(Call),
Q = kz_json:get_value(<<"Server-ID">>, RespJObj),
Win = props:filter_undefined(
[{<<"Call">>, CallJSON}
,{<<"Process-ID">>, MyId}
,{<<"Agent-Process-ID">>, kz_json:get_value(<<"Agent-Process-ID">>, RespJObj)}
,{<<"Queue-ID">>, QueueId}
,{<<"Agent-ID">>, kz_json:get_value(<<"Agent-ID">>, RespJObj)}
| QueueOpts ++ kz_api:default_headers(MyQ, ?APP_NAME, ?APP_VERSION)
]),
publish(Q, Win, fun kapi_acdc_queue:publish_member_connect_win/2).
publish(Win, fun kapi_acdc_agent:publish_member_connect_win/1).

-spec send_agent_timeout(kz_json:object(), kapps_call:call(), kz_term:ne_binary()) -> 'ok'.
send_agent_timeout(RespJObj, Call, QueueId) ->
Expand Down
Loading