Skip to content

Commit

Permalink
[4.3] Improve handling of slow websocket clients (#6606)
Browse files Browse the repository at this point in the history
* Improve handling of slow websocket clients

When a websocket client is having trouble reading messages off its
socket, backpressure will cause the mailbox of the
blackhole_socket_handler to start to fill up with {send_data, _}
tuples from blackhole_data_emitter. This is due to the sending of data
being stuck in prim_inet:send/3.

We add two mechnisms to try to avoid the server being harshly impacted
by slow clients:

1. Reduce the send_timeout blackhole uses to more quickly terminate
the socket in the event of slow sends
2. Check the mailbox of blackhole_socket_handler from
blackhole_data_emitter before sending the {send_data, _} tuple, opting
to shed the load while waiting.

* unused

* unused
  • Loading branch information
jamesaimonetti authored Jul 13, 2020
1 parent 6deeab2 commit 72c5328
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 68 deletions.
21 changes: 18 additions & 3 deletions applications/blackhole/src/blackhole_data_emitter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
-export([event/4]).
-export([reply/4]).

-define(MAX_QUEUED_MESSAGES, kapps_config:get_integer(?CONFIG_CAT, <<"max_queued_messages">>, 50)).

-spec event(map(), kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object()) -> 'ok'.
event(Binding, RK, Name, Data) ->
#{subscribed_key := SubscribedKey
Expand All @@ -23,8 +25,7 @@ event(Binding, RK, Name, Data) ->
,{<<"routing_key">>, RK}
,{<<"data">>, Data}
],
SessionPid ! {'send_data', kz_json:from_list(Msg)},
'ok'.
maybe_send(SessionPid, kz_json:from_list(Msg)).

-spec reply(pid(), kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object()) -> 'ok'.
reply(SessionPid, RequestId, Status, Data) ->
Expand All @@ -34,5 +35,19 @@ reply(SessionPid, RequestId, Status, Data) ->
,{<<"status">>, Status}
,{<<"data">>, Data}
],
SessionPid ! {'send_data', kz_json:from_list(Msg)},
maybe_send(SessionPid, kz_json:from_list(Msg)).

maybe_send(SessionPid, Data) ->
maybe_send(SessionPid, Data, process_info(SessionPid, ['message_queue_len'])).

maybe_send(SessionPid, Data, [{'message_queue_len', QueueLen}]) ->
maybe_send(SessionPid, Data, QueueLen, ?MAX_QUEUED_MESSAGES);
maybe_send(_SessionPid, _Data, 'undefined') ->
lager:info("failed to find session ~p, dropping data").

maybe_send(SessionPid, _Data, QueueLen, MaxLen) when QueueLen > MaxLen ->
lager:error("~p queue length ~p (max: ~p), dropping event", [SessionPid, QueueLen, MaxLen]);
maybe_send(SessionPid, Data, QueueLen, _MaxLen) ->
lager:info("~p queue length ~p (max: ~p)", [SessionPid, QueueLen, _MaxLen]),
SessionPid ! {'send_data', Data},
'ok'.
26 changes: 16 additions & 10 deletions applications/blackhole/src/blackhole_init.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
-define(SOCKET_PORT, kapps_config:get_integer(?APP_NAME, <<"port">>, 5555)).
-define(SOCKET_ACCEPTORS, kapps_config:get_integer(?APP_NAME, <<"acceptors">>, 100)).

-define(BASE_TRANSPORT_OPTIONS(IP, Workers)
,[{'ip', IP}
,{'num_acceptors', Workers}
,{'send_timeout', kapps_config:get_integer(?CONFIG_CAT, <<"send_timeout_ms">>, 5 * ?MILLISECONDS_IN_SECOND)}
]
).
-spec blackhole_routes() -> cowboy_router:routes().
blackhole_routes() -> [{'_', paths_list()}].

