Skip to content

Commit

Permalink
fix: unsubscribe process when it shuts down
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed Apr 24, 2024
1 parent 5ab1d74 commit 32f8318
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions lib/elasticlunr/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ defmodule Elasticlunr.PubSub do

@impl true
def handle_call({:subscribe, stream, pid}, _from, state) do
# TODO: link process so that it can be unsubscribe automatically when it shuts down.
ref = Process.monitor(pid)

state
|> Map.get(stream, MapSet.new())
|> MapSet.put(pid)
|> MapSet.put({pid, ref})
|> then(&Map.put(state, stream, &1))
|> then(&{:reply, :ok, &1})
end
Expand All @@ -33,8 +34,21 @@ defmodule Elasticlunr.PubSub do
def handle_cast({:publish, stream, event, args}, state) do
state
|> Map.get(stream, MapSet.new())
|> Enum.map(&elem(&1, 0))
|> Enum.each(&send(&1, {event, args}))

{:noreply, state}
end

@impl true
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
state =
Enum.reduce(state, state, fn {stream, pids}, state ->
pids
|> MapSet.symmetric_difference(MapSet.new([{pid, ref}]))
|> then(&%{state | stream => &1})
end)

{:noreply, state}
end
end

0 comments on commit 32f8318

Please sign in to comment.