diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index d35c4f4b28..7563e5f064 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -173,8 +173,7 @@ defmodule Absinthe.Subscription do pubsub |> registry_name |> Registry.lookup(key) - |> Enum.map(fn match -> - {_, {doc_id, doc}} = match + |> Map.new(fn {_, {doc_id, doc}} -> doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1) {doc_id, doc} diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 50272b5c7b..d6fd6d42b2 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -49,7 +49,7 @@ defmodule Absinthe.Execution.SubscriptionTest do @behaviour Absinthe.Subscription.Pubsub def start_link() do - Registry.start_link(keys: :unique, name: __MODULE__) + Registry.start_link(keys: :duplicate, name: __MODULE__) end def node_name() do @@ -678,6 +678,43 @@ defmodule Absinthe.Execution.SubscriptionTest do :telemetry.detach(context.test) end + @query """ + subscription { + otherUser { id } + } + """ + test "de-duplicates pushes to the same context" do + documents = + Enum.map(1..5, fn _index -> + {:ok, doc} = run_subscription(@query, Schema, context: %{context_id: "global"}) + doc + end) + + # assert that all documents are the same + assert [document] = Enum.dedup(documents) + + Absinthe.Subscription.publish( + PubSub, + %{id: "global_user_id"}, + other_user: "*" + ) + + topic_id = document["subscribed"] + + for _i <- 1..5 do + assert_receive( + {:broadcast, + %{ + event: "subscription:data", + result: %{data: %{"otherUser" => %{"id" => "global_user_id"}}}, + topic: ^topic_id + }} + ) + end + + refute_receive({:broadcast, _}) + end + defp run_subscription(query, schema, opts \\ []) do opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub))