Skip to content

Commit

Permalink
Replace ets by gproc for gwmp protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Jul 30, 2024
1 parent 91c2c6f commit 249573e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 53 deletions.
57 changes: 8 additions & 49 deletions src/protocols/gwmp/hpr_gwmp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
-export([
start_link/0,
init/1,
maybe_start_worker/2,
lookup_worker/1
maybe_start_worker/1
]).

-define(UDP_WORKER, hpr_gwmp_worker).
-define(ETS, hpr_gwmp_sup_ets).

-define(WORKER(I), #{
id => I,
Expand All @@ -40,61 +38,22 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec maybe_start_worker(WorkerKey :: binary(), Args :: map()) -> {ok, pid()} | {error, any()}.
maybe_start_worker(WorkerKey, Args) ->
case ets:lookup(?ETS, WorkerKey) of
[] ->
start_worker(WorkerKey, Args);
[{WorkerKey, Pid}] ->
case erlang:is_process_alive(Pid) of
true ->
{ok, Pid};
false ->
_ = ets:delete(?ETS, WorkerKey),
start_worker(WorkerKey, Args)
end
end.

-spec lookup_worker(PubKeyBin :: binary()) -> {ok, pid()} | {error, not_found}.
lookup_worker(WorkerKey) ->
case ets:lookup(?ETS, WorkerKey) of
[] ->
{error, not_found};
[{WorkerKey, Pid}] ->
case erlang:is_process_alive(Pid) of
true -> {ok, Pid};
false -> {error, not_found}
end
-spec maybe_start_worker(Args :: map()) -> {ok, pid()} | {error, any()}.
maybe_start_worker(#{key := Key} = Args) ->
case gproc:lookup_local_name(Key) of
Pid when is_pid(Pid) ->
{ok, Pid};
undefined ->
supervisor:start_child(?MODULE, [Args])
end.

%%====================================================================
%% Supervisor callbacks
%%====================================================================

init([]) ->
?ETS = ets:new(?ETS, [public, named_table, set]),
{ok, {?FLAGS, [?WORKER(?UDP_WORKER)]}}.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

-spec start_worker(PubKeyBin :: binary(), Args :: map()) ->
{ok, pid()} | {error, any()}.
start_worker(PubKeyBin, Args) ->
ChildArgs = maps:merge(
#{pubkeybin => PubKeyBin},
Args
),
case supervisor:start_child(?MODULE, [ChildArgs]) of
{error, Err} ->
{error, Err};
{ok, Pid} = OK ->
case ets:insert_new(?ETS, {PubKeyBin, Pid}) of
true ->
OK;
false ->
supervisor:terminate_child(?UDP_WORKER, Pid),
maybe_start_worker(PubKeyBin, Args)
end
end.
4 changes: 3 additions & 1 deletion src/protocols/gwmp/hpr_gwmp_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ push_data(WorkerPid, PacketUp, SocketDest, Timestamp, GatewayLocation) ->
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init(#{pubkeybin := PubKeyBin} = Args) ->
init(#{key := Key, pubkeybin := PubKeyBin} = Args) ->
process_flag(trap_exit, true),
lager:info("~p init with ~p", [?SERVER, Args]),

true = gproc:add_local_name(Key),

PullDataTimer = maps:get(pull_data_timer, Args, ?PULL_DATA_TIMER),

lager:md([
Expand Down
3 changes: 2 additions & 1 deletion src/protocols/hpr_protocol_gwmp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
) -> ok | {error, any()}.
send(PacketUp, Route, Timestamp, GatewayLocation) ->
Gateway = hpr_packet_up:gateway(PacketUp),
case hpr_gwmp_sup:maybe_start_worker(Gateway, #{}) of
Key = {?MODULE, Gateway},
case hpr_gwmp_sup:maybe_start_worker(#{key => Key, pubkeybin => Gateway}) of
{error, Reason} ->
{error, {gwmp_sup_err, Reason}};
{ok, Pid} ->
Expand Down
13 changes: 11 additions & 2 deletions test/hpr_protocol_gwmp_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ pull_ack_test(_Config) ->
?assert(erlang:is_binary(Token)),

%% There is an outstanding pull_data
{ok, WorkerPid} = hpr_gwmp_sup:lookup_worker(PubKeyBin),
{ok, WorkerPid} = lookup_worker(PubKeyBin),
?assertEqual(
1,
maps:size(element(5, sys:get_state(WorkerPid))),
Expand Down Expand Up @@ -508,7 +508,7 @@ pull_ack_hostname_test(_Config) ->
?assert(erlang:is_binary(Token)),

%% There is an outstanding pull_data
{ok, WorkerPid} = hpr_gwmp_sup:lookup_worker(PubKeyBin),
{ok, WorkerPid} = lookup_worker(PubKeyBin),
?assertEqual(
1,
maps:size(element(5, sys:get_state(WorkerPid))),
Expand Down Expand Up @@ -636,6 +636,15 @@ gateway_disconnect_test(_Config) ->
%% Helpers
%% ===================================================================

-spec lookup_worker(Key :: binary()) -> {ok, pid()} | {error, not_found}.
lookup_worker(Key) ->
case gproc:lookup_local_name({hpr_protocol_gwmp, Key}) of
Pid when is_pid(Pid) ->
{ok, Pid};
undefined ->
{error, not_found}
end.

test_route(Port) ->
test_route("127.0.0.1", Port).

Expand Down

0 comments on commit 249573e

Please sign in to comment.