Skip to content

Commit

Permalink
Add block timestamps to logs (#8616)
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista authored Mar 14, 2023
1 parent c83584b commit 39fe8e3
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 72 deletions.
11 changes: 7 additions & 4 deletions core/chains/evm/client/simulated_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"strings"
"testing"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
Expand Down Expand Up @@ -488,13 +489,15 @@ func (c *SimulatedBackendClient) BatchCallContext(ctx context.Context, b []rpc.B
switch v := elem.Result.(type) {
case *evmtypes.Head:
b[i].Result = &evmtypes.Head{
Number: header.Number.Int64(),
Hash: header.Hash(),
Number: header.Number.Int64(),
Hash: header.Hash(),
Timestamp: time.Unix(int64(header.Time), 0).UTC(),
}
case *evmtypes.Block:
b[i].Result = &evmtypes.Block{
Number: header.Number.Int64(),
Hash: header.Hash(),
Number: header.Number.Int64(),
Hash: header.Hash(),
Timestamp: time.Unix(int64(header.Time), 0),
}
default:
return errors.Errorf("SimulatedBackendClient Unexpected Type %T", v)
Expand Down
9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/smartcontractkit/sqlx"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -101,3 +102,11 @@ func (lp *logPoller) Filter() ethereum.FilterQuery {
func (o *ORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) {
return o.selectLogsByBlockRange(start, end)
}

func (lp *logPoller) ConvertLogs(gethLogs []types.Log, blocks []LogPollerBlock) []Log {
return convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ChainID())
}

func (lp *logPoller) BlocksFromLogs(ctx context.Context, logs []types.Log) (blocks []LogPollerBlock, err error) {
return lp.blocksFromLogs(ctx, logs)
}
95 changes: 94 additions & 1 deletion core/chains/evm/logpoller/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/core/gethwrappers/generated/log_emitter"
"github.com/smartcontractkit/chainlink/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/core/internal/testutils"
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestPopulateLoadedDB(t *testing.T) {
}()

// Confirm all the logs.
require.NoError(t, o.InsertBlock(common.HexToHash("0x10"), 1000000))
require.NoError(t, o.InsertBlock(common.HexToHash("0x10"), 1000000, time.Now()))
func() {
defer logRuntime(t, time.Now())
lgs, err := o.SelectDataWordRange(address1, event1, 0, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0)
Expand Down Expand Up @@ -295,3 +296,95 @@ func Test_BackupLogPoller(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, len(logs))
}

func TestLogPoller_BlockTimestamps(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
th := logpoller.SetupTH(t, 2, 3, 2)

addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2}
topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}

err := th.LogPoller.RegisterFilter(logpoller.Filter{"convertLogs", topics, addresses})
require.NoError(t, err)

blk, err := th.Client.BlockByNumber(ctx, nil)
require.NoError(t, err)
require.Equal(t, big.NewInt(1), blk.Number())
start := blk.Time()

// There is automatically a 10s delay between each block. To make sure it's including the correct block timestamps,
// we introduce irregularities by inserting two additional block delays. We can't control the block times for
// blocks produced by the log emitter, but we can adjust the time on empty blocks in between. Simulated time
// sequence: [ #1 ] ..(10s + delay1).. [ #2 ] ..10s.. [ #3 (LOG1) ] ..(10s + delay2).. [ #4 ] ..10s.. [ #5 (LOG2) ]
const delay1 = 589
const delay2 = 643
time1 := start + 20 + delay1
time2 := time1 + 20 + delay2

require.NoError(t, th.Client.AdjustTime(delay1*time.Second))
hash := th.Client.Commit()

blk, err = th.Client.BlockByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, big.NewInt(2), blk.Number())
assert.Equal(t, time1-10, blk.Time())

_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)})
require.NoError(t, err)
hash = th.Client.Commit()

blk, err = th.Client.BlockByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, big.NewInt(3), blk.Number())
assert.Equal(t, time1, blk.Time())

require.NoError(t, th.Client.AdjustTime(delay2*time.Second))
th.Client.Commit()
_, err = th.Emitter2.EmitLog2(th.Owner, []*big.Int{big.NewInt(2)})
require.NoError(t, err)
hash = th.Client.Commit()

blk, err = th.Client.BlockByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, big.NewInt(5), blk.Number())
assert.Equal(t, time2, blk.Time())

query := ethereum.FilterQuery{
FromBlock: big.NewInt(2),
ToBlock: big.NewInt(5),
Topics: [][]common.Hash{topics},
Addresses: []common.Address{th.EmitterAddress1, th.EmitterAddress2}}

gethLogs, err := th.Client.FilterLogs(ctx, query)
require.NoError(t, err)
require.Len(t, gethLogs, 2)

blocks, err := th.LogPoller.BlocksFromLogs(ctx, gethLogs)
require.NoError(t, err)
require.Len(t, blocks, 2)

logs := th.LogPoller.ConvertLogs(gethLogs, blocks)
require.Len(t, logs, 2)

val, err := logs[0].Topics.Value()
require.NoError(t, err)
s, ok := val.(string)
require.True(t, ok)
var topics0 evmtypes.HashArray
require.NoError(t, topics0.Scan(s))

val, err = logs[1].Topics.Value()
require.NoError(t, err)
s, ok = val.(string)
require.True(t, ok)
var topics1 evmtypes.HashArray
require.NoError(t, topics1.Scan(s))

