Skip to content

Commit

Permalink
feat: multiple exit processors (#1573)
Browse files Browse the repository at this point in the history
* feat: start multiple exit processors

* chore: use snapshot support payment v2

... and run mix format

* fix: setup exit_games for test fixture

* fix: dispatch return in the format of {:ok, []}

* fix: make watcher integ test to work

* fix: unit test for EthEventAggregator

1. fix the unit tests, add address to log result
2. change how we bind contract address to log, use the pre-decoded address data

* fix: patch payment_exit_game back into contract_addr

* fix: wrong filter_events mapping

* fix: linter, dializer, remove tmp logging

* fix: use snapwhot with payment v2

Previous snapshot was wrong.
Using this one:
omisego-images/docker-elixir-omg#43 (comment)

* refactor: restrain from using ++

Use [a|b] as best practice

* chore: use Task.async_stream instad of sync looping

* refactor: move private functions to the end

* fix: map after Task.async_stream

Co-authored-by: Thibault Denizet <thibault@omisego.co>
  • Loading branch information
boolafish and Thibault Denizet committed Jun 22, 2020
1 parent 3062461 commit 9b9f7f4
Show file tree
Hide file tree
Showing 16 changed files with 687 additions and 398 deletions.
9 changes: 9 additions & 0 deletions apps/omg/lib/omg/wire_format_types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ defmodule OMG.WireFormatTypes do

@tx_type_values %{
tx_payment_v1: 1,
tx_payment_v2: 2,
tx_fee_token_claim: 3
}

@tx_type_modules %{
1 => OMG.State.Transaction.Payment,
2 => OMG.State.Transaction.Payment,
3 => OMG.State.Transaction.Fee
}

Expand All @@ -48,6 +50,11 @@ defmodule OMG.WireFormatTypes do
2 => OMG.Output
}

@exit_game_tx_types [
:tx_payment_v1,
:tx_payment_v2
]

