Skip to content

Commit

Permalink
Merge branch 'roman/sqs-poc-v20' of github.com:osmosis-labs/osmosis i…
Browse files Browse the repository at this point in the history
…nto roman/sqs-poc-v20
  • Loading branch information
p0mvn committed Nov 30, 2023
2 parents 0480b12 + d7ccd64 commit 12db782
Show file tree
Hide file tree
Showing 37 changed files with 342 additions and 305 deletions.
39 changes: 23 additions & 16 deletions .github/workflows/check-state-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ jobs:
should_i_run: ${{ steps.compare_versions.outputs.should_i_run }}
mainnet_major_version: ${{ steps.mainnet_version.outputs.mainnet_major_version }}
steps:
- name: Get mainnet major version
-
name: Get mainnet major version
id: mainnet_version
run: |
# Find current major version via rpc.osmosis.zone/abci_info
Expand All @@ -64,7 +65,8 @@ jobs:
echo "MAINNET_MAJOR_VERSION=$MAINNET_MAJOR_VERSION" >> $GITHUB_ENV
echo "mainnet_major_version=$MAINNET_MAJOR_VERSION" >> $GITHUB_OUTPUT
- name: Get GitHub branch major version
-
name: Get GitHub branch major version
id: compare_versions
run: |
CURRENT_BRANCH_MAJOR_VERSION=$(echo ${{ github.event.pull_request.base.ref }} | tr -dc '0-9')
Expand All @@ -86,20 +88,23 @@ jobs:
needs: compare_versions
runs-on: self-hosted
steps:
- name: Checkout branch
-
name: Checkout branch
uses: actions/checkout@v4
with:
fetch-depth: 0
-
-
name: 🐿 Setup Golang
uses: actions/setup-go@v4
with:
go-version: '^1.20'
- name: 🔨 Build the osmosisd binary
go-version-file: go.mod
-
name: 🔨 Build the osmosisd binary
run: |
make build
build/osmosisd version
- name: 🧪 Initialize Osmosis Node
-
name: 🧪 Initialize Osmosis Node
run: |
rm -rf $HOME/.osmosisd/ || true
build/osmosisd init runner -o
Expand All @@ -112,9 +117,10 @@ jobs:
# Copy genesis to config folder
cp /mnt/data/genesis/osmosis-1/genesis.json $HOME/.osmosisd/config/genesis.json
- name: ⏬ Download last pre-epoch snapshot
-
name: ⏬ Download last pre-epoch snapshot
run: |
REPO_MAJOR_VERSION=v19
REPO_MAJOR_VERSION=$(echo ${{ github.base_ref }} | sed 's/\.x//')
SNAPSHOT_INFO_URL=${{ env.SNAPSHOT_BUCKET }}/$REPO_MAJOR_VERSION/snapshots.json
# Get the latest pre-epoch snapshot information from bucket
Expand All @@ -132,12 +138,11 @@ jobs:
# Copy snapshot in Data folder
cp -R /mnt/data/snapshots/$REPO_MAJOR_VERSION/$SNAPSHOT_ID/* $HOME/.osmosisd/
- name: 🧪 Configure Osmosis Node
-
name: 🧪 Configure Osmosis Node
run: |
CONFIG_FOLDER=$HOME/.osmosisd/config
# Find last epoch block comparing repo version to current chain version
REPO_MAJOR_VERSION=19
REPO_MAJOR_VERSION=$(echo ${{ github.base_ref }} | tr -dc '0-9')
if [ $REPO_MAJOR_VERSION == $MAINNET_MAJOR_VERSION ]; then
# I'm in the latest major, fetch the epoch info from the lcd endpoint
Expand Down Expand Up @@ -170,8 +175,10 @@ jobs:
# Download addrbook
wget -q -O $CONFIG_FOLDER/addrbook.json ${{ env.ADDRBOOK_URL }}
- name: 🧪 Start Osmosis Node
-
name: 🧪 Start Osmosis Node
run: build/osmosisd start
- name: 🧹 Clean up Osmosis Home
-
name: 🧹 Clean up Osmosis Home
if: always()
run: rm -rf $HOME/.osmosisd/ || true
run: rm -rf $HOME/.osmosisd/ || true
25 changes: 23 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,29 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v20.5.1

### API

* [#6939](https://github.com/osmosis-labs/osmosis/pull/6939) Fix taker fee GRPC gateway query path in poolmanager.

## v20.4.0

### Bug Fixes

* [#6906](https://github.com/osmosis-labs/osmosis/pull/6906) Fix issue with the affiliate swap contract mempool check.

### Misc Improvements

* [#6863](https://github.com/osmosis-labs/osmosis/pull/6863) GetPoolDenoms method on PoolI interface in poolmanager
* [#6900](https://github.com/osmosis-labs/osmosis/pull/6900) Update EIP-1559 constants to smoothen resets.

## v20.3.0

### Configuration Changes

* [#6897](https://github.com/osmosis-labs/osmosis/pull/6897) Enable 1559 mempool by default.

## v20.2.2

### Features
Expand All @@ -48,14 +71,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Misc Improvements

* [#6788](https://github.com/osmosis-labs/osmosis/pull/6788) Improve error message when CL LP fails due to slippage bound hit.
* [#6898](https://github.com/osmosis-labs/osmosis/pull/6898) Enable 1559 mempool by default.

### API Breaks

* [#6805](https://github.com/osmosis-labs/osmosis/pull/6805) return bucket index of the current tick from LiquidityPerTickRange query
* [#6863](https://github.com/osmosis-labs/osmosis/pull/6863) GetPoolDenoms method on PoolI interface in poolmanager


## v20.0.0

### Features
Expand Down
5 changes: 3 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func NewOsmosisApp(
)

isIngestManagerEnabled := os.Getenv(ENV_NAME_INGEST_TYPE) == ENV_VALUE_INGESTER_SQS
// Initialize the ingest manager for propagating data to external sinks.
app.IngestManager = ingest.NewIngestManager()
if isIngestManagerEnabled {
dbHost := os.Getenv(ENV_NAME_INGEST_SQS_DBHOST)
Expand Down Expand Up @@ -327,7 +328,7 @@ func NewOsmosisApp(
sqsIngester := sqs.NewSidecarQueryServerIngester(poolsIngester, chainInfoingester, txManager)

// Set the sidecar query server ingester to the ingest manager.
app.IngestManager.SetIngester(sqsIngester)
app.IngestManager.RegisterIngester(sqsIngester)
}

// TODO: There is a bug here, where we register the govRouter routes in InitNormalKeepers and then
Expand Down Expand Up @@ -453,8 +454,8 @@ func (app *OsmosisApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock)

// EndBlocker application updates every end block.
func (app *OsmosisApp) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock {
// Process the block and ingest data into various sinks.
app.IngestManager.ProcessBlock(ctx)

return app.mm.EndBlock(ctx, req)
}

Expand Down
3 changes: 2 additions & 1 deletion app/keepers/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
icq "github.com/cosmos/ibc-apps/modules/async-icq/v4"
icqtypes "github.com/cosmos/ibc-apps/modules/async-icq/v4/types"

"github.com/osmosis-labs/osmosis/v20/ingest"
"github.com/osmosis-labs/osmosis/v20/x/cosmwasmpool"
cosmwasmpooltypes "github.com/osmosis-labs/osmosis/v20/x/cosmwasmpool/types"
downtimedetector "github.com/osmosis-labs/osmosis/v20/x/downtime-detector"
Expand Down Expand Up @@ -67,6 +66,8 @@ import (
packetforwardkeeper "github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v4/router/keeper"
packetforwardtypes "github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v4/router/types"

"github.com/osmosis-labs/osmosis/v20/ingest"

// IBC Transfer: Defines the "transfer" IBC port
transfer "github.com/cosmos/ibc-go/v4/modules/apps/transfer"

Expand Down
8 changes: 8 additions & 0 deletions ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Ingest

This is a package that is responsible for ingesting end-of-block data into various
sinks. It is designed to be extensible. A user can add a new sink by implementing
an `Ingester` interface and then calling `RegisterIngester` in `app.go`.

Note that to avoid causing a chain halt, any error or panic occuring during ingestion
is logged and silently ignored.
44 changes: 16 additions & 28 deletions ingest/ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ package ingest

import (
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/osmosis-labs/osmosis/v20/ingest/sqs/domain/mvc"
"github.com/osmosis-labs/osmosis/v20/ingest/sqs/log"
)

// IngestManager is an interface that defines the methods for the ingest manager.
// Ingest manager handles the processing of blocks and ingesting data into various sinks
// tha are defined by the Ingester interface.
type IngestManager interface {
// RegisterIngester registers an ingester
// to propagate data into at the end of each block.
RegisterIngester(ingester Ingester)

// ProcessBlock processes the block and ingests data into various sinks.
// Must never panic. If panic occurs, it is silently logged and ignored.
// If the ingester returns an error, it is silently logged and ignored.
ProcessBlock(ctx sdk.Context)
// SetIngester sets the ingester.
// Note: In the future, we may expand this to support multiple ingesters.
SetIngester(ingester Ingester)
}

// Ingester is an interface that defines the methods for the ingester.
Expand All @@ -26,34 +24,29 @@ type Ingester interface {
// ProcessBlock processes the block and ingests data into a sink.
// Returns error if the ingester fails to ingest data.
ProcessBlock(ctx sdk.Context) error
}

// AtomicIngester is an interface that defines the methods for the atomic ingester.
// It processes a block by writing data into a transaction.
// The caller must call Exec on the transaction to flush data to sink.
type AtomicIngester interface {
// ProcessBlock processes the block by writing data into a transaction.
// Returns error if fails to process.
// It does not flush data to sink. The caller must call Exec on the transaction
ProcessBlock(ctx sdk.Context, tx mvc.Tx) error

SetLogger(log.Logger)
GetName() string
}

// ingesterImpl is an implementation of IngesterManager.
type ingestManagerImpl struct {
ingester Ingester
ingesters []Ingester
}

var _ IngestManager = &ingestManagerImpl{}

// NewIngestManager creates a new IngestManager.
func NewIngestManager() IngestManager {
return &ingestManagerImpl{
ingester: nil,
ingesters: []Ingester{},
}
}

// RegisterIngester implements IngestManager.
func (im *ingestManagerImpl) RegisterIngester(ingester Ingester) {
im.ingesters = append(im.ingesters, ingester)
}

// ProcessBlock implements IngestManager.
func (im *ingestManagerImpl) ProcessBlock(ctx sdk.Context) {
defer func() {
Expand All @@ -63,16 +56,11 @@ func (im *ingestManagerImpl) ProcessBlock(ctx sdk.Context) {
}
}()

// Ingester must be set in the app. If not, we do nothing.
if im.ingester != nil {
if err := im.ingester.ProcessBlock(ctx); err != nil {
// Ingesters must be set in the app. If not, we do nothing.
for _, ingester := range im.ingesters {
if err := ingester.ProcessBlock(ctx); err != nil {
// The error is silently logged and ignored.
ctx.Logger().Error("error processing block during ingest", "err", err)
ctx.Logger().Error("error processing block during ingest", "err", err, "ingester", ingester.GetName())
}
}
}

// SetIngester implements IngestManager.
func (im *ingestManagerImpl) SetIngester(ingester Ingester) {
im.ingester = ingester
}
5 changes: 2 additions & 3 deletions ingest/sqs/chain_info/ingester/redis/chain_info_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"go.uber.org/zap"

"github.com/osmosis-labs/osmosis/v20/ingest"
"github.com/osmosis-labs/osmosis/v20/ingest/sqs/domain/mvc"
"github.com/osmosis-labs/osmosis/v20/ingest/sqs/log"
)
Expand All @@ -19,7 +18,7 @@ type chainInfoIngester struct {
}

// NewChainInfoIngester returns a new chain information ingester.
func NewChainInfoIngester(chainInfoRepo mvc.ChainInfoRepository, repositoryManager mvc.TxManager) ingest.AtomicIngester {
func NewChainInfoIngester(chainInfoRepo mvc.ChainInfoRepository, repositoryManager mvc.TxManager) mvc.AtomicIngester {
return &chainInfoIngester{
chainInfoRepo: chainInfoRepo,
repositoryManager: repositoryManager,
Expand Down Expand Up @@ -47,4 +46,4 @@ func (ci *chainInfoIngester) SetLogger(logger log.Logger) {
ci.logger = logger
}

var _ ingest.AtomicIngester = &chainInfoIngester{}
var _ mvc.AtomicIngester = &chainInfoIngester{}
4 changes: 2 additions & 2 deletions ingest/sqs/domain/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func (e ConcentratedNotEnoughLiquidityToCompleteSwapError) Error() string {
return fmt.Sprintf("not enough liquidity to complete swap in pool (%d) with amount in (%s)", e.PoolId, e.AmountIn)
}

type ConcentratedTickModelNotSet struct {
type ConcentratedTickModelNotSetError struct {
PoolId uint64
}

func (e ConcentratedTickModelNotSet) Error() string {
func (e ConcentratedTickModelNotSetError) Error() string {
return fmt.Sprintf("tick model is not set on pool (%d)", e.PoolId)
}

Expand Down
7 changes: 3 additions & 4 deletions ingest/sqs/domain/mocks/pool_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ func (mp *MockRoutablePool) GetSQSPoolModel() domain.SQSPool {
}

// CalculateTokenOutByTokenIn implements routerusecase.RoutablePool.
func (r *MockRoutablePool) CalculateTokenOutByTokenIn(tokenIn sdk.Coin) (sdk.Coin, error) {

func (mp *MockRoutablePool) CalculateTokenOutByTokenIn(tokenIn sdk.Coin) (sdk.Coin, error) {
// Cast to balancer
balancerPool, ok := r.ChainPoolModel.(*balancer.Pool)
balancerPool, ok := mp.ChainPoolModel.(*balancer.Pool)
if !ok {
panic("not a balancer pool")
}

return balancerPool.CalcOutAmtGivenIn(sdk.Context{}, sdk.NewCoins(tokenIn), r.TokenOutDenom, r.SpreadFactor)
return balancerPool.CalcOutAmtGivenIn(sdk.Context{}, sdk.NewCoins(tokenIn), mp.TokenOutDenom, mp.SpreadFactor)
}

// String implements domain.RoutablePool.
Expand Down
5 changes: 2 additions & 3 deletions ingest/sqs/domain/mocks/pools_usecase_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ type PoolsUsecaseMock struct {
// GetRoutesFromCandidates implements mvc.PoolsUsecase.
// Note that taker fee are ignored and not set
// Note that tick models are not set
func (r *PoolsUsecaseMock) GetRoutesFromCandidates(ctx context.Context, candidateRoutes route.CandidateRoutes, takerFeeMap domain.TakerFeeMap, tokenInDenom string, tokenOutDenom string) ([]route.RouteImpl, error) {
func (pm *PoolsUsecaseMock) GetRoutesFromCandidates(ctx context.Context, candidateRoutes route.CandidateRoutes, takerFeeMap domain.TakerFeeMap, tokenInDenom string, tokenOutDenom string) ([]route.RouteImpl, error) {
finalRoutes := make([]route.RouteImpl, 0, len(candidateRoutes.Routes))
for _, candidateRoute := range candidateRoutes.Routes {

routablePools := make([]domain.RoutablePool, 0, len(candidateRoute.Pools))
for _, candidatePool := range candidateRoute.Pools {
// Get the pool data for routing
var foundPool domain.PoolI
for _, pool := range r.Pools {
for _, pool := range pm.Pools {
if pool.GetId() == candidatePool.ID {
foundPool = pool
}
Expand Down
2 changes: 1 addition & 1 deletion ingest/sqs/domain/mvc/doc.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Encapsulates the Model-View-Controller abstraction domain.
// Separated from the rest of the domain to avoid import cycles.
package mvc
package mvc
15 changes: 15 additions & 0 deletions ingest/sqs/domain/mvc/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/redis/go-redis/v9"

"github.com/osmosis-labs/osmosis/v20/ingest/sqs/log"
)

// Tx defines an interface for atomic transaction.
Expand All @@ -24,6 +27,18 @@ type Tx interface {
ClearAll(ctx context.Context) error
}

// AtomicIngester is an interface that defines the methods for the atomic ingester.
// It processes a block by writing data into a transaction.
// The caller must call Exec on the transaction to flush data to sink.
type AtomicIngester interface {
// ProcessBlock processes the block by writing data into a transaction.
// Returns error if fails to process.
// It does not flush data to sink. The caller must call Exec on the transaction
ProcessBlock(ctx sdk.Context, tx Tx) error

SetLogger(log.Logger)
}

// RedisTx is a redis transaction.
type RedisTx struct {
pipeliner redis.Pipeliner
Expand Down
Loading

0 comments on commit 12db782

Please sign in to comment.