diff --git a/acorus.go b/acorus.go index 0ebbd04..5622428 100644 --- a/acorus.go +++ b/acorus.go @@ -255,7 +255,7 @@ func (as *Acorus) initSynchronizer(config *config.Config) error { rpcItem := config.RPCs[i] cfg := synchronizer.Config{ LoopIntervalMsec: 5, - HeaderBufferSize: 500, + HeaderBufferSize: 50, ConfirmationDepth: big.NewInt(int64(1)), StartHeight: big.NewInt(int64(rpcItem.StartBlock)), ChainId: uint(rpcItem.ChainId), diff --git a/acorus.yaml b/acorus.yaml index 6290cc5..90e8b19 100644 --- a/acorus.yaml +++ b/acorus.yaml @@ -1,64 +1,38 @@ slave_db_enable: false server: - host: 127.0.0.1 + host: 0.0.0.0 port: 8080 metrics: - host: 127.0.0.1 + host: 0.0.0.0 port: 8081 - rpcs: - rpc_url: 'https://eth-sepolia.g.alchemy.com/v2/o0zYyGdlny_0bvOGS8qtNdMG4REyM0I_' - chain_id: 11155111 - start_block: 5218234 + chain_id: 1 + start_block: 5237021 contracts: event_contracts: - # - rpc_url: 'https://rpc.ankr.com/optimism/7f3ae11204e03961b67c557c4996244f0a53222b23c31a7baf9ae91c6bd89702' - # chain_id: 10 - # start_block: 114216017 - # contracts: - # event_contracts: + - rpc_url: 'https://opt-sepolia.g.alchemy.com/v2/FzuTJBqbf8-w1E6AAcPkWO1a2OGFX2DL' + chain_id: 10 + start_block: 7746768 + contracts: + event_contracts: -# - rpc_url: "https://polygonzkevm-mainnet.g.alchemy.com/v2/XJph3lRc3_xJN3kHX7Gy0PkvkPbJD_GD" -# chain_id: 1101 -# start_block: 9634600 -# l1_event_unpack_block: 19147000 -# contracts: -# - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" -# event_contracts: -# - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" -# - rpc_url: 'https://rpc.ankr.com/scroll/7f3ae11204e03961b67c557c4996244f0a53222b23c31a7baf9ae91c6bd89702' -# chain_id: 534352 -# start_block: 2950000 -# l1_event_unpack_block: 19126200 -# contracts: -# - "0xF8B1378579659D8F7EE5f3C929c2f3E332E41Fd6" -# - "0x6774Bcbd5ceCeF1336b5300fb5186a12DDD8b367" -# - "0x7F2b8C31F88B6006c382775eea88297Ec1e3E905" -# - "0x7AC440cAe8EB6328de4fA621163a792c1EA9D4fE" -# - "0xD8A791fE2bE73eb6E6cF1eb0cb3F36adC9B3F8f9" -# - "0xb2b10a289A229415a124EFDeF310C10cb004B6ff" -# - "0x6260aF48e8948617b8FA17F4e5CEa2d21D21554B" -# - "0xb94f7F6ABcb811c5Ac709dE14E37590fcCd975B6" -# - "0x37ba659D6CC380D12Fb96567CC52FC8e1DF4E334" -# - "0x892dDB2899325aBBA1fD00FDA8249B40Cbbc33F9" -# - "0xD8dD7787f89c7E6243AD32E0d0cCf460243C8130" -# event_contracts: -# - "0x4C0926FF5252A435FD19e10ED15e5a249Ba19d79" -# - "0x781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC" -# - "0x6EA73e05AdC79974B931123675ea8F78FfdacDF0" -# - "0x7003E7B7186f0E6601203b99F7B8DECBfA391cf9" -# - "0xE2b4795039517653c5Ae8C2A9BFdd783b48f447A" -# - "0x64CCBE37c9A82D85A1F2E74649b7A42923067988" -# - "0x7bC08E1c04fb41d75F1410363F0c5746Eae80582" -# - "0x62597Cc19703aF10B58feF87B0d5D29eFE263bcc" -# - "0x987e300fDfb06093859358522a79098848C33852" -# - "0xE9c5C9f67ec7B773fC76440845751F657bb953FF" -# - "0xC5034eB8F682b73F93C9246aa95A8eBbF82793aA" -# - "0x987e300fDfb06093859358522a79098848C33852" + - rpc_url: "https://polygonzkevm-testnet.g.alchemy.com/v2/dmeCZ-Yf1YjIrgExOXbfrTF_Z_51GyBx" + chain_id: 1101 + start_block: 3961436 + l1_event_unpack_block: 5237021 + contracts: + event_contracts: + - rpc_url: 'https://rpc.ankr.com/scroll_sepolia_testnet/7f3ae11204e03961b67c557c4996244f0a53222b23c31a7baf9ae91c6bd89702' + chain_id: 534352 + start_block: 2945654 + l1_event_unpack_block: 5237021 + contracts: + event_contracts: master_db: db_host: "postgresql" @@ -78,12 +52,24 @@ slave_db: bridge_grpc_url: "selaginella-app:8080" relayers: - - chain_id: 11155111 + - chain_id: 1 eventStartBlock: 5218234 contracts: - "0x44c8EDB79089Fe99cfb79F8284D9f9CD83B8Fbc5" - "0xAC13457009a2aE49820Bd7517818436dE2202549" -# - chain_id: 1101 -# eventStartBlock: 9634600 -# contracts: -# - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" + - chain_id: 1101 + eventStartBlock: 5237021 + contracts: + - "0x8fEE59c274eA82fAc9280FeB04B7364A80c63BA6" + - "0x03C43a778A4157c8781aA1bc645F2703B224b9d8" + - chain_id: 534352 + eventStartBlock: 2945654 + contracts: + - "0x82Ec3E6DBf43b6D6751b0645f1Ff06AEaD426dd1" + - "0xf6f3Ef5419F5B3e7655a6D81bad5B33dad26A4AE" + - chain_id: 10 + eventStartBlock: 7746768 + contracts: + - "0x82Ec3E6DBf43b6D6751b0645f1Ff06AEaD426dd1" + - "0xf6f3Ef5419F5B3e7655a6D81bad5B33dad26A4AE" + diff --git a/acorus_prod.yaml b/acorus_prod.yaml new file mode 100644 index 0000000..be11ad2 --- /dev/null +++ b/acorus_prod.yaml @@ -0,0 +1,94 @@ +slave_db_enable: false + +server: + host: 127.0.0.1 + port: 8080 + +metrics: + host: 127.0.0.1 + port: 8081 + +rpcs: + - rpc_url: 'https://eth-sepolia.g.alchemy.com/v2/o0zYyGdlny_0bvOGS8qtNdMG4REyM0I_' + chain_id: 11155111 + start_block: 5232909 + contracts: + event_contracts: + + - rpc_url: 'https://opt-sepolia.g.alchemy.com/v2/FzuTJBqbf8-w1E6AAcPkWO1a2OGFX2DL' + chain_id: 10 + start_block: 114216017 + contracts: + event_contracts: + + - rpc_url: "https://polygonzkevm-testnet.g.alchemy.com/v2/dmeCZ-Yf1YjIrgExOXbfrTF_Z_51GyBx" + chain_id: 1101 + start_block: 9634600 + l1_event_unpack_block: 19147000 + contracts: + - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" + event_contracts: + - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" + - + - rpc_url: 'https://rpc.ankr.com/scroll_sepolia_testnet' + chain_id: 534351 + start_block: 2950000 + l1_event_unpack_block: 19126200 + contracts: + - "0xF8B1378579659D8F7EE5f3C929c2f3E332E41Fd6" + - "0x6774Bcbd5ceCeF1336b5300fb5186a12DDD8b367" + - "0x7F2b8C31F88B6006c382775eea88297Ec1e3E905" + - "0x7AC440cAe8EB6328de4fA621163a792c1EA9D4fE" + - "0xD8A791fE2bE73eb6E6cF1eb0cb3F36adC9B3F8f9" + - "0xb2b10a289A229415a124EFDeF310C10cb004B6ff" + - "0x6260aF48e8948617b8FA17F4e5CEa2d21D21554B" + - "0xb94f7F6ABcb811c5Ac709dE14E37590fcCd975B6" + - "0x37ba659D6CC380D12Fb96567CC52FC8e1DF4E334" + - "0x892dDB2899325aBBA1fD00FDA8249B40Cbbc33F9" + - "0xD8dD7787f89c7E6243AD32E0d0cCf460243C8130" + event_contracts: + - "0x4C0926FF5252A435FD19e10ED15e5a249Ba19d79" + - "0x781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC" + - "0x6EA73e05AdC79974B931123675ea8F78FfdacDF0" + - "0x7003E7B7186f0E6601203b99F7B8DECBfA391cf9" + - "0xE2b4795039517653c5Ae8C2A9BFdd783b48f447A" + - "0x64CCBE37c9A82D85A1F2E74649b7A42923067988" + - "0x7bC08E1c04fb41d75F1410363F0c5746Eae80582" + - "0x62597Cc19703aF10B58feF87B0d5D29eFE263bcc" + - "0x987e300fDfb06093859358522a79098848C33852" + - "0xE9c5C9f67ec7B773fC76440845751F657bb953FF" + - "0xC5034eB8F682b73F93C9246aa95A8eBbF82793aA" + - "0x987e300fDfb06093859358522a79098848C33852" + +master_db: + db_host: "postgresql" + db_port: 5432 + db_user: "postgres" + db_password: "vVb9NhLYui" + db_name: "acorus" + +slave_db: + db_host: "127.0.0.1" + db_port: 5432 + db_user: "graphnode" + db_password: "" + db_name: "acorus" + + +bridge_grpc_url: "selaginella-app:8080" + +relayers: + - chain_id: 11155111 + eventStartBlock: 5218234 + contracts: + - "0x44c8EDB79089Fe99cfb79F8284D9f9CD83B8Fbc5" + - "0xAC13457009a2aE49820Bd7517818436dE2202549" + - chain_id: 1101 + eventStartBlock: 9634600 + contracts: + - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" + - chain_id: 534351 + eventStartBlock: 9634600 + contracts: + - "0x2a3DD3EB832aF982ec71669E178424b10Dca2EDe" + diff --git a/common/global_const/global_const.go b/common/global_const/global_const.go index a14bfa0..160ef0d 100644 --- a/common/global_const/global_const.go +++ b/common/global_const/global_const.go @@ -8,6 +8,7 @@ const ( GormInfoFmt = "%s\n[%.3fms] [rows:%v] %s" ZeroAddress = "0x0000000000000000000000000000000000000000" WEthAddress = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" + SepoliaWETH = "" LogTimeFormat = "2006-01-02" LayerTypeOne = 1 LayerTypeTwo = 2 diff --git a/database/relayer/bridge_msg_sent.go b/database/relayer/bridge_msg_sent.go index fa87aef..cc3275e 100644 --- a/database/relayer/bridge_msg_sent.go +++ b/database/relayer/bridge_msg_sent.go @@ -7,19 +7,21 @@ import ( ) type BridgeMsgSent struct { - GUID string `json:"guid" gorm:"primaryKey;DEFAULT replace(uuid_generate_v4()::text,'-','')"` - TxHash common.Hash `json:"tx_hash" gorm:"serializer:bytes"` - MsgHash common.Hash `json:"msg_hash" gorm:"serializer:bytes"` - DestHash common.Hash `json:"dest_hash" gorm:"serializer:bytes"` - DestBlockNumber *big.Int `json:"dest_block_number" gorm:"serializer:u256"` - DestTimestamp uint64 `json:"det_timestamp"` - DestToken common.Address `json:"dest_token" gorm:"serializer:bytes"` - Fee *big.Int `json:"fee" gorm:"serializer:u256"` - MsgNonce *big.Int `json:"msg_nonce" gorm:"serializer:u256"` - MsgHashRelation bool `json:"msg_hash_relation"` - BridgeRelation bool `json:"bridge_relation"` - ToBridgeRecord bool `json:"to_bridge_record"` - Data string `json:"data"` + GUID string `json:"guid" gorm:"primaryKey;DEFAULT replace(uuid_generate_v4()::text,'-','')"` + TxHash common.Hash `json:"tx_hash" gorm:"serializer:bytes"` + MsgHash common.Hash `json:"msg_hash" gorm:"serializer:bytes"` + DestHash common.Hash `json:"dest_hash" gorm:"serializer:bytes"` + DestBlockNumber *big.Int `json:"dest_block_number" gorm:"serializer:u256"` + DestTimestamp uint64 `json:"det_timestamp"` + DestToken common.Address `json:"dest_token" gorm:"serializer:bytes"` + Fee *big.Int `json:"fee" gorm:"serializer:u256"` + MsgNonce *big.Int `json:"msg_nonce" gorm:"serializer:u256"` + MsgHashRelation bool `json:"msg_hash_relation"` + BridgeRelation bool `json:"bridge_relation"` + ToChangeTransStatus bool `json:"to_change_trans_status"` + ToCrossStatus bool `json:"to_cross_status"` + ToBridgeRecord bool `json:"to_bridge_record"` + Data string `json:"data"` } func (BridgeMsgSent) TableName() string { @@ -35,10 +37,14 @@ type BridgeMsgSentDB interface { StoreBridgeMsgSent(msgSent BridgeMsgSent) error StoreBridgeMsgSents(msgSentList []BridgeMsgSent) error CleanMsgSent() error + UpdateChangeStatus(txHash string) error + UpdateCrossStatus(txHash string) error } type BridgeMsgSentDBView interface { GetCanSaveDecodeList() (mList []BridgeMsgSent, err error) + GetCanCrossDataList() []BridgeMsgSent + GetCanChangeTransStatusList() []BridgeMsgSent } func NewBridgeMsgSentDB(db *gorm.DB) BridgeMsgSentDB { @@ -61,7 +67,7 @@ func (db bridgeMsgSentDB) GetCanSaveDecodeList() (mList []BridgeMsgSent, err err var msgSentList []BridgeMsgSent selectSql := ` SELECT DISTINCT ON ("a"."tx_hash") * FROM bridge_msg_sent "a" WHERE "a"."msg_hash_relation" = true - AND "a"."bridge_relation" = true AND "a"."to_bridge_record" = false order by msg_sent_timestamp desc LIMIT 1000 + AND "a"."bridge_relation" = true AND "a"."to_bridge_record" = false LIMIT 1000 ` result := db.gorm.Raw(selectSql).Find(&msgSentList) if result.Error != nil { @@ -79,3 +85,42 @@ func (db bridgeMsgSentDB) CleanMsgSent() error { err := db.gorm.Exec(cleanSql).Error return err } + +func (db bridgeMsgSentDB) GetCanCrossDataList() []BridgeMsgSent { + getSql := ` + select DISTINCT ON ("a"."tx_hash") * from bridge_msg_sent a where a.bridge_relation=false + and a.to_bridge_record = false and a.to_cross_status = false and a.msg_hash_relation = true LIMIT 20 + ` + var bridgeMsgSent []BridgeMsgSent + result := db.gorm.Raw(getSql).Find(&bridgeMsgSent) + err := result.Error + if err != nil { + return nil + } + return bridgeMsgSent +} + +func (db bridgeMsgSentDB) GetCanChangeTransStatusList() []BridgeMsgSent { + getSql := ` + select DISTINCT ON ("a"."tx_hash") * from bridge_msg_sent a where a.bridge_relation=true + and a.to_change_trans_status = false and a.msg_hash_relation = true LIMIT 20 + ` + var bridgeMsgSent []BridgeMsgSent + result := db.gorm.Raw(getSql).Find(&bridgeMsgSent) + err := result.Error + if err != nil { + return nil + } + return bridgeMsgSent +} + +func (db bridgeMsgSentDB) UpdateChangeStatus(txHash string) error { + msgSentRecord := new(BridgeMsgSent) + result := db.gorm.Table(msgSentRecord.TableName()).Where("tx_hash = ?", txHash).Updates(map[string]interface{}{"to_change_trans_status": true}) + return result.Error +} +func (db bridgeMsgSentDB) UpdateCrossStatus(txHash string) error { + msgSentRecord := new(BridgeMsgSent) + result := db.gorm.Table(msgSentRecord.TableName()).Where("tx_hash = ?", txHash).Updates(map[string]interface{}{"to_cross_status": true}) + return result.Error +} diff --git a/database/relayer/bridge_record.go b/database/relayer/bridge_record.go index 9afbdb1..fb8789a 100644 --- a/database/relayer/bridge_record.go +++ b/database/relayer/bridge_record.go @@ -28,7 +28,6 @@ type BridgeRecords struct { Amount *big.Int `json:"amount" gorm:"serializer:u256"` Nonce *big.Int `json:"nonce" gorm:"serializer:u256"` Fee *big.Int `json:"fee" gorm:"serializer:u256"` - OperaType int `json:"opera_type"` AssetType int `json:"asset_type"` MsgSentTimestamp uint64 `json:"msg_sent_timestamp"` ClaimTimestamp uint64 `json:"claim_timestamp"` diff --git a/migrations/00001_create_schema.sql b/migrations/00001_create_schema.sql index aedb02d..78529cc 100644 --- a/migrations/00001_create_schema.sql +++ b/migrations/00001_create_schema.sql @@ -316,7 +316,6 @@ CREATE TABLE IF NOT EXISTS bridge_record nonce UINT256, fee UINT256, asset_type SMALLINT NOT NULL, - opera_type smallint not null, msg_sent_timestamp INTEGER, claim_timestamp INTEGER ); @@ -351,6 +350,8 @@ create table if not exists bridge_msg_sent msg_hash_relation boolean default false, bridge_relation boolean default false, to_bridge_record boolean default false, + to_change_trans_status boolean default false, + to_cross_status boolean default false, data varchar ); diff --git a/relayer/relayer.go b/relayer/relayer.go index a795826..2b1efbb 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -84,8 +84,6 @@ func (rl *RelayerListener) Start() error { }) } else { relayerEventOn2 := time.NewTicker(rl.loopInterval) - tickerBridgeRel := time.NewTicker(rl.loopInterval) - rl.tasks.Go(func() error { for range relayerEventOn2.C { err := rl.onL2Data() @@ -96,19 +94,43 @@ func (rl *RelayerListener) Start() error { } return nil }) + } + tickerBridgeRel := time.NewTicker(rl.loopInterval) + // start relation worker + rl.tasks.Go(func() error { + for range tickerBridgeRel.C { + err := rl.relationBridge() + if err != nil { + log.Println("shutting down relationBridge") + return err + } + } + return nil + }) - // start relation worker - rl.tasks.Go(func() error { - for range tickerBridgeRel.C { - err := rl.relationBridge() - if err != nil { - log.Println("shutting down relationBridge") - return err - } + tickerCross := time.NewTicker(10 * time.Second) + rl.tasks.Go(func() error { + for range tickerCross.C { + err := rl.CrossChainTransfer() + if err != nil { + log.Println("shutting down relationBridge") + return err } - return nil - }) - } + } + return nil + }) + + tickerTrans := time.NewTicker(10 * time.Second) + rl.tasks.Go(func() error { + for range tickerTrans.C { + err := rl.ChangeTransferStatus() + if err != nil { + log.Println("shutting down relationBridge") + return err + } + } + return nil + }) return nil } @@ -326,18 +348,21 @@ func (rl *RelayerListener) eventUnpack(event event.ContractEvent) error { func (rl *RelayerListener) relationBridge() error { if err := rl.db.Transaction(func(tx *database.DB) error { // step 1 + log.Println("RelationClaim") err := rl.db.BridgeFinalize.RelationClaim() if err != nil { log.Println("relationBridge failed", "err", err) return err } // step 2 + log.Println("RelationMsgHash") err = rl.db.BridgeMsgHash.RelationMsgHash() if err != nil { log.Println("relationBridge failed", "err", err) return err } // step 3 + log.Println("RelationMsgSent") err = rl.db.BridgeClaim.RelationMsgSent() if err != nil { log.Println("RelationMsgSent failed", "err", err) @@ -364,11 +389,13 @@ func (rl *RelayerListener) relationBridge() error { bridgeRecordSaveList = append(bridgeRecordSaveList, bridgeRecord) } if len(bridgeRecordSaveList) > 0 { + log.Println("StoreBridgeRecords") err = rl.db.BridgeRecord.StoreBridgeRecords(bridgeRecordSaveList) if err != nil { return err } } + log.Println("CleanMsgSent") err = rl.db.BridgeMsgSent.CleanMsgSent() if err != nil { return err @@ -379,3 +406,59 @@ func (rl *RelayerListener) relationBridge() error { } return nil } + +func (rl *RelayerListener) CrossChainTransfer() error { + list := rl.db.BridgeMsgSent.GetCanCrossDataList() + if len(list) > 0 { + for _, v := range list { + data := v.Data + var bridgeRecord relayer.BridgeRecords + if unMarErr := json.Unmarshal([]byte(data), &bridgeRecord); unMarErr != nil { + continue + } + sourceChainId := bridgeRecord.SourceChainId + destChainId := bridgeRecord.DestChainId + amount := bridgeRecord.Amount.String() + fee := bridgeRecord.Fee.String() + nonce := bridgeRecord.Nonce.String() + to := bridgeRecord.To.String() + tokenAddress := bridgeRecord.SourceTokenAddress.String() + transfer, err := rl.bridgeRpcService.CrossChainTransfer(sourceChainId, destChainId, amount, to, tokenAddress, fee, nonce) + if err != nil { + continue + } + log.Println("CrossChainTransfer", "transfer", transfer.Success) + // todo if call rpc fail ,need add retry times + if transfer.Success { + rl.db.BridgeMsgSent.UpdateCrossStatus(v.TxHash.String()) + } + } + } + return nil +} + +func (rl *RelayerListener) ChangeTransferStatus() error { + list := rl.db.BridgeMsgSent.GetCanChangeTransStatusList() + if len(list) > 0 { + for _, v := range list { + data := v.Data + var bridgeRecord relayer.BridgeRecords + if unMarErr := json.Unmarshal([]byte(data), &bridgeRecord); unMarErr != nil { + continue + } + sourceChainId := bridgeRecord.SourceChainId + destChainId := bridgeRecord.DestChainId + hash := v.DestHash.String() + changeStatus, err := rl.bridgeRpcService.ChangeTransferStatus(sourceChainId, destChainId, hash) + if err != nil { + continue + } + log.Println("ChangeTransferStatus", "changeStatus", changeStatus.Success) + // todo if call rpc fail ,need add retry times + if changeStatus.Success { + rl.db.BridgeMsgSent.UpdateChangeStatus(v.TxHash.String()) + } + } + } + return nil +} diff --git a/relayer/unpack/log_unpack.go b/relayer/unpack/log_unpack.go index 4417323..0179014 100644 --- a/relayer/unpack/log_unpack.go +++ b/relayer/unpack/log_unpack.go @@ -66,7 +66,6 @@ func InitiateETH(event event.ContractEvent, db *database.DB) error { MsgHash: common.Hash{}, MsgSentTimestamp: event.Timestamp, Status: 0, - OperaType: global_const.BridgeOperaInitType, SourceTxHash: rlpLog.TxHash, SourceBlockNumber: big.NewInt(int64(rlpLog.BlockNumber)), } @@ -101,7 +100,6 @@ func InitiateWETH(event event.ContractEvent, db *database.DB) error { MsgHash: common.Hash{}, MsgSentTimestamp: event.Timestamp, Status: 0, - OperaType: global_const.BridgeOperaInitType, SourceTxHash: rlpLog.TxHash, SourceBlockNumber: big.NewInt(int64(rlpLog.BlockNumber)), } @@ -134,7 +132,6 @@ func InitiateERC20(event event.ContractEvent, db *database.DB) error { DestTokenAddress: common.Address{}, MsgHash: common.Hash{}, MsgSentTimestamp: event.Timestamp, - OperaType: global_const.BridgeOperaInitType, Status: 0, SourceTxHash: rlpLog.TxHash, SourceBlockNumber: big.NewInt(int64(rlpLog.BlockNumber)),