Skip to content

Commit

Permalink
dex: fold execution budget in DexParameters (#4620)
Browse files Browse the repository at this point in the history
## Describe your changes
Close #4602, and simplifies the execution breaker as well, fixing an
edge case that would cause only successful executions to be counted.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > This is state breaking, and contains a migration.

---------

Co-authored-by: Tal Derei <70081547+TalDerei@users.noreply.github.com>
  • Loading branch information
erwanor and TalDerei authored Jun 15, 2024
1 parent 16750b4 commit 4debdd9
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 59 deletions.
18 changes: 17 additions & 1 deletion crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::TryStreamExt as _;
use futures::{pin_mut, StreamExt};
use jmt::RootHash;
use penumbra_app::app::StateReadExt as _;
use penumbra_dex::component::PositionManager;
use penumbra_dex::component::{PositionManager, StateReadExt, StateWriteExt};
use penumbra_dex::lp::position;
use penumbra_dex::lp::position::Position;
use penumbra_governance::proposal_state::State as ProposalState;
Expand Down Expand Up @@ -86,6 +86,9 @@ pub async fn migrate(
// Re-index all open positions.
reindex_dex_positions(&mut delta).await?;

// Write the new dex parameters (with the execution budget field) to the state.
update_dex_params(&mut delta).await?;

// Reset the application height and halt flag.
delta.ready_to_start();
delta.put_block_height(0u64);
Expand Down Expand Up @@ -148,6 +151,19 @@ pub async fn migrate(
Ok(())
}

/// Write the updated dex parameters to the chain state.
async fn update_dex_params(delta: &mut StateDelta<Snapshot>) -> anyhow::Result<()> {
let mut dex_params = delta
.get_dex_params()
.await
.expect("chain state is initialized");
dex_params.max_execution_budget = 64;
delta.put_dex_params(dex_params);

Ok(())
}

/// Reindex opened liquidity positions to augment the price indexes.
async fn reindex_dex_positions(delta: &mut StateDelta<Snapshot>) -> anyhow::Result<()> {
tracing::info!("running dex re-indexing migration");
let prefix_key_lp = penumbra_dex::state_key::all_positions();
Expand Down
3 changes: 2 additions & 1 deletion crates/core/app/src/params/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl AppParameters {
fixed_candidates: _,
max_hops: _,
max_positions_per_pair: _,
max_execution_budget: _,
},
// IMPORTANT: Don't use `..` here! We want to ensure every single field is verified!
} = self;
Expand Down Expand Up @@ -203,6 +204,7 @@ impl AppParameters {
fixed_candidates: _,
max_hops: _,
max_positions_per_pair: _,
max_execution_budget: _,
},
// IMPORTANT: Don't use `..` here! We want to ensure every single field is verified!
} = self;
Expand Down Expand Up @@ -278,7 +280,6 @@ impl AppParameters {
*min_validator_stake >= 1_000_000u128.into(),
"the minimum validator stake must be at least 1penumbra",
),
// TODO(erwan): add a `max_positions_per_pair` check
])
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/core/component/dex/src/component/arb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use crate::component::metrics;
use crate::component::{metrics, StateReadExt};
use anyhow::Result;
use async_trait::async_trait;
use cnidarium::{StateDelta, StateWrite};
Expand Down Expand Up @@ -43,7 +43,8 @@ pub trait Arbitrage: StateWrite + Sized {
amount: u64::MAX.into(),
};

