Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: subscribe history logs #27439

Open
wants to merge 60 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
ea0d0b1
eth/filters: wip
jsvisa Jun 7, 2023
013f13a
eth/filters: do hist sync
jsvisa Jun 7, 2023
7258162
eth/filters: use a sole context
jsvisa Jun 8, 2023
fe9287d
eth/filters: restruct
jsvisa Jun 8, 2023
0b470f1
eth/filters: more accurate check of live or hist
jsvisa Jun 8, 2023
b83d6ff
eth/filters: handle toBlock
jsvisa Jun 8, 2023
054436f
eth/filters: notify with error check
jsvisa Jun 8, 2023
baf6f7a
eth/filters: local variable
jsvisa Jun 9, 2023
d4be045
eth/filters: wrap Notify, for unit test
jsvisa Jun 9, 2023
db89164
eth/filters: move inner function as private method
jsvisa Jun 11, 2023
bbf7323
eth/filters: add the first logs testcase
jsvisa Jun 11, 2023
f35525d
eth/filters: more testcase
jsvisa Jun 13, 2023
69d0ae7
eth/filters: header is nullable
jsvisa Jun 13, 2023
93620fe
eth/filters: check hist only mode
jsvisa Jun 13, 2023
a802250
eth/filters: go lint
jsvisa Jun 13, 2023
fc7d086
fix minor bug
s1na Jun 14, 2023
180bcbb
eth/filters: add notifier interface doc
jsvisa Jun 26, 2023
f652f2c
eth/filters: logs subscribe donot support to criteria
jsvisa Jun 26, 2023
a8b70b9
eth/filters: dry
jsvisa Jun 26, 2023
335d085
eth/filters: fix the issue of wrong check on from block
jsvisa Jun 26, 2023
616f4ff
eth/filters: more testcase
jsvisa Jun 26, 2023
10ce0e4
eth/filters: subscribe pending logs
jsvisa Jun 26, 2023
4dde82d
eth/filters: testing pending with latest
jsvisa Jun 26, 2023
e6394e4
stricter range validation
s1na Jun 29, 2023
4e9ad2f
minor
s1na Jun 29, 2023
03a282a
eth/filters: reorg logs
jsvisa Jul 4, 2023
ce56b7a
fix tests
s1na Jul 4, 2023
b92dc75
add a live log to the test
s1na Jul 4, 2023
48ea3e7
eth/filters: shadow copy && reset reorglog
jsvisa Jul 4, 2023
04f63d6
add comment
s1na Jul 4, 2023
298d7c8
add test case with reorg
s1na Jul 6, 2023
aeaa1ee
fix and simplify reorged logs handling
s1na Jul 6, 2023
593031f
minor fix
s1na Jul 7, 2023
7ef6c8e
fix live log in test
s1na Jul 7, 2023
533b71b
eth/filters: live && hist coworking
jsvisa Jul 24, 2023
192a983
eth/filters: pass reorg filter
jsvisa Jul 25, 2023
a6cd23d
eth/filters: don't print logs
jsvisa Jul 25, 2023
a19dbc5
eth/filters: add reason for check with balance instead of logs
jsvisa Jul 25, 2023
5092b51
eth/filters: almost working
jsvisa Jul 25, 2023
c33c3f0
eth/filters: deliver the same block logs if reorg occured
jsvisa Jul 25, 2023
582df42
eth/filters: dry
jsvisa Jul 25, 2023
5d90ff5
eth/filters: dry 2
jsvisa Jul 25, 2023
6897bd0
eth/filters: history logs should be requested > head
jsvisa Jul 25, 2023
ec81063
eth/filters: test with N logs in one block
jsvisa Jul 25, 2023
c82cf9d
eth/filters: check reorgBlock in idempotent
jsvisa Jul 25, 2023
2afe950
eth/filters: dry testcases
jsvisa Jul 25, 2023
eaaf9f3
eth/filters: more testcase
jsvisa Jul 25, 2023
ee71ca6
eth/filters: balanceDiffer as private function
jsvisa Jul 25, 2023
8ddb10d
eth/filters: reuse genesisAlloc
jsvisa Jul 25, 2023
4f576d0
eth/filters: ethclient set toBlock=nil to latest
jsvisa Aug 9, 2023
2f8becb
refactors
s1na Aug 10, 2023
2df25bc
refactor notifying logs
s1na Aug 10, 2023
83c0c7c
send logs of each block together
s1na Aug 11, 2023
205b696
fix fetch range err handling
s1na Aug 11, 2023
ae5b814
fix future reorged log edge case
s1na Aug 14, 2023
fc3e830
minor
s1na Aug 29, 2023
e195ee4
eth/filters: add some newlines
fjl Sep 14, 2023
f823768
rm reorg param, re-use liveOnly
s1na Sep 18, 2023
6a7706e
refactor
s1na Sep 18, 2023
97bbe05
track delivered blocks
s1na Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 278 additions & 13 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,25 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
logger "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
errConnectDropped = errors.New("connection dropped")
errInvalidToBlock = errors.New("log subscription does not support history block range")
errInvalidFromBlock = errors.New("from block can be only a number, or \"safe\", or \"finalized\"")
errClientUnsubscribed = errors.New("client unsubscribed")
)

const (
// maxTrackedBlocks is the number of block hashes that will be tracked by subscription.
maxTrackedBlocks = 32 * 1024
)

// filter is a helper struct that holds meta information over the filter type
Expand Down Expand Up @@ -246,31 +257,68 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
// notifier is used for broadcasting data(eg: logs) to rpc receivers
// used in unit testing.
type notifier interface {
Notify(id rpc.ID, data interface{}) error
Closed() <-chan interface{}
}

// Logs creates a subscription that fires for all historical
// and new logs that match the given filter criteria.
func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
err := api.logs(ctx, notifier, rpcSub, crit)
return rpcSub, err
}

var (
rpcSub = notifier.CreateSubscription()
matchedLogs = make(chan []*types.Log)
)
// logs is the inner implementation of logs subscription.
// The following criteria are valid:
// * from: nil, to: nil -> yield live logs.
// * from: blockNum | safe | finalized, to: nil -> historical beginning at `from` to head, then live logs.
// * Every other case should fail with an error.
func (api *FilterAPI) logs(ctx context.Context, notifier notifier, rpcSub *rpc.Subscription, crit FilterCriteria) error {
if crit.ToBlock != nil {
return errInvalidToBlock
}
if crit.FromBlock == nil {
return api.liveLogs(notifier, rpcSub, crit)
Comment on lines +288 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on input, you take a ctx. But the ctx is lost, afaict you don't check for ctx timeouts or cancel. Shouldn't you? What is the lifecycle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the ctx will be canceled when the subscription goroutine returns, so we instantiated a new background context to control the workflow.

The lifecycle is as below:

  1. do historical logs fetch and push;
  2. do live logs push;
  3. terminate if the subscription was canceled or the push channel was broken;

}
from := rpc.BlockNumber(crit.FromBlock.Int64())
switch from {
case rpc.LatestBlockNumber, rpc.PendingBlockNumber:
return errInvalidFromBlock
case rpc.SafeBlockNumber, rpc.FinalizedBlockNumber:
header, err := api.sys.backend.HeaderByNumber(ctx, from)
if err != nil {
return err
}
from = rpc.BlockNumber(header.Number.Int64())
}
if from < 0 {
return errInvalidFromBlock
}
return api.histLogs(notifier, rpcSub, int64(from), crit)
}

// liveLogs only retrieves live logs.
func (api *FilterAPI) liveLogs(notifier notifier, rpcSub *rpc.Subscription, crit FilterCriteria) error {
matchedLogs := make(chan []*types.Log)
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
if err != nil {
return nil, err
return err
}

go func() {
for {
select {
case logs := <-matchedLogs:
for _, log := range logs {
log := log
notifier.Notify(rpcSub.ID, &log)
}
notifyLogsIf(notifier, rpcSub.ID, logs, nil)

case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe()
return
Expand All @@ -280,8 +328,225 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
}
}
}()
return nil
}

return rpcSub, nil
// histLogs retrieves logs older than current header.
func (api *FilterAPI) histLogs(notifier notifier, rpcSub *rpc.Subscription, from int64, crit FilterCriteria) error {
Comment on lines +334 to +335
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think histLogs is a bit wonky, is the benefit of reducing historicalLogs really worth it?

// Subscribe the Live logs
// if an ChainReorg occurred,
// we will first recv the old chain's deleted logs in descending order,
// and then the new chain's added logs in descending order
// see core/blockchain.go#reorg(oldHead *types.Header, newHead *types.Block) for more details
// if an reorg happened between `from` and `to`, we will need to think about some scenarios:
// 1. if a reorg occurs after the currently delivered block, then because this is happening in the future, has nothing to do with the current historical sync, we can just ignore it.
// 2. if a reorg occurs before the currently delivered block, then we need to stop the historical delivery, and send all replaced logs instead
var (
liveLogs = make(chan []*types.Log)
histLogs = make(chan []*types.Log)
histDone = make(chan error)
)
liveLogsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), liveLogs)
if err != nil {
return err
}

// The original request ctx will be canceled as soon as the parent goroutine
// returns a subscription. Use a new context instead.
ctx, cancel := context.WithCancel(context.Background())
go func() {
histDone <- api.doHistLogs(ctx, from, crit.Addresses, crit.Topics, histLogs)
}()

// Compose and notify the logs from liveLogs and histLogs
go func() {
defer func() {
liveLogsSub.Unsubscribe()
cancel()
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer notifier.Done()

var (
// delivered is the block number of the last historical log delivered.
delivered uint64

// liveMode is true when either:
// - all historical logs are delivered.
// - or, during history processing a reorg is detected.
liveMode bool

// reorgedBlockHash is the block hash of the reorg point. It is set when
// a reorg is detected in the future. It is used to detect if the history
// processor is sending stale logs.
reorgedBlockHash common.Hash

// hashes is used to track the hashes of the blocks that have been delivered.
// It is used as a guard to prevent duplicate logs as well as inaccurate "removed"
// logs being delivered during a reorg.
hashes = lru.NewBasicLRU[common.Hash, struct{}](maxTrackedBlocks)
)
for {
select {
case err := <-histDone:
if err != nil {
logger.Warn("History logs delivery failed", "err", err)
return
}
// Else historical logs are all delivered, let's switch to live mode
logger.Info("History logs delivery finished, and now enter into live mode", "delivered", delivered)
// TODO: It's theoretically possible that we miss logs due to
// asynchrony between the history processor and the chain subscription.
liveMode = true
histLogs = nil

case logs := <-liveLogs:
if len(logs) == 0 {
continue
}
// TODO: further reorgs are possible during history processing.
if !liveMode && logs[0].BlockNumber <= delivered {
// History is being processed and a reorg is encountered.
// From this point we ignore historical logs coming in and
// only send logs from the chain subscription.
logger.Info("Reorg detected", "reorgBlock", logs[0].BlockNumber, "delivered", delivered)
liveMode = true
}
if !liveMode {
if logs[0].Removed && reorgedBlockHash == (common.Hash{}) {
// Reorg in future. Remember fork point.
reorgedBlockHash = logs[0].BlockHash
}
// Implicit cases:
// - there was a reorg in future and blockchain is sending logs from the new chain.
// - history is still being processed and blockchain sends logs from the tip.
continue
}
// Removed logs from reorged chain, replacing logs or logs from tip of the chain.
notifyLogsIf(notifier, rpcSub.ID, logs, &hashes)

case logs := <-histLogs:
if len(logs) == 0 {
continue
}
if liveMode {
continue
}
if logs[0].BlockHash == reorgedBlockHash {
// We have reached the fork point and the historical producer
// is emitting old logs because of delay. Restart the process
// from last delivered block.
logger.Info("Restarting historical logs delivery", "from", logs[0].BlockNumber, "delivered", delivered)
liveLogsSub.Unsubscribe()
// Stop hist logs fetcher
cancel()
if err := api.histLogs(notifier, rpcSub, int64(logs[0].BlockNumber), crit); err != nil {
logger.Warn("failed to restart historical logs delivery", "err", err)
}
return
}
notifyLogsIf(notifier, rpcSub.ID, logs, &hashes)
// Assuming batch = all logs of a single block
delivered = logs[0].BlockNumber

case <-rpcSub.Err(): // client send an unsubscribe request
return
case <-notifier.Closed(): // connection dropped
return
}
}
}()

return nil
}

// doHistLogs retrieves the logs older than current header, and forward them to the histLogs channel.
func (api *FilterAPI) doHistLogs(ctx context.Context, from int64, addrs []common.Address, topics [][]common.Hash, histLogs chan<- []*types.Log) error {
// Fetch logs from a range of blocks.
fetchRange := func(from, to int64) error {
f := api.sys.NewRangeFilter(from, to, addrs, topics)
logsCh, errChan := f.rangeLogsAsync(ctx)
for {
select {
case logs := <-logsCh:
select {
case histLogs <- logs:
case <-ctx.Done():
// Flush out all logs until the range filter voluntarily exits.
continue
}
case err := <-errChan:
return err
}
}
}

for {
// Get the latest block header.
header := api.sys.backend.CurrentHeader()
if header == nil {
return errors.New("unexpected error: no header block found")
}
head := header.Number.Int64()
if from > head {
logger.Info("Finish historical sync", "from", from, "head", head)
return nil
}
if err := fetchRange(from, head); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("Historical logs delivery canceled", "from", from, "to", head)
return nil
}
return err
}
// Move forward to the next batch
from = head + 1
}
}

// notifyLogsIf sends logs to the notifier if the condition is met.
// It assumes all logs of the same block are either all removed or all added.
func notifyLogsIf(notifier notifier, id rpc.ID, logs []*types.Log, hashes *lru.BasicLRU[common.Hash, struct{}]) {
// Iterate logs and batch them by block hash.
type batch struct {
start int
end int
hash common.Hash
removed bool
}
var (
batches = make([]batch, 0)
h common.Hash
)
for i, log := range logs {
if h == log.BlockHash {
// Skip logs of seen block
continue
}
if len(batches) > 0 {
batches[len(batches)-1].end = i
}
batches = append(batches, batch{start: i, hash: log.BlockHash, removed: log.Removed})
h = log.BlockHash
}
// Close off last batch.
if batches[len(batches)-1].end == 0 {
batches[len(batches)-1].end = len(logs)
}
for _, batch := range batches {
if hashes != nil {
// During reorgs it's possible that logs from the new chain have been delivered.
// Avoid sending removed logs from the old chain and duplicate logs from new chain.
if batch.removed && !hashes.Contains(batch.hash) {
continue
}
if !batch.removed && hashes.Contains(batch.hash) {
continue
}
hashes.Add(batch.hash, struct{}{})
}
for _, log := range logs[batch.start:batch.end] {
log := log
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to deref the log here to copy it?
e.g. log := *log
Otherwise we just copy the pointer if I understand correctly

notifier.Notify(id, &log)
}
}
}

// FilterCriteria represents a request to create a new filter.
Expand Down
Loading