Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

next seq ack handling #1244

Merged
merged 15 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ dist/

# Don't commit the vendor directory if anyone runs 'go mod vendor'.
/vendor

go.work.sum
1,238 changes: 0 additions & 1,238 deletions go.work.sum

This file was deleted.

16 changes: 9 additions & 7 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider, metrics

const (
queryTimeout = 5 * time.Second
queryStateTimeout = 60 * time.Second
blockResultsQueryTimeout = 2 * time.Minute
latestHeightQueryRetryDelay = 1 * time.Second
latestHeightQueryRetries = 5
Expand Down Expand Up @@ -279,7 +280,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui

// initializeConnectionState will bootstrap the connectionStateCache with the open connection state.
func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
defer cancel()
connections, err := ccp.chainProvider.QueryConnections(ctx)
if err != nil {
Expand All @@ -299,7 +300,7 @@ func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context)

// initializeChannelState will bootstrap the channelStateCache with the open channel state.
func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
defer cancel()
channels, err := ccp.chainProvider.QueryChannels(ctx)
if err != nil {
Expand All @@ -315,12 +316,13 @@ func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) err
continue
}
ccp.channelConnections[ch.ChannelId] = ch.ConnectionHops[0]
ccp.channelStateCache[processor.ChannelKey{
k := processor.ChannelKey{
ChannelID: ch.ChannelId,
PortID: ch.PortId,
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
}
ccp.channelStateCache.SetOpen(k, ch.State == chantypes.OPEN, ch.Ordering)
}
return nil
}
Expand Down Expand Up @@ -402,11 +404,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
})

