-
Notifications
You must be signed in to change notification settings - Fork 20.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eth/filters: subscribe history logs #27439
base: master
Are you sure you want to change the base?
Conversation
9ec20ce
to
ea6c486
Compare
I have some misgivings about how we handle block ranges right now. IMO block ranges specifically for the live mode are not necessary. I.e. when you subscribe you get the logs for all new blocks. To make my case web3.js only lets you specify FromBlock (no ToBlock) and ethers.js doesn't support any of those parameters. These are 2 of the biggest web3 libraries. My suggestion is we can simplify this greatly by: only allowing |
@s1na lgtm, and as we discussed offline, seems we have more work to do, I'll post them here(in case I forget).
|
I like @s1na's idea to not allow |
eth/filters/api.go
Outdated
logChan, errChan := f.rangeLogsAsync(cctx) | ||
|
||
// subscribe rmLogs | ||
query := ethereum.FilterQuery{FromBlock: big.NewInt(n), ToBlock: big.NewInt(head), Addresses: crit.Addresses, Topics: crit.Topics} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took me a minute to get this, but smart trick I love it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reality check: It's more complicated.
- Reorg happens in a future block that
rangeLogsAsync
hasn't processed yet. In that case we don't really care about the event since whenrangeLogsAsync
reaches that point it will go on the right chain by itself. - We can keep track of how far we are in delivery by checking the block number of incoming logs.
- However it's harder to say where is the point of reorg, because removed logs are sent in the reverse order (from more recent block backwards).
- What we can do is compare as they come in to see if they pass the blocks for which we've already delivered logs.
- At this point we have to stop
rangeAsyncLogs
immediately. Or hmm there's a question what should be done if it is mid-block (with some logs remaining in that block). - Then the new subscription which sends removed logs will also send the replacement logs of the new chain. So we can just tune into that.
eth/filters/api.go
Outdated
// We transmit all data that meets the following conditions: | ||
// 1. reorg not happened | ||
// 2. reorg happened but the the log is in the remainder of the currently delivered block | ||
if !reorged || log.BlockNumber < reorgBlock || log.BlockNumber <= delivered { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this condition: log.BlockNumber <= delivered
. Why did you add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure that all logs of the same block are sent out, avoid sending only a subset of logs.
eth/filters/api.go
Outdated
if len(logs) == 0 { | ||
continue | ||
} | ||
if reorgBlock == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reorg maybe happened more than once between from
and to
, so I think we need to check for each <-reorgLogsCh
message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if we should distinguish between two different Reorgs and one Reorg with sequence data(but the received logs maybe not continuous, due to query filter) 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with a scenario:
from, to
is1 -> 10
;delivered
is 6;- A Reorg occurred between
5-9
, assume the logs are huge, split into the following logs:- Removed
5-7
- Removed
8-9
- Replaced
5-7
- Replaced
8-9
- Removed
- We are receiving a log from
<-reorgLogsCh
, here we set thereorgBlock
to 5, and send allremoved
logs between5-6
to the subscriber; - Then we recv a log from
<-logChan
, becausereorg
is detected, so no logs were sent to the subscriber; - Next we may recv
<-reorgLogsCh
orerrChan
, if the first channel comes first(the followingii, iii, iv
), everything works fine; else if the latter comes first, we miss the following new logs(Replaced5-7
and8-9
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you're right that considering 2 different reorgs adds another layer of complexity. I have not considered that in the changes I made.
Regarding the scenario. Yes I think we should disable the errChan
breaking out of the loop if we're in the middle of a reorg processing. But I also don't know how to detect the end of a reorg yet.
275897a
to
ca0c080
Compare
// histLogs retrieves logs older than current header. | ||
func (api *FilterAPI) histLogs(notifier notifier, rpcSub *rpc.Subscription, from int64, crit FilterCriteria) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think histLogs
is a bit wonky, is the benefit of reducing historicalLogs
really worth it?
if crit.FromBlock == nil { | ||
return api.liveLogs(notifier, rpcSub, crit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on input, you take a ctx
. But the ctx is lost, afaict you don't check for ctx timeouts or cancel. Shouldn't you? What is the lifecycle here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the ctx
will be canceled when the subscription goroutine returns, so we instantiated a new background context to control the workflow.
The lifecycle is as below:
- do historical logs fetch and push;
- do live logs push;
- terminate if the subscription was canceled or the push channel was broken;
eth/filters/api.go
Outdated
reorgBlock := logs[0].BlockNumber | ||
if !reorged && reorgBlock <= delivered { | ||
logger.Info("Reorg detected", "reorgBlock", reorgBlock, "delivered", delivered) | ||
reorged = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit strange. Once reorged
i sset, it never becomes unset again. It will discard all histLogs
when it's in that mode. Seems to me that this big switch has thee different modes:
- reorged mode, where it discards historic logs
- liveOnly mode
- "normal" non-reorged mode
It makes it a bit complicated to follow, IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once reorged i sset, it never becomes unset again
Here we only need to handle the case when our historical delivery process is catching up with the live blocks.
liveOnly mode
Yeah, We did have another switch liveOnly
, which was set after all historical logs were delivered: https://github.com/ethereum/go-ethereum/blob/633c7ac67a6de35e000bdc72d51f1054f4e743b5/eth/filters/api.go#L373-L375
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me that this big switch has thee different modes:
Actually I managed to bring it down to 2 modes (674a888). Now only setting liveOnly
mode. This will be set:
- either when a reorg happens to discard historical logs
- or when historical logs have finished
But my patch has a problem. In the "reorg" mode, we only send removed log notifications up to the delivered
point. So if we finish historical processing (say head = 10 so delivered = 10). Then after emitting block 11, we get a reorg from block 8. We will only delivered the removed logs up to 10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I pushed another commit which should fix the problem above and IMO the code is more readable now.
That said I noticed the tests fail every once in a while. I believe the edge case they're catching is the following:
- Blockchain will first write the new (reorged) blocks to db and then send removed/new logs. This can cause the history processor to go accidentally on the new chain before we realize there's a reorg happening.
- So we return
oldBlock1, oldBlock2, newBlock3, removeOldBlock3, newBlock3, newBlock4
. I.e. the removed logs don't match the ones we emitted for that height.
If my hunch is right about the issue, it will be a nasty one to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed a fix for this. The subscription now tracks a number of block hashes that it has delivered logs for. This map is used to avoid sending removed logs for a block we haven't delivered as well as sending duplicate logs.
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
d83f528
to
97bbe05
Compare
hashes.Add(batch.hash, struct{}{}) | ||
} | ||
for _, log := range logs[batch.start:batch.end] { | ||
log := log |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to deref the log here to copy it?
e.g. log := *log
Otherwise we just copy the pointer if I understand correctly
} | ||
|
||
func (n *mockNotifier) Notify(id rpc.ID, data interface{}) error { | ||
n.c <- data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log := data.(*types.Log)
n.callback(log)
n.logs = append(n.logs, log)
func (n *mockNotifier) Closed() <-chan interface{} { | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (n *mockNotifier) Done() {
close(n.done)
}
and call Done
from FilterAPI.
defer func() { | ||
liveLogsSub.Unsubscribe() | ||
cancel() | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer notifier.Done()
why not merge it ? |
|
This is the second part of #15063