-
Notifications
You must be signed in to change notification settings - Fork 20.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eth/filters: subscribe history logs #27439
base: master
Are you sure you want to change the base?
Changes from all commits
ea0d0b1
013f13a
7258162
fe9287d
0b470f1
b83d6ff
054436f
baf6f7a
d4be045
db89164
bbf7323
f35525d
69d0ae7
93620fe
a802250
fc7d086
180bcbb
f652f2c
a8b70b9
335d085
616f4ff
10ce0e4
4dde82d
e6394e4
4e9ad2f
03a282a
ce56b7a
b92dc75
48ea3e7
04f63d6
298d7c8
aeaa1ee
593031f
7ef6c8e
533b71b
192a983
a6cd23d
a19dbc5
5092b51
c33c3f0
582df42
5d90ff5
6897bd0
ec81063
c82cf9d
2afe950
eaaf9f3
ee71ca6
8ddb10d
4f576d0
2f8becb
2df25bc
83c0c7c
205b696
ae5b814
fc3e830
e195ee4
f823768
6a7706e
97bbe05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,14 +28,25 @@ import ( | |
"github.com/ethereum/go-ethereum" | ||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/ethereum/go-ethereum/common/hexutil" | ||
"github.com/ethereum/go-ethereum/common/lru" | ||
"github.com/ethereum/go-ethereum/core/types" | ||
"github.com/ethereum/go-ethereum/internal/ethapi" | ||
logger "github.com/ethereum/go-ethereum/log" | ||
"github.com/ethereum/go-ethereum/rpc" | ||
) | ||
|
||
var ( | ||
errInvalidTopic = errors.New("invalid topic(s)") | ||
errFilterNotFound = errors.New("filter not found") | ||
errInvalidTopic = errors.New("invalid topic(s)") | ||
errFilterNotFound = errors.New("filter not found") | ||
errConnectDropped = errors.New("connection dropped") | ||
errInvalidToBlock = errors.New("log subscription does not support history block range") | ||
errInvalidFromBlock = errors.New("from block can be only a number, or \"safe\", or \"finalized\"") | ||
errClientUnsubscribed = errors.New("client unsubscribed") | ||
) | ||
|
||
const ( | ||
// maxTrackedBlocks is the number of block hashes that will be tracked by subscription. | ||
maxTrackedBlocks = 32 * 1024 | ||
) | ||
|
||
// filter is a helper struct that holds meta information over the filter type | ||
|
@@ -246,31 +257,68 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { | |
return rpcSub, nil | ||
} | ||
|
||
// Logs creates a subscription that fires for all new log that match the given filter criteria. | ||
// notifier is used for broadcasting data(eg: logs) to rpc receivers | ||
// used in unit testing. | ||
type notifier interface { | ||
Notify(id rpc.ID, data interface{}) error | ||
Closed() <-chan interface{} | ||
} | ||
|
||
// Logs creates a subscription that fires for all historical | ||
// and new logs that match the given filter criteria. | ||
func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { | ||
notifier, supported := rpc.NotifierFromContext(ctx) | ||
if !supported { | ||
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported | ||
} | ||
rpcSub := notifier.CreateSubscription() | ||
err := api.logs(ctx, notifier, rpcSub, crit) | ||
return rpcSub, err | ||
} | ||
|
||
var ( | ||
rpcSub = notifier.CreateSubscription() | ||
matchedLogs = make(chan []*types.Log) | ||
) | ||
// logs is the inner implementation of logs subscription. | ||
// The following criteria are valid: | ||
// * from: nil, to: nil -> yield live logs. | ||
// * from: blockNum | safe | finalized, to: nil -> historical beginning at `from` to head, then live logs. | ||
// * Every other case should fail with an error. | ||
func (api *FilterAPI) logs(ctx context.Context, notifier notifier, rpcSub *rpc.Subscription, crit FilterCriteria) error { | ||
if crit.ToBlock != nil { | ||
return errInvalidToBlock | ||
} | ||
if crit.FromBlock == nil { | ||
return api.liveLogs(notifier, rpcSub, crit) | ||
} | ||
from := rpc.BlockNumber(crit.FromBlock.Int64()) | ||
switch from { | ||
case rpc.LatestBlockNumber, rpc.PendingBlockNumber: | ||
return errInvalidFromBlock | ||
case rpc.SafeBlockNumber, rpc.FinalizedBlockNumber: | ||
header, err := api.sys.backend.HeaderByNumber(ctx, from) | ||
if err != nil { | ||
return err | ||
} | ||
from = rpc.BlockNumber(header.Number.Int64()) | ||
} | ||
if from < 0 { | ||
return errInvalidFromBlock | ||
} | ||
return api.histLogs(notifier, rpcSub, int64(from), crit) | ||
} | ||
|
||
// liveLogs only retrieves live logs. | ||
func (api *FilterAPI) liveLogs(notifier notifier, rpcSub *rpc.Subscription, crit FilterCriteria) error { | ||
matchedLogs := make(chan []*types.Log) | ||
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
|
||
go func() { | ||
for { | ||
select { | ||
case logs := <-matchedLogs: | ||
for _, log := range logs { | ||
log := log | ||
notifier.Notify(rpcSub.ID, &log) | ||
} | ||
notifyLogsIf(notifier, rpcSub.ID, logs, nil) | ||
|
||
case <-rpcSub.Err(): // client send an unsubscribe request | ||
logsSub.Unsubscribe() | ||
return | ||
|
@@ -280,8 +328,225 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc | |
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
return rpcSub, nil | ||
// histLogs retrieves logs older than current header. | ||
func (api *FilterAPI) histLogs(notifier notifier, rpcSub *rpc.Subscription, from int64, crit FilterCriteria) error { | ||
Comment on lines
+334
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
// Subscribe the Live logs | ||
// if an ChainReorg occurred, | ||
// we will first recv the old chain's deleted logs in descending order, | ||
// and then the new chain's added logs in descending order | ||
// see core/blockchain.go#reorg(oldHead *types.Header, newHead *types.Block) for more details | ||
// if an reorg happened between `from` and `to`, we will need to think about some scenarios: | ||
// 1. if a reorg occurs after the currently delivered block, then because this is happening in the future, has nothing to do with the current historical sync, we can just ignore it. | ||
// 2. if a reorg occurs before the currently delivered block, then we need to stop the historical delivery, and send all replaced logs instead | ||
var ( | ||
liveLogs = make(chan []*types.Log) | ||
histLogs = make(chan []*types.Log) | ||
histDone = make(chan error) | ||
) | ||
liveLogsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), liveLogs) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// The original request ctx will be canceled as soon as the parent goroutine | ||
// returns a subscription. Use a new context instead. | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
go func() { | ||
histDone <- api.doHistLogs(ctx, from, crit.Addresses, crit.Topics, histLogs) | ||
}() | ||
|
||
// Compose and notify the logs from liveLogs and histLogs | ||
go func() { | ||
defer func() { | ||
liveLogsSub.Unsubscribe() | ||
cancel() | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
var ( | ||
// delivered is the block number of the last historical log delivered. | ||
delivered uint64 | ||
|
||
// liveMode is true when either: | ||
// - all historical logs are delivered. | ||
// - or, during history processing a reorg is detected. | ||
liveMode bool | ||
|
||
// reorgedBlockHash is the block hash of the reorg point. It is set when | ||
// a reorg is detected in the future. It is used to detect if the history | ||
// processor is sending stale logs. | ||
reorgedBlockHash common.Hash | ||
|
||
// hashes is used to track the hashes of the blocks that have been delivered. | ||
// It is used as a guard to prevent duplicate logs as well as inaccurate "removed" | ||
// logs being delivered during a reorg. | ||
hashes = lru.NewBasicLRU[common.Hash, struct{}](maxTrackedBlocks) | ||
) | ||
for { | ||
select { | ||
case err := <-histDone: | ||
if err != nil { | ||
logger.Warn("History logs delivery failed", "err", err) | ||
return | ||
} | ||
// Else historical logs are all delivered, let's switch to live mode | ||
logger.Info("History logs delivery finished, and now enter into live mode", "delivered", delivered) | ||
// TODO: It's theoretically possible that we miss logs due to | ||
// asynchrony between the history processor and the chain subscription. | ||
liveMode = true | ||
histLogs = nil | ||
|
||
case logs := <-liveLogs: | ||
if len(logs) == 0 { | ||
continue | ||
} | ||
// TODO: further reorgs are possible during history processing. | ||
if !liveMode && logs[0].BlockNumber <= delivered { | ||
// History is being processed and a reorg is encountered. | ||
// From this point we ignore historical logs coming in and | ||
// only send logs from the chain subscription. | ||
logger.Info("Reorg detected", "reorgBlock", logs[0].BlockNumber, "delivered", delivered) | ||
liveMode = true | ||
} | ||
if !liveMode { | ||
if logs[0].Removed && reorgedBlockHash == (common.Hash{}) { | ||
// Reorg in future. Remember fork point. | ||
reorgedBlockHash = logs[0].BlockHash | ||
} | ||
// Implicit cases: | ||
// - there was a reorg in future and blockchain is sending logs from the new chain. | ||
// - history is still being processed and blockchain sends logs from the tip. | ||
continue | ||
} | ||
// Removed logs from reorged chain, replacing logs or logs from tip of the chain. | ||
notifyLogsIf(notifier, rpcSub.ID, logs, &hashes) | ||
|
||
case logs := <-histLogs: | ||
if len(logs) == 0 { | ||
continue | ||
} | ||
if liveMode { | ||
continue | ||
} | ||
if logs[0].BlockHash == reorgedBlockHash { | ||
// We have reached the fork point and the historical producer | ||
// is emitting old logs because of delay. Restart the process | ||
// from last delivered block. | ||
logger.Info("Restarting historical logs delivery", "from", logs[0].BlockNumber, "delivered", delivered) | ||
liveLogsSub.Unsubscribe() | ||
// Stop hist logs fetcher | ||
cancel() | ||
if err := api.histLogs(notifier, rpcSub, int64(logs[0].BlockNumber), crit); err != nil { | ||
logger.Warn("failed to restart historical logs delivery", "err", err) | ||
} | ||
return | ||
} | ||
notifyLogsIf(notifier, rpcSub.ID, logs, &hashes) | ||
// Assuming batch = all logs of a single block | ||
delivered = logs[0].BlockNumber | ||
|
||
case <-rpcSub.Err(): // client send an unsubscribe request | ||
return | ||
case <-notifier.Closed(): // connection dropped | ||
return | ||
} | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
// doHistLogs retrieves the logs older than current header, and forward them to the histLogs channel. | ||
func (api *FilterAPI) doHistLogs(ctx context.Context, from int64, addrs []common.Address, topics [][]common.Hash, histLogs chan<- []*types.Log) error { | ||
// Fetch logs from a range of blocks. | ||
fetchRange := func(from, to int64) error { | ||
f := api.sys.NewRangeFilter(from, to, addrs, topics) | ||
logsCh, errChan := f.rangeLogsAsync(ctx) | ||
for { | ||
select { | ||
case logs := <-logsCh: | ||
select { | ||
case histLogs <- logs: | ||
case <-ctx.Done(): | ||
// Flush out all logs until the range filter voluntarily exits. | ||
continue | ||
} | ||
case err := <-errChan: | ||
return err | ||
} | ||
} | ||
} | ||
|
||
for { | ||
// Get the latest block header. | ||
header := api.sys.backend.CurrentHeader() | ||
if header == nil { | ||
return errors.New("unexpected error: no header block found") | ||
} | ||
head := header.Number.Int64() | ||
if from > head { | ||
logger.Info("Finish historical sync", "from", from, "head", head) | ||
return nil | ||
} | ||
if err := fetchRange(from, head); err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
logger.Info("Historical logs delivery canceled", "from", from, "to", head) | ||
return nil | ||
s1na marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return err | ||
} | ||
// Move forward to the next batch | ||
from = head + 1 | ||
} | ||
} | ||
|
||
// notifyLogsIf sends logs to the notifier if the condition is met. | ||
// It assumes all logs of the same block are either all removed or all added. | ||
func notifyLogsIf(notifier notifier, id rpc.ID, logs []*types.Log, hashes *lru.BasicLRU[common.Hash, struct{}]) { | ||
// Iterate logs and batch them by block hash. | ||
type batch struct { | ||
start int | ||
end int | ||
hash common.Hash | ||
removed bool | ||
} | ||
var ( | ||
batches = make([]batch, 0) | ||
h common.Hash | ||
) | ||
for i, log := range logs { | ||
if h == log.BlockHash { | ||
// Skip logs of seen block | ||
continue | ||
} | ||
if len(batches) > 0 { | ||
batches[len(batches)-1].end = i | ||
} | ||
batches = append(batches, batch{start: i, hash: log.BlockHash, removed: log.Removed}) | ||
h = log.BlockHash | ||
} | ||
// Close off last batch. | ||
if batches[len(batches)-1].end == 0 { | ||
batches[len(batches)-1].end = len(logs) | ||
} | ||
for _, batch := range batches { | ||
if hashes != nil { | ||
// During reorgs it's possible that logs from the new chain have been delivered. | ||
// Avoid sending removed logs from the old chain and duplicate logs from new chain. | ||
if batch.removed && !hashes.Contains(batch.hash) { | ||
continue | ||
} | ||
if !batch.removed && hashes.Contains(batch.hash) { | ||
continue | ||
} | ||
hashes.Add(batch.hash, struct{}{}) | ||
} | ||
for _, log := range logs[batch.start:batch.end] { | ||
log := log | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to deref the log here to copy it? |
||
notifier.Notify(id, &log) | ||
} | ||
} | ||
} | ||
|
||
// FilterCriteria represents a request to create a new filter. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on input, you take a
ctx
. But the ctx is lost, afaict you don't check for ctx timeouts or cancel. Shouldn't you? What is the lifecycle here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the
ctx
will be canceled when the subscription goroutine returns, so we instantiated a new background context to control the workflow.The lifecycle is as below: