diff --git a/cmd/query.go b/cmd/query.go index 4ea1564af76..23aee31472e 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -634,12 +634,12 @@ func queryUnrelayed() *cobra.Command { return err } - hs, err := relayer.UpdatesWithHeaders(c[src], c[dst]) + sh, err := relayer.NewSyncHeaders(c[src], c[dst]) if err != nil { return err } - sp, err := relayer.UnrelayedSequences(c[src], c[dst], hs[src].Height, hs[dst].Height) + sp, err := relayer.UnrelayedSequences(c[src], c[dst], sh) if err != nil { return err } diff --git a/cmd/tx.go b/cmd/tx.go index 379992f3c5a..54ba1a4c458 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -188,7 +188,17 @@ func relayMsgsCmd() *cobra.Command { return err } - if err = relayer.RelayUnRelayedPacketsOrderedChan(c[src], c[dst]); err != nil { + sh, err := relayer.NewSyncHeaders(c[src], c[dst]) + if err != nil { + return err + } + + sp, err := relayer.UnrelayedSequences(c[src], c[dst], sh) + if err != nil { + return err + } + + if err = relayer.RelayPacketsOrderedChan(c[src], c[dst], sh, sp); err != nil { return err } diff --git a/go.mod b/go.mod index 81302fb84ac..e4ab99e42a6 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/btcsuite/btcd v0.0.0-20190807005414-4063feeff79a // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/containerd/continuity v0.0.0-20200228182428-0f16d7a0959c // indirect - github.com/cosmos/cosmos-sdk v0.34.4-0.20200417201027-11528d39594c + github.com/cosmos/cosmos-sdk v0.34.4-0.20200419154345-84774907316c github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d github.com/gorilla/mux v1.7.4 github.com/ory/dockertest/v3 v3.5.5 diff --git a/go.sum b/go.sum index fa5382be6e0..cd3a7cf9d55 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cosmos/cosmos-sdk v0.34.4-0.20200417201027-11528d39594c h1:Yd7DVqfImC4YZi6oRtGaTMaufgzdaYgD2ydkMhz3/qQ= github.com/cosmos/cosmos-sdk v0.34.4-0.20200417201027-11528d39594c/go.mod h1:jngw1LJfwEAU+xc/tZwUGIXBAJHapckYGtXycsq8D+U= +github.com/cosmos/cosmos-sdk v0.34.4-0.20200419154345-84774907316c h1:QZgeN76JbDNEOko3EoI8FeN2IwPS7dPiap18E7O5Q+U= +github.com/cosmos/cosmos-sdk v0.34.4-0.20200419154345-84774907316c/go.mod h1:jngw1LJfwEAU+xc/tZwUGIXBAJHapckYGtXycsq8D+U= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d h1:49RLWk1j44Xu4fjHb6JFYmeUnDORVwHNkDxaQ0ctCVU= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y= github.com/cosmos/ledger-cosmos-go v0.11.1 h1:9JIYsGnXP613pb2vPjFeMMjBI5lEDsEaF6oYorTy6J4= diff --git a/relayer/channel-tx.go b/relayer/channel-tx.go index 9233837a7eb..01c624c665c 100644 --- a/relayer/channel-tx.go +++ b/relayer/channel-tx.go @@ -19,6 +19,7 @@ func (src *Chain) CreateChannel(dst *Chain, ordered bool, to time.Duration) erro } ticker := time.NewTicker(to) + failures := 0 for ; true; <-ticker.C { chanSteps, err := src.CreateChannelStep(dst, order) if err != nil { @@ -29,7 +30,12 @@ func (src *Chain) CreateChannel(dst *Chain, ordered bool, to time.Duration) erro break } - if chanSteps.Send(src, dst); chanSteps.success && chanSteps.last { + chanSteps.Send(src, dst) + + switch { + // In the case of success and this being the last transaction + // debug logging, log created connection and break + case chanSteps.success && chanSteps.last: chans, err := QueryChannelPair(src, dst, 0, 0) if err != nil { return err @@ -41,6 +47,19 @@ func (src *Chain) CreateChannel(dst *Chain, ordered bool, to time.Duration) erro src.ChainID, src.PathEnd.ChannelID, src.PathEnd.PortID, dst.ChainID, dst.PathEnd.ChannelID, dst.PathEnd.PortID)) break + // In the case of success, reset the failures counter + case chanSteps.success: + failures = 0 + continue + // In the case of failure, increment the failures counter and exit if this is the 3rd failure + case !chanSteps.success: + failures++ + if failures > 2 { + src.Error(fmt.Errorf("! Channel failed: [%s]chan{%s}port{%s} -> [%s]chan{%s}port{%s}", + src.ChainID, src.PathEnd.ChannelID, src.PathEnd.PortID, + dst.ChainID, dst.PathEnd.ChannelID, dst.PathEnd.PortID)) + break + } } } diff --git a/relayer/connection-tx.go b/relayer/connection-tx.go index 2dd01789376..ae79c8eec9b 100644 --- a/relayer/connection-tx.go +++ b/relayer/connection-tx.go @@ -12,6 +12,7 @@ import ( // TODO: add max retries or something to this function func (src *Chain) CreateConnection(dst *Chain, to time.Duration) error { ticker := time.NewTicker(to) + failed := 0 for ; true; <-ticker.C { connSteps, err := src.CreateConnectionStep(dst) if err != nil { @@ -22,13 +23,17 @@ func (src *Chain) CreateConnection(dst *Chain, to time.Duration) error { break } - if connSteps.Send(src, dst); connSteps.success && connSteps.last { - conns, err := QueryConnectionPair(src, dst, 0, 0) - if err != nil { - return err - } + connSteps.Send(src, dst) + switch { + // In the case of success and this being the last transaction + // debug logging, log created connection and break + case connSteps.success && connSteps.last: if src.debug { + conns, err := QueryConnectionPair(src, dst, 0, 0) + if err != nil { + return err + } logConnectionStates(src, dst, conns) } @@ -36,6 +41,19 @@ func (src *Chain) CreateConnection(dst *Chain, to time.Duration) error { src.ChainID, src.PathEnd.ClientID, src.PathEnd.ConnectionID, dst.ChainID, dst.PathEnd.ClientID, dst.PathEnd.ConnectionID)) break + // In the case of success, reset the failures counter + case connSteps.success: + failed = 0 + continue + // In the case of failure, increment the failures counter and exit if this is the 3rd failure + case !connSteps.success: + failed++ + if failed > 2 { + src.Error(fmt.Errorf("! Connection failed: [%s]client{%s}conn{%s} -> [%s]client{%s}conn{%s}", + src.ChainID, src.PathEnd.ClientID, src.PathEnd.ConnectionID, + dst.ChainID, dst.PathEnd.ClientID, dst.PathEnd.ConnectionID)) + break + } } } diff --git a/relayer/contextual.go b/relayer/contextual.go index 44124b76d5a..797ec922929 100644 --- a/relayer/contextual.go +++ b/relayer/contextual.go @@ -1,14 +1,10 @@ package relayer import ( - "sync" - "github.com/cosmos/cosmos-sdk/codec" stdcodec "github.com/cosmos/cosmos-sdk/codec/std" ) -var globalMutex sync.Mutex - type contextualStdCodec struct { *stdcodec.Codec useContext func() func() diff --git a/relayer/headers.go b/relayer/headers.go new file mode 100644 index 00000000000..9ce7aeee148 --- /dev/null +++ b/relayer/headers.go @@ -0,0 +1,51 @@ +package relayer + +import ( + "sync" + + tmclient "github.com/cosmos/cosmos-sdk/x/ibc/07-tendermint/types" +) + +// NewSyncHeaders returns a new instance of map[string]*tmclient.Header that can be easily +// kept "reasonably up to date" +func NewSyncHeaders(chains ...*Chain) (*SyncHeaders, error) { + mp, err := UpdatesWithHeaders(chains...) + if err != nil { + return nil, err + } + return &SyncHeaders{hds: mp}, nil +} + +// SyncHeaders is an instance of map[string]*tmclient.Header +// that can be kept "reasonably up to date" using it's Update method +type SyncHeaders struct { + sync.Mutex + + hds map[string]*tmclient.Header +} + +// Update the header for a given chain +func (uh *SyncHeaders) Update(c *Chain) error { + hd, err := c.UpdateLiteWithHeader() + if err != nil { + return err + } + uh.Lock() + defer uh.Unlock() + uh.hds[c.ChainID] = hd + return nil +} + +// GetHeader returns the latest header for a given chainID +func (uh *SyncHeaders) GetHeader(chainID string) *tmclient.Header { + uh.Lock() + defer uh.Unlock() + return uh.hds[chainID] +} + +// GetHeight returns the latest height for a given chainID +func (uh *SyncHeaders) GetHeight(chainID string) uint64 { + uh.Lock() + defer uh.Unlock() + return uh.hds[chainID].GetHeight() +} diff --git a/relayer/log-tx.go b/relayer/log-tx.go index 958d758dc6e..adda9c26cad 100644 --- a/relayer/log-tx.go +++ b/relayer/log-tx.go @@ -2,6 +2,8 @@ package relayer import ( "fmt" + "strconv" + "strings" sdk "github.com/cosmos/cosmos-sdk/types" connTypes "github.com/cosmos/cosmos-sdk/x/ibc/03-connection/types" @@ -75,8 +77,24 @@ func (c *Chain) logCreateClient(dst *Chain, dstH uint64) { func (c *Chain) logTx(events map[string][]string) { c.Log(fmt.Sprintf("• [%s]@{%d} - actions(%s) hash(%s)", c.ChainID, - getEventHeight(events), - actions(events["message.action"]), + getTxEventHeight(events), + getTxActions(events["message.action"]), events["tx.hash"][0]), ) } + +func getTxEventHeight(events map[string][]string) int64 { + if val, ok := events["tx.height"]; ok { + out, _ := strconv.ParseInt(val[0], 10, 64) + return out + } + return -1 +} + +func getTxActions(act []string) string { + out := "" + for i, a := range act { + out += fmt.Sprintf("%d:%s,", i, a) + } + return strings.TrimSuffix(out, ",") +} diff --git a/relayer/packet-tx.go b/relayer/packet-tx.go index a0ce681ddff..65501d1d660 100644 --- a/relayer/packet-tx.go +++ b/relayer/packet-tx.go @@ -20,43 +20,38 @@ var ( defaultUnbondingTime = time.Hour * 504 // 3 weeks in hours defaultPacketTimeout = 1000 defaultPacketQuery = "send_packet.packet_src_channel=%s&send_packet.packet_sequence=%d" + // defaultPacketAckQuery = "recv_packet.packet_src_channel=%s&recv_packet.packet_sequence=%d" ) -// RelayUnRelayedPacketsOrderedChan creates transactions to clear both queues -func RelayUnRelayedPacketsOrderedChan(src, dst *Chain) error { - // Update lite clients, headers to be used later - hs, err := UpdatesWithHeaders(src, dst) - if err != nil { - return err - } - - // find any unrelayed packets - sp, err := UnrelayedSequences(src, dst, hs[src.ChainID].Height-1, hs[dst.ChainID].Height-1) - if err != nil { - return err - } +// RelayPacketsOrderedChan creates transactions to clear both queues +// CONTRACT: the SyncHeaders passed in here must be up to date or being kept updated +func RelayPacketsOrderedChan(src, dst *Chain, sh *SyncHeaders, sp *RelaySequences) error { // create the appropriate update client messages msgs := &RelayMsgs{Src: []sdk.Msg{}, Dst: []sdk.Msg{}} if len(sp.Src) > 0 { - msgs.Dst = append(msgs.Dst, dst.PathEnd.UpdateClient(hs[src.ChainID], dst.MustGetAddress())) + msgs.Dst = append(msgs.Dst, dst.PathEnd.UpdateClient(sh.GetHeader(src.ChainID), dst.MustGetAddress())) } if len(sp.Dst) > 0 { - msgs.Src = append(msgs.Src, src.PathEnd.UpdateClient(hs[dst.ChainID], src.MustGetAddress())) + msgs.Src = append(msgs.Src, src.PathEnd.UpdateClient(sh.GetHeader(dst.ChainID), src.MustGetAddress())) } // add messages for src -> dst for _, seq := range sp.Src { - if err = addPacketMsg(src, dst, hs[src.ChainID], hs[dst.ChainID], seq, msgs, true); err != nil { + msg, err := packetMsgFromTxQuery(src, dst, sh, seq) + if err != nil { return err } + msgs.Dst = append(msgs.Dst, msg) } // add messages for dst -> src for _, seq := range sp.Dst { - if err = addPacketMsg(dst, src, hs[dst.ChainID], hs[src.ChainID], seq, msgs, false); err != nil { + msg, err := packetMsgFromTxQuery(dst, src, sh, seq) + if err != nil { return err } + msgs.Src = append(msgs.Src, msg) } if !msgs.Ready() { @@ -66,15 +61,12 @@ func RelayUnRelayedPacketsOrderedChan(src, dst *Chain) error { // TODO: increase the amount of gas as the number of messages increases // notify the user of that - if msgs.Send(src, dst); msgs.success { - src.Log(fmt.Sprintf("★ Clients updated: [%s]client(%s) and [%s]client(%s)", - src.ChainID, src.PathEnd.ClientID, dst.ChainID, dst.PathEnd.ClientID)) if len(msgs.Dst) > 1 { - src.logPacketsRelayed(dst, len(msgs.Dst)-1) + dst.logPacketsRelayed(src, len(msgs.Dst)-1) } if len(msgs.Src) > 1 { - dst.logPacketsRelayed(src, len(msgs.Src)-1) + src.logPacketsRelayed(dst, len(msgs.Src)-1) } } @@ -181,19 +173,8 @@ func (src *Chain) SendTransferBothSides(dst *Chain, amount sdk.Coin, dstAddr sdk seqRecv.NextSequenceRecv, timeoutHeight, xferPacket, - chanTypes.NewPacketResponse( - src.PathEnd.PortID, - src.PathEnd.ChannelID, - seqSend-1, - src.PathEnd.NewPacket( - dst.PathEnd, - seqSend-1, - xferPacket, - timeoutHeight, - ), - srcCommitRes.Proof.Proof, - int64(srcCommitRes.ProofHeight), - ), + srcCommitRes.Proof, + srcCommitRes.ProofHeight, dst.MustGetAddress(), ), }, @@ -237,84 +218,76 @@ func (src *Chain) SendTransferMsg(dst *Chain, amount sdk.Coin, dstAddr sdk.AccAd return nil } -func addPacketMsg(src, dst *Chain, srcH, dstH *tmclient.Header, seq uint64, msgs *RelayMsgs, source bool) error { +// packetMsgFromTxQuery returns a sdk.Msg to relay a packet with a given seq on src +func packetMsgFromTxQuery(src, dst *Chain, sh *SyncHeaders, seq uint64) (sdk.Msg, error) { eve, err := ParseEvents(fmt.Sprintf(defaultPacketQuery, src.PathEnd.ChannelID, seq)) if err != nil { - return err + return nil, err } - tx, err := src.QueryTxs(srcH.GetHeight(), 1, 1000, eve) + tx, err := src.QueryTxs(sh.GetHeight(src.ChainID), 1, 1000, eve) switch { case err != nil: - return err + return nil, err case tx.Count == 0: - return fmt.Errorf("no transactions returned with query") + return nil, fmt.Errorf("no transactions returned with query") case tx.Count > 1: - return fmt.Errorf("more than one transaction returned with query") + return nil, fmt.Errorf("more than one transaction returned with query") } - pd, to, qSeq, err := src.packetDataAndTimeoutFromQueryResponse(src, tx.Txs[0]) - if err != nil { - return err + rlyPackets, err := relayPacketFromQueryResponse(tx.Txs[0]) + switch { + case err != nil: + return nil, err + case len(rlyPackets) == 0: + return nil, fmt.Errorf("no relay msgs created from query response") + case len(rlyPackets) > 1: + return nil, fmt.Errorf("more than one relay msg found in tx query") } - if seq != qSeq { - return fmt.Errorf("Different sequence number from query (%d vs %d)", seq, qSeq) + // sanity check the sequence number against the one we are querying for + // TODO: move this into relayPacketFromQueryResponse? + if seq != rlyPackets[0].Seq() { + return nil, fmt.Errorf("Different sequence number from query (%d vs %d)", seq, rlyPackets[0].Seq()) } - var ( - srcCommitRes CommitmentResponse - ) - - if err = retry.Do(func() error { - srcCommitRes, err = src.QueryPacketCommitment(srcH.Height-1, int64(seq)) - if err != nil { - return err - } else if srcCommitRes.Proof.Proof == nil { - return fmt.Errorf("[%s]@{%d} - Packet Commitment Proof is nil seq(%d)", src.ChainID, srcH.Height-1, seq) - } - return nil - }); err != nil { - return err + // fetch the proof from the sending chain + if err = rlyPackets[0].FetchCommitResponse(dst, src, sh); err != nil { + return nil, err } - msg, err := dst.PacketMsg(src, pd, to, int64(seq), srcCommitRes) - if err != nil { - return err - } - - if source { - msgs.Dst = append(msgs.Dst, msg) - } else { - msgs.Src = append(msgs.Src, msg) - } - return nil + // return the sending msg + return rlyPackets[0].Msg(dst, src), nil } -func (src *Chain) packetDataAndTimeoutFromQueryResponse(dst *Chain, res sdk.TxResponse) (packetData []byte, timeout uint64, seq uint64, err error) { - // Set sdk config to use custom Bech32 account prefix - defer dst.UseSDKContext()() - +// relayPacketFromQueryResponse looks through the events in a sdk.Response +// and returns relayPackets with the appropriate data +func relayPacketFromQueryResponse(res sdk.TxResponse) (rlyPackets []relayPacket, err error) { for _, l := range res.Logs { for _, e := range l.Events { if e.Type == "send_packet" { + rp := &relayMsgRecvPacket{} for _, p := range e.Attributes { if p.Key == "packet_data" { - packetData = []byte(p.Value) + rp.packetData = []byte(p.Value) } if p.Key == "packet_timeout" { - timeout, _ = strconv.ParseUint(p.Value, 10, 64) + timeout, _ := strconv.ParseUint(p.Value, 10, 64) + rp.timeout = timeout } if p.Key == "packet_sequence" { - seq, _ = strconv.ParseUint(p.Value, 10, 64) - + seq, _ := strconv.ParseUint(p.Value, 10, 64) + rp.seq = seq } } - if packetData != nil && timeout != 0 { - return - } + rlyPackets = append(rlyPackets, rp) } } } - return nil, 0, 0, fmt.Errorf("no packet data found") + + if len(rlyPackets) > 0 { + return + } + + return nil, fmt.Errorf("no packet data found") } diff --git a/relayer/pathEnd.go b/relayer/pathEnd.go index c3bbbe2ca93..4d59d268a11 100644 --- a/relayer/pathEnd.go +++ b/relayer/pathEnd.go @@ -10,6 +10,7 @@ import ( chanTypes "github.com/cosmos/cosmos-sdk/x/ibc/04-channel/types" tmclient "github.com/cosmos/cosmos-sdk/x/ibc/07-tendermint/types" xferTypes "github.com/cosmos/cosmos-sdk/x/ibc/20-transfer/types" + commitmenttypes "github.com/cosmos/cosmos-sdk/x/ibc/23-commitment/types" ) // TODO: add Order chanTypes.Order as a property and wire it up in validation @@ -179,7 +180,7 @@ func (src *PathEnd) ChanCloseConfirm(dstChanState chanTypes.ChannelResponse, sig } // MsgRecvPacket creates a MsgPacket -func (src *PathEnd) MsgRecvPacket(dst *PathEnd, sequence, timeoutHeight uint64, packetData []byte, proof chanTypes.PacketResponse, signer sdk.AccAddress) sdk.Msg { +func (src *PathEnd) MsgRecvPacket(dst *PathEnd, sequence, timeoutHeight uint64, packetData []byte, proof commitmenttypes.MerkleProof, proofHeight uint64, signer sdk.AccAddress) sdk.Msg { return chanTypes.NewMsgPacket( dst.NewPacket( src, @@ -187,8 +188,8 @@ func (src *PathEnd) MsgRecvPacket(dst *PathEnd, sequence, timeoutHeight uint64, packetData, timeoutHeight, ), - proof.Proof, - proof.ProofHeight+1, + proof, + proofHeight+1, signer, ) } @@ -205,12 +206,17 @@ func (src *PathEnd) MsgTimeout(packet chanTypes.Packet, seq uint64, proof chanTy } // MsgAck creates MsgAck -func (src *PathEnd) MsgAck(packet chanTypes.Packet, ack []byte, proof chanTypes.PacketResponse, signer sdk.AccAddress) sdk.Msg { +func (src *PathEnd) MsgAck(dst *PathEnd, sequence, timeoutHeight uint64, ack []byte, proof commitmenttypes.MerkleProof, proofHeight uint64, signer sdk.AccAddress) sdk.Msg { return chanTypes.NewMsgAcknowledgement( - packet, + dst.NewPacket( + src, + sequence, + ack, + timeoutHeight, + ), ack, - proof.Proof, - proof.ProofHeight+1, + proof, + proofHeight+1, signer, ) } @@ -250,25 +256,14 @@ func (src *PathEnd) XferPacket(amount sdk.Coins, sender, reciever string) []byte } // PacketMsg returns a new MsgPacket for forwarding packets from one chain to another -func (src *Chain) PacketMsg(dst *Chain, xferPacket []byte, timeout uint64, seq int64, dstCommitRes CommitmentResponse) (sdk.Msg, error) { +func (src *Chain) PacketMsg(dst *Chain, xferPacket []byte, timeout uint64, seq int64, dstCommitRes CommitmentResponse) sdk.Msg { return src.PathEnd.MsgRecvPacket( dst.PathEnd, uint64(seq), timeout, xferPacket, - chanTypes.NewPacketResponse( - dst.PathEnd.PortID, - dst.PathEnd.ChannelID, - uint64(seq), - dst.PathEnd.NewPacket( - src.PathEnd, - uint64(seq), - xferPacket, - timeout, - ), - dstCommitRes.Proof.Proof, - int64(dstCommitRes.ProofHeight), - ), + dstCommitRes.Proof, + dstCommitRes.ProofHeight, src.MustGetAddress(), - ), nil + ) } diff --git a/relayer/query.go b/relayer/query.go index 66accc94f72..4ff908b1808 100644 --- a/relayer/query.go +++ b/relayer/query.go @@ -654,8 +654,8 @@ func newRlySeq(start, end uint64) []uint64 { } // UnrelayedSequences returns the unrelayed sequence numbers between two chains -func UnrelayedSequences(src, dst *Chain, srcH, dstH int64) (*RelaySequences, error) { - seqP, err := QueryNextSeqPairs(src, dst, srcH, dstH) +func UnrelayedSequences(src, dst *Chain, sh *SyncHeaders) (*RelaySequences, error) { + seqP, err := QueryNextSeqPairs(src, dst, sh) if err != nil { return nil, err } @@ -663,14 +663,14 @@ func UnrelayedSequences(src, dst *Chain, srcH, dstH int64) (*RelaySequences, err } // QueryNextSeqPairs returns a pair of chain's next sequences for the configured channel -func QueryNextSeqPairs(src, dst *Chain, srcH, dstH int64) (*SeqPairs, error) { +func QueryNextSeqPairs(src, dst *Chain, sh *SyncHeaders) (*SeqPairs, error) { sps := &SeqPairs{Src: &SeqPair{}, Dst: &SeqPair{}, errs: errs{}} var wg sync.WaitGroup wg.Add(4) - go src.queryNextSendWG(sps, srcH, &wg, true) - go src.queryNextRecvWG(sps, srcH, &wg, true) - go dst.queryNextSendWG(sps, dstH, &wg, false) - go dst.queryNextRecvWG(sps, dstH, &wg, false) + go src.queryNextSendWG(sps, int64(sh.GetHeight(src.ChainID)), &wg, true) + go src.queryNextRecvWG(sps, int64(sh.GetHeight(src.ChainID)), &wg, true) + go dst.queryNextSendWG(sps, int64(sh.GetHeight(dst.ChainID)), &wg, false) + go dst.queryNextRecvWG(sps, int64(sh.GetHeight(dst.ChainID)), &wg, false) wg.Wait() return sps, sps.errs.err() } @@ -854,14 +854,14 @@ type ChannelStatus struct { func QueryPathStatus(src, dst *Chain, path *Path) (stat *PathStatus, err error) { stat = &PathStatus{ Chains: map[string]*ChainStatus{ - src.ChainID: &ChainStatus{ + src.ChainID: { Reachable: false, Height: -1, Client: &ClientStatus{}, Connection: &ConnectionStatus{}, Channel: &ChannelStatus{}, }, - dst.ChainID: &ChainStatus{ + dst.ChainID: { Reachable: false, Height: -1, Client: &ClientStatus{}, @@ -881,18 +881,15 @@ func QueryPathStatus(src, dst *Chain, path *Path) (stat *PathStatus, err error) return } - srch, err := src.QueryLatestHeight() + sh, err := NewSyncHeaders(src, dst) if err != nil { return } - stat.Chains[src.ChainID].Height = srch + + stat.Chains[src.ChainID].Height = int64(sh.GetHeight(src.ChainID)) stat.Chains[src.ChainID].Reachable = true - dsth, err := dst.QueryLatestHeight() - if err != nil { - return - } - stat.Chains[dst.ChainID].Height = dsth + stat.Chains[dst.ChainID].Height = int64(sh.GetHeight(dst.ChainID)) stat.Chains[dst.ChainID].Reachable = true srcCs, err := src.QueryClientState() @@ -909,21 +906,21 @@ func QueryPathStatus(src, dst *Chain, path *Path) (stat *PathStatus, err error) stat.Chains[dst.ChainID].Client.ID = dstCs.ClientState.GetID() stat.Chains[dst.ChainID].Client.Height = dstCs.ClientState.GetLatestHeight() - srcConn, err := src.QueryConnection(srch) + srcConn, err := src.QueryConnection(int64(sh.GetHeight(src.ChainID))) if err != nil { return } stat.Chains[src.ChainID].Connection.ID = srcConn.Connection.Identifier stat.Chains[src.ChainID].Connection.State = srcConn.Connection.Connection.GetState().String() - dstConn, err := dst.QueryConnection(dsth) + dstConn, err := dst.QueryConnection(int64(sh.GetHeight(dst.ChainID))) if err != nil { return } stat.Chains[dst.ChainID].Connection.ID = dstConn.Connection.Identifier stat.Chains[dst.ChainID].Connection.State = dstConn.Connection.Connection.GetState().String() - srcChan, err := src.QueryChannel(srch) + srcChan, err := src.QueryChannel(int64(sh.GetHeight(src.ChainID))) if err != nil { return } @@ -932,7 +929,7 @@ func QueryPathStatus(src, dst *Chain, path *Path) (stat *PathStatus, err error) stat.Chains[src.ChainID].Channel.State = srcChan.Channel.Channel.GetState().String() stat.Chains[src.ChainID].Channel.Order = srcChan.Channel.Channel.GetOrdering().String() - dstChan, err := dst.QueryChannel(dsth) + dstChan, err := dst.QueryChannel(int64(sh.GetHeight(dst.ChainID))) if err != nil { return } @@ -941,7 +938,7 @@ func QueryPathStatus(src, dst *Chain, path *Path) (stat *PathStatus, err error) stat.Chains[dst.ChainID].Channel.State = dstChan.Channel.Channel.GetState().String() stat.Chains[dst.ChainID].Channel.Order = dstChan.Channel.Channel.GetOrdering().String() - unrelayed, err := UnrelayedSequences(src, dst, srch, dsth) + unrelayed, err := UnrelayedSequences(src, dst, sh) if err != nil { return } diff --git a/relayer/relayPackets.go b/relayer/relayPackets.go new file mode 100644 index 00000000000..afe7b311207 --- /dev/null +++ b/relayer/relayPackets.go @@ -0,0 +1,195 @@ +package relayer + +import ( + "fmt" + "strconv" + + retry "github.com/avast/retry-go" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +func relayPacketsFromEventListener(events map[string][]string) (rlyPkts []relayPacket, err error) { + // check for send packets + if pdval, ok := events["send_packet.packet_data"]; ok { + for i, pd := range pdval { + rp := &relayMsgRecvPacket{packetData: []byte(pd)} + // next, get and parse the sequence + if sval, ok := events["send_packet.packet_sequence"]; ok { + seq, err := strconv.ParseUint(sval[i], 10, 64) + if err != nil { + return nil, err + } + rp.seq = seq + } + + // finally, get and parse the timeout + if sval, ok := events["send_packet.packet_timeout"]; ok { + timeout, err := strconv.ParseUint(sval[i], 10, 64) + if err != nil { + return nil, err + } + rp.timeout = timeout + } + rlyPkts = append(rlyPkts, rp) + } + } + + // // then, check for packet acks + // if pdval, ok := events["recv_packet.packet_data"]; ok { + // for i, pd := range pdval { + // rp := &relayMsgPacketAck{ack: []byte(pd)} + // // next, get and parse the sequence + // if sval, ok := events["recv_packet.packet_sequence"]; ok { + // seq, err := strconv.ParseUint(sval[i], 10, 64) + // if err != nil { + // return nil, err + // } + // rp.seq = seq + // } + + // // finally, get and parse the timeout + // if sval, ok := events["recv_packet.packet_timeout"]; ok { + // timeout, err := strconv.ParseUint(sval[i], 10, 64) + // if err != nil { + // return nil, err + // } + // rp.timeout = timeout + // } + + // rlyPkts = append(rlyPkts, rp) + // } + // } + return +} + +func sendTxFromEventPackets(src, dst *Chain, rlyPackets []relayPacket, sh *SyncHeaders) { + // fetch the proofs for the relayPackets + for _, rp := range rlyPackets { + if err := rp.FetchCommitResponse(src, dst, sh); err != nil { + // we don't expect many errors here because of the retry + // in FetchCommitResponse + src.Error(err) + } + } + + // instantiate the RelayMsgs with the appropriate update client + txs := &RelayMsgs{ + Src: []sdk.Msg{ + src.PathEnd.UpdateClient(sh.GetHeader(dst.ChainID), src.MustGetAddress()), + }, + Dst: []sdk.Msg{}, + } + + // add the packet msgs to RelayPackets + for _, rp := range rlyPackets { + txs.Src = append(txs.Src, rp.Msg(src, dst)) + } + + // send the transaction, maybe retry here if not successful + if txs.Send(src, dst); !txs.success { + src.Error(fmt.Errorf("failed to send packets, maybe we should add a retry here")) + } +} + +type relayPacket interface { + Msg(src, dst *Chain) sdk.Msg + FetchCommitResponse(src, dst *Chain, sh *SyncHeaders) error + Data() []byte + Seq() uint64 + Timeout() uint64 +} + +type relayMsgRecvPacket struct { + packetData []byte + seq uint64 + timeout uint64 + dstComRes *CommitmentResponse +} + +func (rp *relayMsgRecvPacket) Data() []byte { + return rp.packetData +} + +func (rp *relayMsgRecvPacket) Seq() uint64 { + return rp.seq +} + +func (rp *relayMsgRecvPacket) Timeout() uint64 { + return rp.timeout +} + +func (rp *relayMsgRecvPacket) FetchCommitResponse(src, dst *Chain, sh *SyncHeaders) (err error) { + var dstCommitRes CommitmentResponse + + // retry getting commit response until it succeeds + if err = retry.Do(func() error { + dstCommitRes, err = dst.QueryPacketCommitment(int64(sh.GetHeight(dst.ChainID)-1), int64(rp.seq)) + if err != nil { + return err + } else if dstCommitRes.Proof.Proof == nil { + return fmt.Errorf("- [%s]@{%d} - Packet Commitment Proof is nil seq(%d)", dst.ChainID, int64(sh.GetHeight(dst.ChainID)-1), rp.seq) + } + return nil + }); err != nil { + dst.Error(err) + return + } + + rp.dstComRes = &dstCommitRes + return +} + +func (rp *relayMsgRecvPacket) Msg(src, dst *Chain) sdk.Msg { + if rp.dstComRes == nil { + return nil + } + return src.PacketMsg(dst, rp.packetData, rp.timeout, int64(rp.seq), *rp.dstComRes) +} + +// nolint +type relayMsgPacketAck struct { + ack []byte + seq uint64 + timeout uint64 + dstComRes *CommitmentResponse +} + +func (rp *relayMsgPacketAck) Data() []byte { + return rp.ack +} +func (rp *relayMsgPacketAck) Seq() uint64 { + return rp.seq +} +func (rp *relayMsgPacketAck) Timeout() uint64 { + return rp.timeout +} + +func (rp *relayMsgPacketAck) Msg(src, dst *Chain) sdk.Msg { + return src.PathEnd.MsgAck( + dst.PathEnd, + rp.seq, + rp.timeout, + rp.ack, + rp.dstComRes.Proof, + rp.dstComRes.ProofHeight, + src.MustGetAddress(), + ) +} + +func (rp *relayMsgPacketAck) FetchCommitResponse(src, dst *Chain, sh *SyncHeaders) (err error) { + var dstCommitRes CommitmentResponse + if err = retry.Do(func() error { + dstCommitRes, err = dst.QueryPacketAck(int64(sh.GetHeight(dst.ChainID)-1), int64(rp.seq)) + if err != nil { + return err + } else if dstCommitRes.Proof.Proof == nil { + return fmt.Errorf("- [%s]@{%d} - Packet Ack Proof is nil seq(%d)", dst.ChainID, int64(sh.GetHeight(dst.ChainID)-1), rp.seq) + } + return nil + }); err != nil { + dst.Error(err) + return + } + rp.dstComRes = &dstCommitRes + return nil +} diff --git a/relayer/strategies.go b/relayer/strategies.go index 1975d37e866..4b2b62af360 100644 --- a/relayer/strategies.go +++ b/relayer/strategies.go @@ -1,14 +1,10 @@ package relayer import ( + "context" "fmt" - "strconv" - "strings" - retry "github.com/avast/retry-go" - sdk "github.com/cosmos/cosmos-sdk/types" - chanTypes "github.com/cosmos/cosmos-sdk/x/ibc/04-channel/types" - tmclient "github.com/cosmos/cosmos-sdk/x/ibc/07-tendermint/types" + ctypes "github.com/tendermint/tendermint/rpc/core/types" ) var ( @@ -106,35 +102,51 @@ func (nrs NaiveStrategy) GetConstraints() map[string]string { func (nrs NaiveStrategy) Run(src, dst *Chain) (func(), error) { doneChan := make(chan struct{}) + sh, err := NewSyncHeaders(src, dst) + if err != nil { + return nil, err + } + + go nrsLoop(src, dst, doneChan, sh) + + sp, err := UnrelayedSequences(src, dst, sh) + if err != nil { + return nil, err + } + // first, we want to ensure that there are no packets remaining to be relayed - if err := RelayUnRelayedPacketsOrderedChan(src, dst); err != nil { + if err := RelayPacketsOrderedChan(src, dst, sh, sp); err != nil { // TODO: some errors may leak here when there are no packets to be relayed // be on the lookout for that return nil, err } - go nrsLoop(src, dst, doneChan) - return func() { doneChan <- struct{}{} }, nil } -func nrsLoop(src, dst *Chain, doneChan chan struct{}) { - // Subscribe to source chain - if err := src.Start(); err != nil { +func nrsLoop(src, dst *Chain, doneChan chan struct{}, sh *SyncHeaders) { + var ( + srcTxEvents, srcBlockEvents, dstTxEvents, dstBlockEvents <-chan ctypes.ResultEvent + srcTxCancel, srcBlockCancel, dstTxCancel, dstBlockCancel context.CancelFunc + err error + ) + + // Start client for source chain + if err = src.Start(); err != nil { src.Error(err) return } - srcTxEvents, srcTxCancel, err := src.Subscribe(txEvents) - if err != nil { + // Subscibe to txEvents from the source chain + if srcTxEvents, srcTxCancel, err = src.Subscribe(txEvents); err != nil { src.Error(err) return } defer srcTxCancel() src.Log(fmt.Sprintf("- listening to tx events from %s...", src.ChainID)) - srcBlockEvents, srcBlockCancel, err := src.Subscribe(blEvents) - if err != nil { + // Subscibe to blockEvents from the source chain + if srcBlockEvents, srcBlockCancel, err = src.Subscribe(blEvents); err != nil { src.Error(err) return } @@ -142,21 +154,21 @@ func nrsLoop(src, dst *Chain, doneChan chan struct{}) { src.Log(fmt.Sprintf("- listening to block events from %s...", src.ChainID)) // Subscribe to destination chain - if err := dst.Start(); err != nil { + if err = dst.Start(); err != nil { dst.Error(err) return } - dstTxEvents, dstTxCancel, err := dst.Subscribe(txEvents) - if err != nil { + // Subscibe to txEvents from the destination chain + if dstTxEvents, dstTxCancel, err = dst.Subscribe(txEvents); err != nil { dst.Error(err) return } defer dstTxCancel() dst.Log(fmt.Sprintf("- listening to tx events from %s...", dst.ChainID)) - dstBlockEvents, dstBlockCancel, err := dst.Subscribe(blEvents) - if err != nil { + // Subscibe to blockEvents from the destination chain + if dstBlockEvents, dstBlockCancel, err = dst.Subscribe(blEvents); err != nil { src.Error(err) return } @@ -168,14 +180,22 @@ func nrsLoop(src, dst *Chain, doneChan chan struct{}) { select { case srcMsg := <-srcTxEvents: src.logTx(srcMsg.Events) - go dst.handlePacket(src, srcMsg.Events) + go dst.handleEvents(src, srcMsg.Events, sh) case dstMsg := <-dstTxEvents: dst.logTx(dstMsg.Events) - go src.handlePacket(dst, dstMsg.Events) + go src.handleEvents(dst, dstMsg.Events, sh) case srcMsg := <-srcBlockEvents: - go dst.handlePacket(src, srcMsg.Events) + // TODO: Add debug block logging here + if err = sh.Update(src); err != nil { + src.Error(err) + } + go dst.handleEvents(src, srcMsg.Events, sh) case dstMsg := <-dstBlockEvents: - go src.handlePacket(dst, dstMsg.Events) + // TODO: Add debug block logging here + if err = sh.Update(dst); err != nil { + dst.Error(err) + } + go src.handleEvents(dst, dstMsg.Events, sh) case <-doneChan: src.Log(fmt.Sprintf("- [%s]:{%s} <-> [%s]:{%s} relayer shutting down", src.ChainID, src.PathEnd.PortID, dst.ChainID, dst.PathEnd.PortID)) @@ -185,115 +205,9 @@ func nrsLoop(src, dst *Chain, doneChan chan struct{}) { } } -func (src *Chain) handlePacket(dst *Chain, events map[string][]string) { - byt, seq, timeout, err := src.packetDataAndTimeoutFromEvent(dst, events) - if byt != nil && seq != 0 && err == nil { - src.sendPacketFromEvent(dst, byt, seq, timeout) - return - } else if err != nil { - src.Error(err) - } -} - -// TODO: rewrite this function to take a relayPacket -func (src *Chain) sendPacketFromEvent(dst *Chain, xferPacket []byte, seq int64, timeout uint64) { - var ( - err error - dstH *tmclient.Header - dstCommitRes CommitmentResponse - ) - - if err = retry.Do(func() error { - dstH, err = dst.UpdateLiteWithHeader() - if err != nil { - return err - } - dstCommitRes, err = dst.QueryPacketCommitment(dstH.Height-1, int64(seq)) - if err != nil { - return err - } else if dstCommitRes.Proof.Proof == nil { - return fmt.Errorf("- [%s]@{%d} - Packet Commitment Proof is nil seq(%d)", dst.ChainID, dstH.Height-1, seq) - } - return nil - }); err != nil { - dst.Error(err) - return - } - - txs := &RelayMsgs{ - Src: []sdk.Msg{ - src.PathEnd.UpdateClient(dstH, src.MustGetAddress()), - src.PathEnd.MsgRecvPacket( - dst.PathEnd, - uint64(seq), - timeout, - xferPacket, - chanTypes.NewPacketResponse( - dst.PathEnd.PortID, - dst.PathEnd.ChannelID, - uint64(seq), - dst.PathEnd.NewPacket( - src.PathEnd, - uint64(seq), - xferPacket, - timeout, - ), - dstCommitRes.Proof.Proof, - int64(dstCommitRes.ProofHeight), - ), - src.MustGetAddress(), - ), - }, - Dst: []sdk.Msg{}, - } - txs.Send(src, dst) -} - -type relayPacket struct { - packetData string - seq int64 - timeout uint64 -} - -// TODO: rewrite this function to return a relayPacket and also return relay packets for acks and multiple msg trasactions -func (src *Chain) packetDataAndTimeoutFromEvent(dst *Chain, events map[string][]string) (packetData []byte, seq int64, timeout uint64, err error) { - - // then, get packet data and parse - if pdval, ok := events["send_packet.packet_data"]; ok { - packetData = []byte(pdval[0]) - } - - // next, get and parse the sequence - if sval, ok := events["send_packet.packet_sequence"]; ok { - seq, err = strconv.ParseInt(sval[0], 10, 64) - if err != nil { - return nil, 0, 0, err - } - } - - // finally, get and parse the timeout - if sval, ok := events["send_packet.packet_timeout"]; ok { - timeout, err = strconv.ParseUint(sval[0], 10, 64) - if err != nil { - return nil, 0, 0, err - } - } - - return -} - -func getEventHeight(events map[string][]string) int64 { - if val, ok := events["tx.height"]; ok { - out, _ := strconv.ParseInt(val[0], 10, 64) - return out - } - return -1 -} - -func actions(act []string) string { - out := "" - for i, a := range act { - out += fmt.Sprintf("%d:%s,", i, a) +func (src *Chain) handleEvents(dst *Chain, events map[string][]string, sh *SyncHeaders) { + rlyPackets, err := relayPacketsFromEventListener(events) + if len(rlyPackets) > 0 && err == nil { + sendTxFromEventPackets(src, dst, rlyPackets, sh) } - return strings.TrimSuffix(out, ",") } diff --git a/test/relayer_gaia_test.go b/test/relayer_gaia_test.go index eb983517011..9eaec1d93ad 100644 --- a/test/relayer_gaia_test.go +++ b/test/relayer_gaia_test.go @@ -62,7 +62,7 @@ func TestGaiaToGaiaStreamingRelayer(t *testing.T) { require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress(), false)) // wait for packet processing - require.NoError(t, dst.WaitForNBlocks(4)) + require.NoError(t, dst.WaitForNBlocks(6)) // kill relayer routine rlyDone() diff --git a/test/test_chains.go b/test/test_chains.go index 1fafe450217..b8a5f839648 100644 --- a/test/test_chains.go +++ b/test/test_chains.go @@ -8,9 +8,10 @@ import ( codecstd "github.com/cosmos/cosmos-sdk/codec/std" "github.com/cosmos/cosmos-sdk/server" "github.com/cosmos/cosmos-sdk/simapp" - . "github.com/iqlusioninc/relayer/relayer" "github.com/stretchr/testify/require" "github.com/tendermint/go-amino" + + . "github.com/iqlusioninc/relayer/relayer" ) var ( diff --git a/test/test_queries.go b/test/test_queries.go index 62f60d43bcd..4a4dd9e0a23 100644 --- a/test/test_queries.go +++ b/test/test_queries.go @@ -3,8 +3,9 @@ package test import ( "testing" - . "github.com/iqlusioninc/relayer/relayer" "github.com/stretchr/testify/require" + + . "github.com/iqlusioninc/relayer/relayer" ) // testClientPair tests that the client for src on dst and dst on src are the only clients on those chains diff --git a/test/test_setup.go b/test/test_setup.go index c48e2ff3292..1be1c005f09 100644 --- a/test/test_setup.go +++ b/test/test_setup.go @@ -9,11 +9,8 @@ import ( "sync" "testing" - // TODO: replace this codec with the gaia codec - "github.com/ory/dockertest/v3" dc "github.com/ory/dockertest/v3/docker" - "github.com/stretchr/testify/require" . "github.com/iqlusioninc/relayer/relayer"