@known_tx_types Map.keys(@tx_type_values)
@known_input_pointer_types Map.keys(@input_pointer_type_values)
@known_output_types Map.keys(@output_type_values)
Expand Down Expand Up @@ -88,4 +95,6 @@ defmodule OMG.WireFormatTypes do
"""
@spec output_type_modules() :: tx_type_to_module_map()
def output_type_modules(), do: @output_type_modules

def exit_game_tx_types(), do: @exit_game_tx_types
end
2 changes: 2 additions & 0 deletions apps/omg_eth/lib/omg_eth/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ defmodule OMG.Eth.Application do
|> Map.put(:authority_addr, Configuration.authority_address())
|> :erlang.phash2()

# TODO: Make this smarter. Adding new exit game contracts
# should be allowed, changing them, not so much
case DB.get_single_value(:omg_eth_contracts) do
result when result == :not_found or result == {:ok, 0} ->
multi_update = [{:put, :omg_eth_contracts, contracts_hash}]
Expand Down
5 changes: 5 additions & 0 deletions apps/omg_eth/lib/omg_eth/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ defmodule OMG.Eth.Configuration do
Application.fetch_env!(@app, :contract_addr)
end

@spec exit_games() :: no_return | map()
def exit_games() do
Application.fetch_env!(@app, :exit_games)
end

@spec txhash_contract() :: no_return | binary()
def txhash_contract() do
Application.fetch_env!(@app, :txhash_contract)
Expand Down
108 changes: 69 additions & 39 deletions apps/omg_eth/lib/omg_eth/release_tasks/set_contract.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,71 +72,76 @@ defmodule OMG.Eth.ReleaseTasks.SetContract do
end

# get all the data from external sources
{payment_exit_game, eth_vault, erc20_vault, min_exit_period_seconds, contract_semver, child_block_interval} =
{exit_games, eth_vault, erc20_vault, min_exit_period_seconds, contract_semver, child_block_interval} =
get_external_data(plasma_framework, rpc_api)

# Okay, so I am not sure if we want to keep payment_exit_game under contract_addr.
# However, this is more backward competible with existing code. If we want to move away
# for all the places we should refactor this.
contract_addresses = %{
plasma_framework: plasma_framework,
eth_vault: eth_vault,
erc20_vault: erc20_vault,
payment_exit_game: payment_exit_game
payment_exit_game: exit_games.tx_payment_v1,
payment_v2_exit_game: exit_games.tx_payment_v2
}

merge_configuration(
config,
txhash_contract,
authority_address,
contract_addresses,
min_exit_period_seconds,
contract_semver,
network,
child_block_interval
)
extra_config = %{
txhash_contract: txhash_contract,
authority_address: authority_address,
contract_addresses: contract_addresses,
exit_games: exit_games,
min_exit_period_seconds: min_exit_period_seconds,
contract_semver: contract_semver,
network: network,
child_block_interval: child_block_interval
}

{:ok, []} = valid_extra_config?(extra_config)
merge_configuration(config, extra_config)
end

defp get_external_data(plasma_framework, rpc_api) do
min_exit_period_seconds = get_min_exit_period(plasma_framework, rpc_api)

payment_exit_game =
plasma_framework |> exit_game_contract_address(ExPlasma.payment_v1(), rpc_api) |> Encoding.to_hex()
# TODO: get the list of types from ex_plasma?
exit_games =
Enum.into(OMG.WireFormatTypes.exit_game_tx_types(), %{}, fn type ->
{type,
plasma_framework
|> exit_game_contract_address(OMG.WireFormatTypes.tx_type_for(type), rpc_api)
|> Encoding.to_hex()}
end)

eth_vault = plasma_framework |> get_vault(@ether_vault_id, rpc_api) |> Encoding.to_hex()
erc20_vault = plasma_framework |> get_vault(@erc20_vault_id, rpc_api) |> Encoding.to_hex()
contract_semver = get_contract_semver(plasma_framework, rpc_api)
child_block_interval = get_child_block_interval(plasma_framework, rpc_api)
{payment_exit_game, eth_vault, erc20_vault, min_exit_period_seconds, contract_semver, child_block_interval}
end

defp merge_configuration(
config,
txhash_contract,
authority_address,
contract_addresses,
min_exit_period_seconds,
contract_semver,
network,
child_block_interval
)
when is_binary(txhash_contract) and
is_binary(authority_address) and is_map(contract_addresses) and is_integer(min_exit_period_seconds) and
is_binary(contract_semver) and is_binary(network) do
contract_addresses = Enum.into(contract_addresses, %{}, fn {name, addr} -> {name, String.downcase(addr)} end)
{exit_games, eth_vault, erc20_vault, min_exit_period_seconds, contract_semver, child_block_interval}
end

defp merge_configuration(config, extra_config) do
contract_addresses =
Enum.into(
extra_config.contract_addresses,
%{},
fn {name, addr} -> {name, String.downcase(addr)} end
)

Config.Reader.merge(config,
omg_eth: [
txhash_contract: String.downcase(txhash_contract),
authority_address: String.downcase(authority_address),
txhash_contract: String.downcase(extra_config.txhash_contract),
authority_address: String.downcase(extra_config.authority_address),
contract_addr: contract_addresses,
min_exit_period_seconds: min_exit_period_seconds,
contract_semver: contract_semver,
network: network,
child_block_interval: child_block_interval
exit_games: extra_config.exit_games,
min_exit_period_seconds: extra_config.min_exit_period_seconds,
contract_semver: extra_config.contract_semver,
network: extra_config.network,
child_block_interval: extra_config.child_block_interval
]
)
end

defp merge_configuration(_, _, _, _, _, _, _, _), do: exit(@error)

defp get_min_exit_period(plasma_framework_contract, rpc_api) do
signature = "minExitPeriod()"
{:ok, data} = call(plasma_framework_contract, signature, [], rpc_api)
Expand Down Expand Up @@ -210,4 +215,29 @@ defmodule OMG.Eth.ReleaseTasks.SetContract do
{:ok, _} = Application.ensure_all_started(:logger)
{:ok, _} = Application.ensure_all_started(:ethereumex)
end

defp valid_extra_config?(extra_config) do
{_, validation_result} =
{extra_config, {:ok, []}}
|> valid_field?(:txhash_contract, &is_binary/1)
|> valid_field?(:authority_address, &is_binary/1)
|> valid_field?(:contract_addresses, &is_map/1)
|> valid_field?(:exit_games, &is_map/1)
|> valid_field?(:min_exit_period_seconds, &is_integer/1)
|> valid_field?(:contract_semver, &is_binary/1)
|> valid_field?(:network, &is_binary/1)
|> valid_field?(:child_block_interval, &is_integer/1)

validation_result
end

defp valid_field?(validation_state, field, validation_function) do
{extra_config, {status, invalid_fields}} = validation_state
field_data = extra_config[field]

case field_data != nil && validation_function.(field_data) do
true -> {extra_config, {status, invalid_fields}}
false -> {extra_config, {:invalid_extra_config, [field | invalid_fields]}}
end
end
end
34 changes: 31 additions & 3 deletions apps/omg_eth/test/fixtures.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ defmodule OMG.Eth.Fixtures do

alias OMG.Eth.Configuration
alias OMG.Eth.Encoding
alias OMG.Eth.RootChain.Abi, as: RootChainABI
alias OMG.WireFormatTypes
alias Support.DevHelper
alias Support.DevNode
alias Support.RootChainHelper
Expand All @@ -41,6 +43,8 @@ defmodule OMG.Eth.Fixtures do
deffixture contract(eth_node) do
:ok = eth_node

:ok = setup_exit_games()

{:ok, true} =
Ethereumex.HttpClient.request("personal_unlockAccount", ["0x6de4b3b9c28e9c3e84c2b2d3a875c947a84de68d", "", 0], [])

Expand All @@ -64,16 +68,33 @@ defmodule OMG.Eth.Fixtures do
token_addr
end

# inject the exit games into :omg_eth
# test fixture does not rely on the release task so it would need this setup
defp setup_exit_games() do
contracts = SnapshotContracts.parse_contracts()
plasma_framework = contracts["CONTRACT_ADDRESS_PLASMA_FRAMEWORK"]

exit_games =
Enum.into(WireFormatTypes.exit_game_tx_types(), %{}, fn type ->
{type,
plasma_framework
|> exit_game_contract_address(WireFormatTypes.tx_type_for(type))
|> Encoding.to_hex()}
end)

Application.put_env(:omg_eth, :exit_games, exit_games)
end

defp has_exit_queue(vault_id, token) do
plasma_framework = Configuration.contracts().plasma_framework
token = Encoding.from_hex(token)
call_contract(plasma_framework, "hasExitQueue(uint256,address)", [vault_id, token], [:bool])
{:ok, return} = call_contract(plasma_framework, "hasExitQueue(uint256,address)", [vault_id, token])
decode_answer(return, [:bool])
end

defp call_contract(contract, signature, args, return_types) do
defp call_contract(contract, signature, args) do
data = ABI.encode(signature, args)
{:ok, return} = Ethereumex.HttpClient.eth_call(%{to: contract, data: Encoding.to_hex(data)})
decode_answer(return, return_types)
end

defp decode_answer(enc_return, return_types) do
Expand All @@ -85,4 +106,11 @@ defmodule OMG.Eth.Fixtures do

{:ok, single_return}
end

defp exit_game_contract_address(plasma_framework_contract, tx_type) do
signature = "exitGames(uint256)"
{:ok, data} = call_contract(plasma_framework_contract, signature, [tx_type])
%{"exit_game_address" => exit_game_address} = RootChainABI.decode_function(data, signature)
exit_game_address
end
end
10 changes: 5 additions & 5 deletions apps/omg_watcher/lib/omg_watcher/api/in_flight_exit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule OMG.Watcher.API.InFlightExit do
alias OMG.State.Transaction
alias OMG.Utxo
alias OMG.Watcher.API
alias OMG.Watcher.ExitProcessor
alias OMG.Watcher.ExitProcessorDispatcher

require Utxo

Expand Down Expand Up @@ -58,7 +58,7 @@ defmodule OMG.Watcher.API.InFlightExit do
This delegates directly to `OMG.Watcher.ExitProcessor` see there for details
"""
def get_competitor(txbytes) do
ExitProcessor.get_competitor_for_ife(txbytes)
ExitProcessorDispatcher.get_competitor_for_ife(txbytes)
end

