Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs checking #11229

Merged
merged 43 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
eea467a
added logs to receipts
Jul 3, 2024
9ff21c9
casted to temporal
Jul 3, 2024
35a3f1f
save
Jul 3, 2024
f05ba60
added semaphore
Jul 3, 2024
28274e1
Merge branch 'refs/heads/main' into p2p-receipts
Jul 4, 2024
c917eb0
reorg unlocked
Jul 4, 2024
cf7fc0b
fix
Jul 5, 2024
132d101
fix
Jul 5, 2024
ddc0aae
new receipt func
Jul 6, 2024
7cf43ac
save
Jul 9, 2024
61ea076
save
Jul 9, 2024
894855e
Merge branch 'refs/heads/main' into p2p-receipts
Jul 9, 2024
c965d3f
save
Jul 10, 2024
f5740fe
save
Jul 10, 2024
9f1bb35
save
Jul 11, 2024
e879bd8
save
Jul 11, 2024
6072d54
save
Jul 11, 2024
8604911
save
Jul 11, 2024
490c1a8
save
Jul 11, 2024
0d23573
save
Jul 11, 2024
625a62c
fixes + debug logs
Jul 11, 2024
d7c5fff
fixes
Jul 11, 2024
e25c4ec
fixes
Jul 11, 2024
ba044f6
find out how to receive a message
Jul 12, 2024
74ab518
added tests for empty blockchain
Jul 12, 2024
2107f89
added test for generated blockchain
Jul 12, 2024
6d3b7da
save
Jul 13, 2024
b632521
save
Jul 13, 2024
68b09d5
save
Jul 13, 2024
b2c0f15
Merge branch 'main' into p2p-receipts
AskAlexSharov Jul 14, 2024
0620327
save
Jul 15, 2024
c569a98
save
Jul 15, 2024
7bd5c20
Merge branch 'refs/heads/main' into p2p-receipts
Jul 16, 2024
7626b76
save
Jul 16, 2024
4cb1fc3
Merge branch 'refs/heads/p2p-receipts' into logs-checking
Jul 16, 2024
e62b317
Merge branch 'refs/heads/main' into logs-checking
Jul 17, 2024
b621f45
Merge branch 'refs/heads/main' into logs-checking
Jul 17, 2024
942820b
save
Jul 18, 2024
64320b3
Merge branch 'refs/heads/main' into logs-checking
Jul 18, 2024
787b0a8
save
Jul 18, 2024
6a5c474
Merge branch 'main' into logs-checking
AskAlexSharov Jul 19, 2024
226173f
save
AskAlexSharov Jul 19, 2024
999c570
save
AskAlexSharov Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions erigon-lib/txpool/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
}
return err
}

var req *sentry.InboundMessage
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
Expand Down
243 changes: 81 additions & 162 deletions turbo/jsonrpc/erigon_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,22 @@
package jsonrpc

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"github.com/erigontech/erigon-lib/kv/order"
"github.com/erigontech/erigon-lib/kv/rawdbv3"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cmd/state/exec3"

"github.com/RoaringBitmap/roaring"
bortypes "github.com/erigontech/erigon/polygon/bor/types"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/bitmapdb"
"github.com/erigontech/erigon/core/rawdb"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/eth/ethutils"
"github.com/erigontech/erigon/eth/filters"
"github.com/erigontech/erigon/ethdb/cbor"
"github.com/erigontech/erigon/rpc"
"github.com/erigontech/erigon/turbo/rpchelper"
)
Expand Down Expand Up @@ -114,99 +113,8 @@ func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria)
if end > roaring.MaxUint32 {
return nil, fmt.Errorf("end (%d) > MaxUint32", end)
}
blockNumbers := bitmapdb.NewBitmap()
defer bitmapdb.ReturnToPool(blockNumbers)
if err := applyFilters(blockNumbers, tx, begin, end, crit); err != nil {
return nil, err
}
if blockNumbers.IsEmpty() {
return erigonLogs, nil
}

addrMap := make(map[common.Address]struct{}, len(crit.Addresses))
for _, v := range crit.Addresses {
addrMap[v] = struct{}{}
}
iter := blockNumbers.Iterator()
for iter.HasNext() {
if err := ctx.Err(); err != nil {
return nil, err
}

blockNumber := uint64(iter.Next())
var logIndex uint
var txIndex uint
var blockLogs []*types.Log
it, err := tx.Prefix(kv.Log, hexutility.EncodeTs(blockNumber))
if err != nil {
return nil, err
}
for it.HasNext() {
k, v, err := it.Next()
if err != nil {
return erigonLogs, err
}
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return erigonLogs, fmt.Errorf("receipt unmarshal failed: %w", err)
}
for _, log := range logs {
log.Index = logIndex
logIndex++
}
filtered := logs.Filter(addrMap, crit.Topics, 0)
if len(filtered) == 0 {
continue
}
txIndex = uint(binary.BigEndian.Uint32(k[8:]))
for _, log := range filtered {
log.TxIndex = txIndex
}
blockLogs = append(blockLogs, filtered...)
}
it.Close()
if len(blockLogs) == 0 {
continue
}

