From ed1ae1b26d7dba31cdf6eac551eed22a99e9bf52 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Mon, 26 Aug 2024 12:50:49 +0200 Subject: [PATCH] bug: insert balances in chunks --- chain/src/main.rs | 2 +- chain/src/repository/balance.rs | 80 ++++++++++++++++++++++++++++++++- chain/src/test_db.rs | 19 ++++++++ orm/src/balances.rs | 11 +---- 4 files changed, 101 insertions(+), 11 deletions(-) diff --git a/chain/src/main.rs b/chain/src/main.rs index c5973abe..d049d996 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -362,7 +362,7 @@ async fn initial_query( conn.build_transaction() .read_write() .run(|transaction_conn| { - repository::balance::insert_balance( + repository::balance::insert_balance_in_chunks( transaction_conn, balances, )?; diff --git a/chain/src/repository/balance.rs b/chain/src/repository/balance.rs index b137ab08..b6b9efbc 100644 --- a/chain/src/repository/balance.rs +++ b/chain/src/repository/balance.rs @@ -1,8 +1,18 @@ +use diesel::sql_types::{BigInt, Integer}; use diesel::upsert::excluded; -use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; +use diesel::{ + sql_query, ExpressionMethods, PgConnection, QueryableByName, RunQueryDsl, +}; use orm::balances::BalancesInsertDb; use orm::schema::balances; use shared::balance::Balances; +pub const MAX_PARAM_SIZE: u16 = u16::MAX; + +#[derive(QueryableByName)] +struct BalanceCount { + #[diesel(sql_type = BigInt)] + count: i64, +} pub fn insert_balance( transaction_conn: &mut PgConnection, @@ -27,6 +37,32 @@ pub fn insert_balance( anyhow::Ok(()) } +pub fn insert_balance_in_chunks( + transaction_conn: &mut PgConnection, + balances: Balances, +) -> anyhow::Result<()> { + let balances_col_count = sql_query( + "SELECT COUNT(*) + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'balances';", + ) + .get_result::(transaction_conn)?; + + for chunk in balances + // We have to devide MAX_PARAM_SIZE by the number of columns in the balances table to get + // the correct number of parameters in the chunk. + .chunks((MAX_PARAM_SIZE as i64 / balances_col_count.count) as usize) + { + transaction_conn + .build_transaction() + .read_write() + .run(|conn| insert_balance(conn, chunk.to_vec()))?; + } + + anyhow::Ok(()) +} + #[cfg(test)] mod tests { @@ -303,6 +339,48 @@ mod tests { .expect("Failed to run test"); } + /// Test that we can insert more than u16::MAX balances + #[tokio::test] + async fn test_insert_balance_in_chunks_with_max_param_size_plus_one() { + let config = TestConfig::parse(); + let db = TestDb::new(&config); + + db.run_test_without_transaction(|conn| { + let mps = MAX_PARAM_SIZE as u32; + + let balances = + (0..mps + 1).map(|_| Balance::fake()).collect::>(); + + let res = insert_balance_in_chunks(conn, balances)?; + + assert_eq!(res, ()); + + anyhow::Ok(()) + }) + .await + .expect("Failed to run test"); + } + + /// Test that we can insert less than u16::MAX balances using chunks + #[tokio::test] + async fn test_insert_balance_in_chunks_with_1000_params() { + let config = TestConfig::parse(); + let db = TestDb::new(&config); + + db.run_test_without_transaction(|conn| { + let balances = + (0..1000).map(|_| Balance::fake()).collect::>(); + + let res = insert_balance_in_chunks(conn, balances)?; + + assert_eq!(res, ()); + + anyhow::Ok(()) + }) + .await + .expect("Failed to run test"); + } + fn seed_balance( conn: &mut PgConnection, balances: Vec, diff --git a/chain/src/test_db.rs b/chain/src/test_db.rs index 1e2ff6cb..b1c2c033 100644 --- a/chain/src/test_db.rs +++ b/chain/src/test_db.rs @@ -74,6 +74,25 @@ impl TestDb { anyhow::Ok(()) } + + pub async fn run_test_without_transaction( + &self, + test: impl Fn(&mut PgConnection) -> anyhow::Result<()> + + namada_sdk::MaybeSend + + 'static, + ) -> anyhow::Result<()> { + let conn = &mut self.pool.get().await.unwrap(); + + run_migrations(conn).await.unwrap(); + + conn.interact(move |conn| test(conn)) + .await + .context_db_interact_error() + .and_then(identity) + .into_db_error()?; + + anyhow::Ok(()) + } } impl Drop for TestDb { diff --git a/orm/src/balances.rs b/orm/src/balances.rs index 432805e4..b1c1540e 100644 --- a/orm/src/balances.rs +++ b/orm/src/balances.rs @@ -19,18 +19,11 @@ pub type BalanceDb = BalancesInsertDb; impl BalancesInsertDb { pub fn from_balance(balance: Balance) -> Self { - let asd = Self { + Self { owner: balance.owner.to_string(), token: balance.token.to_string(), raw_amount: BigDecimal::from_str(&balance.amount.to_string()) .expect("Invalid amount"), - }; - - println!( - "Balance_test: {:?}", - BigDecimal::from_str(&balance.amount.to_string()) - ); - - asd + } } }