Skip to content

Commit

Permalink
Implement packing pipeline when storing in the registry
Browse files Browse the repository at this point in the history
  • Loading branch information
zoldar committed Dec 28, 2020
1 parent d91bd35 commit 187e655
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 17 deletions.
9 changes: 1 addition & 8 deletions lib/absinthe/phase/subscription/subscribe_self.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,7 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
subscription_id = get_subscription_id(config, blueprint, options)

for field_key <- field_keys,
do:
Absinthe.Subscription.subscribe(
pubsub,
field_key,
subscription_id,
blueprint,
options
)
do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint)

{:replace, blueprint,
[
Expand Down
17 changes: 11 additions & 6 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ defmodule Absinthe.Subscription do
require Logger
alias __MODULE__

alias Absinthe.Subscription.PipelineSerializer

@doc """
Add Absinthe.Subscription to your process tree.
"""
Expand Down Expand Up @@ -106,15 +108,14 @@ defmodule Absinthe.Subscription do
defp fetch_fields(_, _), do: []

@doc false
def subscribe(pubsub, field_key, doc_id, doc, options) do
def subscribe(pubsub, field_key, doc_id, doc) do
registry = pubsub |> registry_name

doc_value = {
doc_id,
%{
schema: doc.schema,
source: doc.source,
options: options
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}
}

Expand All @@ -140,8 +141,12 @@ defmodule Absinthe.Subscription do
pubsub
|> registry_name
|> Registry.lookup(key)
|> Enum.map(&elem(&1, 1))
|> Map.new()
|> Enum.map(fn match ->
{_, {name, doc}} = match
doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)

{name, doc}
end)
end

@doc false
Expand Down
4 changes: 1 addition & 3 deletions lib/absinthe/subscription/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ defmodule Absinthe.Subscription.Local do
defp run_docset(pubsub, docs_and_topics, mutation_result) do
for {topic, key_strategy, doc} <- docs_and_topics do
try do
pipeline = Absinthe.Pipeline.for_document(doc.schema, doc.options)

pipeline =
pipeline
doc.initial_phases
|> Pipeline.replace(
Phase.Telemetry,
{Phase.Telemetry, event: [:subscription, :publish, :start]}
Expand Down
68 changes: 68 additions & 0 deletions lib/absinthe/subscription/phase_serializer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule Absinthe.Subscription.PipelineSerializer do
@moduledoc """
Serializer responsible for packing and unpacking pipeline stored in the Elixir registry.
The purpose of this logic is saving memory by deduplicating repeating options - (ETS
backed registry stores them flat in the memory).
"""

alias Absinthe.{Phase, Pipeline}

@type options_label :: {:options, non_neg_integer()}

@type packed_phase_config :: Phase.t() | {Phase.t(), options_label()}

@type options_map :: %{options_label() => Keyword.t()}

@type packed_pipeline :: {:packed, [packed_phase_config()], options_map()}

@spec pack(Pipeline.t()) :: packed_pipeline()
def pack(pipeline) do
{reverse_pipeline, options_reverse_map} =
pipeline
|> List.flatten()
|> Enum.reduce({[], %{}}, fn phase, {pipeline, options_reverse_map} ->
{phase, options_reverse_map} = maybe_pack_phase(phase, options_reverse_map)

{[phase | pipeline], options_reverse_map}
end)

pipeline = Enum.reverse(reverse_pipeline)
options_map = Map.new(options_reverse_map, fn {options, label} -> {label, options} end)

{:packed, pipeline, options_map}
end

@spec unpack(Pipeline.t() | packed_pipeline()) :: Pipeline.t()
def unpack({:packed, pipeline, options_map}) do
Enum.map(pipeline, fn
{phase, {:options, _n} = options_label} ->
{phase, Map.fetch!(options_map, options_label)}

phase ->
phase
end)
end

def unpack([_ | _] = pipeline) do
pipeline
end

defp maybe_pack_phase({phase, options}, options_reverse_map) do
if Map.has_key?(options_reverse_map, options) do
options_label = options_reverse_map[options]

{{phase, options_label}, options_reverse_map}
else
new_index = map_size(options_reverse_map)
options_label = {:options, new_index}
options_reverse_map = Map.put(options_reverse_map, options, options_label)

{{phase, options_label}, options_reverse_map}
end
end

defp maybe_pack_phase(phase, options_reverse_map) do
{phase, options_reverse_map}
end
end

0 comments on commit 187e655

Please sign in to comment.