From 7183e217c089fd013f0691b01a3d206ffb7eeaf8 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal <93arpit@gmail.com> Date: Mon, 6 Apr 2020 11:40:14 +0530 Subject: [PATCH] fix: ethClient.SubscribeNewDeposit should accept StateData channel (#44) * ethClient.SubscribeNewDeposit should accept StateData channel * remove redundant function * Fix compilation error --- accounts/abi/bind/backends/simulated.go | 3 +++ consensus/bor/bor.go | 2 -- core/blockchain.go | 8 +++++++- core/types/transaction.go | 4 ---- eth/filters/api.go | 3 ++- eth/filters/filter_system.go | 5 ++--- ethclient/ethclient.go | 7 +++---- 7 files changed, 17 insertions(+), 15 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 58c2c4a44d95..a161fb0c9501 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -510,6 +510,9 @@ func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEve func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return fb.bc.SubscribeLogsEvent(ch) } +func (fb *filterBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + return fb.bc.SubscribeStateEvent(ch) +} func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 85f24f5b78b7..bd55a745e7c9 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -1161,7 +1161,6 @@ func (c *Bor) CommitStates( header *types.Header, chain core.ChainContext, ) error { - fmt.Println("comminting state") // get pending state proposals stateIds, err := c.GetPendingStateProposals(header.Number.Uint64() - 1) if err != nil { @@ -1240,7 +1239,6 @@ func (c *Bor) CommitStates( return nil } - // SubscribeStateEvent registers a subscription of ChainSideEvent. func (c *Bor) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { return c.scope.Track(c.stateDataFeed.Subscribe(ch)) diff --git a/core/blockchain.go b/core/blockchain.go index 34bbd400efde..e90ffbb30eac 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/golang-lru" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/mclock" "github.com/maticnetwork/bor/common/prque" @@ -42,7 +43,6 @@ import ( "github.com/maticnetwork/bor/params" "github.com/maticnetwork/bor/rlp" "github.com/maticnetwork/bor/trie" - "github.com/hashicorp/golang-lru" ) var ( @@ -142,6 +142,7 @@ type BlockChain struct { chainHeadFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed + stateDataFeed event.Feed scope event.SubscriptionScope genesisBlock *types.Block @@ -2177,6 +2178,11 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return bc.scope.Track(bc.logsFeed.Subscribe(ch)) } +// SubscribeStateEvent registers a subscription of ChainSideEvent. +func (bc *BlockChain) SubscribeStateEvent(ch chan<- NewStateChangeEvent) event.Subscription { + return bc.scope.Track(bc.stateDataFeed.Subscribe(ch)) +} + // SubscribeBlockProcessingEvent registers a subscription of bool where true means // block processing has started while false means it has stopped. func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { diff --git a/core/types/transaction.go b/core/types/transaction.go index f44c3ece369f..1481229b5ca2 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -79,10 +79,6 @@ type txdataMarshaling struct { S *hexutil.Big } -func (sd *StateData) StateData() *StateData { - return sd -} - func NewTransaction(nonce uint64, to common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction { return newTransaction(nonce, &to, amount, gasLimit, gasPrice, data) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 294be1cec380..d7d382d9e8a8 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,6 +17,7 @@ package filters import ( + "bytes" "context" "encoding/json" "errors" @@ -247,7 +248,7 @@ func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.Filte for { select { case h := <-stateData: - if crit.Did == h.Did || crit.Contract == h.Contract || + if crit.Did == h.Did || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || (crit.Did == 0 && crit.Contract == common.Address{}) { notifier.Notify(rpcSub.ID, h) } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 35efa4c8e7b9..deff4faea078 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -311,8 +311,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } -// SubscribeNewHeads creates a subscription that writes the header of a block that is -// imported in the chain. +// SubscribeNewDeposits creates a subscription that writes details about the new state sync events (from mainchain to Bor) func (es *EventSystem) SubscribeNewDeposits(stateData chan *types.StateData) *Subscription { sub := &subscription{ id: rpc.NewID(), @@ -387,7 +386,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } case core.NewStateChangeEvent: for _, f := range filters[StateSubscription] { - f.stateData <- e.StateData.StateData() + f.stateData <- e.StateData } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index ea55e9e7c7eb..5d349a289b86 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -324,10 +324,9 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) return ec.c.EthSubscribe(ctx, ch, "newHeads") } -// SubscribeNewHead subscribes to notifications about the current blockchain head -// on the given channel. -func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { - return ec.c.EthSubscribe(ctx, ch, "newDeposits") +// SubscribeNewDeposit subscribes to new state sync events +func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.StateData) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newDeposits", nil) } // State Access