Skip to content

Commit

Permalink
NONEVM-745 LogPoller db models (#921)
Browse files Browse the repository at this point in the history
* logpoller db models

Signed-off-by: Dmytro Haidashenko <dmytro.haidashenko@smartcontract.com>

* Replace solana types with custom to support db read/write

* remove redundant file

Signed-off-by: Dmytro Haidashenko <dmytro.haidashenko@smartcontract.com>

* improve tests coverage & ensure subkey naming is consistent

Signed-off-by: Dmytro Haidashenko <dmytro.haidashenko@smartcontract.com>

* drop redundant constraint

* linter fixes

* updata chainlink-common

* gomodtidy

---------

Signed-off-by: Dmytro Haidashenko <dmytro.haidashenko@smartcontract.com>
  • Loading branch information
dhaidashenko authored Dec 23, 2024
1 parent eac4f15 commit ed1b2f6
Show file tree
Hide file tree
Showing 8 changed files with 693 additions and 1 deletion.
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ require (
github.com/go-viper/mapstructure/v2 v2.1.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.2
github.com/jackc/pgx/v4 v4.18.3
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.4.1-0.20241223143929-db7919d60550
Expand Down Expand Up @@ -60,14 +62,20 @@ require (
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jmoiron/sqlx v1.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/linkedin/goavro/v2 v2.12.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -94,6 +102,7 @@ require (
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 // indirect
github.com/scylladb/go-reflectx v1.0.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect
github.com/streamingfast/logging v0.0.0-20220405224725-2755dab2ce75 // indirect
Expand Down
99 changes: 99 additions & 0 deletions go.sum

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions pkg/solana/logpoller/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package logpoller

import (
"time"

"github.com/lib/pq"
)

type Filter struct {
ID int64
Name string
Address PublicKey
EventName string
EventSig []byte
StartingBlock int64
EventIDL string
SubkeyPaths SubkeyPaths
Retention time.Duration
MaxLogsKept int64
}

type Log struct {
ID int64
FilterID int64
ChainID string
LogIndex int64
BlockHash Hash
BlockNumber int64
BlockTimestamp time.Time
Address PublicKey
EventSig []byte
SubkeyValues pq.ByteaArray
TxHash Signature
Data []byte
CreatedAt time.Time
ExpiresAt *time.Time
SequenceNum int64
}
158 changes: 158 additions & 0 deletions pkg/solana/logpoller/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package logpoller

import (
"context"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

type DSORM struct {
chainID string
ds sqlutil.DataSource
lggr logger.Logger
}

// NewORM creates an DSORM scoped to chainID.
func NewORM(chainID string, ds sqlutil.DataSource, lggr logger.Logger) *DSORM {
return &DSORM{
chainID: chainID,
ds: ds,
lggr: lggr,
}
}

func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by ds.
func (o *DSORM) new(ds sqlutil.DataSource) *DSORM { return NewORM(o.chainID, ds, o.lggr) }

// InsertFilter is idempotent.
//
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
// Returns ID for updated or newly inserted filter.
func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err error) {
args, err := newQueryArgs(o.chainID).
withField("name", filter.Name).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withName(filter.Name).
withAddress(filter.Address).
withEventName(filter.EventName).
withEventSig(filter.EventSig).
withStartingBlock(filter.StartingBlock).
withEventIDL(filter.EventIDL).
withSubkeyPaths(filter.SubkeyPaths).
toArgs()
if err != nil {
return 0, err
}

// '::' has to be escaped in the query string
// https://github.com/jmoiron/sqlx/issues/91, https://github.com/jmoiron/sqlx/issues/428
query := `
INSERT INTO solana.log_poller_filters
(chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept)
VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept)
RETURNING id;`

query, sqlArgs, err := o.ds.BindNamed(query, args)
if err != nil {
return 0, err
}
if err = o.ds.GetContext(ctx, &id, query, sqlArgs...); err != nil {
return 0, err
}
return id, nil
}

// GetFilterByID returns filter by ID
func (o *DSORM) GetFilterByID(ctx context.Context, id int64) (Filter, error) {
query := `SELECT id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept
FROM solana.log_poller_filters WHERE id = $1`
var result Filter
err := o.ds.GetContext(ctx, &result, query, id)
return result, err
}

// InsertLogs is idempotent to support replays.
func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error {
if err := o.validateLogs(logs); err != nil {
return err
}
return o.Transact(ctx, func(orm *DSORM) error {
return orm.insertLogsWithinTx(ctx, logs, orm.ds)
})
}

func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

query := `INSERT INTO solana.logs
(filter_id, chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, subkey_values, tx_hash, data, created_at, expires_at, sequence_num)
VALUES
(:filter_id, :chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :subkey_values, :tx_hash, :data, NOW(), :expires_at, :sequence_num)
ON CONFLICT DO NOTHING`

_, err := tx.NamedExecContext(ctx, query, logs[start:end])
if err != nil {
if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
// In case of DB timeouts, try to insert again with a smaller batch upto a limit
batchInsertSize /= 2
i -= batchInsertSize // counteract +=batchInsertSize on next loop iteration
continue
}
return err
}
}
return nil
}

func (o *DSORM) validateLogs(logs []Log) error {
for _, log := range logs {
if o.chainID != log.ChainID {
return fmt.Errorf("invalid chainID in log got %v want %v", log.ChainID, o.chainID)
}
}
return nil
}

// SelectLogs finds the logs in a given block range.
func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig []byte) ([]Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withStartBlock(start).
withEndBlock(end).
toArgs()
if err != nil {
return nil, err
}

query := logsQuery(`
WHERE chain_id = :chain_id
AND address = :address
AND event_sig = :event_sig
AND block_number >= :start_block
AND block_number <= :end_block
ORDER BY block_number, log_index`)

var logs []Log
query, sqlArgs, err := o.ds.BindNamed(query, args)
if err != nil {
return nil, err
}

err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...)
if err != nil {
return nil, err
}
return logs, nil
}
159 changes: 159 additions & 0 deletions pkg/solana/logpoller/orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
//go:build db_tests

