Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

non-blocking context registry #418

Merged
merged 1 commit into from
Jul 26, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 73 additions & 77 deletions apps/ergw_core/src/gtp_context_reg.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%% Copyright 2015, Travelping GmbH <info@travelping.com>
%% Copyright 2021, Travelping GmbH <info@travelping.com>

%% This program is free software; you can redistribute it and/or
%% modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -29,7 +29,6 @@
-include("include/ergw.hrl").

-define(SERVER, ?MODULE).
-record(state, {pids, await_unreg}).

%%%===================================================================
%%% API
Expand Down Expand Up @@ -59,19 +58,28 @@ select(Key) ->

register(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {register, Keys, Handler, Pid}).
register(fun ets:insert/2, Keys, Handler, Pid).

register_new(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {register_new, Keys, Handler, Pid}).
register(fun ets:insert_new/2, Keys, Handler, Pid).

update(Delete, Insert, Handler, Pid)
when is_list(Delete), is_list(Insert), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {update, Delete, Insert, Handler, Pid}).
RegV = {Handler, Pid},
[global_del_key(DKey, RegV) || DKey <- Delete],
[global_add_key(IKey, RegV) || IKey <- Insert],

ets_delete_objects(?SERVER, mk_reg_objects(Delete, Handler, Pid)),
add_keys(fun ets:insert/2, Insert, Handler, Pid).

unregister(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {unregister, Keys, Handler, Pid}).
RegV = {Handler, Pid},
[global_del_key(Key, RegV) || Key <- Keys],

ets_delete_objects(?SERVER, mk_reg_objects(Keys, Handler, Pid)),
ok.

all() ->
ets:tab2list(?SERVER).
Expand All @@ -86,46 +94,35 @@ await_unreg(Key) ->
init([]) ->
process_flag(trap_exit, true),

ets:new(?SERVER, [ordered_set, named_table, public, {keypos, 1}]),
State = #state{
pids = #{},
await_unreg = #{}
},
{ok, State}.

handle_call({register, Keys, Handler, Pid}, _From, State) ->
handle_add_keys(fun ets:insert/2, Keys, Handler, Pid, State);

handle_call({register_new, Keys, Handler, Pid}, _From, State) ->
handle_add_keys(fun ets:insert_new/2, Keys, Handler, Pid, State);

handle_call({update, Delete, Insert, Handler, Pid}, _From, State) ->
lists:foreach(fun(Key) -> delete_key(Key, Pid) end, Delete),
NKeys = ordsets:union(ordsets:subtract(get_pid(Pid, State), Delete), Insert),
handle_add_keys(fun ets:insert/2, Insert, Handler, Pid, update_pid(Pid, NKeys, State));
ets:new(?SERVER, [bag, named_table, public, {keypos, 1},
{decentralized_counters, true},
{write_concurrency, true}, {read_concurrency, true}]),
{ok, #{}}.

handle_call({unregister, Keys, _Handler, Pid}, _From, State0) ->
State = delete_keys(Keys, Pid, State0),
{reply, ok, State};

handle_call({await_unreg, Pid}, From, #state{pids = Pids, await_unreg = AWait} = State0)
handle_call({await_unreg, Pid}, From, State0)
when is_pid(Pid) ->
case maps:is_key(Pid, Pids) of
{links, Links} = process_info(self(), links),
case lists:member(Pid, Links) of
true ->
State = State0#state{
await_unreg =
maps:update_with(Pid, fun(V) -> [From|V] end, [From], AWait)},
State = maps:update_with(Pid, fun(V) -> [From|V] end, [From], State0),
{noreply, State};
_ ->
{reply, ok, State0}
end.

handle_cast({link, Pid}, State) ->
link(Pid),
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'EXIT', Pid, _Reason}, State0) ->
Keys = get_pid(Pid, State0),
State = delete_keys(Keys, Pid, State0),
Objs = ets:lookup(?SERVER, Pid),
[global_del_key(Key, {Handler, Pid}) || {_, {Handler, Key}} <- Objs],
ets_delete_objects(?SERVER, Objs ++ mk_reg_pids(Objs)),

State = notify_unregister(Pid, State0),
{noreply, State}.

terminate(_Reason, _State) ->
Expand All @@ -138,52 +135,51 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

get_pid(Pid, #state{pids = Pids}) ->
maps:get(Pid, Pids, []).

update_pid(Pid, Keys, #state{pids = Pids} = State) ->
State#state{pids = Pids#{Pid => Keys}}.

delete_pid(Pid, #state{pids = Pids} = State) ->
notify_unregister(Pid, State#state{pids = maps:remove(Pid, Pids)}).

notify_unregister(Pid, #state{await_unreg = AWait} = State) ->
Reply = maps:get(Pid, AWait, []),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Reply),
State#state{await_unreg = maps:remove(Pid, AWait)}.

handle_add_keys(Fun, Keys, Handler, Pid, State) ->
register(InsFun, Keys, Handler, Pid) ->
RegV = {Handler, Pid},
case Fun(?SERVER, [{Key, RegV} || Key <- Keys]) of
true ->
link(Pid),
[global_add_key(Key, RegV) || Key <- Keys],
NKeys = ordsets:union(Keys, get_pid(Pid, State)),
{reply, ok, update_pid(Pid, NKeys, State)};
_ ->
{reply, {error, duplicate}, State}
end.

delete_keys(Keys, Pid, State) ->
lists:foreach(fun(Key) -> delete_key(Key, Pid) end, Keys),
case ordsets:subtract(get_pid(Pid, State), Keys) of
[] ->
unlink(Pid),
delete_pid(Pid, State);
Rest ->
update_pid(Pid, Rest, State)
[global_add_key(Key, RegV) || Key <- Keys],
Return = add_keys(InsFun, Keys, Handler, Pid),

%% if we are registering for a different Pid (not self()), then the
%% process could have crashed and existed already when we arrive here.
%% By doing the link this late, we make sure that a `EXIT` signal will
%% be delivered to the monitor in any case.
case self() of
Pid -> link(whereis(?SERVER));
_ -> gen_server:cast(?SERVER, {link, Pid})
end,

Return.

mk_reg_keys(Keys, Handler, Pid) ->
[{Key, {Handler, Pid}} || Key <- Keys].

mk_reg_objects([], _Handler, _Pid) ->
[];
mk_reg_objects([Key|T], Handler, Pid) ->
[{Key, {Handler, Pid}}, {Pid, {Handler, Key}} | mk_reg_objects(T, Handler, Pid)].

mk_reg_pids(Objs) ->
[{Pid, {Handler, Key}} || {Key, {Handler, Pid}} <- Objs].

ets_delete_objects(Tab, Objects) ->
[ets:delete_object(Tab, Obj) || Obj <-Objects].

add_keys(InsFun, Keys, Handler, Pid) ->
Insert = mk_reg_keys(Keys, Handler, Pid),
ets:insert(?SERVER, mk_reg_pids(Insert)),
case InsFun(?SERVER, Insert) of
true ->
ok;
false ->
ets_delete_objects(?SERVER, mk_reg_pids(Insert)),
{error, duplicate}
end.

%% this is not the same a ets:take, the object will only
%% be delete if Key and Pid match.....
delete_key(Key, Pid) ->
case ets:lookup(?SERVER, Key) of
[{Key, {_, Pid} = Value}] ->
global_del_key(Key, Value),
ets:take(?SERVER, Key);
Other ->
Other
end.
notify_unregister(Pid, State) ->
Reply = maps:get(Pid, State, []),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Reply),
maps:remove(Pid, State).

global_add_key(#context_key{id = {Type, _, _}} = Key, RegV)
when Type == imei; Type == imsi ->
Expand Down