header, err := api._blockReader.HeaderByNumber(ctx, tx, blockNumber)
if err != nil {
return nil, err
}
if header == nil {
return nil, fmt.Errorf("block header not found: %d", blockNumber)
}
timestamp := header.Time

blockHash := header.Hash()
body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber)
if err != nil {
return nil, err
}
if body == nil {
return nil, fmt.Errorf("block not found %d", blockNumber)
}
for _, log := range blockLogs {
erigonLog := &types.ErigonLog{}
erigonLog.BlockNumber = blockNumber
erigonLog.BlockHash = blockHash
if log.TxIndex == uint(len(body.Transactions)) {
erigonLog.TxHash = bortypes.ComputeBorTxHash(blockNumber, blockHash)
} else {
erigonLog.TxHash = body.Transactions[log.TxIndex].Hash()
}
erigonLog.Timestamp = timestamp
erigonLog.Address = log.Address
erigonLog.Topics = log.Topics
erigonLog.Data = log.Data
erigonLog.Index = log.Index
erigonLog.Removed = log.Removed
erigonLog.TxIndex = log.TxIndex
erigonLogs = append(erigonLogs, erigonLog)
}
}

return erigonLogs, nil
return api.getLogsV3(ctx, tx.(kv.TemporalTx), begin, end, crit)
}

// GetLatestLogs implements erigon_getLatestLogs.
Expand All @@ -228,11 +136,14 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri
logOptions = filters.DefaultLogFilterOptions()
}
erigonLogs := types.ErigonLogs{}
tx, beginErr := api.db.BeginRo(ctx)
db, beginErr := api.db.BeginRo(ctx)
if beginErr != nil {
return erigonLogs, beginErr
}
defer tx.Rollback()
defer db.Rollback()

tx := db.(kv.TemporalTx)

var err error
var begin, end uint64 // Filter range: begin-end(from-to). Two limits are included in the filter

Expand Down Expand Up @@ -274,13 +185,15 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri
return nil, fmt.Errorf("end (%d) < begin (%d)", end, begin)
}

