Skip to content

Commit

Permalink
VRF-367 Fix use of getRoundRobinAddress in VRF listener (#8653)
Browse files Browse the repository at this point in the history
* VRF-367 Fix use of getRoundRobinAddress in VRF listener

* Move round robin call to inside SQLX callback

* Add empty slice check

* Add comments

* Revert "Move round robin call to inside SQLX callback"

This reverts commit 8a21518.

* Move fromAddress validation to validate.go

* Update job spec tests

* Revert "Move fromAddress validation to validate.go"

This reverts commit 28d0279.

* Remove specs test changes

* Fix nit

* Fix integration tst

* Revert "Revert "Move fromAddress validation to validate.go""

This reverts commit b8b2a22.

* Adjust validation

* Fix integration test model

* Fix other integration test

* Adjust integration test

* Fix lint
  • Loading branch information
vreff authored Mar 8, 2023
1 parent 1e63bed commit d003932
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 24 deletions.
2 changes: 2 additions & 0 deletions core/services/vrf/integration_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1910,9 +1910,11 @@ func TestMaliciousConsumer(t *testing.T) {
s := testspecs.GenerateVRFSpec(testspecs.VRFSpecParams{
JobID: jid.String(),
Name: "vrf-primary",
FromAddresses: []string{key.Address.String()},
CoordinatorAddress: uni.rootContractAddress.String(),
BatchCoordinatorAddress: uni.batchCoordinatorContractAddress.String(),
MinIncomingConfirmations: incomingConfs,
GasLanePrice: assets.GWei(1),
PublicKey: vrfkey.PublicKey.String(),
V2: true,
}).Toml()
Expand Down
29 changes: 12 additions & 17 deletions core/services/vrf/listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,13 +570,9 @@ func (lsn *listenerV2) processRequestsPerSubBatch(
}
}

// All fromAddresses passed to the VRFv2 job have the same KeySpecificMaxGasPriceWei value.
fromAddresses := lsn.fromAddresses()
fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...)
if err != nil {
l.Errorw("Couldn't get next from address", "err", err)
continue
}
maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddress)
maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddresses[0])

// Cases:
// 1. Never simulated: in this case, we want to observe the time until simulated
Expand All @@ -593,7 +589,6 @@ func (lsn *listenerV2) processRequestsPerSubBatch(
ll := l.With("reqID", p.req.req.RequestId.String(),
"txHash", p.req.req.Raw.TxHash,
"maxGasPrice", maxGasPriceWei.String(),
"fromAddress", fromAddress,
"juelsNeeded", p.juelsNeeded.String(),
"maxLink", p.maxLink.String(),
"gasLimit", p.gasLimit,
Expand Down Expand Up @@ -647,7 +642,7 @@ func (lsn *listenerV2) processRequestsPerSubBatch(
var processedRequestIDs []string
for _, batch := range batches.fulfillments {
l.Debugw("Processing batch", "batchSize", len(batch.proofs))
p := lsn.processBatch(l, subID, fromAddress, startBalanceNoReserveLink, batchMaxGas, batch)
p := lsn.processBatch(l, subID, startBalanceNoReserveLink, batchMaxGas, batch)
processedRequestIDs = append(processedRequestIDs, p...)
}

Expand Down Expand Up @@ -752,22 +747,15 @@ func (lsn *listenerV2) processRequestsPerSub(
}
}

// All fromAddresses passed to the VRFv2 job have the same KeySpecificMaxGasPriceWei value.
fromAddresses := lsn.fromAddresses()
fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...)
if err != nil {
l.Errorw("Couldn't get next from address", "err", err)
continue
}
maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddress)

maxGasPriceWei := lsn.cfg.KeySpecificMaxGasPriceWei(fromAddresses[0])
observeRequestSimDuration(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2, unfulfilled)

