Skip to content

Commit

Permalink
non-blocking context registry
Browse files Browse the repository at this point in the history
The processing of registrations is moved from the single process
registration server into the caller. This means that a registration
does no longer block while waiting for an overloaded registry server
to catch up.
  • Loading branch information
RoadRunnr committed Jul 22, 2021
1 parent 3aeaeb8 commit 57ffd41
Showing 1 changed file with 73 additions and 77 deletions.
150 changes: 73 additions & 77 deletions apps/ergw_core/src/gtp_context_reg.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%% Copyright 2015, Travelping GmbH <info@travelping.com>
%% Copyright 2021, Travelping GmbH <info@travelping.com>

%% This program is free software; you can redistribute it and/or
%% modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -29,7 +29,6 @@
-include("include/ergw.hrl").

-define(SERVER, ?MODULE).
-record(state, {pids, await_unreg}).

%%%===================================================================
%%% API
Expand Down Expand Up @@ -59,19 +58,28 @@ select(Key) ->

register(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {register, Keys, Handler, Pid}).
register(fun ets:insert/2, Keys, Handler, Pid).

register_new(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {register_new, Keys, Handler, Pid}).
register(fun ets:insert_new/2, Keys, Handler, Pid).

update(Delete, Insert, Handler, Pid)
when is_list(Delete), is_list(Insert), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {update, Delete, Insert, Handler, Pid}).
RegV = {Handler, Pid},
[global_del_key(DKey, RegV) || DKey <- Delete],
[global_add_key(IKey, RegV) || IKey <- Insert],

ets_delete_objects(?SERVER, mk_reg_objects(Delete, Handler, Pid)),
add_keys(fun ets:insert/2, Insert, Handler, Pid).

unregister(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {unregister, Keys, Handler, Pid}).
RegV = {Handler, Pid},
[global_del_key(Key, RegV) || Key <- Keys],

ets_delete_objects(?SERVER, mk_reg_objects(Keys, Handler, Pid)),
ok.

all() ->
ets:tab2list(?SERVER).
Expand All @@ -86,46 +94,35 @@ await_unreg(Key) ->
init([]) ->
process_flag(trap_exit, true),

ets:new(?SERVER, [ordered_set, named_table, public, {keypos, 1}]),
State = #state{
pids = #{},
await_unreg = #{}
},
{ok, State}.

handle_call({register, Keys, Handler, Pid}, _From, State) ->
handle_add_keys(fun ets:insert/2, Keys, Handler, Pid, State);

handle_call({register_new, Keys, Handler, Pid}, _From, State) ->
handle_add_keys(fun ets:insert_new/2, Keys, Handler, Pid, State);

handle_call({update, Delete, Insert, Handler, Pid}, _From, State) ->
lists:foreach(fun(Key) -> delete_key(Key, Pid) end, Delete),
NKeys = ordsets:union(ordsets:subtract(get_pid(Pid, State), Delete), Insert),
handle_add_keys(fun ets:insert/2, Insert, Handler, Pid, update_pid(Pid, NKeys, State));
ets:new(?SERVER, [bag, named_table, public, {keypos, 1},
{decentralized_counters, true},
{write_concurrency, true}, {read_concurrency, true}]),
{ok, #{}}.

handle_call({unregister, Keys, _Handler, Pid}, _From, State0) ->
State = delete_keys(Keys, Pid, State0),
{reply, ok, State};

handle_call({await_unreg, Pid}, From, #state{pids = Pids, await_unreg = AWait} = State0)
handle_call({await_unreg, Pid}, From, State0)
when is_pid(Pid) ->
case maps:is_key(Pid, Pids) of
{links, Links} = process_info(self(), links),
case lists:member(Pid, Links) of
true ->
State = State0#state{
await_unreg =
maps:update_with(Pid, fun(V) -> [From|V] end, [From], AWait)},
State = maps:update_with(Pid, fun(V) -> [From|V] end, [From], State0),
{noreply, State};
_ ->
{reply, ok, State0}
end.

handle_cast({link, Pid}, State) ->
link(Pid),
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'EXIT', Pid, _Reason}, State0) ->
Keys = get_pid(Pid, State0),
State = delete_keys(Keys, Pid, State0),
Objs = ets:lookup(?SERVER, Pid),
[global_del_key(Key, {Handler, Pid}) || {_, {Handler, Key}} <- Objs],
ets_delete_objects(?SERVER, Objs ++ mk_reg_pids(Objs)),

