From f5f4fc4af16520a34e805e8f16c50e0de4902815 Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Sun, 12 Mar 2023 19:06:33 -0700 Subject: [PATCH] feat(relayer): MessageStatusChanged events (#13272) Co-authored-by: David --- packages/relayer/.golangci.yml | 6 +- packages/relayer/bridge.go | 9 ++ packages/relayer/event.go | 11 ++- .../relayer/indexer/filter_then_subscribe.go | 25 ++++-- .../indexer/filter_then_subscribe_test.go | 9 +- packages/relayer/indexer/handle_event.go | 4 +- .../save_message_status_changed_events.go | 74 ++++++++++++++++ packages/relayer/indexer/subscribe.go | 59 ++++++++++++- packages/relayer/indexer/subscribe_test.go | 5 +- packages/relayer/message/process_message.go | 54 +++++++++++- .../relayer/message/process_message_test.go | 1 + .../1666650599_create_events_table.sql | 4 +- packages/relayer/mock/bridge.go | 46 +++++++++- packages/relayer/mock/event_repository.go | 27 ++++-- packages/relayer/repo/event.go | 25 +++++- packages/relayer/repo/event_test.go | 84 +++++++++++++++++++ 16 files changed, 415 insertions(+), 28 deletions(-) create mode 100644 packages/relayer/indexer/save_message_status_changed_events.go diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index 25aa24eb39..71d65badac 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -28,10 +28,10 @@ linters: linters-settings: funlen: - lines: 130 - statements: 52 + lines: 132 + statements: 54 gocognit: - min-complexity: 40 + min-complexity: 41 issues: exclude-rules: diff --git a/packages/relayer/bridge.go b/packages/relayer/bridge.go index 0ab984735c..506a6eb282 100644 --- a/packages/relayer/bridge.go +++ b/packages/relayer/bridge.go @@ -19,4 +19,13 @@ type Bridge interface { GetMessageStatus(opts *bind.CallOpts, msgHash [32]byte) (uint8, error) ProcessMessage(opts *bind.TransactOpts, message bridge.IBridgeMessage, proof []byte) (*types.Transaction, error) IsMessageReceived(opts *bind.CallOpts, msgHash [32]byte, srcChainId *big.Int, proof []byte) (bool, error) // nolint + FilterMessageStatusChanged( + opts *bind.FilterOpts, + msgHash [][32]byte, + ) (*bridge.BridgeMessageStatusChangedIterator, error) + WatchMessageStatusChanged( + opts *bind.WatchOpts, + sink chan<- *bridge.BridgeMessageStatusChanged, + msgHash [][32]byte, + ) (event.Subscription, error) } diff --git a/packages/relayer/event.go b/packages/relayer/event.go index 8b2737aef6..a598c0215d 100644 --- a/packages/relayer/event.go +++ b/packages/relayer/event.go @@ -9,7 +9,8 @@ import ( ) var ( - EventNameMessageSent = "MessageSent" + EventNameMessageSent = "MessageSent" + EventNameMessageStatusChanged = "MessageStatusChanged" ) // EventStatus is used to indicate whether processing has been attempted @@ -54,6 +55,8 @@ type Event struct { CanonicalTokenName string `json:"canonicalTokenName"` CanonicalTokenDecimals uint8 `json:"canonicalTokenDecimals"` Amount string `json:"amount"` + MsgHash string `json:"msgHash"` + MessageOwner string `json:"messageOwner"` } // SaveEventOpts @@ -68,6 +71,8 @@ type SaveEventOpts struct { CanonicalTokenName string CanonicalTokenDecimals uint8 Amount string + MsgHash string + MessageOwner string } // EventRepository is used to interact with events in the store @@ -83,4 +88,8 @@ type EventRepository interface { ctx context.Context, address common.Address, ) ([]*Event, error) + FindAllByMsgHash( + ctx context.Context, + msgHash string, + ) ([]*Event, error) } diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 0d949fb087..875496b379 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -61,16 +61,31 @@ func (svc *Service) FilterThenSubscribe( end = header.Number.Uint64() } - events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{ + filterOpts := &bind.FilterOpts{ Start: svc.processingBlockHeight, End: &end, Context: ctx, - }, nil) + } + + messageStatusChangedEvents, err := svc.bridge.FilterMessageStatusChanged(filterOpts, nil) + if err != nil { + return errors.Wrap(err, "bridge.FilterMessageStatusChanged") + } + + // we dont need to do anything with msgStatus events except save them to the DB. + // we dont need to process them. they are for exposing via the API. + + err = svc.saveMessageStatusChangedEvents(ctx, chainID, messageStatusChangedEvents) + if err != nil { + return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents") + } + + messageSentEvents, err := svc.bridge.FilterMessageSent(filterOpts, nil) if err != nil { return errors.Wrap(err, "bridge.FilterMessageSent") } - if !events.Next() || events.Event == nil { + if !messageSentEvents.Next() || messageSentEvents.Event == nil { if err := svc.handleNoEventsInBatch(ctx, chainID, int64(end)); err != nil { return errors.Wrap(err, "svc.handleNoEventsInBatch") } @@ -83,7 +98,7 @@ func (svc *Service) FilterThenSubscribe( group.SetLimit(svc.numGoroutines) for { - event := events.Event + event := messageSentEvents.Event group.Go(func() error { err := svc.handleEvent(groupCtx, chainID, event) @@ -97,7 +112,7 @@ func (svc *Service) FilterThenSubscribe( }) // if there are no more events - if !events.Next() { + if !messageSentEvents.Next() { // wait for the last of the goroutines to finish if err := group.Wait(); err != nil { return errors.Wrap(err, "group.Wait") diff --git a/packages/relayer/indexer/filter_then_subscribe_test.go b/packages/relayer/indexer/filter_then_subscribe_test.go index 2bede242ca..3d6841d272 100644 --- a/packages/relayer/indexer/filter_then_subscribe_test.go +++ b/packages/relayer/indexer/filter_then_subscribe_test.go @@ -27,7 +27,8 @@ func Test_FilterThenSubscribe(t *testing.T) { <-time.After(6 * time.Second) assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, b.MessageStatusesChanged, 1) + assert.Equal(t, b.ErrorsSent, 2) } func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) { @@ -45,7 +46,8 @@ func Test_FilterThenSubscribe_subscribeWatchMode(t *testing.T) { <-time.After(6 * time.Second) assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, b.MessageStatusesChanged, 1) + assert.Equal(t, b.ErrorsSent, 2) } func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) { @@ -65,5 +67,6 @@ func Test_FilterThenSubscribe_alreadyCaughtUp(t *testing.T) { <-time.After(6 * time.Second) assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, b.MessageStatusesChanged, 1) + assert.Equal(t, b.ErrorsSent, 2) } diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index a3714ae87f..08aaac249d 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -52,7 +52,7 @@ func (svc *Service) handleEvent( } e, err := svc.eventRepo.Save(ctx, relayer.SaveEventOpts{ - Name: eventName, + Name: relayer.EventNameMessageSent, Data: string(marshaled), ChainID: chainID, Status: eventStatus, @@ -62,6 +62,8 @@ func (svc *Service) handleEvent( CanonicalTokenName: canonicalToken.Name, CanonicalTokenDecimals: canonicalToken.Decimals, Amount: amount.String(), + MsgHash: common.Hash(event.MsgHash).Hex(), + MessageOwner: event.Message.Owner.Hex(), }) if err != nil { return errors.Wrap(err, "svc.eventRepo.Save") diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/save_message_status_changed_events.go new file mode 100644 index 0000000000..ce44f1a483 --- /dev/null +++ b/packages/relayer/indexer/save_message_status_changed_events.go @@ -0,0 +1,74 @@ +package indexer + +import ( + "context" + "encoding/json" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikoxyz/taiko-mono/packages/relayer" + "github.com/taikoxyz/taiko-mono/packages/relayer/contracts/bridge" +) + +func (svc *Service) saveMessageStatusChangedEvents( + ctx context.Context, + chainID *big.Int, + events *bridge.BridgeMessageStatusChangedIterator, +) error { + if !events.Next() { + log.Infof("no messageStatusChanged events") + return nil + } + + for { + event := events.Event + log.Infof("messageStatusChanged: %v", common.Hash(event.MsgHash).Hex()) + + if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { + return errors.Wrap(err, "svc.saveMessageStatusChangedEvent") + } + + if !events.Next() { + return nil + } + } +} + +func (svc *Service) saveMessageStatusChangedEvent( + ctx context.Context, + chainID *big.Int, + event *bridge.BridgeMessageStatusChanged, +) error { + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + // get the previous MessageSent event or other message status changed events, + // so we can find out the previous owner of this msg hash, + // to save to the db. + previousEvents, err := svc.eventRepo.FindAllByMsgHash(ctx, common.Hash(event.MsgHash).Hex()) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash") + } + + if len(previousEvents) == 0 { + return errors.Wrap(err, "svc.eventRepo.FindAllByMsgHash") + } + + _, err = svc.eventRepo.Save(ctx, relayer.SaveEventOpts{ + Name: relayer.EventNameMessageStatusChanged, + Data: string(marshaled), + ChainID: chainID, + Status: relayer.EventStatus(event.Status), + MessageOwner: previousEvents[0].MessageOwner, + MsgHash: common.Hash(event.MsgHash).Hex(), + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") + } + + return nil +} diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 50cab3ad18..46b8f82002 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -5,6 +5,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -16,6 +17,25 @@ import ( func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { log.Info("subscribing to new events") + errChan := make(chan error) + + go svc.subscribeMessageSent(ctx, chainID, errChan) + + go svc.subscribeMessageStatusChanged(ctx, chainID, errChan) + + // nolint: gosimple + for { + select { + case <-ctx.Done(): + log.Info("context finished") + return nil + case err := <-errChan: + return errors.Wrap(err, "errChan") + } + } +} + +func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, errChan chan error) { sink := make(chan *bridge.BridgeMessageSent) sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { @@ -32,11 +52,16 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { for { select { + case <-ctx.Done(): + log.Info("context finished") + return case err := <-sub.Err(): - return errors.Wrap(err, "sub.Err()") + errChan <- errors.Wrap(err, "sub.Err()") case event := <-sink: go func() { + log.Infof("new message sent event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String()) err := svc.handleEvent(ctx, chainID, event) + if err != nil { log.Errorf("svc.subscribe, svc.handleEvent: %v", err) } @@ -63,3 +88,35 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { } } } + +func (svc *Service) subscribeMessageStatusChanged(ctx context.Context, chainID *big.Int, errChan chan error) { + sink := make(chan *bridge.BridgeMessageStatusChanged) + + sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + log.Errorf("svc.bridge.WatchMessageStatusChanged: %v", err) + } + + return svc.bridge.WatchMessageStatusChanged(&bind.WatchOpts{ + Context: ctx, + }, sink, nil) + }) + + defer sub.Unsubscribe() + + for { + select { + case <-ctx.Done(): + log.Info("context finished") + return + case err := <-sub.Err(): + errChan <- errors.Wrap(err, "sub.Err()") + case event := <-sink: + log.Infof("new message status changed event %v from chainID %v", common.Hash(event.MsgHash).Hex(), chainID.String()) + + if err := svc.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { + log.Errorf("svc.subscribe, svc.saveMessageStatusChangedEvent: %v", err) + } + } + } +} diff --git a/packages/relayer/indexer/subscribe_test.go b/packages/relayer/indexer/subscribe_test.go index f899386d35..a606fe53cd 100644 --- a/packages/relayer/indexer/subscribe_test.go +++ b/packages/relayer/indexer/subscribe_test.go @@ -20,6 +20,7 @@ func Test_subscribe(t *testing.T) { b := bridge.(*mock.Bridge) - assert.Equal(t, b.MessagesSent, 1) - assert.Equal(t, b.ErrorsSent, 1) + assert.Equal(t, 1, b.MessagesSent) + assert.Equal(t, 1, b.MessageStatusesChanged) + assert.Equal(t, 2, b.ErrorsSent) } diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 6336a1e070..cc479b5edd 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -3,7 +3,10 @@ package message import ( "context" "encoding/hex" + "fmt" + "strings" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -88,11 +91,15 @@ func (p *Processor) ProcessMessage( log.Infof("waiting for tx hash %v", hex.EncodeToString(tx.Hash().Bytes())) - _, err = relayer.WaitReceipt(ctx, p.destEthClient, tx.Hash()) + receipt, err := relayer.WaitReceipt(ctx, p.destEthClient, tx.Hash()) if err != nil { return errors.Wrap(err, "relayer.WaitReceipt") } + if err := p.saveMessageStatusChangedEvent(ctx, receipt, e, event); err != nil { + return errors.Wrap(err, "p.saveMEssageStatusChangedEvent") + } + log.Infof("Mined tx %s", hex.EncodeToString(tx.Hash().Bytes())) messageStatus, err := p.destBridge.GetMessageStatus(&bind.CallOpts{}, event.MsgHash) @@ -165,3 +172,48 @@ func (p *Processor) sendProcessMessageCall( func (p *Processor) setLatestNonce(nonce uint64) { p.destNonce = nonce } + +func (p *Processor) saveMessageStatusChangedEvent( + ctx context.Context, + receipt *types.Receipt, + e *relayer.Event, + event *bridge.BridgeMessageSent, +) error { + bridgeAbi, err := abi.JSON(strings.NewReader(bridge.BridgeABI)) + if err != nil { + return errors.Wrap(err, "abi.JSON") + } + + m := make(map[string]interface{}) + + for _, log := range receipt.Logs { + topic := log.Topics[0] + if topic == bridgeAbi.Events["MessageStatusChanged"].ID { + err = bridgeAbi.UnpackIntoMap(m, "MessageStatusChanged", log.Data) + if err != nil { + return errors.Wrap(err, "abi.UnpackIntoInterface") + } + + break + } + } + + if m["status"] != nil { + // keep same format as other raw events + data := fmt.Sprintf(`{"Raw":{"transactionHash": "%v"}}`, receipt.TxHash.Hex()) + + _, err = p.eventRepo.Save(ctx, relayer.SaveEventOpts{ + Name: relayer.EventNameMessageStatusChanged, + Data: data, + ChainID: event.Message.DestChainId, + Status: relayer.EventStatus(m["status"].(uint8)), + MsgHash: e.MsgHash, + MessageOwner: e.MessageOwner, + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") + } + } + + return nil +} diff --git a/packages/relayer/message/process_message_test.go b/packages/relayer/message/process_message_test.go index 116fc17e2f..71ed8629ba 100644 --- a/packages/relayer/message/process_message_test.go +++ b/packages/relayer/message/process_message_test.go @@ -67,6 +67,7 @@ func Test_ProcessMessage(t *testing.T) { GasLimit: big.NewInt(1), DestChainId: mock.MockChainID, ProcessingFee: big.NewInt(1000000000), + SrcChainId: mock.MockChainID, }, MsgHash: mock.SuccessMsgHash, }, &relayer.Event{}) diff --git a/packages/relayer/migrations/1666650599_create_events_table.sql b/packages/relayer/migrations/1666650599_create_events_table.sql index 827d459573..2cc73556c3 100644 --- a/packages/relayer/migrations/1666650599_create_events_table.sql +++ b/packages/relayer/migrations/1666650599_create_events_table.sql @@ -11,7 +11,9 @@ CREATE TABLE IF NOT EXISTS events ( canonical_token_symbol VARCHAR(10) DEFAULT "", canonical_token_name VARCHAR(255) DEFAULT "", canonical_token_decimals int DEFAULT 0, - amount VARCHAR(255) NOT NULL, + amount VARCHAR(255) NOT NULL DEFAULT 0, + msg_hash VARCHAR(255) NOT NULL, + message_owner VARCHAR(255) NOT NULL DEFAULT "", created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP , updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); diff --git a/packages/relayer/mock/bridge.go b/packages/relayer/mock/bridge.go index 10b54fc13c..c8e87c0abf 100644 --- a/packages/relayer/mock/bridge.go +++ b/packages/relayer/mock/bridge.go @@ -30,8 +30,9 @@ var ProcessMessageTx = types.NewTransaction( ) type Bridge struct { - MessagesSent int - ErrorsSent int + MessagesSent int + MessageStatusesChanged int + ErrorsSent int } type Subscription struct { @@ -57,7 +58,11 @@ func (b *Bridge) WatchMessageSent( go func(sink chan<- *bridge.BridgeMessageSent) { <-time.After(2 * time.Second) - sink <- &bridge.BridgeMessageSent{} + sink <- &bridge.BridgeMessageSent{ + Message: bridge.IBridgeMessage{ + SrcChainId: big.NewInt(1), + }, + } b.MessagesSent++ }(sink) @@ -80,6 +85,41 @@ func (b *Bridge) FilterMessageSent( return &bridge.BridgeMessageSentIterator{}, nil } +func (b *Bridge) WatchMessageStatusChanged( + opts *bind.WatchOpts, + sink chan<- *bridge.BridgeMessageStatusChanged, + msgHash [][32]byte, +) (event.Subscription, error) { + s := &Subscription{ + errChan: make(chan error), + } + + go func(sink chan<- *bridge.BridgeMessageStatusChanged) { + <-time.After(2 * time.Second) + + sink <- &bridge.BridgeMessageStatusChanged{} + b.MessageStatusesChanged++ + }(sink) + + go func(errChan chan error) { + <-time.After(5 * time.Second) + + errChan <- errors.New("fail") + + s.done = true + b.ErrorsSent++ + }(s.errChan) + + return s, nil +} + +func (b *Bridge) FilterMessageStatusChanged( + opts *bind.FilterOpts, + signal [][32]byte, +) (*bridge.BridgeMessageStatusChangedIterator, error) { + return &bridge.BridgeMessageStatusChangedIterator{}, nil +} + func (b *Bridge) GetMessageStatus(opts *bind.CallOpts, msgHash [32]byte) (uint8, error) { if msgHash == SuccessMsgHash { return uint8(relayer.EventStatusNew), nil diff --git a/packages/relayer/mock/event_repository.go b/packages/relayer/mock/event_repository.go index b0dd7aa416..d9f8f921ec 100644 --- a/packages/relayer/mock/event_repository.go +++ b/packages/relayer/mock/event_repository.go @@ -22,11 +22,13 @@ func NewEventRepository() *EventRepository { } func (r *EventRepository) Save(ctx context.Context, opts relayer.SaveEventOpts) (*relayer.Event, error) { r.events = append(r.events, &relayer.Event{ - ID: rand.Int(), // nolint: gosec - Data: datatypes.JSON(opts.Data), - Status: opts.Status, - ChainID: opts.ChainID.Int64(), - Name: opts.Name, + ID: rand.Int(), // nolint: gosec + Data: datatypes.JSON(opts.Data), + Status: opts.Status, + ChainID: opts.ChainID.Int64(), + Name: opts.Name, + MessageOwner: opts.MessageOwner, + MsgHash: opts.MsgHash, }) return nil, nil @@ -121,3 +123,18 @@ func (r *EventRepository) FindAllByAddress( return events, nil } + +func (r *EventRepository) FindAllByMsgHash( + ctx context.Context, + msgHash string, +) ([]*relayer.Event, error) { + events := make([]*relayer.Event, 0) + + for _, e := range r.events { + if e.MsgHash == msgHash { + events = append(events, e) + } + } + + return events, nil +} diff --git a/packages/relayer/repo/event.go b/packages/relayer/repo/event.go index c0dd7d7bce..5220692d31 100644 --- a/packages/relayer/repo/event.go +++ b/packages/relayer/repo/event.go @@ -37,6 +37,8 @@ func (r *EventRepository) Save(ctx context.Context, opts relayer.SaveEventOpts) CanonicalTokenName: opts.CanonicalTokenName, CanonicalTokenDecimals: opts.CanonicalTokenDecimals, Amount: opts.Amount, + MsgHash: opts.MsgHash, + MessageOwner: opts.MessageOwner, } if err := r.db.GormDB().Create(e).Error; err != nil { return nil, errors.Wrap(err, "r.db.Create") @@ -59,18 +61,37 @@ func (r *EventRepository) UpdateStatus(ctx context.Context, id int, status relay return nil } +func (r *EventRepository) FindAllByMsgHash( + ctx context.Context, + msgHash string, +) ([]*relayer.Event, error) { + e := make([]*relayer.Event, 0) + // find all message sent events + if err := r.db.GormDB().Where("msg_hash = ?", msgHash). + Find(&e).Error; err != nil { + return nil, errors.Wrap(err, "r.db.Find") + } + + // find all message status changed events + + return e, nil +} + func (r *EventRepository) FindAllByAddressAndChainID( ctx context.Context, chainID *big.Int, address common.Address, ) ([]*relayer.Event, error) { e := make([]*relayer.Event, 0) + // find all message sent events if err := r.db.GormDB().Where("chain_id = ?", chainID.Int64()). - Find(&e, datatypes.JSONQuery("data"). - Equals(strings.ToLower(address.Hex()), "Message", "Owner")).Error; err != nil { + Where("message_owner = ?", strings.ToLower(address.Hex())). + Find(&e).Error; err != nil { return nil, errors.Wrap(err, "r.db.Find") } + // find all message status changed events + return e, nil } diff --git a/packages/relayer/repo/event_test.go b/packages/relayer/repo/event_test.go index 172efa1290..2a8e6f67d9 100644 --- a/packages/relayer/repo/event_test.go +++ b/packages/relayer/repo/event_test.go @@ -65,6 +65,8 @@ func TestIntegration_Event_Save(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: "0x1", }, nil, }, @@ -121,6 +123,8 @@ func TestIntegration_Event_UpdateStatus(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: "0x1", }, ) assert.Equal(t, nil, err) @@ -152,6 +156,8 @@ func TestIntegration_Event_FindAllByAddressAndChainID(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }) assert.Equal(t, nil, err) tests := []struct { @@ -179,6 +185,8 @@ func TestIntegration_Event_FindAllByAddressAndChainID(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }, }, nil, @@ -230,6 +238,8 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }) assert.Equal(t, nil, err) tests := []struct { @@ -255,6 +265,8 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { CanonicalTokenName: "Ethereum", CanonicalTokenDecimals: 18, Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), }, }, nil, @@ -275,3 +287,75 @@ func TestIntegration_Event_FindAllByAddress(t *testing.T) { }) } } + +func TestIntegration_Event_FindAllByMsgHash(t *testing.T) { + db, close, err := testMysql(t) + assert.Equal(t, nil, err) + + defer close() + + eventRepo, err := NewEventRepository(db) + assert.Equal(t, nil, err) + + addr := common.HexToAddress("0x71C7656EC7ab88b098defB751B7401B5f6d8976F") + + _, err = eventRepo.Save(context.Background(), relayer.SaveEventOpts{ + Name: "name", + Data: fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())), + ChainID: big.NewInt(1), + Status: relayer.EventStatusDone, + EventType: relayer.EventTypeSendETH, + CanonicalTokenAddress: "0x1", + CanonicalTokenSymbol: "ETH", + CanonicalTokenName: "Ethereum", + CanonicalTokenDecimals: 18, + Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), + }) + assert.Equal(t, nil, err) + tests := []struct { + name string + msgHash string + wantResp []*relayer.Event + wantErr error + }{ + { + "success", + "0x1", + []*relayer.Event{ + { + ID: 1, + Name: "name", + // nolint lll + Data: datatypes.JSON([]byte(fmt.Sprintf(`{"Message": {"Owner": "%s"}}`, strings.ToLower(addr.Hex())))), + ChainID: 1, + Status: relayer.EventStatusDone, + EventType: relayer.EventTypeSendETH, + CanonicalTokenAddress: "0x1", + CanonicalTokenSymbol: "ETH", + CanonicalTokenName: "Ethereum", + CanonicalTokenDecimals: 18, + Amount: "1", + MsgHash: "0x1", + MessageOwner: addr.Hex(), + }, + }, + nil, + }, + { + "noneByMgHash", + "0xfake", + []*relayer.Event{}, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resp, err := eventRepo.FindAllByMsgHash(context.Background(), tt.msgHash) + assert.Equal(t, tt.wantResp, resp) + assert.Equal(t, tt.wantErr, err) + }) + } +}