Skip to content

Commit

Permalink
Delete all light client code in filter system (#1256)
Browse files Browse the repository at this point in the history
  • Loading branch information
anusha-ctrl authored Apr 20, 2023
1 parent cb1b65f commit 845aaa4
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 111 deletions.
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)
backend.events = filters.NewEventSystem(backend.filterSystem)

backend.rollback(blockchain.CurrentBlock())
return backend
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (s *Ethereum) APIs() []rpc.API {
Name: "eth",
}, {
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, false /* isLightClient */),
Service: filters.NewFilterAPI(filterSystem),
Name: "eth-filter",
}, {
Namespace: "admin",
Expand Down
4 changes: 2 additions & 2 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type FilterAPI struct {
}

// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
func NewFilterAPI(system *FilterSystem) *FilterAPI {
api := &FilterAPI{
sys: system,
events: NewEventSystem(system, lightMode),
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
}
Expand Down
101 changes: 3 additions & 98 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/bloombits"
"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/core/vm"
"github.com/ava-labs/coreth/ethdb"
Expand Down Expand Up @@ -169,10 +168,8 @@ type subscription struct {
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
backend Backend
sys *FilterSystem
lightMode bool
lastHead *types.Header
backend Backend
sys *FilterSystem

// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
Expand Down Expand Up @@ -203,11 +200,10 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
func NewEventSystem(sys *FilterSystem) *EventSystem {
m := &EventSystem{
sys: sys,
backend: sys.backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
Expand Down Expand Up @@ -544,103 +540,12 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
}

func (es *EventSystem) handleChainAcceptedEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[AcceptedBlocksSubscription] {
f.headers <- ev.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
}

func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
if oldh == nil {
return
}
newh := newHeader
// find common ancestor, create list of rolled back and new block hashes
var oldHeaders, newHeaders []*types.Header
for oldh.Hash() != newh.Hash() {
if oldh.Number.Uint64() >= newh.Number.Uint64() {
oldHeaders = append(oldHeaders, oldh)
oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
}
if oldh.Number.Uint64() < newh.Number.Uint64() {
newHeaders = append(newHeaders, newh)
newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
if newh == nil {
// happens when CHT syncing, nothing to do
newh = oldh
}
}
}
// roll back old blocks
for _, h := range oldHeaders {
callBack(h, true)
}
// check new blocks (array is in reverse order)
for i := len(newHeaders) - 1; i >= 0; i-- {
callBack(newHeaders[i], false)
}
}

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if bloomFilter(header.Bloom, addresses, topics) {
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
logsList, err := es.sys.getLogs(ctx, header.Hash(), header.Number.Uint64())
if err != nil {
return nil
}
var unfiltered []*types.Log
for _, logs := range logsList {
for _, log := range logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
// We have matching but non-derived logs
receipts, err := es.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil
}
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
for _, log := range receipt.Logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
}
logs = filterLogs(unfiltered, nil, nil, addresses, topics)
}
return logs
}
return nil
}

// eventLoop (un)installs filters and processes mux events.
Expand Down
18 changes: 9 additions & 9 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestBlockSubscription(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
genesis = &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.ApricotPhase4MinBaseFee),
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestPendingTxFilter(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)

transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)

testCases = []struct {
crit FilterCriteria
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
)

// different situations where log filter creation should fail.
Expand All @@ -377,7 +377,7 @@ func TestInvalidGetLogsRequest(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)

Expand All @@ -402,7 +402,7 @@ func TestLogFilter(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)

firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestPendingLogsSubscription(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)

firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
Expand Down Expand Up @@ -700,7 +700,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout})
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
done = make(chan struct{})
)

Expand Down Expand Up @@ -769,7 +769,7 @@ func TestGetLogsRegression(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newSectionedTestFilterSystem(t, db, Config{}, 4096)
api = NewFilterAPI(sys, false)
api = NewFilterAPI(sys)
genesis = &core.Genesis{
Config: params.TestChainConfig,
}
Expand Down

0 comments on commit 845aaa4

Please sign in to comment.