From eade2d64c626af48dfde354f72c5b3c181377b29 Mon Sep 17 00:00:00 2001 From: Martin Liu Date: Wed, 2 Oct 2024 18:42:26 +1000 Subject: [PATCH 1/3] Mercury transmitter can use different codecs for native & link price reports --- .changeset/cuddly-colts-speak.md | 5 ++ core/services/relay/evm/evm.go | 44 +++++++++++------ .../services/relay/evm/mercury/transmitter.go | 12 +++-- .../relay/evm/mercury/transmitter_test.go | 48 ++++++++++++------- 4 files changed, 73 insertions(+), 36 deletions(-) create mode 100644 .changeset/cuddly-colts-speak.md diff --git a/.changeset/cuddly-colts-speak.md b/.changeset/cuddly-colts-speak.md new file mode 100644 index 00000000000..ab16cc83123 --- /dev/null +++ b/.changeset/cuddly-colts-speak.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix for Mercury transmitter decoding reports of a different codec version #internal diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 880ad73ad21..6fb84bcc93d 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -379,7 +379,6 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty if relayConfig.FeedID == nil { return nil, pkgerrors.New("FeedID must be specified") } - feedID := mercuryutils.FeedID(*relayConfig.FeedID) if relayConfig.ChainID.String() != r.chain.ID().String() { return nil, fmt.Errorf("internal error: chain id in spec does not match this relayer's chain: have %s expected %s", relayConfig.ChainID.String(), r.chain.ID().String()) @@ -430,20 +429,37 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty reportCodecV3 := reportcodecv3.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV3")) reportCodecV4 := reportcodecv4.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV4")) - var transmitterCodec mercury.TransmitterReportDecoder - switch feedID.Version() { - case 1: - transmitterCodec = reportCodecV1 - case 2: - transmitterCodec = reportCodecV2 - case 3: - transmitterCodec = reportCodecV3 - case 4: - transmitterCodec = reportCodecV4 - default: - return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) + getCodecForFeed := func(feedID mercuryutils.FeedID) (mercury.TransmitterReportDecoder, error) { + var transmitterCodec mercury.TransmitterReportDecoder + switch feedID.Version() { + case 1: + transmitterCodec = reportCodecV1 + case 2: + transmitterCodec = reportCodecV2 + case 3: + transmitterCodec = reportCodecV3 + case 4: + transmitterCodec = reportCodecV4 + default: + return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) + } + return transmitterCodec, nil + } + + benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { + benchmarkPriceCodec, err := getCodecForFeed(feedID) + if err != nil { + return nil, err + } + return benchmarkPriceCodec.BenchmarkPriceFromReport(report) + } + + transmitterCodec, err := getCodecForFeed(mercuryutils.FeedID(*relayConfig.FeedID)) + if err != nil { + return nil, err } - transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, r.triggerCapability) + + transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec, benchmarkPriceDecoder, r.triggerCapability) return NewMercuryProvider(cp, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, reportCodecV4, lggr), nil } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index b914e67b453..c2cb89387a4 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -101,6 +101,8 @@ type TransmitterReportDecoder interface { ObservationTimestampFromReport(report ocrtypes.Report) (uint32, error) } +type BenchmarkPriceDecoder func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) + var _ Transmitter = (*mercuryTransmitter)(nil) type TransmitterConfig interface { @@ -116,8 +118,9 @@ type mercuryTransmitter struct { orm ORM servers map[string]*server - codec TransmitterReportDecoder - triggerCapability *triggers.MercuryTriggerService + codec TransmitterReportDecoder + benchmarkPriceDecoder BenchmarkPriceDecoder + triggerCapability *triggers.MercuryTriggerService feedID mercuryutils.FeedID jobID int32 @@ -301,7 +304,7 @@ func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, p } } -func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter { +func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, benchmarkPriceDecoder BenchmarkPriceDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter { sugared := logger.Sugared(lggr) feedIDHex := fmt.Sprintf("0x%x", feedID[:]) servers := make(map[string]*server, len(clients)) @@ -317,6 +320,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin orm, servers, codec, + benchmarkPriceDecoder, triggerCapability, feedID, jobID, @@ -513,7 +517,7 @@ func (mt *mercuryTransmitter) LatestPrice(ctx context.Context, feedID [32]byte) if !is { return nil, fmt.Errorf("expected report to be []byte, but it was %T", m["report"]) } - return mt.codec.BenchmarkPriceFromReport(report) + return mt.benchmarkPriceDecoder(feedID, report) } // LatestTimestamp will return -1, nil if the feed is missing diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 4eedc0c24a8..31868881c03 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -9,11 +9,10 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" @@ -21,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" + mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" @@ -43,6 +43,9 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) codec := new(mockCodec) + benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { + return codec.BenchmarkPriceFromReport(report) + } orm := NewORM(db) clients := map[string]wsrpc.Client{} @@ -51,7 +54,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV1Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -65,7 +68,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV2Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -79,7 +82,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV3Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -94,7 +97,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { c := &mocks.MockWSRPCClient{} clients[sURL] = c triggerService := triggers.NewMercuryTriggerService(0, lggr) - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, triggerService) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, triggerService) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -111,7 +114,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { clients[sURL2] = c clients[sURL3] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) mt.servers[sURL2].q.Init([]*Transmission{}) @@ -136,6 +139,9 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { db := pgtest.NewSqlxDB(t) var jobID int32 codec := new(mockCodec) + benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { + return codec.BenchmarkPriceFromReport(report) + } orm := NewORM(db) clients := map[string]wsrpc.Client{} @@ -153,7 +159,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -169,7 +175,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -183,7 +189,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) _, err := mt.LatestTimestamp(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -213,7 +219,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -243,6 +249,9 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { var jobID int32 codec := new(mockCodec) + benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { + return codec.BenchmarkPriceFromReport(report) + } orm := NewORM(db) clients := map[string]wsrpc.Client{} @@ -260,7 +269,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) { codec.val = originalPrice @@ -291,7 +300,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.NoError(t, err) @@ -305,7 +314,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) _, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -319,6 +328,9 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { db := pgtest.NewSqlxDB(t) var jobID int32 codec := new(mockCodec) + benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { + return codec.BenchmarkPriceFromReport(report) + } orm := NewORM(db) clients := map[string]wsrpc.Client{} @@ -335,7 +347,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -351,7 +363,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -364,7 +376,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -382,7 +394,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, nil) + mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec, benchmarkPriceDecoder, nil) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x") From 4cc33369ddd9b0e2749394150a3ef3f24c40c687 Mon Sep 17 00:00:00 2001 From: Ivaylo Novakov Date: Wed, 2 Oct 2024 15:05:01 +0200 Subject: [PATCH 2/3] Fix lint. --- core/services/relay/evm/evm.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 6fb84bcc93d..8893959659e 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -447,7 +447,8 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty } benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { - benchmarkPriceCodec, err := getCodecForFeed(feedID) + var benchmarkPriceCodec mercury.TransmitterReportDecoder + benchmarkPriceCodec, err = getCodecForFeed(feedID) if err != nil { return nil, err } From a9200e78079ebce2e63ec0839ca5d7af70e22709 Mon Sep 17 00:00:00 2001 From: Martin Liu Date: Thu, 3 Oct 2024 04:43:45 +1000 Subject: [PATCH 3/3] Don't shadow err var --- core/services/relay/evm/evm.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 8893959659e..6031d396936 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -447,10 +447,9 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty } benchmarkPriceDecoder := func(feedID mercuryutils.FeedID, report ocrtypes.Report) (*big.Int, error) { - var benchmarkPriceCodec mercury.TransmitterReportDecoder - benchmarkPriceCodec, err = getCodecForFeed(feedID) - if err != nil { - return nil, err + benchmarkPriceCodec, benchmarkPriceErr := getCodecForFeed(feedID) + if benchmarkPriceErr != nil { + return nil, benchmarkPriceErr } return benchmarkPriceCodec.BenchmarkPriceFromReport(report) }