Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mercury transmitter can use different codecs for native & link price reports #14631

Merged
merged 4 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cuddly-colts-speak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix for Mercury transmitter decoding reports of a different codec version #internal
44 changes: 30 additions & 14 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
if relayConfig.FeedID == nil {
return nil, pkgerrors.New("FeedID must be specified")
}
feedID := mercuryutils.FeedID(*relayConfig.FeedID)

if relayConfig.ChainID.String() != r.chain.ID().String() {
return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String())
Expand Down Expand Up @@ -430,20 +429,37 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty
reportCodecV3 := reportcodecv3.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV3"))
reportCodecV4 := reportcodecv4.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV4"))

var transmitterCodec mercury.TransmitterReportDecoder
switch feedID.Version() {
case 1:
transmitterCodec = reportCodecV1
case 2:
transmitterCodec = reportCodecV2
case 3:
transmitterCodec = reportCodecV3
case 4:
transmitterCodec = reportCodecV4
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
getCodecForFeed := func(feedID mercuryutils.FeedID) (mercury.TransmitterReportDecoder, error) {
var transmitterCodec mercury.TransmitterReportDecoder
switch feedID.Version() {
case 1:
transmitterCodec = reportCodecV1
case 2:
transmitterCodec = reportCodecV2
case 3:
transmitterCodec = reportCodecV3
case 4:
transmitterCodec = reportCodecV4
default:
return nil, fmt.Errorf("invalid feed version %d", feedID.Version())
}
return transmitterCodec, nil
}

benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
benchmarkPriceCodec, benchmarkPriceErr := getCodecForFeed(feedID)
if benchmarkPriceErr != nil {
return nil, benchmarkPriceErr
}
return benchmarkPriceCodec.BenchmarkPriceFromReport(report)
}
transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, r.triggerCapability)

transmitterCodec, err := getCodecForFeed(mercuryutils.FeedID(*relayConfig.FeedID))
if err != nil {
return nil, err
}

transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability)

return NewMercuryProvider(cp, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, reportCodecV4, lggr), nil
}
Expand Down
12 changes: 8 additions & 4 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type TransmitterReportDecoder interface {
ObservationTimestampFromReport(report ocrtypes.Report) (uint32, error)
}

type BenchmarkPriceDecoder func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error)

var _ Transmitter = (*mercuryTransmitter)(nil)

type TransmitterConfig interface {
Expand All @@ -116,8 +118,9 @@ type mercuryTransmitter struct {
orm ORM
servers map[string]*server

codec TransmitterReportDecoder
triggerCapability *triggers.MercuryTriggerService
codec TransmitterReportDecoder
benchmarkPriceDecoder BenchmarkPriceDecoder
triggerCapability *triggers.MercuryTriggerService

feedID mercuryutils.FeedID
jobID int32
Expand Down Expand Up @@ -301,7 +304,7 @@ func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, p
}
}

func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, benchmarkPriceDecoder BenchmarkPriceDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
sugared := logger.Sugared(lggr)
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
Expand All @@ -317,6 +320,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin
orm,
servers,
codec,
benchmarkPriceDecoder,
triggerCapability,
feedID,
jobID,
Expand Down Expand Up @@ -513,7 +517,7 @@ func (mt *mercuryTransmitter) LatestPrice(ctx context.Context, feedID [32]byte)
if !is {
return nil, fmt.Errorf("expected report to be []byte, but it was %T", m["report"])
}
return mt.codec.BenchmarkPriceFromReport(report)
return mt.benchmarkPriceDecoder(feedID, report)
}

// LatestTimestamp will return -1, nil if the feed is missing
Expand Down
48 changes: 30 additions & 18 deletions core/services/relay/evm/mercury/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
Expand All @@ -43,6 +43,9 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}
orm := NewORM(db)
clients := map[string]wsrpc.Client{}

Expand All @@ -51,7 +54,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV1Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -65,7 +68,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV2Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -79,7 +82,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
report := sampleV3Report
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -94,7 +97,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
c := &mocks.MockWSRPCClient{}
clients[sURL] = c
triggerService := triggers.NewMercuryTriggerService(0, lggr)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, triggerService)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, triggerService)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs)
Expand All @@ -111,7 +114,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) {
clients[sURL2] = c
clients[sURL3] = c

mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
// init the queue since we skipped starting transmitter
mt.servers[sURL].q.Init([]*Transmission{})
mt.servers[sURL2].q.Init([]*Transmission{})
Expand All @@ -136,6 +139,9 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
db := pgtest.NewSqlxDB(t)
var jobID int32
codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}

orm := NewORM(db)
clients := map[string]wsrpc.Client{}
Expand All @@ -153,7 +159,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -169,7 +175,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -183,7 +189,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.LatestTimestamp(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand Down Expand Up @@ -213,7 +219,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) {
return out, nil
},
}
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
ts, err := mt.LatestTimestamp(testutils.Context(t))
require.NoError(t, err)

Expand Down Expand Up @@ -243,6 +249,9 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
var jobID int32

codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}
orm := NewORM(db)
clients := map[string]wsrpc.Client{}

Expand All @@ -260,7 +269,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)

t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) {
codec.val = originalPrice
Expand Down Expand Up @@ -291,7 +300,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID)
require.NoError(t, err)

Expand All @@ -305,7 +314,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.LatestPrice(testutils.Context(t), sampleFeedID)
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand All @@ -319,6 +328,9 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
db := pgtest.NewSqlxDB(t)
var jobID int32
codec := new(mockCodec)
benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) {
return codec.BenchmarkPriceFromReport(report)
}
orm := NewORM(db)
clients := map[string]wsrpc.Client{}

Expand All @@ -335,7 +347,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -351,7 +363,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.NoError(t, err)

Expand All @@ -364,7 +376,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "something exploded")
Expand All @@ -382,7 +394,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) {
},
}
clients[sURL] = c
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil)
mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil)
_, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t))
require.Error(t, err)
assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x")
Expand Down
Loading