Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2 from sumup-bank/features/support_timeout
Browse files Browse the repository at this point in the history
Add support to set timeout in run_async
  • Loading branch information
nuxlli authored Mar 1, 2020
2 parents eb374af + 8d41438 commit ba639ef
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
37 changes: 20 additions & 17 deletions lib/journey.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ defmodule Journey do
end

def run(%__MODULE__{} = journey, spec, args \\ []) do
add_step(journey, spec, args, :sync)
add_step(journey, {spec, args}, :sync)
end

def run_async(%__MODULE__{} = journey, spec, args \\ []) do
add_step(journey, spec, args, :async)
def run_async(%__MODULE__{} = journey, spec, args \\ [], timeout \\ 5000) do
add_step(journey, {spec, args, timeout}, :async)
end

def await(%__MODULE__{steps: steps} = journey) do
Expand All @@ -25,11 +25,12 @@ defmodule Journey do
|> check_results()
end

def await(%Step{transaction: {func, %Task{} = task}} = step) do
def await(%Step{spec: {_, _, timeout}, transaction: {func, %Task{} = task}} = step) do
result =
case Task.yield(task) do
case Task.yield(task, timeout) do
{:ok, result} -> result
{:exit, error} -> {:error, error}
nil -> {:error, {:timeout, step}}
end

%Step{step | transaction: {func, result}}
Expand All @@ -45,28 +46,28 @@ defmodule Journey do
finally(journey, fn %{result: result} -> result end)
end

defp add_step(%__MODULE__{} = journey, spec, args, :sync = type) do
defp add_step(%__MODULE__{} = journey, spec, :sync = type) do
journey
|> await()
|> mk_step(spec, args, type)
|> mk_step(spec, type)
|> await()
end

defp add_step(%__MODULE__{} = journey, spec, args, type) do
mk_step(journey, spec, args, type)
defp add_step(%__MODULE__{} = journey, spec, type) do
mk_step(journey, spec, type)
end

defp mk_step(%__MODULE__{state: :failed} = journey, _, _, _), do: journey
defp mk_step(%__MODULE__{state: :failed} = journey, _, _), do: journey

defp mk_step(%__MODULE__{} = journey, spec, args, type) do
{transaction, compensation} = get_funcs(spec, args)
defp mk_step(%__MODULE__{} = journey, spec, type) do
{transaction, compensation} = get_funcs(spec)

update_steps(
journey,
journey.steps ++
[
%Step{
spec: {spec, args},
spec: spec,
compensation: {compensation, nil},
transaction: {transaction, call(transaction, journey, type)}
}
Expand All @@ -93,14 +94,16 @@ defmodule Journey do
end
end

defp get_funcs({module, function_name}, args) do
apply(module, function_name, args) |> extract_funcs()
end
defp get_funcs({spec, args}), do: get_funcs({spec, args, 0})

defp get_funcs(func, _) when is_function(func) do
defp get_funcs({func, _, _}) when is_function(func) do
extract_funcs(func.())
end

defp get_funcs({{module, function_name}, args, _}) do
extract_funcs(apply(module, function_name, args))
end

defp extract_funcs({transaction, compensation} = funcs)
when is_valid_function(transaction) and is_valid_function(compensation),
do: funcs
Expand Down
23 changes: 23 additions & 0 deletions test/journey_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,29 @@ defmodule JourneyTest do
}
] = steps
end

test "run compensation if await a async transaction ended with timeout", %{journey: journey} do
result =
journey
|> Journey.run_async({__MODULE__, :test_compensation}, [:ok])
|> Journey.run_async({__MODULE__, :test_compensation}, [fn -> :timer.sleep(500) end], 50)
# Step will not be added because previous step has failed due to timeout
|> Journey.run({__MODULE__, :test_compensation}, [:ok])

assert %Journey{result: error, state: :failed, steps: steps} = result
assert {:error, {:timeout, %Step{spec: {{__MODULE__, :test_compensation}, _, 50}}}} = error

assert [
%Step{
transaction: {_, :ok},
compensation: {_, :ok}
},
%Step{
transaction: {_, {:error, {:timeout, %Step{}}}},
compensation: {_, nil}
}
] = steps
end
end

def test_compensation(result) when is_function(result) do
Expand Down

0 comments on commit ba639ef

Please sign in to comment.