blockNumbers := bitmapdb.NewBitmap()
defer bitmapdb.ReturnToPool(blockNumbers)
if err := applyFilters(blockNumbers, tx, begin, end, crit); err != nil {
return erigonLogs, err
chainConfig, err := api.chainConfig(ctx, tx)
if err != nil {
return nil, err
}
if blockNumbers.IsEmpty() {
return erigonLogs, nil
exec := exec3.NewTraceWorker(tx, chainConfig, api.engine(), api._blockReader, nil)

txNumbers, err := applyFiltersV3(tx, begin, end, crit)
if err != nil {
return erigonLogs, err
}

addrMap := make(map[common.Address]struct{}, len(crit.Addresses))
Expand All @@ -295,91 +208,97 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri
}

// latest logs that match the filter crit
iter := blockNumbers.ReverseIterator()
var logCount, blockCount uint64
for iter.HasNext() {
it := rawdbv3.TxNums2BlockNums(tx, txNumbers, order.Asc)
defer it.Close()

var blockHash common.Hash
var header *types.Header
var logCount, blockCount, timestamp uint64

for it.HasNext() {
if err = ctx.Err(); err != nil {
return nil, err
}

blockNumber := uint64(iter.Next())
var logIndex uint
var txIndex uint
var blockLogs []*types.Log
it, err := tx.Prefix(kv.Log, hexutility.EncodeTs(blockNumber))
txNum, blockNum, txIndex, isFinalTxn, blockNumChanged, err := it.Next()
if err != nil {
return nil, err
}
for it.HasNext() {
k, v, err := it.Next()
if err != nil {
return erigonLogs, err
}
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return erigonLogs, fmt.Errorf("receipt unmarshal failed: %w", err)
}
for _, log := range logs {
log.Index = logIndex
logIndex++
}
var filtered types.Logs
var maxLogCount uint64
maxLogCount = 0
if logOptions.LogCount != 0 {
maxLogCount = logOptions.LogCount - logCount
}
if isFinalTxn {
continue
}

if logOptions.IgnoreTopicsOrder {
filtered = logs.CointainTopics(addrMap, topicsMap, maxLogCount)
} else {
filtered = logs.Filter(addrMap, crit.Topics, maxLogCount)
// if block number changed, calculate all related field
if blockNumChanged {
if header, err = api._blockReader.HeaderByNumber(ctx, tx, blockNum); err != nil {
return nil, err
}
if len(filtered) == 0 {
if header == nil {
log.Warn("[rpc] header is nil", "blockNum", blockNum)
continue
}
txIndex = uint(binary.BigEndian.Uint32(k[8:]))
for i := range filtered {
filtered[i].TxIndex = txIndex
}
for i := len(filtered) - 1; i >= 0; i-- {
blockLogs = append(blockLogs, filtered[i])
logCount++
}
if logOptions.LogCount != 0 && logOptions.LogCount <= logCount {
break
}
blockHash = header.Hash()
exec.ChangeBlock(header)
timestamp = header.Time
}
it.Close()
blockCount++
if len(blockLogs) == 0 {
var logIndex uint
var blockLogs types.Logs

txn, err := api._txnReader.TxnByIdxInBlock(ctx, tx, blockNum, txIndex)
if err != nil {
return nil, err
}
if txn == nil {
continue
}

header, err := api._blockReader.HeaderByNumber(ctx, tx, blockNumber)
_, err = exec.ExecTxn(txNum, txIndex, txn)
if err != nil {
return nil, err
}
if header == nil {
return nil, fmt.Errorf("block header not found: %d", blockNumber)
blockLogs = exec.GetLogs(txIndex, txn)
for _, log := range blockLogs {
log.Index = logIndex
logIndex++
}
var filtered types.Logs
var maxLogCount uint64
maxLogCount = 0
if logOptions.LogCount != 0 {
maxLogCount = logOptions.LogCount - logCount
}
if logOptions.IgnoreTopicsOrder {
filtered = blockLogs.CointainTopics(addrMap, topicsMap, maxLogCount)
Copy link
Collaborator

@AskAlexSharov AskAlexSharov Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create separated github issue - to investigate: can we remove this checks or not. because in E3 indices are txn-level-granularity - it means all filtering already done by applyFiltersV3(tx, begin, end, crit). Filter can't remove - because it check order.

} else {
filtered = blockLogs.Filter(addrMap, crit.Topics, maxLogCount)
}
if len(filtered) == 0 {
continue
}
for i := range filtered {
filtered[i].TxIndex = uint(txIndex)
logCount++
}
timestamp := header.Time

blockHash := header.Hash()
blockCount++

if len(blockLogs) == 0 {
continue
}

body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber)
body, err := api._blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNum)
if err != nil {
return nil, err
}
if body == nil {
return nil, fmt.Errorf("block not found %d", blockNumber)
return nil, fmt.Errorf("block not found %d", blockNum)
}
for _, log := range blockLogs {
for _, log := range filtered {
println("append to blocklogs")
erigonLog := &types.ErigonLog{}
erigonLog.BlockNumber = blockNumber
erigonLog.BlockNumber = blockNum
erigonLog.BlockHash = blockHash
if log.TxIndex == uint(len(body.Transactions)) {
erigonLog.TxHash = bortypes.ComputeBorTxHash(blockNumber, blockHash)
erigonLog.TxHash = bortypes.ComputeBorTxHash(blockNum, blockHash)
} else {
erigonLog.TxHash = body.Transactions[log.TxIndex].Hash()
}
Expand Down
20 changes: 18 additions & 2 deletions turbo/jsonrpc/erigon_receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package jsonrpc
import (
"context"
"encoding/json"
"fmt"
"math/big"
"testing"

Expand Down Expand Up @@ -80,7 +81,7 @@ func TestErigonGetLatestLogs(t *testing.T) {
api := NewErigonAPI(newBaseApiForTest(m), db, nil)
expectedLogs, _ := api.GetLogs(m.Ctx, filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())})

expectedErigonLogs := make([]*types.ErigonLog, 0)
expectedErigonLogs := make(types.ErigonLogs, 0)
for i := len(expectedLogs) - 1; i >= 0; i-- {
expectedErigonLogs = append(expectedErigonLogs, &types.ErigonLog{
Address: expectedLogs[i].Address,
Expand All @@ -95,14 +96,29 @@ func TestErigonGetLatestLogs(t *testing.T) {
Timestamp: expectedLogs[i].Timestamp,
})
}
actual, err := api.GetLatestLogs(m.Ctx, filters.FilterCriteria{}, filters.LogFilterOptions{
actual, err := api.GetLatestLogs(m.Ctx, filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, filters.LogFilterOptions{
LogCount: uint64(len(expectedLogs)),
})
if err != nil {
t.Errorf("calling erigon_getLatestLogs: %v", err)
}
require.NotNil(t, actual)
assert.EqualValues(expectedErigonLogs, actual)

expectedLog := &types.ErigonLog{
Address: libcommon.HexToAddress("0x3CB5b6E26e0f37F2514D45641F15Bd6fEC2E0c4c"),
Topics: []libcommon.Hash{libcommon.HexToHash("0x68f6a0f063c25c6678c443b9a484086f15ba8f91f60218695d32a5251f2050eb")},
Data: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 151, 160, 176, 241, 203, 220, 75, 75, 222, 127, 170, 33, 171, 34, 107, 143, 20, 185, 234, 201},
BlockNumber: 10,
TxHash: libcommon.HexToHash("0xb6449d8e167a8826d050afe4c9f07095236ff769a985f02649b1023c2ded2059"),
TxIndex: 0,
BlockHash: libcommon.HexToHash("0x6804117de2f3e6ee32953e78ced1db7b20214e0d8c745a03b8fecf7cc8ee76ef"),
Index: 0,
Removed: false,
Timestamp: 100,
}
assert.Equal(expectedLog, actual[0])
println(fmt.Sprintf("%+v\n%+v\n", expectedLog, actual[0]))
}

func TestErigonGetLatestLogsIgnoreTopics(t *testing.T) {
Expand Down
Loading
Loading