State = notify_unregister(Pid, State0),
{noreply, State}.

terminate(_Reason, _State) ->
Expand All @@ -138,52 +135,51 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

get_pid(Pid, #state{pids = Pids}) ->
maps:get(Pid, Pids, []).

update_pid(Pid, Keys, #state{pids = Pids} = State) ->
State#state{pids = Pids#{Pid => Keys}}.

delete_pid(Pid, #state{pids = Pids} = State) ->
notify_unregister(Pid, State#state{pids = maps:remove(Pid, Pids)}).

notify_unregister(Pid, #state{await_unreg = AWait} = State) ->
Reply = maps:get(Pid, AWait, []),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Reply),
State#state{await_unreg = maps:remove(Pid, AWait)}.

handle_add_keys(Fun, Keys, Handler, Pid, State) ->
register(InsFun, Keys, Handler, Pid) ->
RegV = {Handler, Pid},
case Fun(?SERVER, [{Key, RegV} || Key <- Keys]) of
true ->
link(Pid),
[global_add_key(Key, RegV) || Key <- Keys],
NKeys = ordsets:union(Keys, get_pid(Pid, State)),
{reply, ok, update_pid(Pid, NKeys, State)};
_ ->
{reply, {error, duplicate}, State}
end.

delete_keys(Keys, Pid, State) ->
lists:foreach(fun(Key) -> delete_key(Key, Pid) end, Keys),
case ordsets:subtract(get_pid(Pid, State), Keys) of
[] ->
unlink(Pid),
delete_pid(Pid, State);
Rest ->
update_pid(Pid, Rest, State)
[global_add_key(Key, RegV) || Key <- Keys],
Return = add_keys(InsFun, Keys, Handler, Pid),

%% if we are registering for a different Pid (not self()), then the
%% process could have crashed and existed already when we arrive here.
%% By doing the link this late, we make sure that a `EXIT` signal will
%% be delivered to the monitor in any case.
case self() of
Pid -> link(whereis(?SERVER));
_ -> gen_server:cast(?SERVER, {link, Pid})
end,

Return.

mk_reg_keys(Keys, Handler, Pid) ->
[{Key, {Handler, Pid}} || Key <- Keys].

mk_reg_objects([], _Handler, _Pid) ->
[];
mk_reg_objects([Key|T], Handler, Pid) ->
[{Key, {Handler, Pid}}, {Pid, {Handler, Key}} | mk_reg_objects(T, Handler, Pid)].

mk_reg_pids(Objs) ->
[{Pid, {Handler, Key}} || {Key, {Handler, Pid}} <- Objs].

ets_delete_objects(Tab, Objects) ->
[ets:delete_object(Tab, Obj) || Obj <-Objects].

add_keys(InsFun, Keys, Handler, Pid) ->
Insert = mk_reg_keys(Keys, Handler, Pid),
ets:insert(?SERVER, mk_reg_pids(Insert)),
case InsFun(?SERVER, Insert) of
true ->
ok;
false ->
ets_delete_objects(?SERVER, mk_reg_pids(Insert)),
{error, duplicate}
end.

%% this is not the same a ets:take, the object will only
%% be delete if Key and Pid match.....
delete_key(Key, Pid) ->
case ets:lookup(?SERVER, Key) of
[{Key, {_, Pid} = Value}] ->
global_del_key(Key, Value),
ets:take(?SERVER, Key);
Other ->
Other
end.
notify_unregister(Pid, State) ->
Reply = maps:get(Pid, State, []),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Reply),
maps:remove(Pid, State).

global_add_key(#context_key{id = {Type, _, _}} = Key, RegV)
when Type == imei; Type == imsi ->
Expand Down

0 comments on commit 57ffd41

Please sign in to comment.