Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate token price reporting schedule #1278

Merged
merged 6 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 148 additions & 101 deletions core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,28 @@ type PriceService interface {
var _ PriceService = (*priceService)(nil)

const (
// Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin.
// 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while
// Gas prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time.
gasPriceUpdateInterval = 1 * time.Minute
// Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show
// their prices are stable, 10-minute resolution is accurate enough.
tokenPriceUpdateInterval = 10 * time.Minute

// Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin.
// 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while
// surfacing price update outages quickly enough.
priceExpireSec = 600
// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price.
// 10 minutes should result in 40 rows being cleaned up per job, it is not a heavy load on DB, so there is no need
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireSec`.
priceCleanupInterval = 600 * time.Second
priceExpireThreshold = 25 * time.Minute

// Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time.
priceUpdateInterval = 60 * time.Second
// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price.
// 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`.
priceCleanupInterval = 10 * time.Minute
)

type priceService struct {
priceExpireSec int
cleanupInterval time.Duration
updateInterval time.Duration
priceExpireThreshold time.Duration
cleanupInterval time.Duration
gasUpdateInterval time.Duration
tokenUpdateInterval time.Duration

lggr logger.Logger
orm cciporm.ORM
Expand Down Expand Up @@ -93,9 +98,10 @@ func NewPriceService(
ctx, cancel := context.WithCancel(context.Background())

pw := &priceService{
priceExpireSec: priceExpireSec,
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time
updateInterval: utils.WithJitter(priceUpdateInterval),
priceExpireThreshold: priceExpireThreshold,
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time
gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval),
tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval),

lggr: lggr,
orm: orm,
Expand Down Expand Up @@ -135,10 +141,14 @@ func (p *priceService) Close() error {

func (p *priceService) run() {
cleanupTicker := time.NewTicker(p.cleanupInterval)
updateTicker := time.NewTicker(p.updateInterval)
gasUpdateTicker := time.NewTicker(p.gasUpdateInterval)
tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval)
Comment on lines +144 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these tickers closed anywhere? defer ticker.Close() is the common pattern


go func() {
defer p.wg.Done()
defer cleanupTicker.Stop()
defer gasUpdateTicker.Stop()
defer tokenUpdateTicker.Stop()

for {
select {
Expand All @@ -149,10 +159,15 @@ func (p *priceService) run() {
if err != nil {
p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err)
}
case <-updateTicker.C:
err := p.runUpdate(p.backgroundCtx)
case <-gasUpdateTicker.C:
err := p.runGasPriceUpdate(p.backgroundCtx)
if err != nil {
p.lggr.Errorw("Error when updating prices in the background", "err", err)
p.lggr.Errorw("Error when updating gas prices in the background", "err", err)
}
case <-tokenUpdateTicker.C:
err := p.runTokenPriceUpdate(p.backgroundCtx)
if err != nil {
p.lggr.Errorw("Error when updating token prices in the background", "err", err)
}
}
}
Expand All @@ -167,8 +182,11 @@ func (p *priceService) UpdateDynamicConfig(ctx context.Context, gasPriceEstimato

// Config update may substantially change the prices, refresh the prices immediately, this also makes testing easier
// for not having to wait to the full update interval.
if err := p.runUpdate(ctx); err != nil {
p.lggr.Errorw("Error when updating prices after dynamic config update", "err", err)
if err := p.runGasPriceUpdate(ctx); err != nil {
p.lggr.Errorw("Error when updating gas prices after dynamic config update", "err", err)
}
if err := p.runTokenPriceUpdate(ctx); err != nil {
p.lggr.Errorw("Error when updating token prices after dynamic config update", "err", err)
}

return nil
Expand Down Expand Up @@ -224,15 +242,15 @@ func (p *priceService) runCleanup(ctx context.Context) error {
eg := new(errgroup.Group)

eg.Go(func() error {
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec)
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds()))
if err != nil {
return fmt.Errorf("error clearing gas prices: %w", err)
}
return nil
})

eg.Go(func() error {
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec)
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds()))
if err != nil {
return fmt.Errorf("error clearing token prices: %w", err)
}
Expand All @@ -242,153 +260,182 @@ func (p *priceService) runCleanup(ctx context.Context) error {
return eg.Wait()
}

func (p *priceService) runUpdate(ctx context.Context) error {
func (p *priceService) runGasPriceUpdate(ctx context.Context) error {
// Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader`
// Price updates happen infrequently - once every `priceUpdateInterval` seconds.
// Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds.
// It does not happen on any code path that is performance sensitive.
// We can afford to have non-performant unlocks here that is simple and safe.
p.dynamicConfigMu.RLock()
defer p.dynamicConfigMu.RUnlock()

// There may be a period of time between service is started and dynamic config is updated
if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil {
p.lggr.Info("Skipping price update due to gasPriceEstimator and/or destPriceRegistry not ready")
if p.gasPriceEstimator == nil {
p.lggr.Info("Skipping gas price update due to gasPriceEstimator not ready")
return nil
}

sourceGasPriceUSD, err := p.observeGasPriceUpdates(ctx, p.lggr)
if err != nil {
return fmt.Errorf("failed to observe gas price updates: %w", err)
}

err = p.writeGasPricesToDB(ctx, sourceGasPriceUSD)
if err != nil {
return fmt.Errorf("failed to write gas prices to db: %w", err)
}

return nil
}

func (p *priceService) runTokenPriceUpdate(ctx context.Context) error {
// Protect against concurrent updates of `tokenPriceEstimator` and `destPriceRegistryReader`
// Price updates happen infrequently - once every `tokenPriceUpdateInterval` seconds.
p.dynamicConfigMu.RLock()
defer p.dynamicConfigMu.RUnlock()

// There may be a period of time between service is started and dynamic config is updated
if p.destPriceRegistryReader == nil {
p.lggr.Info("Skipping token price update due to destPriceRegistry not ready")
return nil
}

sourceGasPriceUSD, tokenPricesUSD, err := p.observePriceUpdates(ctx, p.lggr)
tokenPricesUSD, err := p.observeTokenPriceUpdates(ctx, p.lggr)
if err != nil {
return fmt.Errorf("failed to observe price updates: %w", err)
return fmt.Errorf("failed to observe token price updates: %w", err)
}

err = p.writePricesToDB(ctx, sourceGasPriceUSD, tokenPricesUSD)
err = p.writeTokenPricesToDB(ctx, tokenPricesUSD)
if err != nil {
return fmt.Errorf("failed to write prices to db: %w", err)
return fmt.Errorf("failed to write token prices to db: %w", err)
}

return nil
}

func (p *priceService) observePriceUpdates(
func (p *priceService) observeGasPriceUpdates(
ctx context.Context,
lggr logger.Logger,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil {
return nil, nil, fmt.Errorf("gasPriceEstimator and/or destPriceRegistry is not set yet")
) (sourceGasPriceUSD *big.Int, err error) {
if p.gasPriceEstimator == nil {
return nil, fmt.Errorf("gasPriceEstimator is not set yet")
}

sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter)
// Include wrapped native to identify the source native USD price, notice USD is in 1e18 scale, i.e. $1 = 1e18
rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, []cciptypes.Address{p.sourceNative})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently working on a change to only rely on the jobspec for token price reporting. In this change, the TokenPricesUSD won't filter the tokens and spit all prices of tokens configured in the price config.

Hence the 2 calls, we are doing here to get the exactly prices for the srcNative token and the destTokens are not possible. We will simply get all the prices and filter out after. I guess this is fine as this is what TokenPricesUSD is doing internally. We would just have to do it in the observe*PriceUpdates methods here. Moreover, the goal of the PR is to optimize for writes and not read, so we are still aligned with the goal.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this compatible with today's expectation? The PriceGetter has historically never promised to return exact price data, we always query for the price and filter the result

if err != nil {
return nil, fmt.Errorf("failed to fetch source native price (%s): %w", p.sourceNative, err)
}

lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens)
sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative]
if !exists {
return nil, fmt.Errorf("missing source native (%s) price", p.sourceNative)
}

sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx)
if err != nil {
return nil, err
}
if sourceGasPrice == nil {
return nil, fmt.Errorf("missing gas price")
}
sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD)
if err != nil {
return nil, nil, fmt.Errorf("get destination tokens: %w", err)
return nil, err
}

return p.generatePriceUpdates(ctx, lggr, sortedLaneTokens)
lggr.Infow("PriceService observed latest gas price",
"sourceChainSelector", p.sourceChainSelector,
"destChainSelector", p.destChainSelector,
"sourceNative", p.sourceNative,
"gasPriceWei", sourceGasPrice,
"sourceNativePriceUSD", sourceNativePriceUSD,
"sourceGasPriceUSD", sourceGasPriceUSD,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would p.sourceNative be useful as a field in this log?

)
return sourceGasPriceUSD, nil
}

// All prices are USD ($1=1e18) denominated. All prices must be not nil.
// Return token prices should contain the exact same tokens as in tokenDecimals.
func (p *priceService) generatePriceUpdates(
func (p *priceService) observeTokenPriceUpdates(
ctx context.Context,
lggr logger.Logger,
sortedLaneTokens []cciptypes.Address,
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
// Include wrapped native in our token query as way to identify the source native USD price.
// notice USD is in 1e18 scale, i.e. $1 = 1e18
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{p.sourceNative}, sortedLaneTokens)
) (tokenPricesUSD map[cciptypes.Address]*big.Int, err error) {
if p.destPriceRegistryReader == nil {
return nil, fmt.Errorf("destPriceRegistry is not set yet")
}

sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter)
if err != nil {
return nil, fmt.Errorf("get destination tokens: %w", err)
}
Comment on lines +368 to +370
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move error check before log? filteredLaneTokens would be just nil assuming err != nil


lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens)