Expand Down Expand Up @@ -60,10 +66,7 @@ maybe_start_plaintext(Dispatch, IP) ->
try
lager:info("trying to bind to address ~s port ~b", [inet:ntoa(IP), Port]),
cowboy:start_clear('blackhole_socket_handler'
,[{'ip', IP}
,{'port', Port}
,{'num_acceptors', Workers}
]
,[{'port', Port} | ?BASE_TRANSPORT_OPTIONS(IP, Workers)]
,#{'env' => #{'dispatch' => Dispatch
,'timeout' => ReqTimeout
}
Expand Down Expand Up @@ -105,10 +108,7 @@ start_ssl(Dispatch, IP) ->
]
),
cowboy:start_tls('blackhole_socket_handler_ssl'
,[{'ip', IP}
,{'num_acceptors', Workers}
| SSLOpts
]
,?BASE_TRANSPORT_OPTIONS(IP, Workers) ++ SSLOpts
,#{'env' => #{'dispatch' => Dispatch
,'timeout' => ReqTimeout
}
Expand Down Expand Up @@ -147,11 +147,17 @@ base_ssl_opts(RootDir) ->
,{'certfile', find_file(kapps_config:get_string(?CONFIG_CAT
,<<"ssl_cert">>
,filename:join([RootDir, <<"priv/ssl/blackhole.crt">>])
), RootDir)}
)
,RootDir
)
}
,{'keyfile', find_file(kapps_config:get_string(?CONFIG_CAT
,<<"ssl_key">>
,filename:join([RootDir, <<"priv/ssl/blackhole.key">>])
), RootDir)}
)
,RootDir
)
}
,{'password', kapps_config:get_string(?CONFIG_CAT, <<"ssl_password">>, <<>>)}
].

Expand Down
8 changes: 5 additions & 3 deletions applications/blackhole/src/blackhole_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ start_link() ->
-spec handle_amqp_event(kz_json:object(), kz_term:proplist(), gen_listener:basic_deliver() | kz_term:ne_binary()) -> 'ok'.
handle_amqp_event(EventJObj, _Props, ?MODULE_REQ_ROUTING_KEY) ->
handle_module_req(EventJObj);
handle_amqp_event(EventJObj, _Props, <<_/binary>> = RoutingKey) ->
handle_amqp_event(EventJObj, _Props, <<RoutingKey/binary>>) ->
Evt = kz_util:get_event_type(EventJObj),
lager:debug("recv event ~p (~s)", [Evt, RoutingKey]),
RK = <<"blackhole.event.", RoutingKey/binary>>,
{Time, Res} = timer:tc(blackhole_bindings, pmap, [RK, [RoutingKey, EventJObj]]),

StartTime = kz_time:start_time(),
Res = blackhole_bindings:pmap(RK, [RoutingKey, EventJObj]),
lager:debug("delivered the event ~p (~s) to ~b subscriptions in ~b ms"
,[Evt, RoutingKey, length(Res), Time div 1000]
,[Evt, RoutingKey, length(Res), kz_time:elapsed_ms(StartTime)]
);
handle_amqp_event(EventJObj, Props, BasicDeliver) ->
handle_amqp_event(EventJObj, Props, gen_listener:routing_key_used(BasicDeliver)).
Expand Down
10 changes: 10 additions & 0 deletions applications/crossbar/priv/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -29584,6 +29584,11 @@
"description": "blackhole max_connections_per_ip",
"type": "integer"
},
"max_queued_messages": {
"default": 50,
"description": "Max_queued_messages to be sent on a websocket before terminating the socket (slow client)",
"type": "integer"
},
"port": {
"default": 5555,
"description": "blackhole port",
Expand All @@ -29594,6 +29599,11 @@
"description": "blackhole request_timeout_ms",
"type": "integer"
},
"send_timeout_ms": {
"default": 5000,
"description": "How long to wait for a packet to be confirmed sent",
"type": "integer"
},
"ssl_ca_cert": {
"description": "blackhole ssl_ca_cert",
"type": "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
"description": "blackhole max_connections_per_ip",
"type": "integer"
},
"max_queued_messages": {
"default": 50,
"description": "Max_queued_messages to be sent on a websocket before terminating the socket (slow client)",
"type": "integer"
},
"port": {
"default": 5555,
"description": "blackhole port",
Expand All @@ -60,6 +65,11 @@
"description": "blackhole request_timeout_ms",
"type": "integer"
},
"send_timeout_ms": {
"default": 5000,
"description": "How long to wait for a packet to be confirmed sent",
"type": "integer"
},
"ssl_ca_cert": {
"description": "blackhole ssl_ca_cert",
"type": "string"
Expand Down
3 changes: 2 additions & 1 deletion core/kazoo_amqp/src/kz_amqp_connections.erl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ federated_brokers() ->
},
[{'andalso',
{'=/=', '$1', 'local'},
{'=:=', '$3', 'false'}}
{'=:=', '$3', 'false'}
}
],
['$2']
}
Expand Down
153 changes: 104 additions & 49 deletions core/kazoo_proper/src/pqc_blackhole.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
%%%-----------------------------------------------------------------------------
-module(pqc_blackhole).

