diff --git a/CHANGELOG.md b/CHANGELOG.md index 2543c007eb..a106f692f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### FEATURES - [ibc-relayer] + - Add support for event based channel relaying ([#822]) - Graceful handling of packet events in the presence of multiple relayers ([#983]) ### IMPROVEMENTS @@ -22,10 +23,11 @@ - [ibc-relayer-cli] - Promote `start-multi` command to `start` ([#911]) -[#983]: https://github.com/informalsystems/ibc-rs/issues/983 +[#822]: https://github.com/informalsystems/ibc-rs/issues/822 [#871]: https://github.com/informalsystems/ibc-rs/issues/871 [#911]: https://github.com/informalsystems/ibc-rs/issues/911 [#972]: https://github.com/informalsystems/ibc-rs/issues/972 +[#983]: https://github.com/informalsystems/ibc-rs/issues/983 ## v0.3.2 *May 21st, 2021* diff --git a/ci/relayer.Dockerfile b/ci/relayer.Dockerfile index a672931858..58f055350b 100644 --- a/ci/relayer.Dockerfile +++ b/ci/relayer.Dockerfile @@ -7,7 +7,7 @@ LABEL maintainer="hello@informal.systems" ARG RELEASE # Add Python 3 -RUN apt-get update -y && apt-get install python3 -y +RUN apt-get update -y && apt-get install python3 -y && apt-get install python3-toml -y # Copy relayer executable COPY ./hermes /usr/bin/hermes diff --git a/ci/simple_config.toml b/ci/simple_config.toml index 5012fadfc7..3e0c4c4bb2 100644 --- a/ci/simple_config.toml +++ b/ci/simple_config.toml @@ -1,5 +1,5 @@ [global] -strategy = 'naive' +strategy = 'packets' log_level = 'info' [[chains]] diff --git a/config.toml b/config.toml index 2904ee82d5..b1c1078193 100644 --- a/config.toml +++ b/config.toml @@ -1,6 +1,9 @@ [global] -strategy = 'naive' -log_level = 'error' +# Valid strategies: +# 'packets': restricts relaying to packets only, or +# 'all': relay from all types of events +strategy = 'packets' +log_level = 'info' [[chains]] id = 'ibc-0' diff --git a/config_example.toml b/config_example.toml deleted file mode 100644 index 12f4dff667..0000000000 --- a/config_example.toml +++ /dev/null @@ -1,58 +0,0 @@ -[global] -strategy = 'naive' -log_level = 'error' - -[[chains]] -id = 'ibc-0' -rpc_addr = 'http://localhost:26657' -grpc_addr = 'http://localhost:9090' -websocket_addr = 'ws://localhost:26657/websocket' -rpc_timeout = '10s' -account_prefix = 'cosmos' -key_name = 'testkey' -store_prefix = 'ibc' -gas = 3000000 -fee_denom = 'stake' -fee_amount = 10 -clock_drift = '5s' -trusting_period = '14days' - -[chains.trust_threshold] -numerator = '1' -denominator = '3' - -[[chains]] -id = 'ibc-1' -rpc_addr = 'http://localhost:26557' -grpc_addr = 'http://localhost:9091' -websocket_addr = 'ws://localhost:26557/websocket' -rpc_timeout = '10s' -account_prefix = 'cosmos' -key_name = 'testkey' -store_prefix = 'ibc' -gas = 3000000 -fee_denom = 'stake' -fee_amount = 10 -clock_drift = '5s' -trusting_period = '14days' - -[chains.trust_threshold] -numerator = '1' -denominator = '3' - -[[connections]] -a_chain = 'ibc-0' -b_chain = 'ibc-1' -delay = '10s' - -[[connections.paths]] -a_port = 'transfer' -b_port = 'transfer' - -[[connections]] -a_chain = 'ibc-0' -b_chain = 'ibc-1' - -[[connections.paths]] -a_port = 'transfer' -b_port = 'transfer' \ No newline at end of file diff --git a/docs/architecture/adr-002-ibc-relayer.md b/docs/architecture/adr-002-ibc-relayer.md index 1df200c548..fd6b895870 100644 --- a/docs/architecture/adr-002-ibc-relayer.md +++ b/docs/architecture/adr-002-ibc-relayer.md @@ -119,7 +119,7 @@ Below is an example of a configuration file. ```toml [global] -strategy = "naive" +strategy = "packets" log_level = "error" [[chains]] @@ -152,31 +152,6 @@ log_level = "error" gas_price = "0.025stake" trusting_period = "336h" -[[connections]] - -[connections.src] - client_id = "clB1" - connection_id = "connAtoB" - -[connections.dest] - client_id = "clA1" - connection_id = "connBtoA" - -[[connections.paths]] - src_port = "portA1" - dest_port = "portB1" - direction = "unidirectional" - -[[connections.paths]] - src_port = "portA2" - dest_port = "portB2" - direction = "bidirectional" - -[[connections.paths]] - src_port = "portA3" - dest_port = "portB3" - src_channel = "chan3-on-A" - dest_channel = "chan3-on-B" ``` The main sections of the configuration file are: - `global`: @@ -200,7 +175,8 @@ pub struct Config { } pub enum Strategy { - Naive, + Packets, + HandshakeAndPackets, } pub struct GlobalConfig { diff --git a/docs/architecture/adr-006-hermes-v0.2-usecases.md b/docs/architecture/adr-006-hermes-v0.2-usecases.md index bdab7106cd..e9f50f567e 100644 --- a/docs/architecture/adr-006-hermes-v0.2-usecases.md +++ b/docs/architecture/adr-006-hermes-v0.2-usecases.md @@ -230,7 +230,7 @@ of the config file will look as follows: ```toml [global] -strategy = 'naive' +strategy = 'packets' log_level = 'error' log_json = 'false' ``` diff --git a/e2e/e2e/channel.py b/e2e/e2e/channel.py index 49a0716012..b75672907a 100644 --- a/e2e/e2e/channel.py +++ b/e2e/e2e/channel.py @@ -1,8 +1,10 @@ from typing import Optional, Tuple +import toml from .cmd import * from .common import * +import e2e.relayer as relayer @dataclass class TxChanOpenInitRes: @@ -418,18 +420,19 @@ def handshake( c: Config, side_a: ChainId, side_b: ChainId, conn_a: ConnectionId, conn_b: ConnectionId, + port_id: PortId ) -> Tuple[ChannelId, ChannelId]: a_chan_id = chan_open_init(c, dst=side_a, src=side_b, dst_conn=conn_a) split() b_chan_id = chan_open_try( - c, dst=side_b, src=side_a, dst_conn=conn_b, dst_port=PortId('transfer'), src_port=PortId('transfer'), + c, dst=side_b, src=side_a, dst_conn=conn_b, dst_port=port_id, src_port=port_id, src_chan=a_chan_id) split() - ack_res = chan_open_ack(c, dst=side_a, src=side_b, dst_port=PortId('transfer'), src_port=PortId('transfer'), + ack_res = chan_open_ack(c, dst=side_a, src=side_b, dst_port=port_id, src_port=port_id, dst_conn=conn_a, dst_chan=a_chan_id, src_chan=b_chan_id) if ack_res != a_chan_id: @@ -438,7 +441,7 @@ def handshake( exit(1) confirm_res = chan_open_confirm( - c, dst=side_b, src=side_a, dst_port=PortId('transfer'), src_port=PortId('transfer'), + c, dst=side_b, src=side_a, dst_port=port_id, src_port=port_id, dst_conn=conn_b, dst_chan=b_chan_id, src_chan=a_chan_id) if confirm_res != b_chan_id: @@ -448,13 +451,13 @@ def handshake( split() - a_chan_end = query_channel_end(c, side_a, PortId('transfer'), a_chan_id) + a_chan_end = query_channel_end(c, side_a, port_id, a_chan_id) if a_chan_end.state != 'Open': l.error( f'Channel end with id {a_chan_id} on chain {side_a} is not in Open state, got: {a_chan_end.state}') exit(1) - b_chan_end = query_channel_end(c, side_b, PortId('transfer'), b_chan_id) + b_chan_end = query_channel_end(c, side_b, port_id, b_chan_id) if b_chan_end.state != 'Open': l.error( f'Channel end with id {b_chan_id} on chain {side_b} is not in Open state, got: {b_chan_end.state}') @@ -475,3 +478,92 @@ def query_channel_end(c: Config, chain_id: ChainId, port: PortId, chan_id: Chann l.debug(f'Status of channel end {chan_id}: {res}') return res + + +# ============================================================================= +# Passive CHANNEL relayer tests +# ============================================================================= + +def verify_state(c: Config, + ibc1: ChainId, ibc0: ChainId, + ibc1_chan_id: ChannelId, port_id: PortId): + + strategy = toml.load(c.config_file)['global']['strategy'] + # verify channel state on both chains, should be 'Open' for 'all' strategy, 'Init' otherwise + + if strategy == 'all': + sleep(10.0) + for i in range(20): + sleep(2.0) + ibc1_chan_end = query_channel_end(c, ibc1, port_id, ibc1_chan_id) + ibc0_chan_id = ibc1_chan_end.remote.channel_id + ibc0_chan_end = query_channel_end(c, ibc0, port_id, ibc0_chan_id) + if ibc0_chan_end.state == 'Open' and ibc1_chan_end.state == 'Open': + break + else: + assert (ibc0_chan_end.state == 'Open'), (ibc0_chan_end, "state is not Open") + assert (ibc1_chan_end.state == 'Open'), (ibc1_chan_end, "state is not Open") + + elif strategy == 'packets': + sleep(5.0) + ibc1_chan_end = query_channel_end(c, ibc1, port_id, ibc1_chan_id) + assert (ibc1_chan_end.state == 'Init'), (ibc1_chan_end, "state is not Init") + + +def passive_channel_start_then_init(c: Config, + ibc1: ChainId, ibc0: ChainId, + ibc1_conn_id: ConnectionId, port_id: PortId): + + # 1. start hermes + proc = relayer.start(c) + sleep(2.0) + + # 2. create a channel in Init state + ibc1_chan_id = chan_open_init(c, dst=ibc1, src=ibc0, dst_conn=ibc1_conn_id) + + # 3. wait for channel handshake to finish and verify channel state on both chains + verify_state(c, ibc1, ibc0, ibc1_chan_id, port_id) + + # 4. All good, stop the relayer + proc.kill() + + +def passive_channel_init_then_start(c: Config, + ibc1: ChainId, ibc0: ChainId, + ibc1_conn_id: ConnectionId, port_id: PortId): + + # 1. create a channel in Init state + ibc1_chan_id = chan_open_init(c, dst=ibc1, src=ibc0, dst_conn=ibc1_conn_id) + sleep(2.0) + + # 2. start relaying + proc = relayer.start(c) + + # 3. wait for channel handshake to finish and verify channel state on both chains + verify_state(c, ibc1, ibc0, ibc1_chan_id, port_id) + + # 4. All good, stop the relayer + proc.kill() + + +def passive_channel_try_then_start(c: Config, + ibc1: ChainId, + ibc0: ChainId, + ibc1_conn_id: ConnectionId, + ibc0_conn_id: ConnectionId, + port_id: PortId): + + # 1. create a channel in Try state + ibc1_chan_id = chan_open_init(c, dst=ibc1, src=ibc0, dst_conn=ibc1_conn_id) + sleep(2.0) + ibc0_chan_id = chan_open_try(c, dst=ibc0, src=ibc1, dst_conn=ibc0_conn_id, src_port=port_id, dst_port=port_id, src_chan=ibc1_chan_id) + sleep(2.0) + + # 2. start relaying + proc = relayer.start(c) + + # 3. wait for channel handshake to finish and verify channel state on both chains + verify_state(c, ibc1, ibc0, ibc1_chan_id, port_id) + + # 4. All good, stop the relayer + proc.kill() diff --git a/e2e/run.py b/e2e/run.py index 2f3840ac88..d75fdc25bc 100755 --- a/e2e/run.py +++ b/e2e/run.py @@ -2,8 +2,9 @@ import argparse import logging as l - +from typing import Tuple from pathlib import Path +import toml import e2e.channel as channel import e2e.client as client @@ -14,42 +15,39 @@ from e2e.common import * -def loop(c: Config): - IBC_0 = ChainId('ibc-0') - IBC_1 = ChainId('ibc-1') - - TRANSFER = PortId('transfer') - IBC_0_CHANNEL = ChannelId('channel-0') - IBC_1_CHANNEL = ChannelId('channel-1') +def passive_packets( + c: Config, + ibc0: ChainId, ibc1: ChainId, port_id: PortId, + ibc0_channel_id: ChannelId, ibc1_channel_id: ChannelId): # 1. create some unreceived acks # hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 2 - packet.packet_send(c, src=IBC_0, dst=IBC_1, src_port=TRANSFER, - src_channel=IBC_0_CHANNEL, amount=10000, height_offset=1000, number_msgs=2) + packet.packet_send(c, src=ibc0, dst=ibc1, src_port=port_id, + src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=2) # hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 2 - packet.packet_send(c, src=IBC_1, dst=IBC_0, src_port=TRANSFER, - src_channel=IBC_1_CHANNEL, amount=10000, height_offset=1000, number_msgs=2) + packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id, + src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=2) sleep(5.0) # hermes tx raw packet-recv ibc-1 ibc-0 transfer channel-0 - packet.packet_recv(c, src=IBC_0, dst=IBC_1, - src_port=TRANSFER, src_channel=IBC_0_CHANNEL) + packet.packet_recv(c, src=ibc0 , dst=ibc1, + src_port=port_id, src_channel=ibc0_channel_id) # hermes tx raw packet-recv ibc-0 ibc-1 transfer channel-1 - packet.packet_recv(c, src=IBC_1, dst=IBC_0, - src_port=TRANSFER, src_channel=IBC_1_CHANNEL) + packet.packet_recv(c, src=ibc1, dst=ibc0 , + src_port=port_id, src_channel=ibc1_channel_id) # 2. create some unreceived packets # hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 3 - packet.packet_send(c, src=IBC_1, dst=IBC_0, src_port=TRANSFER, - src_channel=IBC_1_CHANNEL, amount=10000, height_offset=1000, number_msgs=3) + packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id, + src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=3) # hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 4 - packet.packet_send(c, src=IBC_0, dst=IBC_1, src_port=TRANSFER, - src_channel=IBC_0_CHANNEL, amount=10000, height_offset=1000, number_msgs=4) + packet.packet_send(c, src=ibc0 , dst=ibc1, src_port=port_id, + src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=4) sleep(10.0) @@ -57,121 +55,117 @@ def loop(c: Config): # hermes query packet unreceived-packets ibc-0 transfer channel-0 unreceived = packet.query_unreceived_packets( - c, chain=IBC_0, port=TRANSFER, channel=IBC_0_CHANNEL) + c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 3), (unreceived, "unreceived packet mismatch") # hermes query packet unreceived-acks ibc-1 transfer channel-1 unreceived = packet.query_unreceived_acks( - c, chain=IBC_1, port=TRANSFER, channel=IBC_1_CHANNEL) + c, chain=ibc1, port=port_id, channel=ibc1_channel_id) assert (len(unreceived) == 2), (unreceived, "unreceived packet mismatch") # hermes query packet unreceived-packets ibc-1 transfer channel-1 unreceived = packet.query_unreceived_packets( - c, chain=IBC_1, port=TRANSFER, channel=IBC_1_CHANNEL) + c, chain=ibc1, port=port_id, channel=ibc1_channel_id) assert (len(unreceived) == 4), (unreceived, "unreceived packet mismatch") # hermes query packet unreceived-acks ibc-0 transfer channel-0 unreceived = packet.query_unreceived_acks( - c, chain=IBC_0, port=TRANSFER, channel=IBC_0_CHANNEL) + c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 2), (unreceived, "unreceived packet mismatch") - sleep(5.0) - # 4. start relaying - it should clear the unreceived packets proc = relayer.start(c) - sleep(5.0) - - # 5. wait a bit and make sure there are no pending packets + # 5. wait for the relayer to initialize and pick up pending packets + sleep(10.0) + # 6. verify that there are no pending packets # hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 3 - packet.packet_send(c, src=IBC_1, dst=IBC_0, src_port=TRANSFER, - src_channel=IBC_1_CHANNEL, amount=10000, height_offset=1000, number_msgs=3) + packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id, + src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=3) # hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 4 - packet.packet_send(c, src=IBC_0, dst=IBC_1, src_port=TRANSFER, - src_channel=IBC_0_CHANNEL, amount=10000, height_offset=1000, number_msgs=4) + packet.packet_send(c, src=ibc0, dst=ibc1, src_port=port_id, + src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=4) sleep(10.0) # hermes query packet unreceived-packets ibc-1 transfer channel-1 unreceived = packet.query_unreceived_packets( - c, chain=IBC_1, port=TRANSFER, channel=IBC_1_CHANNEL) + c, chain=ibc1, port=port_id, channel=ibc1_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived packets mismatch (expected 0)") # hermes query packet unreceived-acks ibc-1 transfer channel-1 unreceived = packet.query_unreceived_acks( - c, chain=IBC_1, port=TRANSFER, channel=IBC_1_CHANNEL) + c, chain=ibc1, port=port_id, channel=ibc1_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived acks mismatch (expected 0)") # hermes query packet unreceived-packets ibc-0 transfer channel-0 unreceived = packet.query_unreceived_packets( - c, chain=IBC_0, port=TRANSFER, channel=IBC_0_CHANNEL) + c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived packets mismatch (expected 0)") # hermes query packet unreceived-acks ibc-0 transfer channel-0 unreceived = packet.query_unreceived_acks( - c, chain=IBC_0, port=TRANSFER, channel=IBC_0_CHANNEL) + c, chain=ibc0 , port=port_id, channel=ibc0_channel_id) assert (len(unreceived) == 0), (unreceived, "unreceived acks mismatch (expected 0)") - # 6. All good, stop the relayer + # 7.Stop the relayer proc.kill() -def raw(c: Config): - IBC_0 = ChainId('ibc-0') - IBC_1 = ChainId('ibc-1') - - ibc0_client_id = client.create_update_query_client(c, IBC_0, IBC_1) +def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ ClientId, ConnectionId, ChannelId, ClientId, ConnectionId, ChannelId]: + ibc0_client_id = client.create_update_query_client(c, ibc0, ibc1) # Allocate first IDs on ibc-1 - ibc1_client_id = client.create_update_query_client(c, IBC_1, IBC_0) + ibc1_client_id = client.create_update_query_client(c, ibc1, ibc0) ibc1_conn_id = connection.conn_init( - c, IBC_1, IBC_0, ibc1_client_id, ibc0_client_id) + c, ibc1, ibc0 , ibc1_client_id, ibc0_client_id) ibc1_chan_id = channel.chan_open_init( - c, dst=IBC_1, src=IBC_0, dst_conn=ibc1_conn_id) + c, dst=ibc1, src=ibc0 , dst_conn=ibc1_conn_id) - ibc1_client_id = client.create_update_query_client(c, IBC_1, IBC_0) + ibc1_client_id = client.create_update_query_client(c, ibc1, ibc0) split() ibc0_conn_id, ibc1_conn_id = connection.handshake( - c, IBC_0, IBC_1, ibc0_client_id, ibc1_client_id) + c, ibc0, ibc1, ibc0_client_id, ibc1_client_id) split() ibc0_chan_id, ibc1_chan_id = channel.handshake( - c, IBC_0, IBC_1, ibc0_conn_id, ibc1_conn_id) + c, ibc0, ibc1, ibc0_conn_id, ibc1_conn_id, port_id) split() - packet.ping_pong(c, IBC_0, IBC_1, ibc0_chan_id, ibc1_chan_id) + packet.ping_pong(c, ibc0, ibc1, ibc0_chan_id, ibc1_chan_id) split() sleep(5) - packet.timeout(c, IBC_0, IBC_1, ibc0_chan_id, ibc1_chan_id) + packet.timeout(c, ibc0, ibc1, ibc0_chan_id, ibc1_chan_id) split() # The ChannelCloseInit message is currently denied by Gaia, # and requires a patch to be accepted. - # channel.close(c, IBC_0, IBC_1, ibc0_conn_id, + # channel.close(c, ibc0 , ibc1, ibc0_conn_id, # ibc1_conn_id, ibc0_chan_id, ibc1_chan_id) + return ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id def main(): parser = argparse.ArgumentParser( @@ -210,9 +204,26 @@ def main(): format='%(asctime)s [%(levelname)8s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') - raw(config) - loop(config) + chains = toml.load(config.config_file)['chains'] + + ibc0 = chains[0]['id'] + ibc1 = chains[1]['id'] + port_id = PortId('transfer') + + ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id = raw(config, ibc0 , ibc1, port_id) + sleep(2.0) + + passive_packets(config, ibc0, ibc1, port_id, ibc0_chan_id, ibc1_chan_id) + sleep(2.0) + + channel.passive_channel_start_then_init(config, ibc1, ibc0, ibc1_conn_id, port_id) + sleep(2.0) + + channel.passive_channel_init_then_start(config, ibc1, ibc0, ibc1_conn_id, port_id) + sleep(2.0) + channel.passive_channel_try_then_start(config, ibc1, ibc0, ibc1_conn_id, ibc0_conn_id, port_id) + sleep(2.0) if __name__ == "__main__": main() diff --git a/guide/src/commands/queries/connection.md b/guide/src/commands/queries/connection.md index 8a35fbe497..4ed695f351 100644 --- a/guide/src/commands/queries/connection.md +++ b/guide/src/commands/queries/connection.md @@ -135,8 +135,21 @@ hermes query connection channels ibc-1 connection-1 ```rust Success: [ - ChannelId( - "channel-1", - ), + PortChannelId { + channel_id: ChannelId( + "channel-0", + ), + port_id: PortId( + "transfer", + ), + }, + PortChannelId { + channel_id: ChannelId( + "channel-1", + ), + port_id: PortId( + "transfer", + ), + }, ] ``` diff --git a/guide/src/config.md b/guide/src/config.md index ad32e7c2cf..77986ae300 100644 --- a/guide/src/config.md +++ b/guide/src/config.md @@ -36,7 +36,7 @@ Here is an example for the `global` section: ```toml [global] -strategy = 'naive' +strategy = 'packets' log_level = 'info' ``` @@ -101,7 +101,7 @@ Here is a full example of a configuration file with two chains configured: ```toml [global] -strategy = 'naive' +strategy = 'packets' log_level = 'error' [[chains]] diff --git a/guide/src/help.md b/guide/src/help.md index 142c8c2264..02468038a7 100644 --- a/guide/src/help.md +++ b/guide/src/help.md @@ -88,7 +88,7 @@ the `profiling` feature and the [log level][log-level] should be `info` level or #### Example output for `tx raw conn-init` command ``` -hermes -c config_example.toml tx raw conn-init ibc-0 ibc-1 07-tendermint-0 07-tendermint-0 +hermes -c config.toml tx raw conn-init ibc-0 ibc-1 07-tendermint-0 07-tendermint-0 ``` ``` @@ -156,7 +156,7 @@ Relevant snippet: ```toml [global] -strategy = 'naive' +strategy = 'packets' log_level = 'error' ``` diff --git a/guide/src/tutorials/local-chains/relay-paths/multiple-paths.md b/guide/src/tutorials/local-chains/relay-paths/multiple-paths.md index 98985ef883..2e7b5df1c0 100644 --- a/guide/src/tutorials/local-chains/relay-paths/multiple-paths.md +++ b/guide/src/tutorials/local-chains/relay-paths/multiple-paths.md @@ -11,7 +11,7 @@ the `start-multi` command. ```toml [global] - strategy = 'naive' + strategy = 'packets' log_level = 'info' [[chains]] diff --git a/modules/src/events.rs b/modules/src/events.rs index e3cbf69833..3277469251 100644 --- a/modules/src/events.rs +++ b/modules/src/events.rs @@ -7,6 +7,7 @@ use crate::ics02_client::events as ClientEvents; use crate::ics02_client::events::NewBlock; use crate::ics03_connection::events as ConnectionEvents; use crate::ics04_channel::events as ChannelEvents; +use crate::ics04_channel::events::Attributes as ChannelAttributes; use crate::Height; use prost::alloc::fmt::Formatter; use std::fmt; @@ -186,6 +187,16 @@ impl IbcEvent { _ => unimplemented!(), } } + + pub fn channel_attributes(&self) -> Option<&ChannelAttributes> { + match self { + IbcEvent::OpenInitChannel(ev) => Some(ev.attributes()), + IbcEvent::OpenTryChannel(ev) => Some(ev.attributes()), + IbcEvent::OpenAckChannel(ev) => Some(ev.attributes()), + IbcEvent::OpenConfirmChannel(ev) => Some(ev.attributes()), + _ => None, + } + } } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/modules/src/ics04_channel/channel.rs b/modules/src/ics04_channel/channel.rs index 2f723ddcad..c59cba5c6f 100644 --- a/modules/src/ics04_channel/channel.rs +++ b/modules/src/ics04_channel/channel.rs @@ -404,6 +404,9 @@ impl State { _ => fail!(error::Kind::UnknownState, s), } } + pub fn is_open(self) -> bool { + self == State::Open + } } /// Provides a `to_string` method. diff --git a/modules/src/ics04_channel/events.rs b/modules/src/ics04_channel/events.rs index 2f04df0ad2..5debf0d2c3 100644 --- a/modules/src/ics04_channel/events.rs +++ b/modules/src/ics04_channel/events.rs @@ -166,8 +166,8 @@ impl Attributes { pub fn port_id(&self) -> &PortId { &self.port_id } - pub fn channel_id(&self) -> &Option { - &self.channel_id + pub fn channel_id(&self) -> Option<&ChannelId> { + self.channel_id.as_ref() } } @@ -188,8 +188,14 @@ impl Default for Attributes { pub struct OpenInit(Attributes); impl OpenInit { - pub fn channel_id(&self) -> &Option { - &self.0.channel_id + pub fn attributes(&self) -> &Attributes { + &self.0 + } + pub fn channel_id(&self) -> Option<&ChannelId> { + self.0.channel_id.as_ref() + } + pub fn port_id(&self) -> &PortId { + &self.0.port_id } pub fn height(&self) -> Height { self.0.height @@ -232,8 +238,14 @@ impl From for IbcEvent { pub struct OpenTry(Attributes); impl OpenTry { - pub fn channel_id(&self) -> &Option { - &self.0.channel_id + pub fn attributes(&self) -> &Attributes { + &self.0 + } + pub fn channel_id(&self) -> Option<&ChannelId> { + self.0.channel_id.as_ref() + } + pub fn port_id(&self) -> &PortId { + &self.0.port_id } pub fn height(&self) -> Height { self.0.height @@ -279,8 +291,11 @@ impl OpenAck { pub fn attributes(&self) -> &Attributes { &self.0 } - pub fn channel_id(&self) -> &Option { - &self.0.channel_id + pub fn channel_id(&self) -> Option<&ChannelId> { + self.0.channel_id.as_ref() + } + pub fn port_id(&self) -> &PortId { + &self.0.port_id } pub fn height(&self) -> Height { self.0.height @@ -288,6 +303,10 @@ impl OpenAck { pub fn set_height(&mut self, height: Height) { self.0.height = height; } + + pub fn counterparty_channel_id(&self) -> Option<&ChannelId> { + self.0.counterparty_channel_id.as_ref() + } } impl From for OpenAck { @@ -326,8 +345,11 @@ impl OpenConfirm { pub fn attributes(&self) -> &Attributes { &self.0 } - pub fn channel_id(&self) -> &Option { - &self.0.channel_id + pub fn channel_id(&self) -> Option<&ChannelId> { + self.0.channel_id.as_ref() + } + pub fn port_id(&self) -> &PortId { + &self.0.port_id } pub fn height(&self) -> Height { self.0.height @@ -444,8 +466,8 @@ impl std::fmt::Display for CloseInit { pub struct CloseConfirm(Attributes); impl CloseConfirm { - pub fn channel_id(&self) -> &Option { - &self.0.channel_id + pub fn channel_id(&self) -> Option<&ChannelId> { + self.0.channel_id.as_ref() } pub fn height(&self) -> Height { self.0.height @@ -523,6 +545,18 @@ impl SendPacket { pub fn set_height(&mut self, height: Height) { self.height = height; } + pub fn src_port_id(&self) -> &PortId { + &self.packet.source_port + } + pub fn src_channel_id(&self) -> &ChannelId { + &self.packet.source_channel + } + pub fn dst_port_id(&self) -> &PortId { + &self.packet.destination_port + } + pub fn dst_channel_id(&self) -> &ChannelId { + &self.packet.destination_channel + } } impl TryFrom for SendPacket { @@ -561,6 +595,18 @@ impl ReceivePacket { pub fn set_height(&mut self, height: Height) { self.height = height; } + pub fn src_port_id(&self) -> &PortId { + &self.packet.source_port + } + pub fn src_channel_id(&self) -> &ChannelId { + &self.packet.source_channel + } + pub fn dst_port_id(&self) -> &PortId { + &self.packet.destination_port + } + pub fn dst_channel_id(&self) -> &ChannelId { + &self.packet.destination_channel + } } impl TryFrom for ReceivePacket { @@ -601,6 +647,18 @@ impl WriteAcknowledgement { pub fn set_height(&mut self, height: Height) { self.height = height; } + pub fn src_port_id(&self) -> &PortId { + &self.packet.source_port + } + pub fn src_channel_id(&self) -> &ChannelId { + &self.packet.source_channel + } + pub fn dst_port_id(&self) -> &PortId { + &self.packet.destination_port + } + pub fn dst_channel_id(&self) -> &ChannelId { + &self.packet.destination_channel + } } impl TryFrom for WriteAcknowledgement { @@ -648,6 +706,12 @@ impl AcknowledgePacket { pub fn set_height(&mut self, height: Height) { self.height = height; } + pub fn src_port_id(&self) -> &PortId { + &self.packet.source_port + } + pub fn src_channel_id(&self) -> &ChannelId { + &self.packet.source_channel + } } impl TryFrom for AcknowledgePacket { @@ -682,7 +746,7 @@ impl TimeoutPacket { self.height } pub fn set_height(&mut self, height: Height) { - self.height = height + self.height = height; } pub fn src_port_id(&self) -> &PortId { &self.packet.source_port @@ -690,6 +754,12 @@ impl TimeoutPacket { pub fn src_channel_id(&self) -> &ChannelId { &self.packet.source_channel } + pub fn dst_port_id(&self) -> &PortId { + &self.packet.destination_port + } + pub fn dst_channel_id(&self) -> &ChannelId { + &self.packet.destination_channel + } } impl TryFrom for TimeoutPacket { @@ -727,6 +797,18 @@ impl TimeoutOnClosePacket { pub fn set_height(&mut self, height: Height) { self.height = height; } + pub fn src_port_id(&self) -> &PortId { + &self.packet.source_port + } + pub fn src_channel_id(&self) -> &ChannelId { + &self.packet.source_channel + } + pub fn dst_port_id(&self) -> &PortId { + &self.packet.destination_port + } + pub fn dst_channel_id(&self) -> &ChannelId { + &self.packet.destination_channel + } } impl TryFrom for TimeoutOnClosePacket { diff --git a/relayer-cli/src/commands/query/channel.rs b/relayer-cli/src/commands/query/channel.rs index cf6a667207..74c53f41a9 100644 --- a/relayer-cli/src/commands/query/channel.rs +++ b/relayer-cli/src/commands/query/channel.rs @@ -9,6 +9,7 @@ use ibc_relayer::chain::{Chain, CosmosSdkChain}; use crate::conclude::Output; use crate::prelude::*; +use ibc::ics04_channel::channel::State; #[derive(Clone, Command, Debug, Options)] pub struct QueryChannelEndCmd { @@ -48,7 +49,17 @@ impl Runnable for QueryChannelEndCmd { let height = ibc::Height::new(chain.id().version(), self.height.unwrap_or(0_u64)); let res = chain.query_channel(&self.port_id, &self.channel_id, height); match res { - Ok(ce) => Output::success(ce).exit(), + Ok(channel_end) => { + if channel_end.state_matches(&State::Uninitialized) { + Output::error(format!( + "port '{}' & channel '{}' does not exist", + self.port_id, self.channel_id + )) + .exit() + } else { + Output::success(channel_end).exit() + } + } Err(e) => Output::error(format!("{}", e)).exit(), } } diff --git a/relayer-cli/src/commands/query/connection.rs b/relayer-cli/src/commands/query/connection.rs index cf427e337a..e549fa2f62 100644 --- a/relayer-cli/src/commands/query/connection.rs +++ b/relayer-cli/src/commands/query/connection.rs @@ -3,8 +3,11 @@ use std::sync::Arc; use abscissa_core::{Command, Options, Runnable}; use tokio::runtime::Runtime as TokioRuntime; -use ibc::ics24_host::identifier::ChainId; -use ibc::ics24_host::identifier::ConnectionId; +use ibc::{ + ics03_connection::connection::State, + ics24_host::identifier::ConnectionId, + ics24_host::identifier::{ChainId, PortChannelId}, +}; use ibc_proto::ibc::core::channel::v1::QueryConnectionChannelsRequest; use ibc_relayer::chain::{Chain, CosmosSdkChain}; @@ -48,7 +51,17 @@ impl Runnable for QueryConnectionEndCmd { let height = ibc::Height::new(chain.id().version(), self.height.unwrap_or(0_u64)); let res = chain.query_connection(&self.connection_id, height); match res { - Ok(ce) => Output::success(ce).exit(), + Ok(connection_end) => { + if connection_end.state_matches(&State::Uninitialized) { + Output::error(format!( + "connection '{}' does not exist", + self.connection_id + )) + .exit() + } else { + Output::success(connection_end).exit() + } + } Err(e) => Output::error(format!("{}", e)).exit(), } } @@ -96,7 +109,16 @@ impl Runnable for QueryConnectionChannelsCmd { .map_err(|e| Kind::Query.context(e).into()); match res { - Ok(cids) => Output::success(cids).exit(), + Ok(channels) => { + let ids: Vec = channels + .into_iter() + .map(|identified_channel| PortChannelId { + port_id: identified_channel.port_id, + channel_id: identified_channel.channel_id, + }) + .collect(); + Output::success(ids).exit() + } Err(e) => Output::error(format!("{}", e)).exit(), } } diff --git a/relayer-cli/src/commands/tx/channel.rs b/relayer-cli/src/commands/tx/channel.rs index 7a3d60fe1d..47476a45f6 100644 --- a/relayer-cli/src/commands/tx/channel.rs +++ b/relayer-cli/src/commands/tx/channel.rs @@ -80,14 +80,14 @@ impl Runnable for TxRawChanOpenInitCmd { ClientId::default(), ConnectionId::default(), self.src_port_id.clone(), - ChannelId::default(), + None, ), b_side: ChannelSide::new( chains.dst.clone(), dst_connection.client_id().clone(), self.dst_conn_id.clone(), self.dst_port_id.clone(), - ChannelId::default(), + None, ), version: None, } @@ -120,6 +120,13 @@ pub struct TxRawChanOpenTryCmd { meta = "ID" )] src_chan_id: ChannelId, + + #[options( + help = "identifier of the destination channel (optional)", + short = "d", + meta = "ID" + )] + dst_chan_id: Option, } impl Runnable for TxRawChanOpenTryCmd { @@ -137,14 +144,14 @@ impl Runnable for TxRawChanOpenTryCmd { ClientId::default(), ConnectionId::default(), self.src_port_id.clone(), - self.src_chan_id.clone(), + Some(self.src_chan_id.clone()), ), b_side: ChannelSide::new( chains.dst.clone(), dst_connection.client_id().clone(), self.dst_conn_id.clone(), self.dst_port_id.clone(), - ChannelId::default(), + self.dst_chan_id.clone(), ), version: None, } @@ -202,14 +209,14 @@ impl Runnable for TxRawChanOpenAckCmd { ClientId::default(), ConnectionId::default(), self.src_port_id.clone(), - self.src_chan_id.clone(), + Some(self.src_chan_id.clone()), ), b_side: ChannelSide::new( chains.dst.clone(), dst_connection.client_id().clone(), self.dst_conn_id.clone(), self.dst_port_id.clone(), - self.dst_chan_id.clone(), + Some(self.dst_chan_id.clone()), ), version: None, } @@ -267,14 +274,14 @@ impl Runnable for TxRawChanOpenConfirmCmd { ClientId::default(), ConnectionId::default(), self.src_port_id.clone(), - self.src_chan_id.clone(), + Some(self.src_chan_id.clone()), ), b_side: ChannelSide::new( chains.dst.clone(), dst_connection.client_id().clone(), self.dst_conn_id.clone(), self.dst_port_id.clone(), - self.dst_chan_id.clone(), + Some(self.dst_chan_id.clone()), ), version: None, } @@ -332,14 +339,14 @@ impl Runnable for TxRawChanCloseInitCmd { ClientId::default(), ConnectionId::default(), self.src_port_id.clone(), - self.src_chan_id.clone(), + Some(self.src_chan_id.clone()), ), b_side: ChannelSide::new( chains.dst.clone(), dst_connection.client_id().clone(), self.dst_conn_id.clone(), self.dst_port_id.clone(), - self.dst_chan_id.clone(), + Some(self.dst_chan_id.clone()), ), version: None, } @@ -397,14 +404,14 @@ impl Runnable for TxRawChanCloseConfirmCmd { ClientId::default(), ConnectionId::default(), self.src_port_id.clone(), - self.src_chan_id.clone(), + Some(self.src_chan_id.clone()), ), b_side: ChannelSide::new( chains.dst.clone(), dst_connection.client_id().clone(), self.dst_conn_id.clone(), self.dst_port_id.clone(), - self.dst_chan_id.clone(), + Some(self.dst_chan_id.clone()), ), version: None, } diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index d4c859b0c8..9e20b3b77c 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -177,7 +177,7 @@ pub trait Chain: Sized { fn query_connection_channels( &self, request: QueryConnectionChannelsRequest, - ) -> Result, Error>; + ) -> Result, Error>; /// Performs a query to retrieve the identifiers of all channels. fn query_channels( diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 19499978a0..abd34b985c 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -26,10 +26,8 @@ use ibc::ics02_client::client_consensus::{ }; use ibc::ics02_client::client_state::{AnyClientState, IdentifiedAnyClientState}; use ibc::ics02_client::events as ClientEvents; -use ibc::ics03_connection::connection::{ConnectionEnd, State as ConnectionState}; -use ibc::ics04_channel::channel::{ - ChannelEnd, IdentifiedChannelEnd, QueryPacketEventDataRequest, State as ChannelState, -}; +use ibc::ics03_connection::connection::ConnectionEnd; +use ibc::ics04_channel::channel::{ChannelEnd, IdentifiedChannelEnd, QueryPacketEventDataRequest}; use ibc::ics04_channel::events as ChannelEvents; use ibc::ics04_channel::packet::{PacketMsgType, Sequence}; use ibc::ics07_tendermint::client_state::{AllowUpdate, ClientState}; @@ -747,20 +745,13 @@ impl Chain for CosmosSdkChain { let connection_end = ConnectionEnd::decode_vec(&res.value) .map_err(|e| Kind::Query(format!("connection '{}'", connection_id)).context(e))?; - match connection_end.state() { - ConnectionState::Uninitialized => { - Err(Kind::Query(format!("connection '{}'", connection_id)) - .context("connection does not exist") - .into()) - } - _ => Ok(connection_end), - } + Ok(connection_end) } fn query_connection_channels( &self, request: QueryConnectionChannelsRequest, - ) -> Result, Error> { + ) -> Result, Error> { crate::time!("query_connection_channels"); let mut client = self @@ -781,13 +772,12 @@ impl Chain for CosmosSdkChain { // TODO: add warnings for any identifiers that fail to parse (below). // https://github.com/informalsystems/ibc-rs/pull/506#discussion_r555945560 - let vec_ids = response + let channels = response .channels - .iter() - .filter_map(|ic| ChannelId::from_str(ic.channel_id.as_str()).ok()) + .into_iter() + .filter_map(|ch| IdentifiedChannelEnd::try_from(ch).ok()) .collect(); - - Ok(vec_ids) + Ok(channels) } fn query_channels( @@ -834,15 +824,7 @@ impl Chain for CosmosSdkChain { Kind::Query(format!("port '{}' & channel '{}'", port_id, channel_id)).context(e) })?; - match channel_end.state() { - ChannelState::Uninitialized => Err(Kind::Query(format!( - "port '{}' & channel '{}'", - port_id, channel_id - )) - .context("channel does not exist") - .into()), - _ => Ok(channel_end), - } + Ok(channel_end) } /// Queries the packet commitment hashes associated with a channel. diff --git a/relayer/src/chain/counterparty.rs b/relayer/src/chain/counterparty.rs index c1ed1e8dd4..958bc9b2ff 100644 --- a/relayer/src/chain/counterparty.rs +++ b/relayer/src/chain/counterparty.rs @@ -4,10 +4,12 @@ use tracing::trace; use ibc::{ ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, ics03_connection::connection::IdentifiedConnectionEnd, - ics04_channel::channel::IdentifiedChannelEnd, + ics04_channel::channel::{ChannelEnd, IdentifiedChannelEnd, State}, + ics24_host::identifier::ConnectionId, ics24_host::identifier::{ChainId, ChannelId, PortId}, Height, }; +use ibc_proto::ibc::core::channel::v1::QueryConnectionChannelsRequest; use crate::supervisor::Error; @@ -34,6 +36,8 @@ impl ChannelConnectionClient { } } +/// Returns the [`ChannelConnectionClient`] associated with the +/// provided port and channel id. pub fn channel_connection_client( chain: &dyn ChainHandle, port_id: &PortId, @@ -50,8 +54,8 @@ pub fn channel_connection_client( .query_channel(port_id, channel_id, Height::zero()) .map_err(|e| Error::QueryFailed(format!("{}", e)))?; - if !channel_end.is_open() { - return Err(Error::ChannelNotOpen(channel_id.clone(), chain.id())); + if channel_end.state_matches(&State::Uninitialized) { + return Err(Error::ChannelUninitialized(channel_id.clone(), chain.id())); } let connection_id = channel_end @@ -96,3 +100,61 @@ pub fn get_counterparty_chain( channel_connection_client(src_chain, src_port_id, src_channel_id) .map(|c| c.client.client_state.chain_id()) } + +fn channel_on_destination( + port_id: &PortId, + channel_id: &ChannelId, + counterparty_chain: &dyn ChainHandle, + remote_connection_id: &ConnectionId, +) -> Result, Error> { + let req = QueryConnectionChannelsRequest { + connection: remote_connection_id.to_string(), + pagination: ibc_proto::cosmos::base::query::pagination::all(), + }; + + let counterparty_channels = counterparty_chain + .query_connection_channels(req) + .map_err(|e| Error::QueryFailed(format!("{}", e)))?; + + for counterparty_channel in counterparty_channels.into_iter() { + let local_channel_end = &counterparty_channel.channel_end.remote; + if let Some(local_channel_id) = local_channel_end.channel_id() { + if local_channel_id == channel_id && local_channel_end.port_id() == port_id { + return Ok(Some(counterparty_channel.channel_end)); + } + } + } + Ok(None) +} + +pub fn channel_state_on_destination( + channel: IdentifiedChannelEnd, + connection: IdentifiedConnectionEnd, + counterparty_chain: &dyn ChainHandle, +) -> Result { + let counterparty_state = + if let Some(remote_channel_id) = channel.channel_end.remote.channel_id() { + counterparty_chain + .query_channel( + channel.channel_end.remote.port_id(), + remote_channel_id, + Height::zero(), + ) + .map_err(|e| Error::QueryFailed(format!("{}", e)))? + .state + } else if let Some(remote_connection_id) = connection.end().counterparty().connection_id() { + channel_on_destination( + &channel.port_id, + &channel.channel_id, + counterparty_chain, + remote_connection_id, + )? + .map_or_else( + || State::Uninitialized, + |remote_channel| remote_channel.state, + ) + } else { + State::Uninitialized + }; + Ok(counterparty_state) +} diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index a313fe5e6e..a8acf44fac 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -26,9 +26,9 @@ use ibc::{ Height, }; use ibc_proto::ibc::core::channel::v1::{ - PacketState, QueryChannelsRequest, QueryNextSequenceReceiveRequest, - QueryPacketAcknowledgementsRequest, QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, - QueryUnreceivedPacketsRequest, + PacketState, QueryChannelsRequest, QueryConnectionChannelsRequest, + QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementsRequest, + QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, }; use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest; use ibc_proto::ibc::core::commitment::v1::MerkleProof; @@ -183,6 +183,11 @@ pub enum ChainRequest { reply_to: ReplyTo>, }, + QueryConnectionChannels { + request: QueryConnectionChannelsRequest, + reply_to: ReplyTo>, + }, + QueryChannel { port_id: PortId, channel_id: ChannelId, @@ -313,6 +318,11 @@ pub trait ChainHandle: DynClone + Send + Sync + Debug { height: Height, ) -> Result; + fn query_connection_channels( + &self, + request: QueryConnectionChannelsRequest, + ) -> Result, Error>; + fn query_next_sequence_receive( &self, request: QueryNextSequenceReceiveRequest, diff --git a/relayer/src/chain/handle/prod.rs b/relayer/src/chain/handle/prod.rs index c5bb7dc809..aad102bea8 100644 --- a/relayer/src/chain/handle/prod.rs +++ b/relayer/src/chain/handle/prod.rs @@ -24,9 +24,9 @@ use ibc::{ Height, }; use ibc_proto::ibc::core::channel::v1::{ - PacketState, QueryChannelsRequest, QueryNextSequenceReceiveRequest, - QueryPacketAcknowledgementsRequest, QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, - QueryUnreceivedPacketsRequest, + PacketState, QueryChannelsRequest, QueryConnectionChannelsRequest, + QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementsRequest, + QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, }; use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest; use ibc_proto::ibc::core::commitment::v1::MerkleProof; @@ -188,6 +188,13 @@ impl ChainHandle for ProdChainHandle { self.send(|reply_to| ChainRequest::QueryChannels { request, reply_to }) } + fn query_connection_channels( + &self, + request: QueryConnectionChannelsRequest, + ) -> Result, Error> { + self.send(|reply_to| ChainRequest::QueryConnectionChannels { request, reply_to }) + } + fn query_channel( &self, port_id: &PortId, diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 486feebcb4..835c0d0ef0 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -177,7 +177,7 @@ impl Chain for MockChain { fn query_connection_channels( &self, _request: QueryConnectionChannelsRequest, - ) -> Result, Error> { + ) -> Result, Error> { unimplemented!() } diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index c3bdf31e52..eaf87ca84e 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -29,9 +29,9 @@ use ibc::{ use ibc_proto::ibc::core::{ channel::v1::{ - PacketState, QueryChannelsRequest, QueryNextSequenceReceiveRequest, - QueryPacketAcknowledgementsRequest, QueryPacketCommitmentsRequest, - QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, + PacketState, QueryChannelsRequest, QueryConnectionChannelsRequest, + QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementsRequest, + QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, }, client::v1::QueryConsensusStatesRequest, commitment::v1::MerkleProof, @@ -264,6 +264,10 @@ impl ChainRuntime { self.query_channels(request, reply_to)? }, + Ok(ChainRequest::QueryConnectionChannels { request, reply_to }) => { + self.query_connection_channels(request, reply_to)? + }, + Ok(ChainRequest::QueryChannel { port_id, channel_id, height, reply_to }) => { self.query_channel(port_id, channel_id, height, reply_to)? }, @@ -582,6 +586,20 @@ impl ChainRuntime { Ok(()) } + fn query_connection_channels( + &self, + request: QueryConnectionChannelsRequest, + reply_to: ReplyTo>, + ) -> Result<(), Error> { + let result = self.chain.query_connection_channels(request); + + reply_to + .send(result) + .map_err(|e| Kind::Channel.context(e))?; + + Ok(()) + } + fn query_channels( &self, request: QueryChannelsRequest, diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 3646e41b94..e48552a3ff 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -1,14 +1,13 @@ #![allow(clippy::borrowed_box)] - -use std::time::Duration; - +use anomaly::BoxError; use prost_types::Any; use serde::Serialize; +use std::time::Duration; use thiserror::Error; -use tracing::error; +use tracing::{debug, error}; use ibc::events::IbcEvent; -use ibc::ics04_channel::channel::{ChannelEnd, Counterparty, Order, State}; +use ibc::ics04_channel::channel::{ChannelEnd, Counterparty, IdentifiedChannelEnd, Order, State}; use ibc::ics04_channel::msgs::chan_close_confirm::MsgChannelCloseConfirm; use ibc::ics04_channel::msgs::chan_close_init::MsgChannelCloseInit; use ibc::ics04_channel::msgs::chan_open_ack::MsgChannelOpenAck; @@ -23,14 +22,29 @@ use crate::chain::handle::ChainHandle; use crate::connection::Connection; use crate::error::Error; use crate::foreign_client::{ForeignClient, ForeignClientError}; +use crate::object::Channel as WorkerChannelObject; +use crate::supervisor::error::Error as WorkerChannelError; const MAX_RETRIES: usize = 5; +use crate::chain::counterparty::{channel_connection_client, channel_state_on_destination}; +use crate::util::retry::RetryResult; +use ibc_proto::ibc::core::channel::v1::QueryConnectionChannelsRequest; + #[derive(Debug, Error)] pub enum ChannelError { #[error("failed with underlying cause: {0}")] Failed(String), + #[error("failed due to missing local channel id")] + MissingLocalChannelId, + + #[error("failed due to missing counterparty channel id")] + MissingCounterpartyChannelId, + + #[error("failed due to missing counterparty connection")] + MissingCounterpartyConnection, + #[error("failed during an operation on client ({0}) hosted by chain ({1}) with error: {2}")] ClientOperation(ClientId, ChainId, ForeignClientError), @@ -49,7 +63,7 @@ pub struct ChannelSide { client_id: ClientId, connection_id: ConnectionId, port_id: PortId, - channel_id: ChannelId, + channel_id: Option, } impl ChannelSide { @@ -58,7 +72,7 @@ impl ChannelSide { client_id: ClientId, connection_id: ConnectionId, port_id: PortId, - channel_id: ChannelId, + channel_id: Option, ) -> ChannelSide { Self { chain, @@ -85,8 +99,8 @@ impl ChannelSide { &self.port_id } - pub fn channel_id(&self) -> &ChannelId { - &self.channel_id + pub fn channel_id(&self) -> Option<&ChannelId> { + self.channel_id.as_ref() } } @@ -141,6 +155,129 @@ impl Channel { Ok(channel) } + pub fn restore_from_event( + chain: Box, + counterparty_chain: Box, + channel_open_event: IbcEvent, + ) -> Result { + let channel_event_attributes = + channel_open_event.channel_attributes().ok_or_else(|| { + ChannelError::Failed( + "A channel object must be build only from a channel event ".to_string(), + ) + })?; + + let port_id = channel_event_attributes.port_id.clone(); + let channel_id = channel_event_attributes.channel_id.clone(); + + let version = counterparty_chain + .module_version(&port_id) + .map_err(|e| ChannelError::QueryError(counterparty_chain.id(), e))?; + + let connection_id = channel_event_attributes.connection_id.clone(); + let connection = chain.query_connection(&connection_id, Height::zero())?; + let connection_counterparty = connection.counterparty(); + + let counterparty_connection_id = connection_counterparty + .connection_id() + .ok_or(ChannelError::MissingCounterpartyConnection)?; + + Ok(Channel { + // The event does not include the channel ordering. + // The message handlers `build_chan_open..` determine the order included in the handshake + // message from channel query. + ordering: Default::default(), + a_side: ChannelSide::new( + chain.clone(), + connection.client_id().clone(), + connection_id, + port_id, + channel_id, + ), + b_side: ChannelSide::new( + counterparty_chain.clone(), + connection.counterparty().client_id().clone(), + counterparty_connection_id.clone(), + channel_event_attributes.counterparty_port_id.clone(), + channel_event_attributes.counterparty_channel_id.clone(), + ), + connection_delay: connection.delay_period(), + // The event does not include the version. + // The message handlers `build_chan_open..` determine the version from channel query. + version: Some(version), + }) + } + + /// Recreates a 'Channel' object from the worker's object built from chain state scanning. + /// The channel must exist on chain and its connection must be initialized on both chains. + pub fn restore_from_state( + chain: Box, + counterparty_chain: Box, + channel: WorkerChannelObject, + height: Height, + ) -> Result<(Channel, State), BoxError> { + let a_channel = + chain.query_channel(&channel.src_port_id, &channel.src_channel_id, height)?; + + let a_connection_id = a_channel.connection_hops().first().ok_or_else(|| { + WorkerChannelError::MissingConnectionHops(channel.src_channel_id.clone(), chain.id()) + })?; + + let a_connection = chain.query_connection(&a_connection_id, Height::zero())?; + let b_connection_id = a_connection + .counterparty() + .connection_id() + .cloned() + .ok_or_else(|| { + WorkerChannelError::ChannelConnectionUninitialized( + channel.src_channel_id.clone(), + chain.id(), + a_connection.counterparty(), + ) + })?; + + let mut handshake_channel = Channel { + ordering: *a_channel.ordering(), + a_side: ChannelSide::new( + chain.clone(), + a_connection.client_id().clone(), + a_connection_id.clone(), + channel.src_port_id.clone(), + Some(channel.src_channel_id.clone()), + ), + b_side: ChannelSide::new( + counterparty_chain.clone(), + a_connection.counterparty().client_id().clone(), + b_connection_id.clone(), + a_channel.remote.port_id.clone(), + a_channel.remote.channel_id.clone(), + ), + connection_delay: a_connection.delay_period(), + version: Some(a_channel.version.clone()), + }; + + if a_channel.state_matches(&State::Init) && a_channel.remote.channel_id.is_none() { + let req = QueryConnectionChannelsRequest { + connection: b_connection_id.to_string(), + pagination: ibc_proto::cosmos::base::query::pagination::all(), + }; + + let channels: Vec = + counterparty_chain.query_connection_channels(req)?; + + for chan in channels { + if let Some(remote_channel_id) = chan.channel_end.remote.channel_id() { + if remote_channel_id == &channel.src_channel_id { + handshake_channel.b_side.channel_id = Some(chan.channel_id); + break; + } + } + } + } + + Ok((handshake_channel, a_channel.state)) + } + pub fn src_chain(&self) -> &Box { &self.a_side.chain } @@ -173,12 +310,12 @@ impl Channel { &self.b_side.port_id } - pub fn src_channel_id(&self) -> &ChannelId { - &self.a_side.channel_id + pub fn src_channel_id(&self) -> Option<&ChannelId> { + self.a_side.channel_id() } - pub fn dst_channel_id(&self) -> &ChannelId { - &self.b_side.channel_id + pub fn dst_channel_id(&self) -> Option<&ChannelId> { + self.b_side.channel_id() } pub fn flipped(&self) -> Channel { @@ -209,7 +346,7 @@ impl Channel { continue; } Ok(event) => { - self.a_side.channel_id = extract_channel_id(&event)?.clone(); + self.a_side.channel_id = Some(extract_channel_id(&event)?.clone()); println!("{} {} => {:#?}\n", done, a_chain.id(), event); init_success = true; break; @@ -236,7 +373,7 @@ impl Channel { continue; } Ok(event) => { - self.b_side.channel_id = extract_channel_id(&event)?.clone(); + self.b_side.channel_id = Some(extract_channel_id(&event)?.clone()); println!("{} {} => {:#?}\n", done, b_chain.id(), event); try_success = true; break; @@ -255,14 +392,20 @@ impl Channel { while counter < MAX_RETRIES { counter += 1; + let src_channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; // Continue loop if query error let a_channel = - a_chain.query_channel(&self.src_port_id(), &self.src_channel_id(), Height::zero()); + a_chain.query_channel(&self.src_port_id(), &src_channel_id, Height::zero()); if a_channel.is_err() { continue; } let b_channel = - b_chain.query_channel(&self.dst_port_id(), &self.dst_channel_id(), Height::zero()); + b_chain.query_channel(&self.dst_port_id(), &dst_channel_id, Height::zero()); if b_channel.is_err() { continue; } @@ -306,6 +449,72 @@ impl Channel { ))) } + pub fn counterparty_state(&self) -> Result { + // Source channel ID must be specified + let channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + + let channel_deps = + channel_connection_client(self.src_chain().as_ref(), self.src_port_id(), channel_id) + .map_err(|_| { + ChannelError::Failed(format!( + "failed to query the channel dependecies for {}", + channel_id + )) + })?; + + channel_state_on_destination( + channel_deps.channel.clone(), + channel_deps.connection, + self.dst_chain().as_ref(), + ) + .map_err(|_| { + ChannelError::Failed(format!( + "failed to query the channel state on destination for {}", + channel_id + )) + }) + } + + pub fn handshake_step(&mut self, state: State) -> Result, ChannelError> { + match (state, self.counterparty_state()?) { + (State::Init, State::Uninitialized) => Ok(vec![self.build_chan_open_try_and_send()?]), + (State::Init, State::Init) => Ok(vec![self.build_chan_open_try_and_send()?]), + (State::TryOpen, State::Init) => Ok(vec![self.build_chan_open_ack_and_send()?]), + (State::TryOpen, State::TryOpen) => Ok(vec![self.build_chan_open_ack_and_send()?]), + (State::Open, State::TryOpen) => Ok(vec![self.build_chan_open_confirm_and_send()?]), + _ => Ok(vec![]), + } + } + + pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<(), u64> { + let done = '🥳'; + + match self.handshake_step(state) { + Err(e) => { + error!("Failed {:?} with error {}", state, e); + RetryResult::Retry(index) + } + Ok(ev) => { + debug!("{} => {:#?}\n", done, ev); + RetryResult::Ok(()) + } + } + } + + pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult<(), u64> { + let state = match event { + IbcEvent::OpenInitChannel(_) => State::Init, + IbcEvent::OpenTryChannel(_) => State::TryOpen, + IbcEvent::OpenAckChannel(_) => State::Open, + IbcEvent::OpenConfirmChannel(_) => State::Open, + _ => State::Uninitialized, + }; + + self.step_state(state, index) + } + pub fn build_update_client_on_dst(&self, height: Height) -> Result, ChannelError> { let client = ForeignClient::restore( self.dst_client_id().clone(), @@ -416,15 +625,19 @@ impl Channel { /// Retrieves the channel from destination and compares against the expected channel /// built from the message type (`msg_type`) and options (`opts`). /// If the expected and the destination channels are compatible, it returns the expected channel + /// Source and destination channel IDs must be specified. fn validated_expected_channel( &self, msg_type: ChannelMsgType, ) -> Result { + // Destination channel ID must be specified + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; + // If there is a channel present on the destination chain, it should look like this: - let counterparty = Counterparty::new( - self.src_port_id().clone(), - Option::from(self.src_channel_id().clone()), - ); + let counterparty = + Counterparty::new(self.src_port_id().clone(), self.src_channel_id().cloned()); // The highest expected state, depends on the message type: let highest_state = match msg_type { @@ -442,22 +655,22 @@ impl Channel { self.dst_version()?, ); - // Retrieve existing channel if any + // Retrieve existing channel let dst_channel = self .dst_chain() - .query_channel(self.dst_port_id(), self.dst_channel_id(), Height::default()) + .query_channel(self.dst_port_id(), dst_channel_id, Height::zero()) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; - // Check if a connection is expected to exist on destination chain + // Check if a channel is expected to exist on destination chain // A channel must exist on destination chain for Ack and Confirm Tx-es to succeed if dst_channel.state_matches(&State::Uninitialized) { return Err(ChannelError::Failed( - "missing channel on source chain".to_string(), + "missing channel on destination chain".to_string(), )); } check_destination_channel_state( - self.dst_channel_id().clone(), + dst_channel_id.clone(), dst_channel, dst_expected_channel.clone(), )?; @@ -466,9 +679,15 @@ impl Channel { } pub fn build_chan_open_try(&self) -> Result, ChannelError> { + // Source channel ID must be specified + let src_channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + + // Channel must exist on source let src_channel = self .src_chain() - .query_channel(self.src_port_id(), self.src_channel_id(), Height::default()) + .query_channel(self.src_port_id(), &src_channel_id, Height::zero()) .map_err(|e| ChannelError::QueryError(self.src_chain().id(), e))?; if src_channel.counterparty().port_id() != self.dst_port_id() { @@ -482,10 +701,9 @@ impl Channel { ))); } - // Retrieve the connection - let _dst_connection = self - .dst_chain() - .query_connection(self.dst_connection_id(), Height::default()) + // Connection must exist on destination + self.dst_chain() + .query_connection(self.dst_connection_id(), Height::zero()) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; let query_height = self @@ -495,16 +713,14 @@ impl Channel { let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), self.src_channel_id(), query_height) + .build_channel_proofs(self.src_port_id(), src_channel_id, query_height) .map_err(|e| ChannelError::Failed(format!("failed to build channel proofs: {}", e)))?; // Build message(s) to update client on destination let mut msgs = self.build_update_client_on_dst(proofs.height())?; - let counterparty = Counterparty::new( - self.src_port_id().clone(), - Some(self.src_channel_id().clone()), - ); + let counterparty = + Counterparty::new(self.src_port_id().clone(), self.src_channel_id().cloned()); let channel = ChannelEnd::new( State::TryOpen, @@ -523,10 +739,16 @@ impl Channel { )) })?; + let previous_channel_id = if src_channel.counterparty().channel_id.is_none() { + self.b_side.channel_id.clone() + } else { + src_channel.counterparty().channel_id.clone() + }; + // Build the domain type message let new_msg = MsgChannelOpenTry { port_id: self.dst_port_id().clone(), - previous_channel_id: src_channel.counterparty().channel_id.clone(), + previous_channel_id, counterparty_version: self.src_version()?, channel, proofs, @@ -566,18 +788,25 @@ impl Channel { } pub fn build_chan_open_ack(&self) -> Result, ChannelError> { + // Source and destination channel IDs must be specified + let src_channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; + // Check that the destination chain will accept the message - let _dst_expected_channel = self.validated_expected_channel(ChannelMsgType::OpenAck)?; + self.validated_expected_channel(ChannelMsgType::OpenAck)?; - let _src_channel = self - .src_chain() - .query_channel(self.src_port_id(), self.src_channel_id(), Height::default()) + // Channel must exist on source + self.src_chain() + .query_channel(self.src_port_id(), src_channel_id, Height::zero()) .map_err(|e| ChannelError::QueryError(self.src_chain().id(), e))?; - // Retrieve the connection - let _dst_connection = self - .dst_chain() - .query_connection(self.dst_connection_id(), Height::default()) + // Connection must exist on destination + self.dst_chain() + .query_connection(self.dst_connection_id(), Height::zero()) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; let query_height = self @@ -587,7 +816,7 @@ impl Channel { let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), self.src_channel_id(), query_height) + .build_channel_proofs(self.src_port_id(), src_channel_id, query_height) .map_err(|e| { ChannelError::Failed(format!( "failed while building the channel proofs at ACK step with error: {}", @@ -610,8 +839,8 @@ impl Channel { // Build the domain type message let new_msg = MsgChannelOpenAck { port_id: self.dst_port_id().clone(), - channel_id: self.dst_channel_id().clone(), - counterparty_channel_id: self.src_channel_id().clone(), + channel_id: dst_channel_id.clone(), + counterparty_channel_id: src_channel_id.clone(), counterparty_version: self.src_version()?, proofs, signer, @@ -650,18 +879,25 @@ impl Channel { } pub fn build_chan_open_confirm(&self) -> Result, ChannelError> { + // Source and destination channel IDs must be specified + let src_channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; + // Check that the destination chain will accept the message - let _dst_expected_channel = self.validated_expected_channel(ChannelMsgType::OpenConfirm)?; + self.validated_expected_channel(ChannelMsgType::OpenConfirm)?; - let _src_channel = self - .src_chain() - .query_channel(self.src_port_id(), self.src_channel_id(), Height::default()) + // Channel must exist on source + self.src_chain() + .query_channel(self.src_port_id(), src_channel_id, Height::zero()) .map_err(|e| ChannelError::QueryError(self.src_chain().id(), e))?; - // Retrieve the connection - let _dst_connection = self - .dst_chain() - .query_connection(self.dst_connection_id(), Height::default()) + // Connection must exist on destination + self.dst_chain() + .query_connection(self.dst_connection_id(), Height::zero()) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; let query_height = self @@ -671,7 +907,7 @@ impl Channel { let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), self.src_channel_id(), query_height) + .build_channel_proofs(self.src_port_id(), src_channel_id, query_height) .map_err(|e| ChannelError::Failed(format!("failed to build channel proofs: {}", e)))?; // Build message(s) to update client on destination @@ -689,7 +925,7 @@ impl Channel { // Build the domain type message let new_msg = MsgChannelOpenConfirm { port_id: self.dst_port_id().clone(), - channel_id: self.dst_channel_id().clone(), + channel_id: dst_channel_id.clone(), proofs, signer, }; @@ -727,9 +963,14 @@ impl Channel { } pub fn build_chan_close_init(&self) -> Result, ChannelError> { - let _channel = self - .dst_chain() - .query_channel(self.dst_port_id(), self.dst_channel_id(), Height::default()) + // Destination channel ID must be specified + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; + + // Channel must exist on destination + self.dst_chain() + .query_channel(self.dst_port_id(), dst_channel_id, Height::zero()) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; let signer = self.dst_chain().get_signer().map_err(|e| { @@ -743,7 +984,7 @@ impl Channel { // Build the domain type message let new_msg = MsgChannelCloseInit { port_id: self.dst_port_id().clone(), - channel_id: self.dst_channel_id().clone(), + channel_id: dst_channel_id.clone(), signer, }; @@ -780,19 +1021,25 @@ impl Channel { } pub fn build_chan_close_confirm(&self) -> Result, ChannelError> { + // Source and destination channel IDs must be specified + let src_channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; + // Check that the destination chain will accept the message - let _dst_expected_channel = - self.validated_expected_channel(ChannelMsgType::CloseConfirm)?; + self.validated_expected_channel(ChannelMsgType::CloseConfirm)?; - let _src_channel = self - .src_chain() - .query_channel(self.src_port_id(), self.src_channel_id(), Height::default()) + // Channel must exist on source + self.src_chain() + .query_channel(self.src_port_id(), src_channel_id, Height::zero()) .map_err(|e| ChannelError::QueryError(self.src_chain().id(), e))?; - // Retrieve the connection - let _dst_connection = self - .dst_chain() - .query_connection(self.dst_connection_id(), Height::default()) + // Connection must exist on destination + self.dst_chain() + .query_connection(self.dst_connection_id(), Height::zero()) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; let query_height = self @@ -802,7 +1049,7 @@ impl Channel { let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), self.src_channel_id(), query_height) + .build_channel_proofs(self.src_port_id(), src_channel_id, query_height) .map_err(|e| ChannelError::Failed(format!("failed to build channel proofs: {}", e)))?; // Build message(s) to update client on destination @@ -820,7 +1067,7 @@ impl Channel { // Build the domain type message let new_msg = MsgChannelCloseConfirm { port_id: self.dst_port_id().clone(), - channel_id: self.dst_channel_id().clone(), + channel_id: dst_channel_id.clone(), proofs, signer, }; @@ -858,12 +1105,12 @@ impl Channel { } } -fn extract_channel_id(event: &IbcEvent) -> Result<&ChannelId, ChannelError> { +pub fn extract_channel_id(event: &IbcEvent) -> Result<&ChannelId, ChannelError> { match event { - IbcEvent::OpenInitChannel(ev) => ev.channel_id().as_ref(), - IbcEvent::OpenTryChannel(ev) => ev.channel_id().as_ref(), - IbcEvent::OpenAckChannel(ev) => ev.channel_id().as_ref(), - IbcEvent::OpenConfirmChannel(ev) => ev.channel_id().as_ref(), + IbcEvent::OpenInitChannel(ev) => ev.channel_id(), + IbcEvent::OpenTryChannel(ev) => ev.channel_id(), + IbcEvent::OpenAckChannel(ev) => ev.channel_id(), + IbcEvent::OpenConfirmChannel(ev) => ev.channel_id(), _ => None, } .ok_or_else(|| ChannelError::Failed("cannot extract channel_id from result".to_string())) @@ -888,13 +1135,15 @@ fn check_destination_channel_state( // TODO: Refactor into a method let good_state = *existing_channel.state() as u32 <= *expected_channel.state() as u32; - let good_channel_ids = existing_channel.counterparty().channel_id().is_none() + let good_channel_port_ids = existing_channel.counterparty().channel_id().is_none() || existing_channel.counterparty().channel_id() - == expected_channel.counterparty().channel_id(); + == expected_channel.counterparty().channel_id() + && existing_channel.counterparty().port_id() + == expected_channel.counterparty().port_id(); // TODO: Check versions - if good_state && good_connection_hops && good_channel_ids { + if good_state && good_connection_hops && good_channel_port_ids { Ok(()) } else { Err(ChannelError::Failed(format!( diff --git a/relayer/src/config.rs b/relayer/src/config.rs index 7e0a212d8f..b269ae252b 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -68,15 +68,18 @@ impl Config { } } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub enum Strategy { - #[serde(rename = "naive")] - Naive, + #[serde(rename = "packets")] + Packets, + + #[serde(rename = "all")] + HandshakeAndPackets, } impl Default for Strategy { fn default() -> Self { - Self::Naive + Self::Packets } } diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 909485e4a5..9b164ac97e 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -245,12 +245,22 @@ impl RelayPath { &self.channel.dst_port_id() } - pub fn src_channel_id(&self) -> &ChannelId { - &self.channel.src_channel_id() + pub fn src_channel_id(&self) -> Result<&ChannelId, LinkError> { + self.channel.src_channel_id().ok_or_else(|| { + LinkError::Failed(format!( + "channel_id on source chain '{}' is 'None'", + self.src_chain().id() + )) + }) } - pub fn dst_channel_id(&self) -> &ChannelId { - &self.channel.dst_channel_id() + pub fn dst_channel_id(&self) -> Result<&ChannelId, LinkError> { + self.channel.dst_channel_id().ok_or_else(|| { + LinkError::Failed(format!( + "channel_id on destination chain '{}' is 'None'", + self.dst_chain().id() + )) + }) } pub fn channel(&self) -> &Channel { @@ -260,14 +270,14 @@ impl RelayPath { fn src_channel(&self, height: Height) -> Result { Ok(self .src_chain() - .query_channel(self.src_port_id(), self.src_channel_id(), height) + .query_channel(self.src_port_id(), self.src_channel_id()?, height) .map_err(|e| ChannelError::QueryError(self.src_chain().id(), e))?) } fn dst_channel(&self, height: Height) -> Result { Ok(self .dst_chain() - .query_channel(self.dst_port_id(), self.dst_channel_id(), height) + .query_channel(self.dst_port_id(), self.dst_channel_id()?, height) .map_err(|e| ChannelError::QueryError(self.src_chain().id(), e))?) } @@ -320,15 +330,16 @@ impl RelayPath { } fn build_chan_close_confirm_from_event(&self, event: &IbcEvent) -> Result { + let src_channel_id = self.src_channel_id()?; let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), self.src_channel_id(), event.height()) + .build_channel_proofs(self.src_port_id(), src_channel_id, event.height()) .map_err(|e| ChannelError::Failed(format!("failed to build channel proofs: {}", e)))?; // Build the domain type message let new_msg = MsgChannelCloseConfirm { port_id: self.dst_port_id().clone(), - channel_id: self.dst_channel_id().clone(), + channel_id: src_channel_id.clone(), proofs, signer: self.dst_signer()?, }; @@ -341,31 +352,37 @@ impl RelayPath { fn filter_events(&self, events: &[IbcEvent]) -> Vec { let mut result = vec![]; + let src_channel_id = if let Ok(some_id) = self.src_channel_id() { + some_id + } else { + return vec![]; + }; + for event in events.iter() { match event { IbcEvent::SendPacket(send_packet_ev) => { - if self.src_channel_id() == &send_packet_ev.packet.source_channel - && self.src_port_id() == &send_packet_ev.packet.source_port + if src_channel_id == send_packet_ev.src_channel_id() + && self.src_port_id() == send_packet_ev.src_port_id() { result.push(event.clone()); } } IbcEvent::WriteAcknowledgement(write_ack_ev) => { - if self.channel.src_channel_id() == &write_ack_ev.packet.destination_channel - && self.channel.src_port_id() == &write_ack_ev.packet.destination_port + if src_channel_id == write_ack_ev.dst_channel_id() + && self.src_port_id() == write_ack_ev.dst_port_id() { result.push(event.clone()); } } IbcEvent::CloseInitChannel(chan_close_ev) => { - if self.channel.src_channel_id() == chan_close_ev.channel_id() - && self.channel.src_port_id() == chan_close_ev.port_id() + if src_channel_id == chan_close_ev.channel_id() + && self.src_port_id() == chan_close_ev.port_id() { result.push(event.clone()); } } IbcEvent::TimeoutPacket(timeout_ev) => { - if self.channel.src_channel_id() == timeout_ev.src_channel_id() + if src_channel_id == timeout_ev.src_channel_id() && self.channel.src_port_id() == timeout_ev.src_port_id() { result.push(event.clone()); @@ -728,7 +745,7 @@ impl RelayPath { self.dst_chain() .query_unreceived_packets(QueryUnreceivedPacketsRequest { port_id: self.dst_port_id().to_string(), - channel_id: self.dst_channel_id().to_string(), + channel_id: self.dst_channel_id()?.to_string(), packet_commitment_sequences: vec![packet.sequence.into()], })?; @@ -741,7 +758,7 @@ impl RelayPath { let (bytes, _) = self.src_chain().build_packet_proofs( PacketMsgType::Recv, self.src_port_id(), - self.src_channel_id(), + self.src_channel_id()?, packet.sequence, Height::zero(), )?; @@ -763,7 +780,7 @@ impl RelayPath { self.dst_chain() .query_unreceived_acknowledgement(QueryUnreceivedAcksRequest { port_id: self.dst_port_id().to_string(), - channel_id: self.dst_channel_id().to_string(), + channel_id: self.dst_channel_id()?.to_string(), packet_ack_sequences: vec![packet.sequence.into()], })?; @@ -882,10 +899,12 @@ impl RelayPath { ) -> Result<(Vec, Height), LinkError> { let mut events_result = vec![]; + let src_channel_id = self.src_channel_id()?; + // Query packet commitments on source chain that have not been acknowledged let pc_request = QueryPacketCommitmentsRequest { port_id: self.src_port_id().to_string(), - channel_id: self.src_channel_id().to_string(), + channel_id: src_channel_id.to_string(), pagination: ibc_proto::cosmos::base::query::pagination::all(), }; let (packet_commitments, src_response_height) = @@ -907,7 +926,7 @@ impl RelayPath { // Get the packets that have not been received on destination chain let request = QueryUnreceivedPacketsRequest { port_id: self.dst_port_id().to_string(), - channel_id: self.dst_channel_id().to_string(), + channel_id: self.dst_channel_id()?.to_string(), packet_commitment_sequences: commit_sequences, }; @@ -933,9 +952,9 @@ impl RelayPath { let query = QueryTxRequest::Packet(QueryPacketEventDataRequest { event_id: IbcEventType::SendPacket, source_port_id: self.src_port_id().clone(), - source_channel_id: self.src_channel_id().clone(), + source_channel_id: src_channel_id.clone(), destination_port_id: self.dst_port_id().clone(), - destination_channel_id: self.dst_channel_id().clone(), + destination_channel_id: self.dst_channel_id()?.clone(), sequences, height: query_height, }); @@ -961,10 +980,13 @@ impl RelayPath { ) -> Result<(Vec, Height), LinkError> { let mut events_result = vec![]; + let src_channel_id = self.src_channel_id()?; + let dst_channel_id = self.dst_channel_id()?; + // Get the sequences of packets that have been acknowledged on source let pc_request = QueryPacketAcknowledgementsRequest { port_id: self.src_port_id().to_string(), - channel_id: self.src_channel_id().to_string(), + channel_id: src_channel_id.to_string(), pagination: ibc_proto::cosmos::base::query::pagination::all(), }; let (acks_on_source, src_response_height) = self @@ -988,7 +1010,7 @@ impl RelayPath { let request = QueryUnreceivedAcksRequest { port_id: self.dst_port_id().to_string(), - channel_id: self.dst_channel_id().to_string(), + channel_id: dst_channel_id.to_string(), packet_ack_sequences: acked_sequences, }; @@ -1016,9 +1038,9 @@ impl RelayPath { .query_txs(QueryTxRequest::Packet(QueryPacketEventDataRequest { event_id: IbcEventType::WriteAck, source_port_id: self.dst_port_id().clone(), - source_channel_id: self.dst_channel_id().clone(), + source_channel_id: dst_channel_id.clone(), destination_port_id: self.src_port_id().clone(), - destination_channel_id: self.src_channel_id().clone(), + destination_channel_id: src_channel_id.clone(), sequences, height: query_height, })) @@ -1149,12 +1171,14 @@ impl RelayPath { packet: &Packet, height: Height, ) -> Result, LinkError> { + let dst_channel_id = self.dst_channel_id()?; + let (packet_type, next_sequence_received) = if self.ordered_channel() { let next_seq = self .dst_chain() .query_next_sequence_receive(QueryNextSequenceReceiveRequest { port_id: self.dst_port_id().to_string(), - channel_id: self.dst_channel_id().to_string(), + channel_id: dst_channel_id.to_string(), }) .map_err(|e| ChannelError::QueryError(self.dst_chain().id(), e))?; (PacketMsgType::TimeoutOrdered, next_seq) @@ -1495,12 +1519,17 @@ impl RelayPath { impl fmt::Display for RelayPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let channel_id = self + .src_channel_id() + .map(ToString::to_string) + .unwrap_or_else(|_| "None".to_string()); + write!( f, "{}:{}/{} -> {}", self.src_chain().id(), self.src_port_id(), - self.src_channel_id(), + channel_id, self.dst_chain().id() ) } @@ -1527,35 +1556,31 @@ impl Link { } pub fn is_closed(&self) -> Result { + let a_channel_id = self.a_to_b.src_channel_id()?; + let a_channel = self .a_to_b .src_chain() - .query_channel( - self.a_to_b.src_port_id(), - self.a_to_b.src_channel_id(), - Height::default(), - ) + .query_channel(self.a_to_b.src_port_id(), a_channel_id, Height::default()) .map_err(|e| { LinkError::Failed(format!( "channel {} does not exist on chain {}; context={}", - self.a_to_b.src_channel_id(), + a_channel_id, self.a_to_b.src_chain().id(), e )) })?; + let b_channel_id = self.a_to_b.dst_channel_id()?; + let b_channel = self .a_to_b .dst_chain() - .query_channel( - self.a_to_b.dst_port_id(), - self.a_to_b.dst_channel_id(), - Height::default(), - ) + .query_channel(self.a_to_b.dst_port_id(), b_channel_id, Height::default()) .map_err(|e| { LinkError::Failed(format!( "channel {} does not exist on chain {}; context={}", - self.a_to_b.dst_channel_id(), + b_channel_id, self.a_to_b.dst_chain().id(), e )) @@ -1631,14 +1656,14 @@ impl Link { a_connection.client_id().clone(), a_connection_id, opts.src_port_id.clone(), - opts.src_channel_id.clone(), + Some(opts.src_channel_id.clone()), ), b_side: ChannelSide::new( b_chain, a_connection.counterparty().client_id().clone(), a_connection.counterparty().connection_id().unwrap().clone(), a_channel.counterparty().port_id.clone(), - b_channel_id, + Some(b_channel_id), ), connection_delay: a_connection.delay_period(), version: None, diff --git a/relayer/src/object.rs b/relayer/src/object.rs index 7462af1eb6..087473936b 100644 --- a/relayer/src/object.rs +++ b/relayer/src/object.rs @@ -38,6 +38,31 @@ impl Client { } } +/// Channel +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct Channel { + /// Destination chain identifier. + pub dst_chain_id: ChainId, + + /// Source chain identifier. + pub src_chain_id: ChainId, + + /// Source channel identifier. + pub src_channel_id: ChannelId, + + /// Source port identifier. + pub src_port_id: PortId, +} + +impl Channel { + pub fn short_name(&self) -> String { + format!( + "{}/{}:{} -> {}", + self.src_channel_id, self.src_port_id, self.src_chain_id, self.dst_chain_id, + ) + } +} + /// A unidirectional path from a source chain, channel and port. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct UnidirectionalChannelPath { @@ -73,6 +98,8 @@ impl UnidirectionalChannelPath { pub enum Object { /// See [`Client`]. Client(Client), + /// See [`Channel`]. + Channel(Channel), /// See [`UnidirectionalChannelPath`]. UnidirectionalChannelPath(UnidirectionalChannelPath), } @@ -83,8 +110,9 @@ impl Object { /// Returns `false` otherwise. pub fn notify_new_block(&self, src_chain_id: &ChainId) -> bool { match self { - Object::UnidirectionalChannelPath(p) => p.src_chain_id == *src_chain_id, Object::Client(_) => false, + Object::Channel(c) => c.src_chain_id == *src_chain_id, + Object::UnidirectionalChannelPath(p) => p.src_chain_id == *src_chain_id, } } } @@ -95,6 +123,12 @@ impl From for Object { } } +impl From for Object { + fn from(c: Channel) -> Self { + Self::Channel(c) + } +} + impl From for Object { fn from(p: UnidirectionalChannelPath) -> Self { Self::UnidirectionalChannelPath(p) @@ -106,6 +140,7 @@ impl Object { match self { Self::Client(ref client) => &client.src_chain_id, Self::UnidirectionalChannelPath(ref path) => &path.src_chain_id, + Self::Channel(ref channel) => &channel.src_chain_id, } } @@ -113,6 +148,7 @@ impl Object { match self { Self::Client(ref client) => &client.dst_chain_id, Self::UnidirectionalChannelPath(ref path) => &path.dst_chain_id, + Self::Channel(ref channel) => &channel.dst_chain_id, } } @@ -120,6 +156,7 @@ impl Object { match self { Self::Client(ref client) => client.short_name(), Self::UnidirectionalChannelPath(ref path) => path.short_name(), + Self::Channel(ref channel) => channel.short_name(), } } @@ -149,14 +186,13 @@ impl Object { } /// Build the client object associated with the given channel event attributes. - pub fn for_chan_open_events( + pub fn client_from_chan_open_events( e: &Attributes, // The attributes of the emitted event chain: &dyn ChainHandle, // The chain which emitted the event ) -> Result { let channel_id = e .channel_id() - .as_ref() - .ok_or_else(|| format!("channel_id missing in OpenAck event '{:?}'", e))?; + .ok_or_else(|| format!("channel_id missing in channel open event '{:?}'", e))?; let client = channel_connection_client(chain, e.port_id(), channel_id)?.client; if client.client_state.refresh_period().is_none() { @@ -176,6 +212,27 @@ impl Object { .into()) } + /// Build the Channel object associated with the given [`Open`] channel event. + pub fn channel_from_chan_open_events( + e: &Attributes, + src_chain: &dyn ChainHandle, + ) -> Result { + let channel_id = e + .channel_id() + .ok_or_else(|| format!("channel_id missing in OpenInit event '{:?}'", e))?; + + let dst_chain_id = get_counterparty_chain(src_chain, channel_id, &e.port_id()) + .map_err(|_| "dest chain missing in init".to_string())?; + + Ok(Channel { + dst_chain_id, + src_chain_id: src_chain.id(), + src_channel_id: channel_id.clone(), + src_port_id: e.port_id().clone(), + } + .into()) + } + /// Build the object associated with the given [`SendPacket`] event. pub fn for_send_packet(e: &SendPacket, src_chain: &dyn ChainHandle) -> Result { let dst_chain_id = diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 6910dec96d..f0fb7e8d9c 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -7,11 +7,8 @@ use itertools::Itertools; use tracing::{debug, error, trace, warn}; use ibc::{ - events::{IbcEvent, VecIbcEvents}, - ics02_client::client_state::ClientState, - ics04_channel::channel::IdentifiedChannelEnd, - ics24_host::identifier::ChainId, - Height, + events::IbcEvent, ics02_client::client_state::ClientState, + ics04_channel::channel::IdentifiedChannelEnd, ics24_host::identifier::ChainId, Height, }; use ibc_proto::ibc::core::channel::v1::QueryChannelsRequest; @@ -23,13 +20,15 @@ use crate::{ self, monitor::{EventBatch, UnwrapOrClone}, }, - object::{Client, Object, UnidirectionalChannelPath}, + object::{Channel, Client, Object, UnidirectionalChannelPath}, registry::Registry, util::try_recv_multiple, worker::{WorkerMap, WorkerMsg}, }; -mod error; +pub mod error; +use crate::chain::counterparty::channel_state_on_destination; +use crate::config::Strategy; pub use error::Error; /// The supervisor listens for events on multiple pairs of chains, @@ -56,6 +55,10 @@ impl Supervisor { }) } + fn handshake_enabled(&self) -> bool { + self.config.global.strategy == Strategy::HandshakeAndPackets + } + /// Collect the events we are interested in from an [`EventBatch`], /// and maps each [`IbcEvent`] to their corresponding [`Object`]. pub fn collect_events( @@ -78,20 +81,58 @@ impl Supervisor { } } } + + IbcEvent::OpenInitChannel(..) | IbcEvent::OpenTryChannel(..) => { + if !self.handshake_enabled() { + continue; + } + + let object = event + .channel_attributes() + .map(|attr| Object::channel_from_chan_open_events(attr, src_chain)); + + if let Some(Ok(object)) = object { + collected.per_object.entry(object).or_default().push(event); + } + } + IbcEvent::OpenAckChannel(ref open_ack) => { // Create client worker here as channel end must be opened if let Ok(object) = - Object::for_chan_open_events(open_ack.attributes(), src_chain) + Object::client_from_chan_open_events(open_ack.attributes(), src_chain) { - collected.per_object.entry(object).or_default().push(event); + collected + .per_object + .entry(object) + .or_default() + .push(event.clone()); + } + + if !self.handshake_enabled() { + continue; + } + + if let Ok(channel_object) = Object::channel_from_chan_open_events( + event.clone().channel_attributes().unwrap(), + src_chain, + ) { + collected + .per_object + .entry(channel_object) + .or_default() + .push(event); } } IbcEvent::OpenConfirmChannel(ref open_confirm) => { // Create client worker here as channel end must be opened if let Ok(object) = - Object::for_chan_open_events(open_confirm.attributes(), src_chain) + Object::client_from_chan_open_events(open_confirm.attributes(), src_chain) { - collected.per_object.entry(object).or_default().push(event); + collected + .per_object + .entry(object) + .or_default() + .push(event.clone()); } } IbcEvent::SendPacket(ref packet) => { @@ -183,14 +224,22 @@ impl Supervisor { let client_res = channel_connection_client(chain.as_ref(), &channel.port_id, &channel.channel_id); - let client = match client_res { - Ok(conn_client) => conn_client.client, - Err(Error::ConnectionNotOpen(..)) | Err(Error::ChannelNotOpen(..)) => { + let (client, connection, channel) = match client_res { + Ok(conn_client) => { + trace!("channel, connection, client {:?}", conn_client); + ( + conn_client.client, + conn_client.connection, + conn_client.channel, + ) + } + Err(Error::ConnectionNotOpen(..)) | Err(Error::ChannelUninitialized(..)) => { // These errors are silent. // Simply ignore the channel and return without spawning the workers. warn!( - "ignoring channel {} because it is not open (or its connection is not open)", - channel.channel_id + " client and packet relay workers not spawned for channel/chain pair '{}'/'{}' because channel is not open", + channel.channel_id, + chain.id(), ); return Ok(()); @@ -207,8 +256,6 @@ impl Supervisor { } }; - trace!("Obtained client id {:?}", client.client_id); - if self .config .find_chain(&client.client_state.chain_id()) @@ -222,29 +269,54 @@ impl Supervisor { .registry .get_or_spawn(&client.client_state.chain_id())?; - // create the client object and spawn worker - let client_object = Object::Client(Client { - dst_client_id: client.client_id.clone(), - dst_chain_id: chain.id(), - src_chain_id: client.client_state.chain_id(), - }); - - self.workers - .get_or_spawn(client_object, counterparty_chain.clone(), chain.clone()); - - // TODO: Only start the Uni worker if there are outstanding packets or ACKs. - // https://github.com/informalsystems/ibc-rs/issues/901 - // create the path object and spawn worker - let path_object = Object::UnidirectionalChannelPath(UnidirectionalChannelPath { - dst_chain_id: counterparty_chain.id(), - src_chain_id: chain.id(), - src_channel_id: channel.channel_id.clone(), - src_port_id: channel.port_id, - }); - - self.workers - .get_or_spawn(path_object, chain.clone(), counterparty_chain.clone()); - + let chan_state_src = channel.channel_end.state; + let chan_state_dst = + channel_state_on_destination(channel.clone(), connection, counterparty_chain.as_ref())?; + + debug!( + "channel {} on chain {} is: {}; state on dest. chain ({}) is: {}", + channel.channel_id, + chain.id(), + chan_state_src, + counterparty_chain.id(), + chan_state_dst + ); + if chan_state_src.is_open() && chan_state_dst.is_open() { + // create the client object and spawn worker + let client_object = Object::Client(Client { + dst_client_id: client.client_id.clone(), + dst_chain_id: chain.id(), + src_chain_id: client.client_state.chain_id(), + }); + self.workers + .get_or_spawn(client_object, counterparty_chain.clone(), chain.clone()); + + // TODO: Only start the Uni worker if there are outstanding packets or ACKs. + // https://github.com/informalsystems/ibc-rs/issues/901 + // create the path object and spawn worker + let path_object = Object::UnidirectionalChannelPath(UnidirectionalChannelPath { + dst_chain_id: counterparty_chain.clone().id(), + src_chain_id: chain.id(), + src_channel_id: channel.channel_id.clone(), + src_port_id: channel.port_id, + }); + self.workers + .get_or_spawn(path_object, chain.clone(), counterparty_chain.clone()); + } else if !chan_state_dst.is_open() + && chan_state_dst as u32 <= chan_state_src as u32 + && self.handshake_enabled() + { + // create worker for channel handshake that will advance the remote state + let channel_object = Object::Channel(Channel { + dst_chain_id: counterparty_chain.clone().id(), + src_chain_id: chain.id(), + src_channel_id: channel.channel_id.clone(), + src_port_id: channel.port_id, + }); + + self.workers + .get_or_spawn(channel_object, chain.clone(), counterparty_chain.clone()); + } Ok(()) } @@ -342,13 +414,6 @@ impl Supervisor { continue; } - debug!( - "chain {} sent {} for object {:?}", - chain_id, - VecIbcEvents(events.clone()), - object, - ); - let src = self.registry.get_or_spawn(object.src_chain_id())?; let dst = self.registry.get_or_spawn(object.dst_chain_id())?; diff --git a/relayer/src/supervisor/error.rs b/relayer/src/supervisor/error.rs index e397235976..89d664a32e 100644 --- a/relayer/src/supervisor/error.rs +++ b/relayer/src/supervisor/error.rs @@ -1,11 +1,15 @@ use thiserror::Error; +use ibc::ics03_connection::connection::Counterparty; use ibc::ics24_host::identifier::{ChainId, ChannelId, ConnectionId}; #[derive(Clone, Debug, Error, PartialEq, Eq)] pub enum Error { #[error("channel {0} on chain {1} is not open")] - ChannelNotOpen(ChannelId, ChainId), + ChannelUninitialized(ChannelId, ChainId), + + #[error("channel {0} on chain {1} has a connection with uninitialized counterparty {:2}")] + ChannelConnectionUninitialized(ChannelId, ChainId, Counterparty), #[error("connection {0} (underlying channel {1}) on chain {2} is not open")] ConnectionNotOpen(ConnectionId, ChannelId, ChainId), diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index 813c111bf2..a2ec1c95b0 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -13,11 +13,14 @@ pub use handle::WorkerHandle; mod cmd; pub use cmd::WorkerCmd; +mod map; +pub use map::WorkerMap; + mod client; pub use client::ClientWorker; -mod map; -pub use map::WorkerMap; +mod channel; +pub use channel::ChannelWorker; mod uni_chan_path; pub use uni_chan_path::UniChanPathWorker; @@ -30,6 +33,7 @@ pub enum WorkerMsg { /// A worker processes batches of events associated with a given [`Object`]. pub enum Worker { Client(ClientWorker), + Channel(ChannelWorker), UniChanPath(UniChanPathWorker), } @@ -58,6 +62,7 @@ impl Worker { let worker = match object { Object::Client(client) => Self::Client(ClientWorker::new(client, chains, cmd_rx)), + Object::Channel(channel) => Self::Channel(ChannelWorker::new(channel, chains, cmd_rx)), Object::UnidirectionalChannelPath(path) => { Self::UniChanPath(UniChanPathWorker::new(path, chains, cmd_rx)) } @@ -73,6 +78,7 @@ impl Worker { let result = match self { Self::Client(w) => w.run(), + Self::Channel(w) => w.run(), Self::UniChanPath(w) => w.run(), }; @@ -90,6 +96,7 @@ impl Worker { fn chains(&self) -> &ChainHandlePair { match self { Self::Client(w) => &w.chains(), + Self::Channel(w) => w.chains(), Self::UniChanPath(w) => w.chains(), } } @@ -97,6 +104,7 @@ impl Worker { fn object(&self) -> Object { match self { Worker::Client(w) => w.object().clone().into(), + Worker::Channel(w) => w.object().clone().into(), Worker::UniChanPath(w) => w.object().clone().into(), } } diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs new file mode 100644 index 0000000000..3714845a53 --- /dev/null +++ b/relayer/src/worker/channel.rs @@ -0,0 +1,114 @@ +use std::{thread, time::Duration}; + +use anomaly::BoxError; +use crossbeam_channel::Receiver; +use tracing::{debug, warn}; + +use crate::channel::Channel as RelayChannel; +use crate::{ + chain::handle::ChainHandlePair, object::Channel, util::retry::retry_with_index, + worker::retry_strategy, +}; + +use super::WorkerCmd; + +pub struct ChannelWorker { + channel: Channel, + chains: ChainHandlePair, + cmd_rx: Receiver, +} + +impl ChannelWorker { + pub fn new(channel: Channel, chains: ChainHandlePair, cmd_rx: Receiver) -> Self { + Self { + channel, + chains, + cmd_rx, + } + } + + /// Run the event loop for events associated with a [`Channel`]. + pub(crate) fn run(self) -> Result<(), BoxError> { + let a_chain = self.chains.a.clone(); + let b_chain = self.chains.b.clone(); + + let mut handshake_channel; + + // Flag that indicates if the worker should actively resume handshake. + // Set on start or when event based handshake fails. + let mut resume_handshake = true; + + loop { + thread::sleep(Duration::from_millis(200)); + + if let Ok(cmd) = self.cmd_rx.try_recv() { + let result = match cmd { + WorkerCmd::IbcEvents { batch } => { + // there can be up to two event for this channel, e.g. init and try. + // process the last event, the one with highest "rank". + let last_event = batch.events.last(); + debug!("channel worker starts processing {:#?}", last_event); + match last_event { + Some(event) => { + handshake_channel = RelayChannel::restore_from_event( + a_chain.clone(), + b_chain.clone(), + event.clone(), + )?; + retry_with_index( + retry_strategy::worker_default_strategy(), + |index| handshake_channel.step_event(event.clone(), index), + ) + } + None => Ok(()), + } + } + WorkerCmd::NewBlock { + height: current_height, + new_block: _, + } => { + if !resume_handshake { + continue; + } + debug!( + "channel worker starts processing block event at {:#?}", + current_height + ); + + let height = current_height.decrement()?; + + let (mut handshake_channel, state) = RelayChannel::restore_from_state( + a_chain.clone(), + b_chain.clone(), + self.channel.clone(), + height, + )?; + + retry_with_index(retry_strategy::worker_default_strategy(), |index| { + handshake_channel.step_state(state, index) + }) + } + }; + + if let Err(retries) = result { + warn!("Channel worker failed after {} retries", retries); + + // Resume handshake on next iteration. + resume_handshake = true; + } else { + resume_handshake = false; + } + } + } + } + + /// Get a reference to the uni chan path worker's chains. + pub fn chains(&self) -> &ChainHandlePair { + &self.chains + } + + /// Get a reference to the client worker's object. + pub fn object(&self) -> &Channel { + &self.channel + } +} diff --git a/relayer/src/worker/handle.rs b/relayer/src/worker/handle.rs index 5546989ade..4d95a33c53 100644 --- a/relayer/src/worker/handle.rs +++ b/relayer/src/worker/handle.rs @@ -32,7 +32,7 @@ impl WorkerHandle { Self { tx, thread_handle } } - /// Send a batch of packet events to the worker. + /// Send a batch of events to the worker. pub fn send_events( &self, height: Height, diff --git a/relayer/src/worker/retry_strategy.rs b/relayer/src/worker/retry_strategy.rs index 56f56b10de..be71a6f15e 100644 --- a/relayer/src/worker/retry_strategy.rs +++ b/relayer/src/worker/retry_strategy.rs @@ -6,7 +6,7 @@ const DELAY_INCR: Duration = Duration::from_millis(100); const INITIAL_DELAY: Duration = Duration::from_millis(200); const MAX_RETRY_DURATION: Duration = Duration::from_secs(2); -pub fn uni_chan_path() -> impl Iterator { +pub fn worker_default_strategy() -> impl Iterator { let strategy = ConstantGrowth::new(INITIAL_DELAY, DELAY_INCR); clamp_total(strategy, MAX_DELAY, MAX_RETRY_DURATION) } diff --git a/relayer/src/worker/uni_chan_path.rs b/relayer/src/worker/uni_chan_path.rs index 89d7a11028..6a5243edf2 100644 --- a/relayer/src/worker/uni_chan_path.rs +++ b/relayer/src/worker/uni_chan_path.rs @@ -55,7 +55,7 @@ impl UniChanPathWorker { loop { thread::sleep(Duration::from_millis(200)); - let result = retry_with_index(retry_strategy::uni_chan_path(), |index| { + let result = retry_with_index(retry_strategy::worker_default_strategy(), |index| { Self::step(rx.try_recv().ok(), &mut link, index) }); diff --git a/relayer/tests/config/fixtures/relayer_conf_example.toml b/relayer/tests/config/fixtures/relayer_conf_example.toml index 234c133427..fbb92571ce 100644 --- a/relayer/tests/config/fixtures/relayer_conf_example.toml +++ b/relayer/tests/config/fixtures/relayer_conf_example.toml @@ -1,5 +1,5 @@ [global] -strategy = 'naive' +strategy = 'packets' log_level = 'error' [[chains]] diff --git a/relayer/tests/config/fixtures/simple_config.toml b/relayer/tests/config/fixtures/simple_config.toml index bd0bd6a8b2..ee61033cbd 100644 --- a/relayer/tests/config/fixtures/simple_config.toml +++ b/relayer/tests/config/fixtures/simple_config.toml @@ -3,7 +3,7 @@ title = "IBC Relayer Config Example" [global] -strategy = "naive" +strategy = "packets" [[chains]] id = "ibc-test"