queryTokens := ccipcommon.FlattenUniqueSlice(sortedLaneTokens)
rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, queryTokens)
if err != nil {
return nil, nil, err
return nil, fmt.Errorf("failed to fetch token prices (%v): %w", queryTokens, err)
}
lggr.Infow("Raw token prices", "rawTokenPrices", rawTokenPricesUSD)

// make sure that we got prices for all the tokens of our query
for _, token := range queryTokens {
if rawTokenPricesUSD[token] == nil {
return nil, nil, fmt.Errorf("missing token price: %+v", token)
return nil, fmt.Errorf("missing token price: %+v", token)
}
}

sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative]
if !exists {
return nil, nil, fmt.Errorf("missing source native (%s) price", p.sourceNative)
}

destTokensDecimals, err := p.destPriceRegistryReader.GetTokensDecimals(ctx, sortedLaneTokens)
if err != nil {
return nil, nil, fmt.Errorf("get tokens decimals: %w", err)
return nil, fmt.Errorf("get tokens decimals: %w", err)
}

tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD))
for i, token := range sortedLaneTokens {
tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i])
}

sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx)
if err != nil {
return nil, nil, err
}
if sourceGasPrice == nil {
return nil, nil, fmt.Errorf("missing gas price")
}
sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD)
if err != nil {
return nil, nil, err
}