@doc """
Expand All @@ -67,7 +67,7 @@ defmodule OMG.Watcher.API.InFlightExit do
This delegates directly to `OMG.Watcher.ExitProcessor` see there for details
"""
def prove_canonical(txbytes) do
ExitProcessor.prove_canonical_for_ife(txbytes)
ExitProcessorDispatcher.prove_canonical_for_ife(txbytes)
end

@doc """
Expand All @@ -76,7 +76,7 @@ defmodule OMG.Watcher.API.InFlightExit do
This delegates directly to `OMG.Watcher.ExitProcessor` see there for details
"""
def get_input_challenge_data(txbytes, input_index) do
ExitProcessor.get_input_challenge_data(txbytes, input_index)
ExitProcessorDispatcher.get_input_challenge_data(txbytes, input_index)
end

@doc """
Expand All @@ -85,7 +85,7 @@ defmodule OMG.Watcher.API.InFlightExit do
This delegates directly to `OMG.Watcher.ExitProcessor` see there for details
"""
def get_output_challenge_data(txbytes, output_index) do
ExitProcessor.get_output_challenge_data(txbytes, output_index)
ExitProcessorDispatcher.get_output_challenge_data(txbytes, output_index)
end

defp find_input_data(tx) do
Expand Down
5 changes: 3 additions & 2 deletions apps/omg_watcher/lib/omg_watcher/api/status_cache/external.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule OMG.Watcher.API.StatusCache.External do
alias OMG.Watcher.BlockGetter
alias OMG.Watcher.Event
alias OMG.Watcher.ExitProcessor
alias OMG.Watcher.ExitProcessorDispatcher