package logpoller

import (
"testing"
"time"

"github.com/gagliardetto/solana-go"
"github.com/google/uuid"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/lib/pq"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

// NOTE: at the moment it's not possible to run all db tests at once. This issue will be addressed separately

func TestLogPollerFilters(t *testing.T) {
lggr := logger.Test(t)
chainID := uuid.NewString()
dbx := pg.NewTestDB(t, pg.TestURL(t))
orm := NewORM(chainID, dbx, lggr)

privateKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
pubKey := privateKey.PublicKey()
t.Run("Ensure all fields are readable/writable", func(t *testing.T) {
filters := []Filter{
{
Name: "happy path",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}),
Retention: 1000,
MaxLogsKept: 3,
},
{
Name: "empty sub key paths",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubkeyPaths: SubkeyPaths([][]string{}),
Retention: 1000,
MaxLogsKept: 3,
},
{
Name: "nil sub key paths",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubkeyPaths: nil,
Retention: 1000,
MaxLogsKept: 3,
},
}

for _, filter := range filters {
t.Run("Read/write filter: "+filter.Name, func(t *testing.T) {
ctx := tests.Context(t)
id, err := orm.InsertFilter(ctx, filter)
require.NoError(t, err)
filter.ID = id
dbFilter, err := orm.GetFilterByID(ctx, id)
require.NoError(t, err)
require.Equal(t, filter, dbFilter)
})
}
})
t.Run("Returns and error if name is not unique", func(t *testing.T) {
filter := newRandomFilter(t)
ctx := tests.Context(t)
_, err = orm.InsertFilter(ctx, filter)
require.NoError(t, err)
filter.EventSig = []byte(uuid.NewString())
_, err = orm.InsertFilter(ctx, filter)
require.EqualError(t, err, `ERROR: duplicate key value violates unique constraint "solana_log_poller_filter_name" (SQLSTATE 23505)`)
})
}

func newRandomFilter(t *testing.T) Filter {
privateKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
pubKey := privateKey.PublicKey()
return Filter{
Name: uuid.NewString(),
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubkeyPaths: [][]string{{"a", "b"}, {"c"}},
Retention: 1000,
MaxLogsKept: 3,
}
}

func TestLogPollerLogs(t *testing.T) {
lggr := logger.Test(t)
chainID := uuid.NewString()
dbx := pg.NewTestDB(t, pg.TestURL(t))
orm := NewORM(chainID, dbx, lggr)

privateKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
pubKey := privateKey.PublicKey()

ctx := tests.Context(t)
// create filter as it's required for a log
filterID, err := orm.InsertFilter(ctx, Filter{
Name: "awesome filter",
Address: PublicKey(pubKey),
EventName: "event",
EventSig: []byte{1, 2, 3},
StartingBlock: 1,
EventIDL: "{}",
SubkeyPaths: [][]string{{"a", "b"}, {"c"}},
Retention: 1000,
MaxLogsKept: 3,
})
require.NoError(t, err)
data := []byte("solana is fun")
signature, err := privateKey.Sign(data)
require.NoError(t, err)
log := Log{
FilterID: filterID,
ChainID: chainID,
LogIndex: 1,
BlockHash: Hash(pubKey),
BlockNumber: 10,
BlockTimestamp: time.Unix(1731590113, 0),
Address: PublicKey(pubKey),
EventSig: []byte{3, 2, 1},
SubkeyValues: pq.ByteaArray([][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}),
TxHash: Signature(signature),
Data: data,
}
err = orm.InsertLogs(ctx, []Log{log})
require.NoError(t, err)
// insert of the same Log should not produce two instances
err = orm.InsertLogs(ctx, []Log{log})
require.NoError(t, err)
dbLogs, err := orm.SelectLogs(ctx, 0, 100, log.Address, log.EventSig)
require.NoError(t, err)
require.Len(t, dbLogs, 1)
log.ID = dbLogs[0].ID
log.CreatedAt = dbLogs[0].CreatedAt
require.Equal(t, log, dbLogs[0])
}
Loading

0 comments on commit ed1b2f6

Please sign in to comment.