pipelines := lsn.runPipelines(ctx, l, maxGasPriceWei, unfulfilled)
for _, p := range pipelines {
ll := l.With("reqID", p.req.req.RequestId.String(),
"txHash", p.req.req.Raw.TxHash,
"maxGasPrice", maxGasPriceWei.String(),
"fromAddress", fromAddress,
"juelsNeeded", p.juelsNeeded.String(),
"maxLink", p.maxLink.String(),
"gasLimit", p.gasLimit,
Expand Down Expand Up @@ -809,6 +797,13 @@ func (lsn *listenerV2) processRequestsPerSub(
return processed
}

fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...)
if err != nil {
l.Errorw("Couldn't get next from address", "err", err)
continue
}
ll = ll.With("fromAddress", fromAddress)

ll.Infow("Enqueuing fulfillment")
var ethTX txmgr.EthTx
err = lsn.q.Transaction(func(tx pg.Queryer) error {
Expand Down
10 changes: 9 additions & 1 deletion core/services/vrf/listener_v2_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (b *batchFulfillments) addRun(result vrfPipelineResult) {
func (lsn *listenerV2) processBatch(
l logger.Logger,
subID uint64,
fromAddress common.Address,
startBalanceNoReserveLink *big.Int,
maxCallbackGasLimit uint32,
batch *batchFulfillment,
Expand All @@ -125,9 +124,18 @@ func (lsn *listenerV2) processBatch(
maxCallbackGasLimit,
float64(lsn.job.VRFSpec.BatchFulfillmentGasMultiplier),
)

fromAddresses := lsn.fromAddresses()
fromAddress, err := lsn.gethks.GetRoundRobinAddress(lsn.chainID, fromAddresses...)
if err != nil {
l.Errorw("Couldn't get next from address", "err", err)
return
}

ll := l.With("numRequestsInBatch", len(batch.reqIDs),
"requestIDs", batch.reqIDs,
"batchSumGasLimit", batch.totalGasLimit,
"fromAddress", fromAddress,
"linkBalance", startBalanceNoReserveLink,
"totalGasLimitBumped", totalGasLimitBumped,
"gasMultiplier", lsn.job.VRFSpec.BatchFulfillmentGasMultiplier,
Expand Down
6 changes: 6 additions & 0 deletions core/services/vrf/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func ValidatedVRFSpec(tomlString string) (job.Job, error) {
if t.Type() == pipeline.TaskTypeVRF || t.Type() == pipeline.TaskTypeVRFV2 {
foundVRFTask = true
}

if t.Type() == pipeline.TaskTypeVRFV2 {
if len(spec.FromAddresses) == 0 {
return jb, errors.Wrap(ErrKeyNotSet, "fromAddreses needs to have a non-zero length")
}
}
}
if !foundVRFTask {
return jb, errors.Wrapf(ErrKeyNotSet, "invalid pipeline, expected a vrf task")
Expand Down
36 changes: 36 additions & 0 deletions core/services/vrf/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,42 @@ decode_log->vrf->encode_tx->submit_tx
require.True(t, errors.Is(ErrKeyNotSet, errors.Cause(err)))
},
},
{
name: "missing fromAddresses",
toml: `
type = "vrf"
schemaVersion = 1
minIncomingConfirmations = 10
publicKey = "0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F8179800"
coordinatorAddress = "0xB3b7874F13387D44a3398D298B075B7A3505D8d4"
requestTimeout = "168h" # 7 days
chunkSize = 25
backoffInitialDelay = "1m"
backoffMaxDelay = "2h"
observationSource = """
decode_log [type=ethabidecodelog
abi="RandomnessRequest(bytes32 keyHash,uint256 seed,bytes32 indexed jobID,address sender,uint256 fee,bytes32 requestID)"
data="$(jobRun.logData)"
topics="$(jobRun.logTopics)"]
vrf [type=vrfv2
publicKey="$(jobSpec.publicKey)"
requestBlockHash="$(jobRun.logBlockHash)"
requestBlockNumber="$(jobRun.logBlockNumber)"
topics="$(jobRun.logTopics)"]
encode_tx [type=ethabiencode
abi="fulfillRandomnessRequest(bytes proof)"
data="{\\"proof\\": $(vrf)}"]
submit_tx [type=ethtx to="%s"
data="$(encode_tx)"
txMeta="{\\"requestTxHash\\": $(jobRun.logTxHash),\\"requestID\\": $(decode_log.requestID),\\"jobID\\": $(jobSpec.databaseID)}"]
decode_log->vrf->encode_tx->submit_tx
"""
`,
assertion: func(t *testing.T, s job.Job, err error) {
require.Error(t, err)
require.True(t, errors.Is(ErrKeyNotSet, errors.Cause(err)))
},
},
{
name: "missing coordinator address",
toml: `
Expand Down
1 change: 1 addition & 0 deletions core/testdata/testspecs/v2_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func GenerateVRFSpec(params VRFSpecParams) VRFSpec {
if params.BatchCoordinatorAddress != "" {
batchCoordinatorAddress = params.BatchCoordinatorAddress
}

batchFulfillmentGasMultiplier := 1.0
if params.BatchFulfillmentGasMultiplier >= 1.0 {
batchFulfillmentGasMultiplier = params.BatchFulfillmentGasMultiplier
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/actions/vrfv2_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func CreateVRFV2Jobs(
job, err := n.MustCreateJob(&client.VRFV2JobSpec{
Name: fmt.Sprintf("vrf-%s", jobUUID),
CoordinatorAddress: coordinator.Address(),
FromAddress: oracleAddr,
FromAddresses: []string{oracleAddr},
EVMChainID: c.GetChainID().String(),
MinIncomingConfirmations: minIncomingConfirmations,
PublicKey: pubKeyCompressed,
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/client/chainlink_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ type VRFV2JobSpec struct {
ExternalJobID string `toml:"externalJobID"`
ObservationSource string `toml:"observationSource"` // List of commands for the Chainlink node
MinIncomingConfirmations int `toml:"minIncomingConfirmations"`
FromAddress string `toml:"fromAddress"`
FromAddresses []string `toml:"fromAddresses"`
EVMChainID string `toml:"evmChainID"`
BatchFulfillmentEnabled bool `toml:"batchFulfillmentEnabled"`
BackOffInitialDelay time.Duration `toml:"backOffInitialDelay"`
Expand All @@ -1041,7 +1041,7 @@ type = "vrf"
schemaVersion = 1
name = "{{.Name}}"
coordinatorAddress = "{{.CoordinatorAddress}}"
fromAddress = "{{.FromAddress}}"
fromAddresses = [{{range .FromAddresses}}"{{.}}",{{end}}]
evmChainID = "{{.EVMChainID}}"
minIncomingConfirmations = {{.MinIncomingConfirmations}}
publicKey = "{{.PublicKey}}"
Expand Down
8 changes: 5 additions & 3 deletions integration-tests/smoke/vrfv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import (
"time"

"github.com/rs/zerolog"
"github.com/smartcontractkit/chainlink-testing-framework/utils"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-testing-framework/utils"

"github.com/onsi/gomega"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-env/environment"
"github.com/smartcontractkit/chainlink-env/pkg/helm/chainlink"
"github.com/smartcontractkit/chainlink-testing-framework/blockchain"
"github.com/smartcontractkit/chainlink-testing-framework/contracts/ethereum"
"github.com/stretchr/testify/require"

eth "github.com/smartcontractkit/chainlink-env/pkg/helm/ethereum"

Expand Down Expand Up @@ -118,7 +120,7 @@ func TestVRFv2Basic(t *testing.T) {
job, err = n.MustCreateJob(&client.VRFV2JobSpec{
Name: fmt.Sprintf("vrf-%s", jobUUID),
CoordinatorAddress: coordinator.Address(),
FromAddress: oracleAddr,
FromAddresses: []string{oracleAddr},
EVMChainID: fmt.Sprint(chainClient.GetNetworkConfig().ChainID),
MinIncomingConfirmations: minimumConfirmations,
PublicKey: pubKeyCompressed,
Expand Down

0 comments on commit d003932

Please sign in to comment.