Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: subscribe history logs #27439

Open
wants to merge 60 commits into
base: master
Choose a base branch
from

Conversation

jsvisa
Copy link
Contributor

@jsvisa jsvisa commented Jun 8, 2023

This is the second part of #15063

@jsvisa jsvisa marked this pull request as draft June 8, 2023 09:01
@jsvisa jsvisa force-pushed the eth-filter-subscribe-history branch from 9ec20ce to ea6c486 Compare June 9, 2023 07:39
@jsvisa jsvisa marked this pull request as ready for review June 13, 2023 10:17
@s1na
Copy link
Contributor

s1na commented Jun 15, 2023

I have some misgivings about how we handle block ranges right now. IMO block ranges specifically for the live mode are not necessary. I.e. when you subscribe you get the logs for all new blocks. To make my case web3.js only lets you specify FromBlock (no ToBlock) and ethers.js doesn't support any of those parameters. These are 2 of the biggest web3 libraries.

My suggestion is we can simplify this greatly by: only allowing FromBlock and only for specifying historical blocks. I.e. FromBlock should be either a number below head block or safe or finalized. Later on we can allow ToBlock (for historical range queries).

@jsvisa
Copy link
Contributor Author

jsvisa commented Jun 19, 2023

@s1na lgtm, and as we discussed offline, seems we have more work to do, I'll post them here(in case I forget).

  • support block range subscription;
  • support server active close connection;

@fjl
Copy link
Contributor

fjl commented Jun 22, 2023

I like @s1na's idea to not allow toBlock for subscription queries. The problem with using our subscription model is that there is no way for the server to signal the end of a subscription. We can fix that by changing how subscriptions work, but I think that will be a lot of work for everyone to support. Let's try the simple solution for now.

logChan, errChan := f.rangeLogsAsync(cctx)

// subscribe rmLogs
query := ethereum.FilterQuery{FromBlock: big.NewInt(n), ToBlock: big.NewInt(head), Addresses: crit.Addresses, Topics: crit.Topics}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a minute to get this, but smart trick I love it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reality check: It's more complicated.

  • Reorg happens in a future block that rangeLogsAsync hasn't processed yet. In that case we don't really care about the event since when rangeLogsAsync reaches that point it will go on the right chain by itself.
  • We can keep track of how far we are in delivery by checking the block number of incoming logs.
  • However it's harder to say where is the point of reorg, because removed logs are sent in the reverse order (from more recent block backwards).
  • What we can do is compare as they come in to see if they pass the blocks for which we've already delivered logs.
  • At this point we have to stop rangeAsyncLogs immediately. Or hmm there's a question what should be done if it is mid-block (with some logs remaining in that block).
  • Then the new subscription which sends removed logs will also send the replacement logs of the new chain. So we can just tune into that.