lggr.Infow("PriceService observed latest price",
lggr.Infow("PriceService observed latest token prices",
"sourceChainSelector", p.sourceChainSelector,
"destChainSelector", p.destChainSelector,
"gasPriceWei", sourceGasPrice,
"sourceNativePriceUSD", sourceNativePriceUSD,
"sourceGasPriceUSD", sourceGasPriceUSD,
"tokenPricesUSD", tokenPricesUSD,
)
return sourceGasPriceUSD, tokenPricesUSD, nil
return tokenPricesUSD, nil
}

func (p *priceService) writePricesToDB(
ctx context.Context,
sourceGasPriceUSD *big.Int,
tokenPricesUSD map[cciptypes.Address]*big.Int,
) (err error) {
eg := new(errgroup.Group)

if sourceGasPriceUSD != nil {
eg.Go(func() error {
return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{
{
SourceChainSelector: p.sourceChainSelector,
GasPrice: assets.NewWei(sourceGasPriceUSD),
},
})
})
func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) {
if sourceGasPriceUSD == nil {
return nil
}

if tokenPricesUSD != nil {
var tokenPrices []cciporm.TokenPriceUpdate
return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{
{
SourceChainSelector: p.sourceChainSelector,
GasPrice: assets.NewWei(sourceGasPriceUSD),
},
})
}

for token, price := range tokenPricesUSD {
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{
TokenAddr: string(token),
TokenPrice: assets.NewWei(price),
})
}
func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) {
if tokenPricesUSD == nil {
return nil
}

// Sort token by addr to make price updates ordering deterministic, easier to testing and debugging
sort.Slice(tokenPrices, func(i, j int) bool {
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr
})
var tokenPrices []cciporm.TokenPriceUpdate

eg.Go(func() error {
return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices)
for token, price := range tokenPricesUSD {
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{
TokenAddr: string(token),
TokenPrice: assets.NewWei(price),
})
}

return eg.Wait()
// Sort token by addr to make price updates ordering deterministic, easier for testing and debugging
sort.Slice(tokenPrices, func(i, j int) bool {
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr
})

return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices)
}

// Input price is USD per full token, with 18 decimal precision
Expand Down
Loading
Loading