Skip to content

Commit

Permalink
Merge PR cosmos#574: rly start fix
Browse files Browse the repository at this point in the history
* add retries in RelayPackets

* get main StartRelayer loop running properly + small logging fixes

Co-authored-by: Jack Zampolin <jack.zampolin@gmail.com>
  • Loading branch information
jtieri and jackzampolin authored Feb 8, 2022
1 parent 5bb5a67 commit 0d314ac
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName)),

done, err := relayer.StartRelayer(c[src], c[dst], maxTxSize, maxMsgLength)
if err != nil {
return err
c[src].Log(fmt.Sprintf("relayer start error. Err: %v", err))
}

thresholdTime := viper.GetDuration(flagThresholdTime)
Expand All @@ -95,7 +95,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName)),
}
})
if err = eg.Wait(); err != nil {
return err
c[src].Log(fmt.Sprintf("update clients error. Err: %v", err))
}

trapSignal(done)
Expand Down
49 changes: 41 additions & 8 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,25 +390,58 @@ func RelayPackets(src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength u

// Prepend non-empty msg lists with UpdateClient
if len(msgs.Dst) != 0 {
srcHeader, err := src.ChainProvider.GetIBCUpdateHeader(srch, dst.ChainProvider, dst.PathEnd.ClientID)
if err != nil {
var (
srcHeader ibcexported.Header
updateMsg provider.RelayerMessage
)

if err = retry.Do(func() error {
srcHeader, err = src.ChainProvider.GetIBCUpdateHeader(srch, dst.ChainProvider, dst.PathEnd.ClientID)
return err
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, _, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}
updateMsg, err := dst.ChainProvider.UpdateClient(dst.PathEnd.ClientID, srcHeader)
if err != nil {

if err = retry.Do(func() error {
updateMsg, err = dst.ChainProvider.UpdateClient(dst.PathEnd.ClientID, srcHeader)
return nil
}, RtyAtt, RtyDel, RtyErr); err != nil {
return err
}

msgs.Dst = append([]provider.RelayerMessage{updateMsg}, msgs.Dst...)
}

if len(msgs.Src) != 0 {
dstHeader, err := dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID)
if err != nil {
//dstHeader, err := dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID)
//if err != nil {
// return err
//}
//updateMsg, err := src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader)
//if err != nil {
// return err
//}

var (
dstHeader ibcexported.Header
updateMsg provider.RelayerMessage
)

if err = retry.Do(func() error {
dstHeader, err = dst.ChainProvider.GetIBCUpdateHeader(dsth, src.ChainProvider, src.PathEnd.ClientID)
return err
}, RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
_, dsth, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}
updateMsg, err := src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader)
if err != nil {

if err = retry.Do(func() error {
updateMsg, err = src.ChainProvider.UpdateClient(src.PathEnd.ClientID, dstHeader)
return nil
}, RtyAtt, RtyDel, RtyErr); err != nil {
return err
}

Expand Down
46 changes: 24 additions & 22 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package relayer

import (
"fmt"
"time"
)

// StartRelayer starts the main relaying loop
Expand All @@ -18,36 +17,39 @@ func StartRelayer(src, dst *Chain, maxTxSize, maxMsgLength uint64) (func(), erro
sp, err := UnrelayedSequences(src, dst)
if err != nil {
src.Log(fmt.Sprintf("unrelayed sequences error: %s", err))
}
if len(sp.Src) > 0 && src.debug {
src.Log(fmt.Sprintf("[%s] unrelayed-packets-> %v", src.ChainID(), sp.Src))
}
if len(sp.Dst) > 0 && dst.debug {
dst.Log(fmt.Sprintf("[%s] unrelayed-packets-> %v", dst.ChainID(), sp.Dst))
}
if !sp.Empty() {
if err = RelayPackets(src, dst, sp, maxTxSize, maxMsgLength); err != nil {
src.Log(fmt.Sprintf("relay packets error: %s", err))
} else {
if len(sp.Src) > 0 && src.debug {
src.Log(fmt.Sprintf("[%s] unrelayed-packets-> %v", src.ChainID(), sp.Src))
}
if len(sp.Dst) > 0 && dst.debug {
dst.Log(fmt.Sprintf("[%s] unrelayed-packets-> %v", dst.ChainID(), sp.Dst))
}
if !sp.Empty() {
if err = RelayPackets(src, dst, sp, maxTxSize, maxMsgLength); err != nil {
src.Log(fmt.Sprintf("relay packets error: %s", err))
}
}
}

// Fetch any unrelayed acks depending on the channel order
ap, err := UnrelayedAcknowledgements(src, dst)
if err != nil {
src.Log(fmt.Sprintf("unrelayed acks error: %s", err))
}
if len(ap.Src) > 0 && src.debug {
src.Log(fmt.Sprintf("[%s] unrelayed-acks-> %v", src.ChainID(), ap.Src))
}
if len(ap.Dst) > 0 && dst.debug {
dst.Log(fmt.Sprintf("[%s] unrelayed-acks-> %v", dst.ChainID(), ap.Dst))
}
if !ap.Empty() {
if err = RelayAcknowledgements(src, dst, ap, maxTxSize, maxMsgLength); err != nil && src.debug {
src.Log(fmt.Sprintf("relay acks error: %s", err))
} else {
if len(ap.Src) > 0 && src.debug {
src.Log(fmt.Sprintf("[%s] unrelayed-acks-> %v", src.ChainID(), ap.Src))
}
if len(ap.Dst) > 0 && dst.debug {
dst.Log(fmt.Sprintf("[%s] unrelayed-acks-> %v", dst.ChainID(), ap.Dst))
}
if !ap.Empty() {
if err = RelayAcknowledgements(src, dst, ap, maxTxSize, maxMsgLength); err != nil && src.debug {
src.Log(fmt.Sprintf("relay acks error: %s", err))
}
}
}
time.Sleep(100 * time.Millisecond)

//time.Sleep(100 * time.Millisecond)
}
}
}()
Expand Down

0 comments on commit 0d314ac

Please sign in to comment.