let execution_circuit_breaker = ExecutionCircuitBreaker::default();
let execution_budget = self.get_dex_params().await?.max_execution_budget;
let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
let swap_execution = this
.route_and_fill(
arb_token,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,28 @@
const MAX_PATH_SEARCHES: u32 = 64;
const MAX_EXECUTIONS: u32 = 64;

/// Holds the state of the execution circuit breaker.
/// Responsible for managing the conditions of halting execution of
/// a single batch swap. All execution circuit breaker triggers are
/// non-fatal and will allow the swap to be partially fulfilled up
/// to the search and execution limits managed by the circuit breaker.
/// The execution circuit breaker meters the number of operations (search and execution)
/// performed to fulfill a batch swap.
///
/// The Dex component MUST call `CircuitBreaker::exceed_limits` before an execution round.
///
/// The circuit breaker ensures the swap will not use unbounded time complexity.
/// Note that in practice, this means that a batch swap can result in a partial fill
/// even if there were enough liquidity to fulfill all of it.
#[derive(Debug, Clone)]
pub(crate) struct ExecutionCircuitBreaker {
/// The maximum number of times to perform path searches before stopping.
pub max_path_searches: u32,
/// The number of times path searches have been performed.
pub current_path_searches: u32,
/// The maximum number of times to execute against liquidity positions before stopping.
pub max_executions: u32,
/// The number of times liquidity positions have been executed against.
pub current_executions: u32,
/// The current number of operations performed.
pub counter: u32,
/// The maximum number of operations allowed.
pub max: u32,
}

impl ExecutionCircuitBreaker {
#[allow(dead_code)]
pub fn new(max_path_searches: u32, max_executions: u32) -> Self {
Self {
max_path_searches,
current_path_searches: 0,
max_executions,
current_executions: 0,
}
pub fn new(max: u32) -> Self {
Self { max, counter: 0 }
}

pub fn exceeded_limits(&self) -> bool {
self.current_path_searches > self.max_path_searches
|| self.current_executions > self.max_executions
pub fn increment(&mut self) {
self.counter += 1;
}
}

impl Default for ExecutionCircuitBreaker {
fn default() -> Self {
Self {
max_path_searches: MAX_PATH_SEARCHES,
current_path_searches: 0,
max_executions: MAX_EXECUTIONS,
current_executions: 0,
}
pub fn exceeded_limits(&self) -> bool {
self.counter >= self.max
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,10 @@ mod tests {
state_tx.apply();

let routing_params = state.routing_params().await.unwrap();
let max_execution = state.get_dex_params().await.unwrap().max_execution_budget;
// This call should panic due to the outflow of gn not being covered by the circuit breaker.
state
.handle_batch_swaps(trading_pair, swap_flow, 0, routing_params)
.handle_batch_swaps(trading_pair, swap_flow, 0, routing_params, max_execution)
.await
.expect("unable to process batch swaps");
}
Expand Down
6 changes: 6 additions & 0 deletions crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl Component for Dex {

// 2. For each batch swap during the block, calculate clearing prices and set in the JMT.
let routing_params = state.routing_params().await.expect("dex params are set");
let execution_budget = state
.get_dex_params()
.await
.expect("dex params are set")
.max_execution_budget;

for (trading_pair, swap_flows) in state.swap_flows() {
let batch_start = std::time::Instant::now();
Expand All @@ -72,6 +77,7 @@ impl Component for Dex {
routing_params
.clone()
.with_extra_candidates([trading_pair.asset_1(), trading_pair.asset_2()]),
execution_budget,
)
.await
.expect("handling batch swaps is infaillible");
Expand Down
22 changes: 11 additions & 11 deletions crates/core/component/dex/src/component/router/route_and_fill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ pub trait HandleBatchSwaps: StateWrite + Sized {
self: &mut Arc<Self>,
trading_pair: TradingPair,
batch_data: SwapFlow,
// This will be read from the ABCI request
block_height: u64,
params: RoutingParams,
execution_budget: u32,
) -> Result<()>
where
Self: 'static,
{
let (delta_1, delta_2) = (batch_data.0, batch_data.1);

tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps");

let execution_circuit_breaker = ExecutionCircuitBreaker::default();
// We initialize a circuit breaker for this batch swap. This will limit the number of frontier
// executions up to the specified `execution_budget` parameter.
let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);

let swap_execution_1_for_2 = if delta_1.value() > 0 {
Some(
Expand Down Expand Up @@ -172,16 +173,19 @@ pub trait RouteAndFill: StateWrite + Sized {
let max_delta_1: Amount = MAX_RESERVE_AMOUNT.into();

// Termination conditions:
// 1. We have no more delta_1 remaining
// 1. We have no more `delta_1` remaining
// 2. A path can no longer be found
// 3. We have reached the `RoutingParams` specified price limit
// 4. The execution circuit breaker has been triggered based on the number of path searches and executions

loop {
// Check if we have exceeded the execution circuit breaker limits.
if execution_circuit_breaker.exceeded_limits() {
tracing::debug!("execution circuit breaker triggered, exiting route_and_fill");
break;
} else {
// This should be done ahead of doing any path search or execution, so that we never
// have to reason about the specific control flow of our batch swap logic.
execution_circuit_breaker.increment();
}

// Find the best route between the two assets in the trading pair.
Expand All @@ -200,9 +204,8 @@ pub trait RouteAndFill: StateWrite + Sized {
break;
}

// Increment the execution circuit breaker path search counter.
execution_circuit_breaker.current_path_searches += 1;

// We split off the entire batch swap into smaller chunks to avoid causing
// a series of overflow in the DEX.
let delta_1 = Value {
amount: total_unfilled_1.min(max_delta_1),
asset_id: asset_1,
Expand Down Expand Up @@ -259,9 +262,6 @@ pub trait RouteAndFill: StateWrite + Sized {
)
};

// Increment the execution circuit breaker execution counter.
execution_circuit_breaker.current_executions += 1;

if total_unfilled_1.value() == 0 {
tracing::debug!("filled all input, exiting route_and_fill");
break;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/component/dex/src/component/router/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ async fn best_position_route_and_fill() -> anyhow::Result<()> {
.unwrap();
let routing_params = state.routing_params().await.unwrap();
state
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), routing_params)
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), routing_params, 64)
.await
.expect("unable to process batch swaps");

Expand Down Expand Up @@ -1170,7 +1170,7 @@ async fn multi_hop_route_and_fill() -> anyhow::Result<()> {
.unwrap();
let routing_params = state.routing_params().await.unwrap();
state
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), routing_params)
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), routing_params, 64)
.await
.expect("unable to process batch swaps");

Expand Down
13 changes: 11 additions & 2 deletions crates/core/component/dex/src/component/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,10 @@ impl SimulationService for Server {
let start_time = std::time::Instant::now();
let state = self.storage.latest_snapshot();

let mut routing_params = state.routing_params().await.expect("routing params unset");
let mut routing_params = state
.routing_params()
.await
.expect("dex routing params are set");
match routing_strategy {
Setting::SingleHop(_) => {
routing_params.max_hops = 1;
Expand All @@ -631,8 +634,14 @@ impl SimulationService for Server {
}
}

let execution_budget = state
.get_dex_params()
.await
.expect("dex parameters are set")
.max_execution_budget;

let mut state_tx = Arc::new(StateDelta::new(state));
let execution_circuit_breaker = ExecutionCircuitBreaker::default();
let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
let swap_execution = state_tx
.route_and_fill(
input.asset_id,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/component/dex/src/component/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ async fn swap_execution_tests() -> anyhow::Result<()> {
.unwrap();
let routing_params = state.routing_params().await.unwrap();
state
.handle_batch_swaps(trading_pair, swap_flow, 0, routing_params)
.handle_batch_swaps(trading_pair, swap_flow, 0, routing_params, 64)
.await
.expect("unable to process batch swaps");

Expand Down Expand Up @@ -741,7 +741,7 @@ async fn swap_execution_tests() -> anyhow::Result<()> {
.unwrap();
let routing_params = state.routing_params().await.unwrap();
state
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), routing_params)
.handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), routing_params, 64)
.await
.expect("unable to process batch swaps");

Expand Down
4 changes: 4 additions & 0 deletions crates/core/component/dex/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct DexParameters {
pub fixed_candidates: Vec<asset::Id>,
pub max_hops: u32,
pub max_positions_per_pair: u32,
pub max_execution_budget: u32,
}

impl DomainType for DexParameters {
Expand All @@ -30,6 +31,7 @@ impl TryFrom<pb::DexParameters> for DexParameters {
.collect::<Result<_, _>>()?,
max_hops: msg.max_hops,
max_positions_per_pair: msg.max_positions_per_pair,
max_execution_budget: msg.max_execution_budget,
})
}
}
Expand All @@ -45,6 +47,7 @@ impl From<DexParameters> for pb::DexParameters {
.collect(),
max_hops: params.max_hops,
max_positions_per_pair: params.max_positions_per_pair,
max_execution_budget: params.max_execution_budget,
}
}
}
Expand All @@ -68,6 +71,7 @@ impl Default for DexParameters {
],
max_hops: 4,
max_positions_per_pair: 1_000,
max_execution_budget: 64,
}
}
}
4 changes: 4 additions & 0 deletions crates/proto/src/gen/penumbra.core.component.dex.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,10 @@ pub struct DexParameters {
/// inventory get evicted from the DEX.
#[prost(uint32, tag = "4")]
pub max_positions_per_pair: u32,
/// The maximum number of routing and execution steps to be performed
/// for a single pair
#[prost(uint32, tag = "5")]
pub max_execution_budget: u32,
}
impl ::prost::Name for DexParameters {
const NAME: &'static str = "DexParameters";
Expand Down
Loading

0 comments on commit 4debdd9

Please sign in to comment.