// We transmit all data that meets the following conditions:
// 1. reorg not happened
// 2. reorg happened but the the log is in the remainder of the currently delivered block
if !reorged || log.BlockNumber < reorgBlock || log.BlockNumber <= delivered {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this condition: log.BlockNumber <= delivered. Why did you add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that all logs of the same block are sent out, avoid sending only a subset of logs.

if len(logs) == 0 {
continue
}
if reorgBlock == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reorg maybe happened more than once between from and to, so I think we need to check for each <-reorgLogsCh message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should distinguish between two different Reorgs and one Reorg with sequence data(but the received logs maybe not continuous, due to query filter) 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with a scenario:

  1. from, to is 1 -> 10;
  2. delivered is 6;
  3. A Reorg occurred between 5-9, assume the logs are huge, split into the following logs:
    1. Removed 5-7
    2. Removed 8-9
    3. Replaced 5-7
    4. Replaced 8-9
  4. We are receiving a log from <-reorgLogsCh, here we set the reorgBlock to 5, and send all removed logs between 5-6 to the subscriber;
  5. Then we recv a log from <-logChan, because reorg is detected, so no logs were sent to the subscriber;
  6. Next we may recv <-reorgLogsCh or errChan, if the first channel comes first(the following ii, iii, iv), everything works fine; else if the latter comes first, we miss the following new logs(Replaced 5-7 and 8-9).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right that considering 2 different reorgs adds another layer of complexity. I have not considered that in the changes I made.

Regarding the scenario. Yes I think we should disable the errChan breaking out of the loop if we're in the middle of a reorg processing. But I also don't know how to detect the end of a reorg yet.

@jsvisa jsvisa force-pushed the eth-filter-subscribe-history branch 2 times, most recently from 275897a to ca0c080 Compare July 25, 2023 13:56
Comment on lines +327 to +335
// histLogs retrieves logs older than current header.
func (api *FilterAPI) histLogs(notifier notifier, rpcSub *rpc.Subscription, from int64, crit FilterCriteria) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think histLogs is a bit wonky, is the benefit of reducing historicalLogs really worth it?

Comment on lines +282 to +289
if crit.FromBlock == nil {
return api.liveLogs(notifier, rpcSub, crit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on input, you take a ctx. But the ctx is lost, afaict you don't check for ctx timeouts or cancel. Shouldn't you? What is the lifecycle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the ctx will be canceled when the subscription goroutine returns, so we instantiated a new background context to control the workflow.

The lifecycle is as below:

  1. do historical logs fetch and push;
  2. do live logs push;
  3. terminate if the subscription was canceled or the push channel was broken;

reorgBlock := logs[0].BlockNumber
if !reorged && reorgBlock <= delivered {
logger.Info("Reorg detected", "reorgBlock", reorgBlock, "delivered", delivered)
reorged = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit strange. Once reorged i sset, it never becomes unset again. It will discard all histLogs when it's in that mode. Seems to me that this big switch has thee different modes:

  • reorged mode, where it discards historic logs
  • liveOnly mode
  • "normal" non-reorged mode

It makes it a bit complicated to follow, IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once reorged i sset, it never becomes unset again

Here we only need to handle the case when our historical delivery process is catching up with the live blocks.

liveOnly mode

Yeah, We did have another switch liveOnly, which was set after all historical logs were delivered: https://github.com/ethereum/go-ethereum/blob/633c7ac67a6de35e000bdc72d51f1054f4e743b5/eth/filters/api.go#L373-L375

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me that this big switch has thee different modes:

Actually I managed to bring it down to 2 modes (674a888). Now only setting liveOnly mode. This will be set:

  • either when a reorg happens to discard historical logs
  • or when historical logs have finished

But my patch has a problem. In the "reorg" mode, we only send removed log notifications up to the delivered point. So if we finish historical processing (say head = 10 so delivered = 10). Then after emitting block 11, we get a reorg from block 8. We will only delivered the removed logs up to 10.

Copy link
Contributor

@s1na s1na Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I pushed another commit which should fix the problem above and IMO the code is more readable now.

That said I noticed the tests fail every once in a while. I believe the edge case they're catching is the following:

  • Blockchain will first write the new (reorged) blocks to db and then send removed/new logs. This can cause the history processor to go accidentally on the new chain before we realize there's a reorg happening.
  • So we return oldBlock1, oldBlock2, newBlock3, removeOldBlock3, newBlock3, newBlock4. I.e. the removed logs don't match the ones we emitted for that height.

If my hunch is right about the issue, it will be a nasty one to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed a fix for this. The subscription now tracks a number of block hashes that it has delivered logs for. This map is used to avoid sending removed logs for a block we haven't delivered as well as sending duplicate logs.

Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
jsvisa and others added 22 commits September 27, 2023 03:59
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
Signed-off-by: jsvisa <delweng@gmail.com>
hashes.Add(batch.hash, struct{}{})
}
for _, log := range logs[batch.start:batch.end] {
log := log
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to deref the log here to copy it?
e.g. log := *log
Otherwise we just copy the pointer if I understand correctly

}

func (n *mockNotifier) Notify(id rpc.ID, data interface{}) error {
n.c <- data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log := data.(*types.Log)
n.callback(log)
n.logs = append(n.logs, log)

func (n *mockNotifier) Closed() <-chan interface{} {
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (n *mockNotifier) Done() {
   close(n.done)
}

and call Done from FilterAPI.

defer func() {
liveLogsSub.Unsubscribe()
cancel()
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer notifier.Done()

@cong08
Copy link

cong08 commented Feb 1, 2024

why not merge it ?

@cong08
Copy link

cong08 commented Feb 20, 2024

why not merge it ?

@karalabe @rjl493456442

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants