From b584c2620ce292386bc95e8c00346bccfd9860f5 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 5 Jun 2024 12:12:17 -0400 Subject: [PATCH] dex: augment price indexes with position data --- crates/bin/pd/src/migrate.rs | 6 +- crates/bin/pd/src/migrate/testnet78.rs | 89 +++++++++++++------ .../src/component/circuit_breaker/value.rs | 2 +- .../dex/src/component/position_manager.rs | 39 ++++---- .../position_manager/base_liquidity_index.rs | 2 +- .../src/component/position_manager/counter.rs | 1 - .../position_manager/inventory_index.rs | 2 +- .../component/position_manager/price_index.rs | 13 ++- .../dex/src/component/router/fill_route.rs | 29 ++---- .../dex/src/component/router/path.rs | 11 ++- .../dex/src/component/router/tests.rs | 12 +-- .../core/component/dex/src/component/rpc.rs | 50 +++++------ crates/core/component/dex/src/state_key.rs | 1 + 13 files changed, 144 insertions(+), 113 deletions(-) diff --git a/crates/bin/pd/src/migrate.rs b/crates/bin/pd/src/migrate.rs index 71c123911d..c793dc2c37 100644 --- a/crates/bin/pd/src/migrate.rs +++ b/crates/bin/pd/src/migrate.rs @@ -50,6 +50,7 @@ pub enum Migration { Testnet77, /// Testnet-78 migration: /// - Truncate various user-supplied `String` fields to a maximum length. + /// - Populate the DEX NV price idnexes with position data Testnet78, } @@ -90,11 +91,12 @@ impl Migration { Migration::Testnet78 => { testnet78::migrate(storage, pd_home.clone(), genesis_start).await? } - _ => unreachable!(), + // We keep historical migrations around for now, this will help inform an abstracted + // design. Feel free to remove it if it's causing you trouble. + _ => unimplemented!("the specified migration is unimplemented"), } if let Some(comet_home) = comet_home { - // TODO avoid this when refactoring to clean up migrations let genesis_path = pd_home.join("genesis.json"); migrate_comet_data(comet_home, genesis_path).await?; } diff --git a/crates/bin/pd/src/migrate/testnet78.rs b/crates/bin/pd/src/migrate/testnet78.rs index 32c8ea5d9d..642cbbb5f6 100644 --- a/crates/bin/pd/src/migrate/testnet78.rs +++ b/crates/bin/pd/src/migrate/testnet78.rs @@ -1,17 +1,20 @@ //! Contains functions related to the migration script of Testnet78. -use anyhow::Context; -use cnidarium::{Snapshot, StateDelta, Storage}; +use cnidarium::{Snapshot, StateDelta, StateWrite, Storage}; use futures::TryStreamExt as _; +use futures::{pin_mut, StreamExt}; use jmt::RootHash; use penumbra_app::app::StateReadExt as _; +use penumbra_dex::component::PositionManager; +use penumbra_dex::lp::position; +use penumbra_dex::lp::position::Position; use penumbra_governance::proposal_state::State as ProposalState; use penumbra_governance::Proposal; use penumbra_governance::StateReadExt as _; -use penumbra_governance::StateWriteExt; +use penumbra_governance::StateWriteExt as _; use penumbra_proto::core::component::governance::v1 as pb_governance; -use penumbra_proto::{StateReadProto as _, StateWriteProto as _}; +use penumbra_proto::{StateReadProto, StateWriteProto}; use penumbra_sct::component::clock::EpochManager; -use penumbra_sct::component::clock::EpochRead as _; +use penumbra_sct::component::clock::EpochRead; use penumbra_stake::validator::Validator; use std::path::PathBuf; use tracing::instrument; @@ -41,32 +44,36 @@ use crate::testnet::generate::TestnetConfig; /// - `client_id` (128 bytes) /// * Governance Signaling Proposals: /// - `commit hash` (255 bytes) +/// - Close and re-open all *open* positions so that they are re-indexed. #[instrument] pub async fn migrate( storage: Storage, pd_home: PathBuf, genesis_start: Option, ) -> anyhow::Result<()> { - // Setup: + /* `Migration::prepare`: collect basic migration data, logging, initialize alt-storage if needed */ let initial_state = storage.latest_snapshot(); + let chain_id = initial_state.get_chain_id().await?; let root_hash = initial_state .root_hash() .await .expect("chain state has a root hash"); - let pre_upgrade_root_hash: RootHash = root_hash.into(); + let pre_upgrade_height = initial_state .get_block_height() .await .expect("chain state has a block height"); let post_upgrade_height = pre_upgrade_height.wrapping_add(1); - // We initialize a `StateDelta` and start by reaching into the JMT for all entries matching the - // swap execution prefix. Then, we write each entry to the nv-storage. + let pre_upgrade_root_hash: RootHash = root_hash.into(); + + /* `Migration::migrate`: reach into the chain state and perform an offline state transition */ let mut delta = StateDelta::new(initial_state); - tracing::info!("beginning migration steps"); + let (migration_duration, post_upgrade_root_hash) = { let start_time = std::time::SystemTime::now(); + // Adjust the length of `Validator` fields. truncate_validator_fields(&mut delta).await?; @@ -76,30 +83,26 @@ pub async fn migrate( // Adjust the length of governance proposal outcome fields. truncate_proposal_outcome_fields(&mut delta).await?; + // Re-index all open positions. + reindex_dex_positions(&mut delta).await?; + + // Reset the application height and halt flag. + delta.ready_to_start(); + delta.put_block_height(0u64); + + // Finally, commit the changes to the chain state. let post_upgrade_root_hash = storage.commit_in_place(delta).await?; tracing::info!(?post_upgrade_root_hash, "post-migration root hash"); ( - start_time.elapsed().expect("start time not set"), + start_time.elapsed().expect("start is set"), post_upgrade_root_hash, ) }; - tracing::info!("completed migration steps"); - - // Set halt bit to 0, so chain can start again. - let migrated_state = storage.latest_snapshot(); - let mut delta = StateDelta::new(migrated_state); - delta.ready_to_start(); - delta.put_block_height(0u64); - let _ = storage - .commit_in_place(delta) - .await - .context("failed to reset halt bit")?; - storage.release().await; - // The migration is complete, now we need to generate a genesis file. To do this, we need - // to lookup a validator view from the chain, and specify the post-upgrade app hash and - // initial height. + tracing::info!("migration completed, generating genesis and signing state..."); + + /* `Migration::complete`: the state transition has been performed, we prepare the checkpointed genesis and signing state */ let app_state = penumbra_app::genesis::Content { chain_id, ..Default::default() @@ -110,22 +113,26 @@ pub async fn migrate( .to_vec() .try_into() .expect("infaillible conversion"); + genesis.initial_height = post_upgrade_height as i64; genesis.genesis_time = genesis_start.unwrap_or_else(|| { let now = tendermint::time::Time::now(); tracing::info!(%now, "no genesis time provided, detecting a testing setup"); now }); + + tracing::info!("generating checkpointed genesis"); let checkpoint = post_upgrade_root_hash.0.to_vec(); let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint)); + tracing::info!("writing genesis to disk"); let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis"); tracing::info!("genesis: {}", genesis_json); let genesis_path = pd_home.join("genesis.json"); std::fs::write(genesis_path, genesis_json).expect("can write genesis"); + tracing::info!("updating signing state"); let validator_state_path = pd_home.join("priv_validator_state.json"); - let fresh_validator_state = crate::testnet::generate::TestnetValidator::initial_state(); std::fs::write(validator_state_path, fresh_validator_state).expect("can write validator state"); @@ -135,12 +142,38 @@ pub async fn migrate( ?pre_upgrade_root_hash, ?post_upgrade_root_hash, duration = migration_duration.as_secs(), - "successful migration!" + "migration fully complete" ); Ok(()) } +async fn reindex_dex_positions(delta: &mut StateDelta) -> anyhow::Result<()> { + tracing::info!("running dex re-indexing migration"); + let prefix_key_lp = penumbra_dex::state_key::all_positions(); + let stream_all_lp = delta.prefix::(&prefix_key_lp); + let stream_open_lp = stream_all_lp.filter_map(|entry| async { + match entry { + Ok((_, lp)) if lp.state == position::State::Opened => Some(lp), + _ => None, + } + }); + pin_mut!(stream_open_lp); + + while let Some(lp) = stream_open_lp.next().await { + // Re-hash the position, since the key is a bech32 string. + let id = lp.id(); + // Close the position, adjusting all its index entries. + delta.close_position_by_id(&id).await?; + // Erase the position from the state, so that we circumvent the `update_position` guard. + delta.delete(penumbra_dex::state_key::position_by_id(&id)); + // Open a position with the adjusted indexing logic. + delta.open_position(lp).await?; + } + tracing::info!("completed dex migration"); + Ok(()) +} + /// * Validators: /// - `name` (140 bytes) /// - `website` (70 bytes) diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 5b75517ab0..77c03e99c0 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -241,7 +241,7 @@ mod tests { let position = buy_1; state_tx - .update_position_by_price_index(&None, &position, &position.id()) + .update_position_by_price_index(&position.id(), &None, &position) .expect("can update price index"); state_tx.put(state_key::position_by_id(&id), position); diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 86e76d0d7a..7ac0f16da3 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -62,14 +62,15 @@ pub trait PositionRead: StateRead { fn positions_by_price( &self, pair: &DirectedTradingPair, - ) -> Pin> + Send + 'static>> { + ) -> Pin> + Send + 'static>> + { let prefix = engine::price_index::prefix(pair); tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price"); - self.nonverifiable_prefix_raw(&prefix) + self.nonverifiable_prefix(&prefix) .map(|entry| match entry { - Ok((k, _)) => { + Ok((k, lp)) => { let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned(); - Ok(position::Id(raw_id)) + Ok((position::Id(raw_id), lp)) } Err(e) => Err(e), }) @@ -90,12 +91,9 @@ pub trait PositionRead: StateRead { async fn best_position( &self, pair: &DirectedTradingPair, - ) -> Result> { + ) -> Result> { let mut positions_by_price = self.positions_by_price(pair); - match positions_by_price.next().await.transpose()? { - Some(id) => self.position_by_id(&id).await, - None => Ok(None), - } + positions_by_price.next().await.transpose() } /// Fetch the list of pending position closures. @@ -205,7 +203,8 @@ pub trait PositionManager: StateWrite + PositionRead { new_state }; - self.update_position(Some(prev_state), new_state).await?; + self.update_position(id, Some(prev_state), new_state) + .await?; Ok(()) } @@ -280,7 +279,7 @@ pub trait PositionManager: StateWrite + PositionRead { // Finally, record the new position state. self.record_proto(event::position_open(&position)); - self.update_position(None, position).await?; + self.update_position(&id, None, position).await?; Ok(()) } @@ -374,7 +373,8 @@ pub trait PositionManager: StateWrite + PositionRead { .map_err(|e| tracing::warn!(?e, "failed to record position execution")) .ok(); - self.update_position(Some(prev_state), new_state).await + self.update_position(&position_id, Some(prev_state), new_state) + .await } /// Withdraw from a closed position, incrementing its sequence number. @@ -450,7 +450,8 @@ pub trait PositionManager: StateWrite + PositionRead { new_state }; - self.update_position(Some(prev_state), new_state).await?; + self.update_position(&position_id, Some(prev_state), new_state) + .await?; Ok(reserves) } @@ -462,15 +463,15 @@ impl PositionManager for T {} trait Inner: StateWrite { /// Writes a position to the state, updating all necessary indexes. /// - /// This should be the SOLE ENTRYPOINT for writing positions to the state. + /// This should be the **SOLE ENTRYPOINT** for writing positions to the state. /// All other position changes exposed by the `PositionManager` should run through here. #[instrument(level = "debug", skip_all)] async fn update_position( &mut self, + id: &position::Id, prev_state: Option, new_state: Position, ) -> Result { - let id = new_state.id(); tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state"); tracing::trace!(?id, ?prev_state, ?new_state, "updating position state"); @@ -478,12 +479,12 @@ trait Inner: StateWrite { Self::guard_invalid_transitions(&prev_state, &new_state, &id)?; // Update the DEX engine indices: - self.update_position_by_price_index(&prev_state, &new_state, &id)?; - self.update_position_by_inventory_index(&prev_state, &new_state, &id)?; - self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id) + self.update_position_by_inventory_index(&id, &prev_state, &new_state)?; + self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state) .await?; - self.update_trading_pair_position_counter(&prev_state, &new_state, &id) + self.update_trading_pair_position_counter(&prev_state, &new_state) .await?; + self.update_position_by_price_index(&id, &prev_state, &new_state)?; self.put(state_key::position_by_id(&id), new_state.clone()); Ok(new_state) diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs index ce6b16d7a5..5e8b8ffaa3 100644 --- a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -76,9 +76,9 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite { /// │ └──┘ async fn update_asset_by_base_liquidity_index( &mut self, + id: &position::Id, prev_state: &Option, new_state: &Position, - id: &position::Id, ) -> Result<()> { // We need to reconstruct the position's previous contribution and compute // its new contribution to the index. We do this for each asset in the pair diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 023494f0dd..67ffad31af 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -41,7 +41,6 @@ pub(crate) trait PositionCounter: StateWrite { &mut self, prev_state: &Option, new_state: &Position, - _id: &position::Id, ) -> Result<()> { use position::State::*; let trading_pair = new_state.phi.pair; diff --git a/crates/core/component/dex/src/component/position_manager/inventory_index.rs b/crates/core/component/dex/src/component/position_manager/inventory_index.rs index 24a60ab7d0..dc9c4efb2e 100644 --- a/crates/core/component/dex/src/component/position_manager/inventory_index.rs +++ b/crates/core/component/dex/src/component/position_manager/inventory_index.rs @@ -13,9 +13,9 @@ use position::State::*; pub(super) trait PositionByInventoryIndex: StateWrite { fn update_position_by_inventory_index( &mut self, + position_id: &position::Id, prev_state: &Option, new_state: &Position, - position_id: &position::Id, ) -> Result<()> { // Clear an existing record of the position, since changes to the // reserves or the position state might have invalidated it. diff --git a/crates/core/component/dex/src/component/position_manager/price_index.rs b/crates/core/component/dex/src/component/position_manager/price_index.rs index 26d6be2b48..ac373e9851 100644 --- a/crates/core/component/dex/src/component/position_manager/price_index.rs +++ b/crates/core/component/dex/src/component/position_manager/price_index.rs @@ -1,4 +1,5 @@ use cnidarium::StateWrite; +use penumbra_proto::StateWriteProto; use crate::{ lp::position::{self, Position}, @@ -12,9 +13,9 @@ use position::State::*; pub(crate) trait PositionByPriceIndex: StateWrite { fn update_position_by_price_index( &mut self, + position_id: &position::Id, prev_state: &Option, new_state: &Position, - position_id: &position::Id, ) -> Result<()> { // Clear an existing record for the position, since changes to the // reserves or the position state might have invalidated it. @@ -57,7 +58,10 @@ trait Inner: StateWrite { end: pair.asset_2(), }; let phi12 = phi.component.clone(); - self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]); + self.nonverifiable_put( + engine::price_index::key(&pair12, &phi12, &id), + position.clone(), + ); tracing::debug!("indexing position for 1=>2 trades"); } @@ -68,7 +72,10 @@ trait Inner: StateWrite { end: pair.asset_1(), }; let phi21 = phi.component.flip(); - self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]); + self.nonverifiable_put( + engine::price_index::key(&pair21, &phi21, &id), + position.clone(), + ); tracing::debug!("indexing position for 2=>1 trades"); } } diff --git a/crates/core/component/dex/src/component/router/fill_route.rs b/crates/core/component/dex/src/component/router/fill_route.rs index eb618b0968..4e3c24af0a 100644 --- a/crates/core/component/dex/src/component/router/fill_route.rs +++ b/crates/core/component/dex/src/component/router/fill_route.rs @@ -286,8 +286,10 @@ fn breakdown_route(route: &[asset::Id]) -> Result, Fill } } -type PositionsByPrice = - BTreeMap> + Send>>>; +type PositionsByPrice = BTreeMap< + DirectedTradingPair, + Pin> + Send>>, +>; /// A frontier of least-priced positions along a route. struct Frontier { @@ -368,7 +370,7 @@ impl Frontier { for pair in &pairs { 'next_position: loop { - let id = positions_by_price + let (id, position) = positions_by_price .get_mut(pair) .expect("positions_by_price should have an entry for each pair") .as_mut() @@ -379,14 +381,6 @@ impl Frontier { // Check that the position is not already part of the frontier. if !position_ids.contains(&id) { - // TODO: fold positions into position_by_id stream - // so separate state lookup not necessary - let position = state - .position_by_id(&id) - .await - .expect("stream doesn't error") - .ok_or(FillError::MissingFrontierPosition(id))?; - position_ids.insert(id); positions.push(position); @@ -508,7 +502,7 @@ impl Frontier { loop { let pair = &self.pairs[index]; - let next_position_id = match self + let (next_position_id, next_position) = match self .positions_by_price .get_mut(pair) .expect("positions_by_price should have an entry for each pair") @@ -525,7 +519,9 @@ impl Frontier { } // Otherwise, we need to check that the position is not already // part of the current frontier. - Some(position_id) if !self.position_ids.contains(&position_id) => position_id, + Some((position_id, lp)) if !self.position_ids.contains(&position_id) => { + (position_id, lp) + } // Otherwise, continue to the next position in the stream. Some(position_id) => { tracing::debug!(?position_id, "skipping position already in frontier"); @@ -533,13 +529,6 @@ impl Frontier { } }; - let next_position = self - .state - .position_by_id(&next_position_id) - .await - .expect("stream doesn't error") - .expect("indexed position should exist"); - tracing::debug!( ?next_position_id, ?next_position, diff --git a/crates/core/component/dex/src/component/router/path.rs b/crates/core/component/dex/src/component/router/path.rs index 4addb03b68..dde86045aa 100644 --- a/crates/core/component/dex/src/component/router/path.rs +++ b/crates/core/component/dex/src/component/router/path.rs @@ -67,7 +67,10 @@ impl Path { async fn extend_to_inner(mut self, new_end: asset::Id) -> Result>> { let target_pair = DirectedTradingPair::new(*self.end(), new_end); - let Some(best_price_position) = self.state.best_position(&target_pair).await? else { + // Pulls the (id, position) that have the best effective price for this hop. + let Some((best_price_lp_id, best_price_lp)) = + self.state.best_position(&target_pair).await? + else { tracing::trace!("no best position, failing to extend path"); return Ok(None); }; @@ -75,10 +78,10 @@ impl Path { // ensuring we don't double-count liquidity while traversing cycles. use crate::component::position_manager::price_index::PositionByPriceIndex; self.state - .deindex_position_by_price(&best_price_position, &best_price_position.id()); + .deindex_position_by_price(&best_price_lp, &best_price_lp_id); // Compute the effective price of a trade in the direction self.end()=>new_end - let hop_price = best_price_position + let hop_price = best_price_lp .phi .orient_end(new_end) .expect("position should be contain the end asset") @@ -87,7 +90,7 @@ impl Path { match self.price * hop_price { Ok(path_price) => { // Update and return the path. - tracing::debug!(%path_price, %hop_price, id = ?best_price_position.id(), "extended path"); + tracing::debug!(%path_price, %hop_price, ?best_price_lp_id, "extended path"); self.price = path_price; self.nodes.push(new_end); // Create a new span for the extension. Note: this is a child of diff --git a/crates/core/component/dex/src/component/router/tests.rs b/crates/core/component/dex/src/component/router/tests.rs index 4f94abcbd1..2ebb5dc8f5 100644 --- a/crates/core/component/dex/src/component/router/tests.rs +++ b/crates/core/component/dex/src/component/router/tests.rs @@ -446,8 +446,8 @@ async fn position_get_best_price() -> anyhow::Result<()> { let positions = state_tx .positions_by_price(&pair) .then(|result| async { - let id = result.unwrap(); - state_tx.position_by_id(&id).await.unwrap().unwrap() + let (_, lp) = result.unwrap(); + lp }) .collect::>() .await; @@ -463,8 +463,8 @@ async fn position_get_best_price() -> anyhow::Result<()> { let positions = state_tx .positions_by_price(&pair) .then(|result| async { - let id = result.unwrap(); - state_tx.position_by_id(&id).await.unwrap().unwrap() + let (_, lp) = result.unwrap(); + lp }) .collect::>() .await; @@ -505,7 +505,7 @@ async fn test_multiple_similar_position() -> anyhow::Result<()> { // that we make up a context here. let context = pair_1.into_directed_trading_pair(); - let mut p_1 = state_tx + let (_, mut p_1) = state_tx .best_position(&pair_1.into_directed_trading_pair()) .await .unwrap() @@ -514,7 +514,7 @@ async fn test_multiple_similar_position() -> anyhow::Result<()> { p_1.reserves = p_1.reserves.flip(); state_tx.position_execution(p_1, context).await.unwrap(); - let mut p_2 = state_tx + let (_, mut p_2) = state_tx .best_position(&pair_1.into_directed_trading_pair()) .await .unwrap() diff --git a/crates/core/component/dex/src/component/rpc.rs b/crates/core/component/dex/src/component/rpc.rs index d195f3c737..1dd595b154 100644 --- a/crates/core/component/dex/src/component/rpc.rs +++ b/crates/core/component/dex/src/component/rpc.rs @@ -382,18 +382,26 @@ impl QueryService for Server { start: pair.asset_2(), end: pair.asset_1(), }; - let best_1_to_2_position = state.best_position(&pair12).await.map_err(|e| { - tonic::Status::internal(format!( - "error finding best position for {:?}: {:#}", - pair12, e - )) - })?; - let best_2_to_1_position = state.best_position(&pair12).await.map_err(|e| { - tonic::Status::internal(format!( - "error finding best position for {:?}: {:#}", - pair21, e - )) - })?; + let best_1_to_2_position = state + .best_position(&pair12) + .await + .map_err(|e| { + tonic::Status::internal(format!( + "error finding best position for {:?}: {:#}", + pair12, e + )) + })? + .map(|(_, p)| p); + let best_2_to_1_position = state + .best_position(&pair12) + .await + .map_err(|e| { + tonic::Status::internal(format!( + "error finding best position for {:?}: {:#}", + pair21, e + )) + })? + .map(|(_, p)| p); let approx_effective_price_1_to_2 = best_1_to_2_position .as_ref() @@ -453,21 +461,9 @@ impl QueryService for Server { let s = state .positions_by_price(&pair) .take(limit) - .and_then(move |id| { - let state2 = state.clone(); - async move { - let position = state2.position_by_id(&id).await?.ok_or_else(|| { - anyhow::anyhow!("indexed position not found in state: {}", id) - })?; - anyhow::Ok(position) - } - }) - .map_ok(|position| { - let id = position.id(); - LiquidityPositionsByPriceResponse { - data: Some(position.into()), - id: Some(id.into()), - } + .map_ok(|(id, position)| LiquidityPositionsByPriceResponse { + data: Some(position.into()), + id: Some(id.into()), }) .map_err(|e: anyhow::Error| { tonic::Status::internal(format!("error retrieving positions: {:#}", e)) diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 6f0747d8f8..01e653de27 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -19,6 +19,7 @@ pub fn positions(trading_pair: &TradingPair, position_id: &str) -> String { } /// Looks up a `Position` by its ID +// This should only ever be called by `position_manager::Inner::update_position`. pub fn position_by_id(id: &position::Id) -> String { format!("dex/position/{id}") }