-export([seq/0, seq_api/0
-export([seq/0
,seq_api/0
,cleanup/0
]).

Expand Down Expand Up @@ -96,7 +97,7 @@ seq_max_conn() ->
-spec seq_api() -> 'ok'.
seq_api() ->
Model = initial_state(),
#{'auth_token' := AuthToken} = API = pqc_kazoo_model:api(Model),
API = pqc_kazoo_model:api(Model),

AccountResp = pqc_cb_accounts:create_account(API, hd(?ACCOUNT_NAMES)),
lager:info("created account: ~s", [AccountResp]),
Expand All @@ -107,13 +108,65 @@ seq_api() ->
lager:info("available: ~s", [AvailableBindings]),
'true' = ([] =/= kz_json:is_json_object([<<"data">>, <<"call">>], kz_json:decode(AvailableBindings))),

EmptySockets = pqc_cb_websockets:summary(API, AccountId),
lager:info("empty: ~s", [EmptySockets]),
[] = kz_json:get_list_value(<<"data">>, kz_json:decode(EmptySockets)),
test_empty_active_connections(API, AccountId),

WSConn = pqc_ws_client:connect("localhost", ?PORT),
lager:info("connected to websocket: ~p", [WSConn]),

%% test pinging the websocket
_ = test_ws_ping(API, WSConn),

Binding = <<"object.*.user">>,

%% test using the API to list ws connections
{SocketId, BindReq} = test_ws_api_listing(API, WSConn, AccountId, Binding),

%% test receiving events over the ws
_ = test_crud_user_events(WSConn, API, AccountId),

%% test unbinding for events
_ = test_ws_unbind(API, WSConn, AccountId, SocketId, BindReq, Binding),

pqc_ws_client:close(WSConn),

test_empty_active_connections(API, AccountId),

cleanup(API),
lager:info("FINISHED API SEQ").

-spec initial_state() -> pqc_kazoo_model:model().
initial_state() ->
_ = init_system(),
API = pqc_cb_api:authenticate(),
pqc_kazoo_model:new(API).

init_system() ->
TestId = kz_binary:rand_hex(5),
kz_util:put_callid(TestId),

_ = kz_data_tracing:clear_all_traces(),
_ = [kapps_controller:start_app(App) ||
App <- ['crossbar', 'blackhole']
],
_ = [crossbar_maintenance:start_module(Mod) ||
Mod <- ['cb_websockets']
],
lager:info("INIT FINISHED").

-spec cleanup() -> 'ok'.
cleanup() ->
_ = pqc_cb_accounts:cleanup_accounts(?ACCOUNT_NAMES),
cleanup_system().

cleanup(API) ->
lager:info("CLEANUP TIME, EVERYBODY HELPS"),
_ = pqc_cb_accounts:cleanup_accounts(API, ?ACCOUNT_NAMES),
_ = pqc_cb_api:cleanup(API),
cleanup_system().

cleanup_system() -> 'ok'.

test_ws_ping(#{auth_token := AuthToken}, WSConn) ->
PingReqId = kz_binary:rand_hex(4),
Ping = kz_json:from_list([{<<"request_id">>, PingReqId}
,{<<"action">>, <<"ping">>}
Expand All @@ -123,15 +176,15 @@ seq_api() ->
{'json', ReplyJObj} = pqc_ws_client:recv(WSConn, 1000),
lager:info("pong: ~p", [ReplyJObj]),
PingReqId = kz_json:get_ne_binary_value(<<"request_id">>, ReplyJObj),
<<"success">> = kz_json:get_ne_binary_value(<<"status">>, ReplyJObj),
<<"success">> = kz_json:get_ne_binary_value(<<"status">>, ReplyJObj).

test_ws_api_listing(#{auth_token := AuthToken}=API, WSConn, AccountId, Binding) ->
WithSocket = pqc_cb_websockets:summary(API, AccountId),
lager:info("with socket: ~s", [WithSocket]),
[SocketDetails] = kz_json:get_list_value(<<"data">>, kz_json:decode(WithSocket)),
SocketId = kz_json:get_ne_binary_value(<<"websocket_session_id">>, SocketDetails),
[] = kz_json:get_list_value(<<"bindings">>, SocketDetails),
SocketId = kz_json:get_ne_binary_value(<<"websocket_session_id">>, SocketDetails),

Binding = <<"object.doc_created.user">>,
BindReqId = kz_binary:rand_hex(4),
BindReq = kz_json:from_list([{<<"action">>, <<"subscribe">>}
,{<<"auth_token">>, AuthToken}
Expand All @@ -157,6 +210,44 @@ seq_api() ->
<<"127.0.0.1">> = kz_json:get_ne_binary_value([<<"data">>, <<"source">>], DetailsJObj),
SocketId = kz_json:get_ne_binary_value([<<"data">>, <<"websocket_session_id">>], DetailsJObj),

{SocketId, BindReq}.

test_crud_user_events(WSConn, API, AccountId) ->
UserDoc = pqc_cb_users:user_doc(),
Create = pqc_cb_users:create(API, AccountId, UserDoc),
lager:info("created user ~s", [Create]),
UserId = kz_json:get_value([<<"data">>, <<"id">>], kz_json:decode(Create)),

{'json', CreateEvent} = pqc_ws_client:recv(WSConn, 1000),
lager:info("create event: ~p", [CreateEvent]),

<<"event">> = kz_json:get_ne_binary_value(<<"action">>, CreateEvent),
<<"object.*.user">> = kz_json:get_ne_binary_value(<<"subscribed_key">>, CreateEvent),
<<"doc_created">> = kz_json:get_ne_binary_value(<<"name">>, CreateEvent),

CreateJObj = kz_json:get_json_value(<<"data">>, CreateEvent),
<<"user">> = kz_json:get_ne_binary_value(<<"type">>, CreateJObj),
AccountId = kz_json:get_ne_binary_value(<<"account_id">>, CreateJObj),
UserId = kz_json:get_ne_binary_value(<<"id">>, CreateJObj),

Delete = pqc_cb_users:delete(API, AccountId, UserId),
lager:info("deleted user ~s", [Delete]),

{'json', DeleteEvent} = pqc_ws_client:recv(WSConn, 1000),
lager:info("delete event: ~p", [DeleteEvent]),

<<"event">> = kz_json:get_ne_binary_value(<<"action">>, DeleteEvent),
<<"object.*.user">> = kz_json:get_ne_binary_value(<<"subscribed_key">>, DeleteEvent),
<<"doc_deleted">> = kz_json:get_ne_binary_value(<<"name">>, DeleteEvent),

DeleteJObj = kz_json:get_json_value(<<"data">>, DeleteEvent),
<<"user">> = kz_json:get_ne_binary_value(<<"type">>, DeleteJObj),
'true' = kz_json:is_true(<<"is_soft_deleted">>, DeleteJObj),
AccountId = kz_json:get_ne_binary_value(<<"account_id">>, DeleteJObj),
UserId = kz_json:get_ne_binary_value(<<"id">>, DeleteJObj).

test_ws_unbind(API, WSConn, AccountId, SocketId, BindReq, Binding) ->

UnbindReqId = kz_binary:rand_hex(4),
UnbindReq = kz_json:set_values([{<<"action">>, <<"unsubscribe">>}
,{<<"request_id">>, UnbindReqId}
Expand All @@ -177,45 +268,9 @@ seq_api() ->
NoDetailsJObj = kz_json:decode(NoDetailsResp),
[] = kz_json:get_list_value([<<"data">>, <<"bindings">>], NoDetailsJObj),
<<"127.0.0.1">> = kz_json:get_ne_binary_value([<<"data">>, <<"source">>], NoDetailsJObj),
SocketId = kz_json:get_ne_binary_value([<<"data">>, <<"websocket_session_id">>], NoDetailsJObj),

pqc_ws_client:close(WSConn),

EmptyAgain = pqc_cb_websockets:summary(API, AccountId),
lager:info("empty again: ~s", [EmptyAgain]),
[] = kz_json:get_list_value(<<"data">>, kz_json:decode(EmptyAgain)),
SocketId = kz_json:get_ne_binary_value([<<"data">>, <<"websocket_session_id">>], NoDetailsJObj).

cleanup(API),
lager:info("FINISHED API SEQ").

-spec initial_state() -> pqc_kazoo_model:model().
initial_state() ->
_ = init_system(),
API = pqc_cb_api:authenticate(),
pqc_kazoo_model:new(API).

init_system() ->
TestId = kz_binary:rand_hex(5),
kz_util:put_callid(TestId),

_ = kz_data_tracing:clear_all_traces(),
_ = [kapps_controller:start_app(App) ||
App <- ['crossbar', 'blackhole']
],
_ = [crossbar_maintenance:start_module(Mod) ||
Mod <- ['cb_websockets']
],
lager:info("INIT FINISHED").

-spec cleanup() -> 'ok'.
cleanup() ->
_ = pqc_cb_accounts:cleanup_accounts(?ACCOUNT_NAMES),
cleanup_system().

cleanup(API) ->
lager:info("CLEANUP TIME, EVERYBODY HELPS"),
_ = pqc_cb_accounts:cleanup_accounts(API, ?ACCOUNT_NAMES),
_ = pqc_cb_api:cleanup(API),
cleanup_system().

cleanup_system() -> 'ok'.
test_empty_active_connections(API, AccountId) ->
EmptySockets = pqc_cb_websockets:summary(API, AccountId),
lager:info("empty: ~s", [EmptySockets]),
[] = kz_json:get_list_value(<<"data">>, kz_json:decode(EmptySockets)).
Loading

0 comments on commit 72c5328

Please sign in to comment.