Skip to content

Commit

Permalink
feat: twap key refactor (#7472)
Browse files Browse the repository at this point in the history
* twap key refactor

* [create-pull-request] automated change

* use rc1 of v23

* use alpine

* Update config.go

* delete all time indexed keys in twap store

* add changelog

* respect prune limit

* add upgrade test for pruning old time indexed keys

* comment clean up

* Update CHANGELOG.md

Co-authored-by: Dev Ojha <ValarDragon@users.noreply.github.com>

* remove outdated comment

* update readme

* readme

* add key sep to end

---------

Co-authored-by: Dev Ojha <ValarDragon@users.noreply.github.com>
  • Loading branch information
czarcas7ic and ValarDragon authored Feb 17, 2024
1 parent 5aedf82 commit 49cd790
Show file tree
Hide file tree
Showing 18 changed files with 379 additions and 227 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### State Breaking

* [#7250](https://github.com/osmosis-labs/osmosis/pull/7250) Further filter spam gauges from epoch distribution.
* [#7472](https://github.com/osmosis-labs/osmosis/pull/7472) Refactor TWAP keys to only require a single key format. Significantly lowers TWAP-caused writes
* [#7499](https://github.com/osmosis-labs/osmosis/pull/7499) Slight speed/gas improvements to CL CreatePosition and AddToPosition

## v23.0.0
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ go-mock-update:
mockgen -source=x/poolmanager/types/pool.go -destination=tests/mocks/pool.go -package=mocks
mockgen -source=x/gamm/types/pool.go -destination=tests/mocks/cfmm_pool.go -package=mocks
mockgen -source=x/concentrated-liquidity/types/cl_pool_extensionI.go -destination=tests/mocks/cl_pool.go -package=mocks
mockgen -source=ingest/sqs/domain/pools.go -destination=tests/mocks/sqs_pool.go -package=mocks -mock_names=PoolI=MockSQSPoolI

###############################################################################
### Release ###
Expand Down
16 changes: 0 additions & 16 deletions app/upgrades/v17/upgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ func (s *UpgradeTestSuite) TestUpgrade() {
clPool2TwapRecordHistoricalPoolIndexPreUpgrade, err := keepers.TwapKeeper.GetAllHistoricalPoolIndexedTWAPsForPoolId(s.Ctx, lastPoolIdMinusOne)
s.Require().NoError(err)

clPoolsTwapRecordHistoricalTimeIndexPreUpgrade, err := keepers.TwapKeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
s.Require().NoError(err)

// Run upgrade handler.
dummyUpgrade(s)
s.Require().NotPanics(func() {
Expand All @@ -239,15 +236,11 @@ func (s *UpgradeTestSuite) TestUpgrade() {
clPool2TwapRecordHistoricalPoolIndexPostUpgrade, err := keepers.TwapKeeper.GetAllHistoricalPoolIndexedTWAPsForPoolId(s.Ctx, lastPoolIdMinusOne)
s.Require().NoError(err)

clPoolsTwapRecordHistoricalTimeIndexPostUpgrade, err := keepers.TwapKeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
s.Require().NoError(err)

// check that all TWAP records aren't empty
s.Require().NotEmpty(clPool1TwapRecordPostUpgrade)
s.Require().NotEmpty(clPool1TwapRecordHistoricalPoolIndexPostUpgrade)
s.Require().NotEmpty(clPool2TwapRecordPostUpgrade)
s.Require().NotEmpty(clPool2TwapRecordHistoricalPoolIndexPostUpgrade)
s.Require().NotEmpty(clPoolsTwapRecordHistoricalTimeIndexPostUpgrade)

for _, data := range []struct {
pre, post []types.TwapRecord
Expand All @@ -262,15 +255,6 @@ func (s *UpgradeTestSuite) TestUpgrade() {
}
}

for i := range clPoolsTwapRecordHistoricalTimeIndexPostUpgrade {
record := clPoolsTwapRecordHistoricalTimeIndexPostUpgrade[i]
if record.PoolId == lastPoolIdMinusOne || record.PoolId == lastPoolIdMinusTwo {
assertTwapFlipped(s, clPoolsTwapRecordHistoricalTimeIndexPreUpgrade[i], record)
} else if record.PoolId == lastPoolID {
assertEqual(s, clPoolsTwapRecordHistoricalTimeIndexPreUpgrade[i], record)
}
}

// Retrieve the community pool balance (and the feePool balance) after the upgrade
communityPoolBalancePost := s.App.BankKeeper.GetAllBalances(s.Ctx, communityPoolAddress)
feePoolCommunityPoolPost := s.App.DistrKeeper.GetFeePool(s.Ctx).CommunityPool
Expand Down
4 changes: 4 additions & 0 deletions app/upgrades/v24/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func CreateUpgradeHandler(
return nil, err
}

// Now that the TWAP keys are refactored, we can delete all time indexed TWAPs
// since we only need the pool indexed TWAPs.
keepers.TwapKeeper.DeleteAllHistoricalTimeIndexedTWAPs(ctx)

return migrations, nil
}
}
104 changes: 104 additions & 0 deletions app/upgrades/v24/upgrades_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package v24_test

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/suite"

upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"

abci "github.com/cometbft/cometbft/abci/types"

"github.com/osmosis-labs/osmosis/osmomath"
"github.com/osmosis-labs/osmosis/osmoutils"
"github.com/osmosis-labs/osmosis/v23/app/apptesting"

"github.com/osmosis-labs/osmosis/v23/x/twap/types"
twaptypes "github.com/osmosis-labs/osmosis/v23/x/twap/types"
)

const (
v24UpgradeHeight = int64(10)
HistoricalTWAPTimeIndexPrefix = "historical_time_index"
KeySeparator = "|"
)

type UpgradeTestSuite struct {
apptesting.KeeperTestHelper
}

func TestUpgradeTestSuite(t *testing.T) {
suite.Run(t, new(UpgradeTestSuite))
}

func (s *UpgradeTestSuite) TestUpgrade() {
s.Setup()

// Manually set up TWAP records indexed by both pool ID and time.
twapStoreKey := s.App.GetKey(twaptypes.ModuleName)
store := s.Ctx.KVStore(twapStoreKey)
twap := twaptypes.TwapRecord{
PoolId: 1,
Asset0Denom: "foo",
Asset1Denom: "bar",
Height: 1,
Time: time.Date(2023, 0o2, 1, 0, 0, 0, 0, time.UTC),
P0LastSpotPrice: osmomath.OneDec(),
P1LastSpotPrice: osmomath.OneDec(),
P0ArithmeticTwapAccumulator: osmomath.ZeroDec(),
P1ArithmeticTwapAccumulator: osmomath.ZeroDec(),
GeometricTwapAccumulator: osmomath.ZeroDec(),
LastErrorTime: time.Time{}, // no previous error
}
poolIndexKey := types.FormatHistoricalPoolIndexTWAPKey(twap.PoolId, twap.Asset0Denom, twap.Asset1Denom, twap.Time)
osmoutils.MustSet(store, poolIndexKey, &twap)

// The time index key is a bit manual since we removed the old code that did this programmatically.
var buffer bytes.Buffer
timeS := osmoutils.FormatTimeString(twap.Time)
fmt.Fprintf(&buffer, "%s%d%s%s%s%s%s%s", HistoricalTWAPTimeIndexPrefix, twap.PoolId, KeySeparator, twap.Asset0Denom, KeySeparator, twap.Asset1Denom, KeySeparator, timeS)
timeIndexKey := buffer.Bytes()
osmoutils.MustSet(store, timeIndexKey, &twap)

// TWAP records indexed by time should exist
twapRecords, err := osmoutils.GatherValuesFromStorePrefix(store, []byte(HistoricalTWAPTimeIndexPrefix), types.ParseTwapFromBz)
s.Require().NoError(err)
s.Require().Len(twapRecords, 1)
s.Require().Equal(twap, twapRecords[0])

// TWAP records indexed by pool ID should exist.
twapRecords, err = s.App.TwapKeeper.GetAllHistoricalPoolIndexedTWAPsForPoolId(s.Ctx, twap.PoolId)
s.Require().NoError(err)
s.Require().Len(twapRecords, 1)
s.Require().Equal(twap, twapRecords[0])

dummyUpgrade(s)
s.Require().NotPanics(func() {
s.App.BeginBlocker(s.Ctx, abci.RequestBeginBlock{})
})

// TWAP records indexed by time should be completely removed.
twapRecords, err = osmoutils.GatherValuesFromStorePrefix(store, []byte(HistoricalTWAPTimeIndexPrefix), types.ParseTwapFromBz)
s.Require().NoError(err)
s.Require().Len(twapRecords, 0)

// TWAP records indexed by pool ID should be untouched.
twapRecords, err = s.App.TwapKeeper.GetAllHistoricalPoolIndexedTWAPsForPoolId(s.Ctx, twap.PoolId)
s.Require().NoError(err)
s.Require().Len(twapRecords, 1)
s.Require().Equal(twap, twapRecords[0])
}

func dummyUpgrade(s *UpgradeTestSuite) {
s.Ctx = s.Ctx.WithBlockHeight(v24UpgradeHeight - 1)
plan := upgradetypes.Plan{Name: "v24", Height: v24UpgradeHeight}
err := s.App.UpgradeKeeper.ScheduleUpgrade(s.Ctx, plan)
s.Require().NoError(err)
_, exists := s.App.UpgradeKeeper.GetUpgradePlan(s.Ctx)
s.Require().True(exists)

s.Ctx = s.Ctx.WithBlockHeight(v24UpgradeHeight)
}
13 changes: 7 additions & 6 deletions proto/osmosis/twap/v1beta1/twap_record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ syntax = "proto3";
package osmosis.twap.v1beta1;

import "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "cosmos_proto/cosmos.proto";
import "cosmos/base/v1beta1/coin.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/osmosis-labs/osmosis/v23/x/twap/types";
Expand Down Expand Up @@ -90,7 +87,11 @@ message PruningState {
(gogoproto.stdtime) = true,
(gogoproto.moretags) = "yaml:\"last_kept_time\""
];
// last_key_seen is the last key of the TWAP records that were pruned
// before reaching the block's prune limit
bytes last_key_seen = 3;
// Deprecated: This field is deprecated.
bytes last_key_seen = 3 [ deprecated = true ];
// last_seen_pool_id is the pool_id that we will begin pruning in the next
// block. This value starts at the highest pool_id at time of epoch, and
// decreases until it reaches 1. When it reaches 1, the pruning
// process is complete.
uint64 last_seen_pool_id = 4;
}
39 changes: 18 additions & 21 deletions x/twap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ A time weighted average price is a function that takes a sequence of `(time, pri
Using the arithmetic mean, the TWAP of a sequence `(t_i, p_i)`, from `t_0` to `t_n`, indexed by time in ascending order, is: $$\frac{1}{t_n - t_0}\sum_{i=0}^{n-1} p_i (t_{i+1} - t_i)$$
Notice that the latest price `p_n` isn't used, as it has lasted for a time interval of `0` seconds in this range!

To illustrate with an example, given the sequence: `(0s, $1), (4s, $6), (5s, $1)`, the arithmetic mean TWAP is:
To illustrate with an example, given the sequence: `(0s, $1), (4s, $6), (5s, $1)`, the arithmetic mean TWAP is:
$$\frac{\$1 * (4s - 0s) + \$6 * (5s - 4s)}{5s - 0s} = \frac{\$10}{5} = \$2$$

## Geometric mean TWAP
Expand Down Expand Up @@ -48,7 +48,7 @@ We also maintain within each accumulator record in state, the latest spot price.
This allows us to interpolate accumulation records between times.
Namely, if I want the twap from `t=10s` to `t=15s`, but the time records are at `9s, 13s, 17s`, this is fine.
Using the latest spot price in each record, we create the accumulator value for `t=10` by computing
`a_10 = a_9 + a_9_latest_spot_price * (10s - 9s)`, and `a_15 = a_13 + a_13_latest_spot_price * (15s - 13s)`.
`a_10 = a_9 + a_9_latest_spot_price * (10s - 9s)`, and `a_15 = a_13 + a_13_latest_spot_price * (15s - 13s)`.
Given these interpolated accumulation values, we can compute the TWAP as before.

## Module API
Expand Down Expand Up @@ -79,7 +79,7 @@ and have a similar cosmwasm binding.
// * endTime in the future
// * startTime older than 48 hours OR pool creation
// * pool with id poolId does not exist, or does not contain quoteAssetDenom, baseAssetDenom
// * there were some computational errors during computing arithmetic twap within the time range of
// * there were some computational errors during computing arithmetic twap within the time range of
// startRecord, endRecord - including the exact record times, which indicates that the result returned could be faulty

// N.B. If there is a notable use case, the state machine could maintain more historical records, e.g. at one per hour.
Expand All @@ -106,32 +106,29 @@ computation of the TWAP, which is done via the geometric mean.
- types/* - Implement TwapRecord, GenesisState. Define AMM interface, and methods to format keys.
- twapmodule/module.go - SDK AppModule interface implementation.
- api.go - Public API, that other users / modules can/should depend on
- listeners.go - Defines hooks & calls to logic.go, for triggering actions on
- listeners.go - Defines hooks & calls to logic.go, for triggering actions on
- keeper.go - generic SDK boilerplate (defining a wrapper for store keys + params)
- logic.go - Implements all TWAP module 'logic'. (Arithmetic, defining what to get/set where, etc.)
- store.go - Managing logic for getting and setting things to underlying stores

## Store layout

We maintain TWAP accumulation records for every AMM pool on Osmosis.
We maintain TWAP accumulation records for every AMM pool on Osmosis.

Because Osmosis supports multi-asset pools, a complicating factor is that we have to store a record for every asset pair in the pool.
For every pool, at a given point in time, we make one twap record entry per unique pair of denoms in the pool. If a pool has `k` denoms, the number of unique pairs is `k * (k - 1) / 2`.
All public API's for the module will sort the input denoms to the canonical representation, so the caller does not need to worry about this. (The canonical representation is the denoms in lexicographical order)

Example of historical TWAP time index records for a pool containing 3 assets.
Example of historical TWAP pool indexed records for a pool containing 3 assets.
* Number of records per time: `3 * (3 - 1) / 2 = 3`
* Records are in a format:
HistoricalTWAPTimeIndexPrefix | time | pool id | denom1 | denom2
HistoricalTWAPPoolIndexPrefix | pool id | denom1 | denom2 | time

For our pool with Id = 1 and 3 assets: denomA, denomB and denomC:

historical_time_index|2009-11-10T23:00:00.000000000|1|denomA|denomB
historical_time_index|2009-11-10T23:00:00.000000000|1|denomA|denomC
historical_time_index|2009-11-10T23:00:00.000000000|1|denomB|denomC



historical_pool_index|1|denomA|denomB|2009-11-10T23:00:00.000000000
historical_pool_index|1|denomA|denomC|2009-11-10T23:00:00.000000000
historical_pool_index|1|denomB|denomC|2009-11-10T23:00:00.000000000

Each twap record stores [(source)](../../proto/osmosis/twap/v1beta1/twap_record.proto):

Expand All @@ -140,10 +137,10 @@ Each twap record stores [(source)](../../proto/osmosis/twap/v1beta1/twap_record.
* Accumulation value of base asset A in terms of quote asset B
* Accumulation value of base asset B in terms of quote asset A

important for calculation of arthmetic twap.
important for calculation of arthmetic twap.

Besides those values, TWAP records currently hold: poolId, Asset0Denom, Asset1Denom, Height (for debugging purposes), Time and
Last error time - time in which the last spot price error occurred. This will allert the caller if they are getting a potentially erroneous TWAP.
Besides those values, TWAP records currently hold: poolId, Asset0Denom, Asset1Denom, Height (for debugging purposes), Time and
Last error time - time in which the last spot price error occurred. This will alert the caller if they are getting a potentially erroneous TWAP.

All TWAP records are indexed in state by the time of write.

Expand All @@ -161,8 +158,8 @@ During `EndBlock`, new records are created, with:

In the event that a pool is created, and has a swap in the same block, the record entries are over written with the end block price.

Error handling during records creation/updating:
* If there are issues with creating a record after pool creation, the creation of a pool will be aborted.
Error handling during records creation/updating:
* If there are issues with creating a record after pool creation, the creation of a pool will be aborted.
* Whereas, if there is an issue with updating records for a pool with potentially price changing events, existing errors will be ignored and the records will not be updated.

### Tracking spot-price changing events in a block
Expand All @@ -182,7 +179,7 @@ and then clears on the block committing. This is done to save on gas (and I/O fo

To avoid infinite growth of the state with the TWAP records, we attempt to delete some old records after every epoch.
Essentially, records older than a configurable parameter `RecordHistoryKeepPeriod` are pruned away. Currently, this parameter is set to 48 hours.
Therefore, at the end of an epoch, records older than 48 hours before the current block time are pruned away.
Therefore, at the end of an epoch, records older than 48 hours before the current block time are pruned away.
This could potentially leave the store with only one record - or no records at all within the "keep" period, so the pruning mechanism keeps the newest record that is older than the pruning time. This record is necessary to enable us interpolating from and getting TWAPs from the "keep" period.
Such record is preserved for each pool.

Expand All @@ -192,7 +189,7 @@ Post-TWAP launch, new pool types were introduced, one such example
being the concentrated liquidity pool. In the context of `x/twap`, there are subtle
differences in terms of when the spot price updates for a concentrated liquidity pool. As a result,
the need for their twap state updates are delivered by distinct listeners that implement a
`concentratedliquiditytypes.ConcentratedLiquidityListener` interface.
`concentratedliquiditytypes.ConcentratedLiquidityListener` interface.

See `x/concentrated-liquidity/README.md` for the details about these differences.

Expand Down Expand Up @@ -235,7 +232,7 @@ The pre-release testing methodology planned for the twap module is:
- The osmosis simulator, simulates building up complex state machine states, in random ways not seen before. We plan on, in a property check, maintaining expected TWAPs for short time ranges, and seeing that the keeper query will return the same value as what we get off of the raw price history for short history intervals.
- Not currently deemed release blocking, but planned: Integration for gas tracking, to ensure gas of reads/writes does not grow with time.
- [ ] Mutation testing usage
- integration of the TWAP module into [go mutation testing](https://github.com/osmosis-labs/go-mutesting):
- integration of the TWAP module into [go mutation testing](https://github.com/osmosis-labs/go-mutesting):
- We've seen with the `tokenfactory` module that it succeeds at surfacing behavior for untested logic.
e.g. if you delete a line, or change the direction of a conditional, mutation tests show if regular Go tests catch it.
- We expect to get this to a state, where after mutation testing is ran, the only items it mutates, that is not caught in a test, is: Deleting `return err`, or `panic` lines, in the situation where that error return or panic isn't reachable.
4 changes: 1 addition & 3 deletions x/twap/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func (k Keeper) InitGenesis(ctx sdk.Context, genState *types.GenesisState) {

// ExportGenesis returns the twap module's exported genesis.
func (k Keeper) ExportGenesis(ctx sdk.Context) *types.GenesisState {
// These are ordered in increasing order, guaranteed by the iterator
// that is prefixed by time.
twapRecords, err := k.GetAllHistoricalTimeIndexedTWAPs(ctx)
twapRecords, err := k.getAllHistoricalPoolIndexedTWAPs(ctx)
if err != nil {
panic(err)
}
Expand Down
6 changes: 0 additions & 6 deletions x/twap/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ func (s *TestSuite) getAllHistoricalRecordsForPool(poolId uint64) []types.TwapRe
func (s *TestSuite) validateExpectedRecords(expectedRecords []types.TwapRecord) {
twapKeeper := s.twapkeeper

// validate that the time indexed TWAPs are cleared.
timeIndexedTwaps, err := twapKeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
s.Require().NoError(err)
s.Require().Len(timeIndexedTwaps, len(expectedRecords))
s.Require().Equal(timeIndexedTwaps, expectedRecords)

// validate that the pool indexed TWAPs are cleared.
poolIndexedTwaps, err := twapKeeper.GetAllHistoricalPoolIndexedTWAPs(s.Ctx)
s.Require().NoError(err)
Expand Down
13 changes: 8 additions & 5 deletions x/twap/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ func (k Keeper) EpochHooks() epochtypes.EpochHooks {
func (hook *epochhook) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
if epochIdentifier == hook.k.PruneEpochIdentifier(ctx) {
lastKeptTime := ctx.BlockTime().Add(-hook.k.RecordHistoryKeepPeriod(ctx))
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""),
})
poolIdToStartFrom := hook.k.poolmanagerKeeper.GetNextPoolId(ctx) - 1
if poolIdToStartFrom > 0 {
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastSeenPoolId: poolIdToStartFrom,
})
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion x/twap/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (s *TestSuite) TestAfterEpochEnd() {

s.twapkeeper.StoreNewRecord(s.Ctx, newestRecord)

twapsBeforeEpoch, err := s.twapkeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
twapsBeforeEpoch, err := s.twapkeeper.GetAllHistoricalPoolIndexedTWAPs(s.Ctx)
s.Require().NoError(err)
s.Require().Equal(2, len(twapsBeforeEpoch))

Expand Down
Loading

0 comments on commit 49cd790

Please sign in to comment.