-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate token price reporting schedule #1278
Changes from all commits
7c72d02
9520a63
b185ffe
2132099
ab0d639
3af3383
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,23 +42,28 @@ type PriceService interface { | |
var _ PriceService = (*priceService)(nil) | ||
|
||
const ( | ||
// Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin. | ||
// 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while | ||
// Gas prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. | ||
gasPriceUpdateInterval = 1 * time.Minute | ||
// Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show | ||
// their prices are stable, 10-minute resolution is accurate enough. | ||
tokenPriceUpdateInterval = 10 * time.Minute | ||
|
||
// Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin. | ||
// 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while | ||
// surfacing price update outages quickly enough. | ||
priceExpireSec = 600 | ||
// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. | ||
// 10 minutes should result in 40 rows being cleaned up per job, it is not a heavy load on DB, so there is no need | ||
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireSec`. | ||
priceCleanupInterval = 600 * time.Second | ||
priceExpireThreshold = 25 * time.Minute | ||
|
||
// Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. | ||
priceUpdateInterval = 60 * time.Second | ||
// Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. | ||
// 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need | ||
// to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`. | ||
priceCleanupInterval = 10 * time.Minute | ||
) | ||
|
||
type priceService struct { | ||
priceExpireSec int | ||
cleanupInterval time.Duration | ||
updateInterval time.Duration | ||
priceExpireThreshold time.Duration | ||
cleanupInterval time.Duration | ||
gasUpdateInterval time.Duration | ||
tokenUpdateInterval time.Duration | ||
|
||
lggr logger.Logger | ||
orm cciporm.ORM | ||
|
@@ -93,9 +98,10 @@ func NewPriceService( | |
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
pw := &priceService{ | ||
priceExpireSec: priceExpireSec, | ||
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time | ||
updateInterval: utils.WithJitter(priceUpdateInterval), | ||
priceExpireThreshold: priceExpireThreshold, | ||
cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time | ||
gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval), | ||
tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval), | ||
|
||
lggr: lggr, | ||
orm: orm, | ||
|
@@ -135,10 +141,14 @@ func (p *priceService) Close() error { | |
|
||
func (p *priceService) run() { | ||
cleanupTicker := time.NewTicker(p.cleanupInterval) | ||
updateTicker := time.NewTicker(p.updateInterval) | ||
gasUpdateTicker := time.NewTicker(p.gasUpdateInterval) | ||
tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval) | ||
|
||
go func() { | ||
defer p.wg.Done() | ||
defer cleanupTicker.Stop() | ||
defer gasUpdateTicker.Stop() | ||
defer tokenUpdateTicker.Stop() | ||
|
||
for { | ||
select { | ||
|
@@ -149,10 +159,15 @@ func (p *priceService) run() { | |
if err != nil { | ||
p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) | ||
} | ||
case <-updateTicker.C: | ||
err := p.runUpdate(p.backgroundCtx) | ||
case <-gasUpdateTicker.C: | ||
err := p.runGasPriceUpdate(p.backgroundCtx) | ||
if err != nil { | ||
p.lggr.Errorw("Error when updating prices in the background", "err", err) | ||
p.lggr.Errorw("Error when updating gas prices in the background", "err", err) | ||
} | ||
case <-tokenUpdateTicker.C: | ||
err := p.runTokenPriceUpdate(p.backgroundCtx) | ||
if err != nil { | ||
p.lggr.Errorw("Error when updating token prices in the background", "err", err) | ||
} | ||
} | ||
} | ||
|
@@ -167,8 +182,11 @@ func (p *priceService) UpdateDynamicConfig(ctx context.Context, gasPriceEstimato | |
|
||
// Config update may substantially change the prices, refresh the prices immediately, this also makes testing easier | ||
// for not having to wait to the full update interval. | ||
if err := p.runUpdate(ctx); err != nil { | ||
p.lggr.Errorw("Error when updating prices after dynamic config update", "err", err) | ||
if err := p.runGasPriceUpdate(ctx); err != nil { | ||
p.lggr.Errorw("Error when updating gas prices after dynamic config update", "err", err) | ||
} | ||
if err := p.runTokenPriceUpdate(ctx); err != nil { | ||
p.lggr.Errorw("Error when updating token prices after dynamic config update", "err", err) | ||
} | ||
|
||
return nil | ||
|
@@ -224,15 +242,15 @@ func (p *priceService) runCleanup(ctx context.Context) error { | |
eg := new(errgroup.Group) | ||
|
||
eg.Go(func() error { | ||
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) | ||
err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) | ||
if err != nil { | ||
return fmt.Errorf("error clearing gas prices: %w", err) | ||
} | ||
return nil | ||
}) | ||
|
||
eg.Go(func() error { | ||
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) | ||
err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) | ||
if err != nil { | ||
return fmt.Errorf("error clearing token prices: %w", err) | ||
} | ||
|
@@ -242,153 +260,182 @@ func (p *priceService) runCleanup(ctx context.Context) error { | |
return eg.Wait() | ||
} | ||
|
||
func (p *priceService) runUpdate(ctx context.Context) error { | ||
func (p *priceService) runGasPriceUpdate(ctx context.Context) error { | ||
// Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` | ||
// Price updates happen infrequently - once every `priceUpdateInterval` seconds. | ||
// Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds. | ||
// It does not happen on any code path that is performance sensitive. | ||
// We can afford to have non-performant unlocks here that is simple and safe. | ||
p.dynamicConfigMu.RLock() | ||
defer p.dynamicConfigMu.RUnlock() | ||
|
||
// There may be a period of time between service is started and dynamic config is updated | ||
if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { | ||
p.lggr.Info("Skipping price update due to gasPriceEstimator and/or destPriceRegistry not ready") | ||
if p.gasPriceEstimator == nil { | ||
p.lggr.Info("Skipping gas price update due to gasPriceEstimator not ready") | ||
return nil | ||
} | ||
|
||
sourceGasPriceUSD, err := p.observeGasPriceUpdates(ctx, p.lggr) | ||
if err != nil { | ||
return fmt.Errorf("failed to observe gas price updates: %w", err) | ||
} | ||
|
||
err = p.writeGasPricesToDB(ctx, sourceGasPriceUSD) | ||
if err != nil { | ||
return fmt.Errorf("failed to write gas prices to db: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (p *priceService) runTokenPriceUpdate(ctx context.Context) error { | ||
// Protect against concurrent updates of `tokenPriceEstimator` and `destPriceRegistryReader` | ||
// Price updates happen infrequently - once every `tokenPriceUpdateInterval` seconds. | ||
p.dynamicConfigMu.RLock() | ||
defer p.dynamicConfigMu.RUnlock() | ||
|
||
// There may be a period of time between service is started and dynamic config is updated | ||
if p.destPriceRegistryReader == nil { | ||
p.lggr.Info("Skipping token price update due to destPriceRegistry not ready") | ||
return nil | ||
} | ||
|
||
sourceGasPriceUSD, tokenPricesUSD, err := p.observePriceUpdates(ctx, p.lggr) | ||
tokenPricesUSD, err := p.observeTokenPriceUpdates(ctx, p.lggr) | ||
if err != nil { | ||
return fmt.Errorf("failed to observe price updates: %w", err) | ||
return fmt.Errorf("failed to observe token price updates: %w", err) | ||
} | ||
|
||
err = p.writePricesToDB(ctx, sourceGasPriceUSD, tokenPricesUSD) | ||
err = p.writeTokenPricesToDB(ctx, tokenPricesUSD) | ||
if err != nil { | ||
return fmt.Errorf("failed to write prices to db: %w", err) | ||
return fmt.Errorf("failed to write token prices to db: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (p *priceService) observePriceUpdates( | ||
func (p *priceService) observeGasPriceUpdates( | ||
ctx context.Context, | ||
lggr logger.Logger, | ||
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { | ||
if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { | ||
return nil, nil, fmt.Errorf("gasPriceEstimator and/or destPriceRegistry is not set yet") | ||
) (sourceGasPriceUSD *big.Int, err error) { | ||
if p.gasPriceEstimator == nil { | ||
return nil, fmt.Errorf("gasPriceEstimator is not set yet") | ||
} | ||
|
||
sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) | ||
// Include wrapped native to identify the source native USD price, notice USD is in 1e18 scale, i.e. $1 = 1e18 | ||
rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, []cciptypes.Address{p.sourceNative}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm currently working on a change to only rely on the jobspec for token price reporting. In this change, the Hence the 2 calls, we are doing here to get the exactly prices for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't this compatible with today's expectation? The PriceGetter has historically never promised to return exact price data, we always query for the price and filter the result |
||
if err != nil { | ||
return nil, fmt.Errorf("failed to fetch source native price (%s): %w", p.sourceNative, err) | ||
} | ||
|
||
lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) | ||
sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] | ||
if !exists { | ||
return nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) | ||
} | ||
|
||
sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if sourceGasPrice == nil { | ||
return nil, fmt.Errorf("missing gas price") | ||
} | ||
sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("get destination tokens: %w", err) | ||
return nil, err | ||
} | ||
|
||
return p.generatePriceUpdates(ctx, lggr, sortedLaneTokens) | ||
lggr.Infow("PriceService observed latest gas price", | ||
"sourceChainSelector", p.sourceChainSelector, | ||
"destChainSelector", p.destChainSelector, | ||
"sourceNative", p.sourceNative, | ||
"gasPriceWei", sourceGasPrice, | ||
"sourceNativePriceUSD", sourceNativePriceUSD, | ||
"sourceGasPriceUSD", sourceGasPriceUSD, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would |
||
) | ||
return sourceGasPriceUSD, nil | ||
} | ||
|
||
// All prices are USD ($1=1e18) denominated. All prices must be not nil. | ||
// Return token prices should contain the exact same tokens as in tokenDecimals. | ||
func (p *priceService) generatePriceUpdates( | ||
func (p *priceService) observeTokenPriceUpdates( | ||
ctx context.Context, | ||
lggr logger.Logger, | ||
sortedLaneTokens []cciptypes.Address, | ||
) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { | ||
// Include wrapped native in our token query as way to identify the source native USD price. | ||
// notice USD is in 1e18 scale, i.e. $1 = 1e18 | ||
queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{p.sourceNative}, sortedLaneTokens) | ||
) (tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { | ||
if p.destPriceRegistryReader == nil { | ||
return nil, fmt.Errorf("destPriceRegistry is not set yet") | ||
} | ||
|
||
sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) | ||
if err != nil { | ||
return nil, fmt.Errorf("get destination tokens: %w", err) | ||
} | ||
Comment on lines
+368
to
+370
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move error check before log? |
||
|
||
lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) | ||
|
||
queryTokens := ccipcommon.FlattenUniqueSlice(sortedLaneTokens) | ||
rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, queryTokens) | ||
if err != nil { | ||
return nil, nil, err | ||
return nil, fmt.Errorf("failed to fetch token prices (%v): %w", queryTokens, err) | ||
} | ||
lggr.Infow("Raw token prices", "rawTokenPrices", rawTokenPricesUSD) | ||
|
||
// make sure that we got prices for all the tokens of our query | ||
for _, token := range queryTokens { | ||
if rawTokenPricesUSD[token] == nil { | ||
return nil, nil, fmt.Errorf("missing token price: %+v", token) | ||
return nil, fmt.Errorf("missing token price: %+v", token) | ||
} | ||
} | ||
|
||
sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] | ||
if !exists { | ||
return nil, nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) | ||
} | ||
|
||
destTokensDecimals, err := p.destPriceRegistryReader.GetTokensDecimals(ctx, sortedLaneTokens) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("get tokens decimals: %w", err) | ||
return nil, fmt.Errorf("get tokens decimals: %w", err) | ||
} | ||
|
||
tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD)) | ||
for i, token := range sortedLaneTokens { | ||
tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i]) | ||
} | ||
|
||
sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
if sourceGasPrice == nil { | ||
return nil, nil, fmt.Errorf("missing gas price") | ||
} | ||
sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
lggr.Infow("PriceService observed latest price", | ||
lggr.Infow("PriceService observed latest token prices", | ||
"sourceChainSelector", p.sourceChainSelector, | ||
"destChainSelector", p.destChainSelector, | ||
"gasPriceWei", sourceGasPrice, | ||
"sourceNativePriceUSD", sourceNativePriceUSD, | ||
"sourceGasPriceUSD", sourceGasPriceUSD, | ||
"tokenPricesUSD", tokenPricesUSD, | ||
) | ||
return sourceGasPriceUSD, tokenPricesUSD, nil | ||
return tokenPricesUSD, nil | ||
} | ||
|
||
func (p *priceService) writePricesToDB( | ||
ctx context.Context, | ||
sourceGasPriceUSD *big.Int, | ||
tokenPricesUSD map[cciptypes.Address]*big.Int, | ||
) (err error) { | ||
eg := new(errgroup.Group) | ||
|
||
if sourceGasPriceUSD != nil { | ||
eg.Go(func() error { | ||
return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ | ||
{ | ||
SourceChainSelector: p.sourceChainSelector, | ||
GasPrice: assets.NewWei(sourceGasPriceUSD), | ||
}, | ||
}) | ||
}) | ||
func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) { | ||
if sourceGasPriceUSD == nil { | ||
return nil | ||
} | ||
|
||
if tokenPricesUSD != nil { | ||
var tokenPrices []cciporm.TokenPriceUpdate | ||
return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ | ||
{ | ||
SourceChainSelector: p.sourceChainSelector, | ||
GasPrice: assets.NewWei(sourceGasPriceUSD), | ||
}, | ||
}) | ||
} | ||
|
||
for token, price := range tokenPricesUSD { | ||
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ | ||
TokenAddr: string(token), | ||
TokenPrice: assets.NewWei(price), | ||
}) | ||
} | ||
func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) { | ||
if tokenPricesUSD == nil { | ||
return nil | ||
} | ||
|
||
// Sort token by addr to make price updates ordering deterministic, easier to testing and debugging | ||
sort.Slice(tokenPrices, func(i, j int) bool { | ||
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr | ||
}) | ||
var tokenPrices []cciporm.TokenPriceUpdate | ||
|
||
eg.Go(func() error { | ||
return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) | ||
for token, price := range tokenPricesUSD { | ||
tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ | ||
TokenAddr: string(token), | ||
TokenPrice: assets.NewWei(price), | ||
}) | ||
} | ||
|
||
return eg.Wait() | ||
// Sort token by addr to make price updates ordering deterministic, easier for testing and debugging | ||
sort.Slice(tokenPrices, func(i, j int) bool { | ||
return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr | ||
}) | ||
|
||
return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) | ||
} | ||
|
||
// Input price is USD per full token, with 18 decimal precision | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these tickers closed anywhere?
defer ticker.Close()
is the common pattern