From 2713e22a6225cbb901ae76ee82c6e2e99d514a82 Mon Sep 17 00:00:00 2001 From: Chris C <104409744+vreff@users.noreply.github.com> Date: Wed, 8 Mar 2023 18:44:44 -0500 Subject: [PATCH 1/2] VRF-367 Fix use of getRoundRobinAddress in VRF listener (#8653) * VRF-367 Fix use of getRoundRobinAddress in VRF listener * Move round robin call to inside SQLX callback * Add empty slice check * Add comments * Revert "Move round robin call to inside SQLX callback" This reverts commit 8a215183680d3541d7a6d0b7f9fb5c52e1c3252f. * Move fromAddress validation to validate.go * Update job spec tests * Revert "Move fromAddress validation to validate.go" This reverts commit 28d0279467cf11acf127d9c6d1ab90609016088f. * Remove specs test changes * Fix nit * Fix integration tst * Revert "Revert "Move fromAddress validation to validate.go"" This reverts commit b8b2a2251028ef62f1b21dcb4160b4d030b884ba. * Adjust validation * Fix integration test model * Fix other integration test * Adjust integration test * Fix lint --- core/services/vrf/integration_v2_test.go | 2 ++ core/services/vrf/listener_v2.go | 29 +++++++--------- core/services/vrf/listener_v2_types.go | 10 +++++- core/services/vrf/validate.go | 6 ++++ core/services/vrf/validate_test.go | 36 ++++++++++++++++++++ core/testdata/testspecs/v2_specs.go | 1 + integration-tests/actions/vrfv2_helpers.go | 2 +- integration-tests/client/chainlink_models.go | 4 +-- integration-tests/smoke/vrfv2_test.go | 6 ++-- 9 files changed, 73 insertions(+), 23 deletions(-) diff --git a/core/services/vrf/integration_v2_test.go b/core/services/vrf/integration_v2_test.go index 6beb05564b1..62288d0e5d7 100644 --- a/core/services/vrf/integration_v2_test.go +++ b/core/services/vrf/integration_v2_test.go @@ -1989,9 +1989,11 @@ func TestMaliciousConsumer(t *testing.T) { s := testspecs.GenerateVRFSpec(testspecs.VRFSpecParams{ JobID: jid.String(), Name: "vrf-primary", + FromAddresses: []string{key.Address.String()}, CoordinatorAddress: uni.rootContractAddress.String(), BatchCoordinatorAddress: uni.batchCoordinatorContractAddress.String(), MinIncomingConfirmations: incomingConfs, + GasLanePrice: assets.GWei(1), PublicKey: vrfkey.PublicKey.String(), V2: true, }).Toml() diff --git a/core/services/vrf/listener_v2.go b/core/services/vrf/listener_v2.go index ced1552b8cd..1692e9e873b 100644 --- a/core/services/vrf/listener_v2.go +++ b/core/services/vrf/listener_v2.go @@ -570,13 +570,9 @@ func (lsn *listenerV2) processRequestsPerSubBatch( } } + // All fromAddresses passed to the VRFv2 job have the same KeySpecificMaxGasPriceWei value. fromAddresses := lsn.fromAddresses() - fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...) - if err != nil { - l.Errorw("Couldn't get next from address", "err", err) - continue - } - maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddress) + maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddresses[0]) // Cases: // 1. Never simulated: in this case, we want to observe the time until simulated @@ -593,7 +589,6 @@ func (lsn *listenerV2) processRequestsPerSubBatch( ll := l.With("reqID", p.req.req.RequestId.String(), "txHash", p.req.req.Raw.TxHash, "maxGasPrice", maxGasPriceWei.String(), - "fromAddress", fromAddress, "juelsNeeded", p.juelsNeeded.String(), "maxLink", p.maxLink.String(), "gasLimit", p.gasLimit, @@ -647,7 +642,7 @@ func (lsn *listenerV2) processRequestsPerSubBatch( var processedRequestIDs []string for _, batch := range batches.fulfillments { l.Debugw("Processing batch", "batchSize", len(batch.proofs)) - p := lsn.processBatch(l, subID, fromAddress, startBalanceNoReserveLink, batchMaxGas, batch) + p := lsn.processBatch(l, subID, startBalanceNoReserveLink, batchMaxGas, batch) processedRequestIDs = append(processedRequestIDs, p...) } @@ -752,22 +747,15 @@ func (lsn *listenerV2) processRequestsPerSub( } } + // All fromAddresses passed to the VRFv2 job have the same KeySpecificMaxGasPriceWei value. fromAddresses := lsn.fromAddresses() - fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...) - if err != nil { - l.Errorw("Couldn't get next from address", "err", err) - continue - } - maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddress) - + maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddresses[0]) observeRequestSimDuration(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2, unfulfilled) - pipelines := lsn.runPipelines(ctx, l, maxGasPriceWei, unfulfilled) for _, p := range pipelines { ll := l.With("reqID", p.req.req.RequestId.String(), "txHash", p.req.req.Raw.TxHash, "maxGasPrice", maxGasPriceWei.String(), - "fromAddress", fromAddress, "juelsNeeded", p.juelsNeeded.String(), "maxLink", p.maxLink.String(), "gasLimit", p.gasLimit, @@ -809,6 +797,13 @@ func (lsn *listenerV2) processRequestsPerSub( return processed } + fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...) + if err != nil { + l.Errorw("Couldn't get next from address", "err", err) + continue + } + ll = ll.With("fromAddress", fromAddress) + ll.Infow("Enqueuing fulfillment") var ethTX txmgr.EthTx err = lsn.q.Transaction(func(tx pg.Queryer) error { diff --git a/core/services/vrf/listener_v2_types.go b/core/services/vrf/listener_v2_types.go index 5afa8d71771..78e7334dd69 100644 --- a/core/services/vrf/listener_v2_types.go +++ b/core/services/vrf/listener_v2_types.go @@ -101,7 +101,6 @@ func (b *batchFulfillments) addRun(result vrfPipelineResult) { func (lsn *listenerV2) processBatch( l logger.Logger, subID uint64, - fromAddress common.Address, startBalanceNoReserveLink *big.Int, maxCallbackGasLimit uint32, batch *batchFulfillment, @@ -125,9 +124,18 @@ func (lsn *listenerV2) processBatch( maxCallbackGasLimit, float64(lsn.job.VRFSpec.BatchFulfillmentGasMultiplier), ) + + fromAddresses := lsn.fromAddresses() + fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...) + if err != nil { + l.Errorw("Couldn't get next from address", "err", err) + return + } + ll := l.With("numRequestsInBatch", len(batch.reqIDs), "requestIDs", batch.reqIDs, "batchSumGasLimit", batch.totalGasLimit, + "fromAddress", fromAddress, "linkBalance", startBalanceNoReserveLink, "totalGasLimitBumped", totalGasLimitBumped, "gasMultiplier", lsn.job.VRFSpec.BatchFulfillmentGasMultiplier, diff --git a/core/services/vrf/validate.go b/core/services/vrf/validate.go index ed8311c4456..8ea0a909b98 100644 --- a/core/services/vrf/validate.go +++ b/core/services/vrf/validate.go @@ -86,6 +86,12 @@ func ValidatedVRFSpec(tomlString string) (job.Job, error) { if t.Type() == pipeline.TaskTypeVRF || t.Type() == pipeline.TaskTypeVRFV2 { foundVRFTask = true } + + if t.Type() == pipeline.TaskTypeVRFV2 { + if len(spec.FromAddresses) == 0 { + return jb, errors.Wrap(ErrKeyNotSet, "fromAddreses needs to have a non-zero length") + } + } } if !foundVRFTask { return jb, errors.Wrapf(ErrKeyNotSet, "invalid pipeline, expected a vrf task") diff --git a/core/services/vrf/validate_test.go b/core/services/vrf/validate_test.go index 2c737bd6d8e..8c7241ce63d 100644 --- a/core/services/vrf/validate_test.go +++ b/core/services/vrf/validate_test.go @@ -92,6 +92,42 @@ decode_log->vrf->encode_tx->submit_tx require.True(t, errors.Is(ErrKeyNotSet, errors.Cause(err))) }, }, + { + name: "missing fromAddresses", + toml: ` +type = "vrf" +schemaVersion = 1 +minIncomingConfirmations = 10 +publicKey = "0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F8179800" +coordinatorAddress = "0xB3b7874F13387D44a3398D298B075B7A3505D8d4" +requestTimeout = "168h" # 7 days +chunkSize = 25 +backoffInitialDelay = "1m" +backoffMaxDelay = "2h" +observationSource = """ +decode_log [type=ethabidecodelog + abi="RandomnessRequest(bytes32 keyHash,uint256 seed,bytes32 indexed jobID,address sender,uint256 fee,bytes32 requestID)" + data="$(jobRun.logData)" + topics="$(jobRun.logTopics)"] +vrf [type=vrfv2 + publicKey="$(jobSpec.publicKey)" + requestBlockHash="$(jobRun.logBlockHash)" + requestBlockNumber="$(jobRun.logBlockNumber)" + topics="$(jobRun.logTopics)"] +encode_tx [type=ethabiencode + abi="fulfillRandomnessRequest(bytes proof)" + data="{\\"proof\\": $(vrf)}"] +submit_tx [type=ethtx to="%s" + data="$(encode_tx)" + txMeta="{\\"requestTxHash\\": $(jobRun.logTxHash),\\"requestID\\": $(decode_log.requestID),\\"jobID\\": $(jobSpec.databaseID)}"] +decode_log->vrf->encode_tx->submit_tx +""" + `, + assertion: func(t *testing.T, s job.Job, err error) { + require.Error(t, err) + require.True(t, errors.Is(ErrKeyNotSet, errors.Cause(err))) + }, + }, { name: "missing coordinator address", toml: ` diff --git a/core/testdata/testspecs/v2_specs.go b/core/testdata/testspecs/v2_specs.go index 3f591837ce8..6d377f9ab8e 100644 --- a/core/testdata/testspecs/v2_specs.go +++ b/core/testdata/testspecs/v2_specs.go @@ -264,6 +264,7 @@ func GenerateVRFSpec(params VRFSpecParams) VRFSpec { if params.BatchCoordinatorAddress != "" { batchCoordinatorAddress = params.BatchCoordinatorAddress } + batchFulfillmentGasMultiplier := 1.0 if params.BatchFulfillmentGasMultiplier >= 1.0 { batchFulfillmentGasMultiplier = params.BatchFulfillmentGasMultiplier diff --git a/integration-tests/actions/vrfv2_helpers.go b/integration-tests/actions/vrfv2_helpers.go index 9dc705f0554..18a19978051 100644 --- a/integration-tests/actions/vrfv2_helpers.go +++ b/integration-tests/actions/vrfv2_helpers.go @@ -69,7 +69,7 @@ func CreateVRFV2Jobs( job, err := n.MustCreateJob(&client.VRFV2JobSpec{ Name: fmt.Sprintf("vrf-%s", jobUUID), CoordinatorAddress: coordinator.Address(), - FromAddress: oracleAddr, + FromAddresses: []string{oracleAddr}, EVMChainID: c.GetChainID().String(), MinIncomingConfirmations: minIncomingConfirmations, PublicKey: pubKeyCompressed, diff --git a/integration-tests/client/chainlink_models.go b/integration-tests/client/chainlink_models.go index f56c69cce00..57511c14e2a 100644 --- a/integration-tests/client/chainlink_models.go +++ b/integration-tests/client/chainlink_models.go @@ -1024,7 +1024,7 @@ type VRFV2JobSpec struct { ExternalJobID string `toml:"externalJobID"` ObservationSource string `toml:"observationSource"` // List of commands for the Chainlink node MinIncomingConfirmations int `toml:"minIncomingConfirmations"` - FromAddress string `toml:"fromAddress"` + FromAddresses []string `toml:"fromAddresses"` EVMChainID string `toml:"evmChainID"` BatchFulfillmentEnabled bool `toml:"batchFulfillmentEnabled"` BackOffInitialDelay time.Duration `toml:"backOffInitialDelay"` @@ -1041,7 +1041,7 @@ type = "vrf" schemaVersion = 1 name = "{{.Name}}" coordinatorAddress = "{{.CoordinatorAddress}}" -fromAddress = "{{.FromAddress}}" +fromAddresses = [{{range .FromAddresses}}"{{.}}",{{end}}] evmChainID = "{{.EVMChainID}}" minIncomingConfirmations = {{.MinIncomingConfirmations}} publicKey = "{{.PublicKey}}" diff --git a/integration-tests/smoke/vrfv2_test.go b/integration-tests/smoke/vrfv2_test.go index dc947e99fed..23eac87aa36 100644 --- a/integration-tests/smoke/vrfv2_test.go +++ b/integration-tests/smoke/vrfv2_test.go @@ -8,15 +8,17 @@ import ( "testing" "time" + "github.com/rs/zerolog" "github.com/smartcontractkit/chainlink-testing-framework/utils" "go.uber.org/zap/zapcore" "github.com/onsi/gomega" + "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-env/environment" "github.com/smartcontractkit/chainlink-env/pkg/helm/chainlink" "github.com/smartcontractkit/chainlink-testing-framework/blockchain" "github.com/smartcontractkit/chainlink-testing-framework/contracts/ethereum" - "github.com/stretchr/testify/require" eth "github.com/smartcontractkit/chainlink-env/pkg/helm/ethereum" @@ -117,7 +119,7 @@ func TestVRFv2Basic(t *testing.T) { job, err = n.MustCreateJob(&client.VRFV2JobSpec{ Name: fmt.Sprintf("vrf-%s", jobUUID), CoordinatorAddress: coordinator.Address(), - FromAddress: oracleAddr, + FromAddresses: []string{oracleAddr}, EVMChainID: fmt.Sprint(chainClient.GetNetworkConfig().ChainID), MinIncomingConfirmations: minimumConfirmations, PublicKey: pubKeyCompressed, From db002182a2cb386eb8d1b182c6bd59940b5b4577 Mon Sep 17 00:00:00 2001 From: Makram Date: Thu, 9 Mar 2023 12:55:18 +0200 Subject: [PATCH 2/2] Fix integration test build (#8662) --- integration-tests/smoke/vrfv2_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/smoke/vrfv2_test.go b/integration-tests/smoke/vrfv2_test.go index 23eac87aa36..c8e505696f0 100644 --- a/integration-tests/smoke/vrfv2_test.go +++ b/integration-tests/smoke/vrfv2_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/rs/zerolog" "github.com/smartcontractkit/chainlink-testing-framework/utils" "go.uber.org/zap/zapcore"