assert.Equal(t, time.Unix(big.NewInt(int64(time1)).Int64(), 0).UTC(), logs[0].BlockTimestamp)
assert.Equal(t, addresses[0], logs[0].Address)
assert.Equal(t, topics[0], topics0[0])
assert.Equal(t, time.Unix(big.NewInt(int64(time2)).Int64(), 0).UTC(), logs[1].BlockTimestamp)
assert.Equal(t, addresses[1], logs[1].Address)
assert.Equal(t, topics[1], topics1[0])
}
74 changes: 56 additions & 18 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,21 +521,39 @@ func (lp *logPoller) run() {
}
}

func convertLogs(chainID *big.Int, logs []types.Log) []Log {
// convertLogs converts an array of geth logs ([]type.Log) to an array of logpoller logs ([]Log)
//
// Block timestamps are extracted from blocks param. If len(blocks) == 1, the same timestamp from this block
// will be used for all logs. If len(blocks) == len(logs) then the block number of each block is used for the
// corresponding log. Any other length for blocks is invalid.
func convertLogs(logs []types.Log, blocks []LogPollerBlock, lggr logger.Logger, chainID *big.Int) []Log {
var lgs []Log
for _, l := range logs {
blockTimestamp := time.Now()
if len(logs) == 0 {
return lgs
}
if len(blocks) != 1 && len(blocks) != len(logs) {
lggr.Errorf("AssumptionViolation: invalid params passed to convertLogs, length of blocks must either be 1 or match length of logs")
return lgs
}

for i, l := range logs {
if i == 0 || len(blocks) == len(logs) {
blockTimestamp = blocks[i].BlockTimestamp
}
lgs = append(lgs, Log{
EvmChainId: utils.NewBig(chainID),
LogIndex: int64(l.Index),
BlockHash: l.BlockHash,
// We assume block numbers fit in int64
// in many places.
BlockNumber: int64(l.BlockNumber),
EventSig: l.Topics[0], // First topic is always event signature.
Topics: convertTopics(l.Topics),
Address: l.Address,
TxHash: l.TxHash,
Data: l.Data,
BlockNumber: int64(l.BlockNumber),
BlockTimestamp: blockTimestamp,
EventSig: l.Topics[0], // First topic is always event signature.
Topics: convertTopics(l.Topics),
Address: l.Address,
TxHash: l.TxHash,
Data: l.Data,
})
}
return lgs
Expand All @@ -549,24 +567,38 @@ func convertTopics(topics []common.Hash) [][]byte {
return topicsForDB
}

func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log) (blocks []LogPollerBlock, err error) {
var numbers []uint64
for _, log := range logs {
numbers = append(numbers, log.BlockNumber)
}

return lp.GetBlocksRange(ctx, numbers)
}

// backfill will query FilterLogs in batches for logs in the
// block range [start, end] and save them to the db.
// Retries until ctx cancelled. Will return an error if cancelled
// or if there is an error backfilling.
func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
for from := start; from <= end; from += lp.backfillBatchSize {
to := mathutil.Min(from+lp.backfillBatchSize-1, end)
logs, err := lp.ec.FilterLogs(ctx, lp.filter(big.NewInt(from), big.NewInt(to), nil))
gethLogs, err := lp.ec.FilterLogs(ctx, lp.filter(big.NewInt(from), big.NewInt(to), nil))
if err != nil {
lp.lggr.Warnw("Unable query for logs, retrying", "err", err, "from", from, "to", to)
return err
}
if len(logs) == 0 {
if len(gethLogs) == 0 {
continue
}
lp.lggr.Infow("Backfill found logs", "from", from, "to", to, "logs", len(logs))
blocks, err := lp.blocksFromLogs(ctx, gethLogs)
if err != nil {
return err
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs))
err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
return lp.orm.InsertLogs(convertLogs(lp.ec.ChainID(), logs), pg.WithQueryer(tx))
return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ChainID()), pg.WithQueryer(tx))
})
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
Expand Down Expand Up @@ -734,13 +766,18 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash)
err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, pg.WithQueryer(tx)); err2 != nil {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if len(logs) == 0 {
return nil
}
return lp.orm.InsertLogs(convertLogs(lp.ec.ChainID(), logs), pg.WithQueryer(tx))
return lp.orm.InsertLogs(convertLogs(logs,
[]LogPollerBlock{{BlockNumber: currentBlockNumber,
BlockTimestamp: currentBlock.Timestamp}},
lp.lggr,
lp.ec.ChainID(),
), pg.WithQueryer(tx))
})
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
Expand Down Expand Up @@ -990,10 +1027,11 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
return nil, errors.Errorf("expected block number to be >= to 0, got %d", block.Number)
}
blocksFoundFromRPC[uint64(block.Number)] = LogPollerBlock{
EvmChainId: block.EVMChainID,
BlockHash: block.Hash,
BlockNumber: block.Number,
CreatedAt: block.Timestamp,
EvmChainId: block.EVMChainID,
BlockHash: block.Hash,
BlockNumber: block.Number,
BlockTimestamp: block.Timestamp,
CreatedAt: block.Timestamp,
}
}

Expand Down
Loading

0 comments on commit 39fe8e3

Please sign in to comment.