@type t() :: %{
last_validated_child_block_number: non_neg_integer(),
Expand Down Expand Up @@ -63,8 +64,8 @@ defmodule OMG.Watcher.API.StatusCache.External do
{_, mined_child_block_timestamp} = RootChain.blocks(mined_child_block_number)
{_, validated_child_block_timestamp} = RootChain.blocks(validated_child_block_number)
{:ok, services_synced_heights} = RootChainCoordinator.get_ethereum_heights()
{_, events_processor} = ExitProcessor.check_validity()
{:ok, in_flight_exits} = ExitProcessor.get_active_in_flight_exits()
{_, events_processor} = ExitProcessorDispatcher.check_validity()
{:ok, in_flight_exits} = ExitProcessorDispatcher.get_active_in_flight_exits()
{:ok, {_, events_block_getter}} = BlockGetter.get_events()

status = %{
Expand Down
3 changes: 2 additions & 1 deletion apps/omg_watcher/lib/omg_watcher/api/utxo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule OMG.Watcher.API.Utxo do
alias OMG.Eth.Configuration
alias OMG.Utxo
alias OMG.Watcher.ExitProcessor
alias OMG.Watcher.ExitProcessorDispatcher
alias OMG.Watcher.UtxoExit.Core

require Utxo
Expand All @@ -42,7 +43,7 @@ defmodule OMG.Watcher.API.Utxo do
@spec create_challenge(Utxo.Position.t()) ::
{:ok, ExitProcessor.StandardExit.Challenge.t()} | {:error, :utxo_not_spent} | {:error, :exit_not_found}
def create_challenge(utxo) do
ExitProcessor.create_challenge(utxo)
ExitProcessorDispatcher.create_challenge(utxo)
end

@spec compose_utxo_exit(Utxo.Position.t()) ::
Expand Down
12 changes: 7 additions & 5 deletions apps/omg_watcher/lib/omg_watcher/block_getter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule OMG.Watcher.BlockGetter do
alias OMG.Watcher.BlockGetter.BlockApplication
alias OMG.Watcher.BlockGetter.Core
alias OMG.Watcher.BlockGetter.Status
alias OMG.Watcher.ExitProcessor
alias OMG.Watcher.ExitProcessorDispatcher
alias OMG.Watcher.HttpRPC.Client

@doc """
Expand Down Expand Up @@ -81,7 +81,7 @@ defmodule OMG.Watcher.BlockGetter do
# TODO rethink posible solutions see issue #724
# if we do not wait here, `ExitProcessor.check_validity()` may timeouts,
# which causes State and BlockGetter to reboot, fetches entire UTXO set again, and then timeout...
exit_processor_initial_results = ExitProcessor.check_validity(10 * 60_000)
exit_processor_initial_results = ExitProcessorDispatcher.check_validity(10 * 60_000)
# State treats current as the next block to be executed or a block that is being executed
# while top block number is a block that has been formed (they differ by the interval)
{current_block_height, state_at_block_beginning} = State.get_status()
Expand Down Expand Up @@ -187,9 +187,11 @@ defmodule OMG.Watcher.BlockGetter do
Updates its view of validity of the chain.
"""
def handle_continue({:apply_block_step, :check_validity}, state) do
exit_processor_results = ExitProcessor.check_validity()
state = Core.consider_exits(state, exit_processor_results)
:ok = update_status(state)
:ok =
state
|> Core.consider_exits(ExitProcessorDispatcher.check_validity())
|> update_status()

{:noreply, state}
end

Expand Down
6 changes: 6 additions & 0 deletions apps/omg_watcher/lib/omg_watcher/block_getter/core.ex
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@ defmodule OMG.Watcher.BlockGetter.Core do
Takes results from `ExitProcessor.check_validity` into account, to potentially stop getting blocks
"""
@spec consider_exits(t(), ExitProcessor.Core.check_validity_result_t()) :: t()
def consider_exits(%__MODULE__{} = state, exit_processor_results) when is_list(exit_processor_results) do
Enum.reduce(exit_processor_results, state, fn results, state ->
consider_exits(state, results)
end)
end

def consider_exits(%__MODULE__{} = state, {:ok, _}), do: state

def consider_exits(%__MODULE__{} = state, {{:error, :unchallenged_exit} = error, _}) do
Expand Down
Loading

0 comments on commit 9b9f7f4

Please sign in to comment.