if err := eg.Wait(); err != nil {
ccp.log.Warn(
"Could not query block data. Consider checking if your RPC node is online, and that transaction indexing is enabled.",
jtieri marked this conversation as resolved.
Show resolved Hide resolved
ccp.log.Debug(
"Error querying block data",
zap.Int64("height", i),
zap.Error(err),
)
ccp.log.Debug("Error querying block data", zap.Error(err))

persistence.retriesAtLatestQueriedBlock++
if persistence.retriesAtLatestQueriedBlock >= blockMaxRetries {
Expand Down
10 changes: 5 additions & 5 deletions relayer/chains/cosmos/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (ccp *CosmosChainProcessor) handlePacketMessage(eventType string, pi provid
}

if eventType == chantypes.EventTypeTimeoutPacket && pi.ChannelOrder == chantypes.ORDERED.String() {
ccp.channelStateCache[k] = false
ccp.channelStateCache.SetOpen(k, false, chantypes.ORDERED)
}

if !c.PacketFlow.ShouldRetainSequence(ccp.pathProcessors, k, ccp.chainProvider.ChainId(), eventType, pi.Sequence) {
Expand Down Expand Up @@ -78,19 +78,19 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi
}
}
if !found {
ccp.channelStateCache[channelKey] = false
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
}
} else {
switch eventType {
case chantypes.EventTypeChannelOpenTry:
ccp.channelStateCache[channelKey] = false
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
ccp.channelStateCache[channelKey] = true
ccp.channelStateCache.SetOpen(channelKey, true, ci.Order)
ccp.logChannelOpenMessage(eventType, ci)
case chantypes.EventTypeChannelCloseConfirm:
for k := range ccp.channelStateCache {
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
ccp.channelStateCache[k] = false
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
break
}
}
Expand Down
10 changes: 5 additions & 5 deletions relayer/chains/cosmos/message_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestChannelStateCache(t *testing.T) {

// The channel state is not open, but the entry should exist in the channelStateCache.
// MsgInitKey returns the ChannelKey with an empty counterparty channel ID.
require.False(t, ccp.channelStateCache[k.MsgInitKey()])
require.False(t, ccp.channelStateCache[k.MsgInitKey()].Open)

// Observe MsgChannelOpenAck, which does have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)
Expand All @@ -139,7 +139,7 @@ func TestChannelStateCache(t *testing.T) {

// The fully populated ChannelKey should now be the only entry for this channel.
// The channel now open.
require.True(t, ccp.channelStateCache[k])
require.True(t, ccp.channelStateCache[k].Open)
})

t.Run("handshake already occurred", func(t *testing.T) {
Expand All @@ -156,7 +156,7 @@ func TestChannelStateCache(t *testing.T) {

// Initialize channelStateCache with populated channel ID and counterparty channel ID.
// This emulates initializeChannelState after a recent channel handshake has completed
ccp.channelStateCache[k] = true
ccp.channelStateCache.SetOpen(k, true, chantypes.NONE)

// Observe MsgChannelOpenInit, which does not have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c)
Expand All @@ -166,7 +166,7 @@ func TestChannelStateCache(t *testing.T) {

// The fully populated ChannelKey should still be the only entry for this channel.
// The channel is still marked open since it was open during initializeChannelState.
require.True(t, ccp.channelStateCache[k])
require.True(t, ccp.channelStateCache[k].Open)

// Observe MsgChannelOpenAck, which does have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)
Expand All @@ -175,6 +175,6 @@ func TestChannelStateCache(t *testing.T) {
require.Len(t, ccp.channelStateCache, 1)

// The fully populated ChannelKey should still be the only entry for this channel.
require.True(t, ccp.channelStateCache[k])
require.True(t, ccp.channelStateCache[k].Open)
})
}
23 changes: 23 additions & 0 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,29 @@ func (cc *CosmosProvider) QueryNextSeqRecv(ctx context.Context, height int64, ch
}, nil
}

// QueryNextSeqAck returns the next seqAck for a configured channel
func (cc *CosmosProvider) QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
key := host.NextSequenceAckKey(portid, channelid)

value, proofBz, proofHeight, err := cc.QueryTendermintProof(ctx, height, key)
if err != nil {
return nil, err
}

// check if next sequence receive exists
if len(value) == 0 {
return nil, sdkerrors.Wrapf(chantypes.ErrChannelNotFound, "portID (%s), channelID (%s)", portid, channelid)
}

sequence := binary.BigEndian.Uint64(value)

return &chantypes.QueryNextSequenceReceiveResponse{
NextSequenceReceive: sequence,
Proof: proofBz,
ProofHeight: proofHeight,
}, nil
}

// QueryPacketCommitment returns the packet commitment proof at a given height
func (cc *CosmosProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) {
key := host.PacketCommitmentKey(portid, channelid, seq)
Expand Down
23 changes: 4 additions & 19 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
rtyAtt = retry.Attempts(rtyAttNum)
rtyDel = retry.Delay(time.Millisecond * 400)
rtyErr = retry.LastErrorOnly(true)
numRegex = regexp.MustCompile("[0-9]+")
accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
defaultBroadcastWaitTimeout = 10 * time.Minute
errUnknown = "unknown"
)
Expand Down Expand Up @@ -660,32 +660,17 @@ func (cc *CosmosProvider) handleAccountSequenceMismatchError(sequenceGuard *Wall
panic("sequence guard not configured")
}

sequences := numRegex.FindAllString(err.Error(), -1)
if len(sequences) != 2 {
matches := accountSeqRegex.FindStringSubmatch(err.Error())
if len(matches) == 0 {
return
}
nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
nextSeq, err := strconv.ParseUint(matches[1], 10, 64)
if err != nil {
return
}
sequenceGuard.NextAccountSequence = nextSeq
}

// handleAccountSequenceMismatchError will parse the error string, e.g.:
// "account sequence mismatch, expected 10, got 9: incorrect account sequence"
// and update the next account sequence with the expected value.
// func (cc *CosmosProvider) handleAccountSequenceMismatchError(err error) {
// sequences := numRegex.FindAllString(err.Error(), -1)
// if len(sequences) != 2 {
// return
// }
// nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
// if err != nil {
// return
// }
// cc.nextAccountSeq = nextSeq
// }

// MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst
func (cc *CosmosProvider) MsgCreateClient(
clientState ibcexported.ClientState,
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (mcp *MockChainProcessor) queryCycle(ctx context.Context, persistence *quer

// mocking all channels open
for channelKey := range ibcMessagesCache.PacketFlow {
channelStateCache[channelKey] = true
channelStateCache.SetOpen(channelKey, true, chantypes.NONE)
}

// now pass foundMessages to the path processors
Expand Down
8 changes: 4 additions & 4 deletions relayer/chains/penumbra/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ func (pcp *PenumbraChainProcessor) handleChannelMessage(eventType string, ci pro
}
}
if !found {
pcp.channelStateCache[channelKey] = false
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
}
} else {
switch eventType {
case chantypes.EventTypeChannelOpenTry:
pcp.channelStateCache[channelKey] = false
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
pcp.channelStateCache[channelKey] = true
pcp.channelStateCache.SetOpen(channelKey, true, ci.Order)
case chantypes.EventTypeChannelCloseConfirm:
for k := range pcp.channelStateCache {
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
pcp.channelStateCache[k] = false
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
break
}
}
Expand Down
5 changes: 3 additions & 2 deletions relayer/chains/penumbra/penumbra_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,13 @@ func (pcp *PenumbraChainProcessor) initializeChannelState(ctx context.Context) e
continue
}
pcp.channelConnections[ch.ChannelId] = ch.ConnectionHops[0]
pcp.channelStateCache[processor.ChannelKey{
k := processor.ChannelKey{
ChannelID: ch.ChannelId,
PortID: ch.PortId,
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
}
pcp.channelStateCache.SetOpen(k, ch.State == chantypes.OPEN, ch.Ordering)
}
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions relayer/chains/penumbra/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,29 @@ func (cc *PenumbraProvider) QueryNextSeqRecv(ctx context.Context, height int64,
}, nil
}

// QueryNextSeqAck returns the next seqAck for a configured channel
func (cc *PenumbraProvider) QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
key := host.NextSequenceAckKey(portid, channelid)

value, proofBz, proofHeight, err := cc.QueryTendermintProof(ctx, height, key)
if err != nil {
return nil, err
}

// check if next sequence receive exists
if len(value) == 0 {
return nil, sdkerrors.Wrapf(chantypes.ErrChannelNotFound, "portID (%s), channelID (%s)", portid, channelid)
}

sequence := binary.BigEndian.Uint64(value)

return &chantypes.QueryNextSequenceReceiveResponse{
NextSequenceReceive: sequence,
Proof: proofBz,
ProofHeight: proofHeight,
}, nil
}

// QueryPacketCommitment returns the packet commitment proof at a given height
func (cc *PenumbraProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) {
key := host.PacketCommitmentKey(portid, channelid, seq)
Expand Down
9 changes: 8 additions & 1 deletion relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,18 @@ func (mp *messageProcessor) trackAndSendMessages(
var batch []messageToTrack

for _, t := range mp.trackers() {

retries := dst.trackProcessingMessage(t)
if t.assembledMsg() == nil {
continue
}
if broadcastBatch && retries == 0 {

ordered := false
if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() {
ordered = true
}

if broadcastBatch && (retries == 0 || ordered) {
batch = append(batch, t)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage,
)
return false
}
if !pathEnd.channelStateCache[k] {
if !pathEnd.channelStateCache[k].Open {
// channel is not open, do not send
pathEnd.log.Warn("Refusing to relay packet message because channel is not open",
zap.String("event_type", eventType),
Expand Down
16 changes: 8 additions & 8 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
interchainQueryTimeout = 60 * time.Second

// Amount of time between flushes if the previous flush failed.
flushFailureRetry = 15 * time.Second
flushFailureRetry = 5 * time.Second

// If message assembly fails from either proof query failure on the source
// or assembling the message for the destination, how many blocks should pass
Expand Down Expand Up @@ -186,12 +186,12 @@ func (pp *PathProcessor) OnConnectionMessage(chainID string, eventType string, o

func (pp *PathProcessor) channelPairs() []channelPair {
// Channel keys are from pathEnd1's perspective
channels := make(map[ChannelKey]bool)
for k, open := range pp.pathEnd1.channelStateCache {
channels[k] = open
channels := make(map[ChannelKey]ChannelState)
for k, cs := range pp.pathEnd1.channelStateCache {
channels[k] = cs
}
for k, open := range pp.pathEnd2.channelStateCache {
channels[k.Counterparty()] = open
for k, cs := range pp.pathEnd2.channelStateCache {
channels[k.Counterparty()] = cs
}
pairs := make([]channelPair, len(channels))
i := 0
Expand Down Expand Up @@ -457,8 +457,8 @@ func (pp *PathProcessor) handleLocalhostData(cacheData ChainProcessorCacheData)
}
}

channelStateCache1 := make(map[ChannelKey]bool)
channelStateCache2 := make(map[ChannelKey]bool)
channelStateCache1 := make(map[ChannelKey]ChannelState)
channelStateCache2 := make(map[ChannelKey]ChannelState)
jtieri marked this conversation as resolved.
Show resolved Hide resolved

// split up data and send lower channel-id data to pathEnd2 and higher channel-id data to pathEnd1.
for k, v := range cacheData.ChannelStateCache {
Expand Down
Loading