diff --git a/CHANGELOG.md b/CHANGELOG.md index aacaae9f..661413dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ and this project adheres to - Upgrade Rust to 1.69.0 and workspace-optimizer to 0.13.0. - proxy: make `test_mode` optional in `InstantiateMsg`. +- gateway: Use new storage layout to allow an efficient query of all jobs. + ([#263]) +- gateway: do not emit `jobs_left` attribute on a when beacon is added since + this cannot efficiently be queried. ([#263]) + +[#263]: https://github.com/noislabs/nois-contracts/pull/263 ## [0.13.6] - 2023-06-17 diff --git a/contracts/nois-gateway/src/contract.rs b/contracts/nois-gateway/src/contract.rs index df115e02..d7da3de8 100644 --- a/contracts/nois-gateway/src/contract.rs +++ b/contracts/nois-gateway/src/contract.rs @@ -497,10 +497,8 @@ fn execute_add_verified_round( let NewDrand { msgs, jobs_processed, - jobs_left, } = router.new_drand(deps, env, round, &randomness, is_verifying_tx)?; attributes.push(Attribute::new("jobs_processed", jobs_processed.to_string())); - attributes.push(Attribute::new("jobs_left", jobs_left.to_string())); Ok(Response::new() .add_messages(msgs) @@ -1059,8 +1057,6 @@ mod tests { assert_eq!(res.messages.len(), 0); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "0"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "0"); // Process one job let msg = make_add_verified_round_msg(ROUND2, true); @@ -1073,8 +1069,6 @@ mod tests { )); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "1"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "0"); // Create 2 job for i in 0..2 { @@ -1105,8 +1099,6 @@ mod tests { )); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "2"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "0"); // Create 21 job for i in 0..21 { @@ -1128,8 +1120,6 @@ mod tests { assert_eq!(res.messages.len(), 2); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "2"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "19"); // Process next 2 jobs let msg = make_add_verified_round_msg(ROUND4, true); @@ -1137,8 +1127,6 @@ mod tests { assert_eq!(res.messages.len(), 2); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "2"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "17"); // Process next 2 jobs let msg = make_add_verified_round_msg(ROUND4, true); @@ -1146,8 +1134,6 @@ mod tests { assert_eq!(res.messages.len(), 2); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "2"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "15"); // Process next 14 jobs let msg = make_add_verified_round_msg(ROUND4, false); @@ -1155,8 +1141,6 @@ mod tests { assert_eq!(res.messages.len(), 14); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "14"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "1"); // Process last 1 jobs let msg = make_add_verified_round_msg(ROUND4, false); @@ -1164,8 +1148,6 @@ mod tests { assert_eq!(res.messages.len(), 1); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "1"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "0"); // No jobs left for later submissions let msg = make_add_verified_round_msg(ROUND4, true); @@ -1173,8 +1155,6 @@ mod tests { assert_eq!(res.messages.len(), 0); let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap(); assert_eq!(jobs_processed, "0"); - let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap(); - assert_eq!(jobs_left, "0"); } // diff --git a/contracts/nois-gateway/src/request_router.rs b/contracts/nois-gateway/src/request_router.rs index fe93332f..4520ef1a 100644 --- a/contracts/nois-gateway/src/request_router.rs +++ b/contracts/nois-gateway/src/request_router.rs @@ -10,7 +10,7 @@ use crate::{ drand_archive::{archive_lookup, archive_store}, state::{ increment_processed_drand_jobs, unprocessed_drand_jobs_dequeue, - unprocessed_drand_jobs_enqueue, unprocessed_drand_jobs_len, Job, + unprocessed_drand_jobs_enqueue, Job, }, }; @@ -32,7 +32,6 @@ pub struct RoutingReceipt { pub struct NewDrand { pub msgs: Vec, pub jobs_processed: u32, - pub jobs_left: u32, } pub struct RequestRouter {} @@ -140,11 +139,9 @@ impl RequestRouter { break; } } - let jobs_left = unprocessed_drand_jobs_len(deps.storage, round)?; Ok(NewDrand { msgs, jobs_processed, - jobs_left, }) } } diff --git a/contracts/nois-gateway/src/state/jobs.rs b/contracts/nois-gateway/src/state/drand_jobs/drand_jobs1.rs similarity index 97% rename from contracts/nois-gateway/src/state/jobs.rs rename to contracts/nois-gateway/src/state/drand_jobs/drand_jobs1.rs index fd616229..77ff23f8 100644 --- a/contracts/nois-gateway/src/state/jobs.rs +++ b/contracts/nois-gateway/src/state/drand_jobs/drand_jobs1.rs @@ -1,5 +1,7 @@ use cosmwasm_schema::cw_serde; -use cosmwasm_std::{from_slice, Binary, Order, StdResult, Storage}; +#[cfg(test)] +use cosmwasm_std::{from_slice, Order}; +use cosmwasm_std::{Binary, StdResult, Storage}; use cw_storage_plus::Deque; const UNPROCESSED_DRAND_JOBS_KEY_LEN: u16 = 24; @@ -7,6 +9,7 @@ const UNPROCESSED_DRAND_JOBS_KEY_LEN: u16 = 24; /// This is the length of the storage key of a meta field /// of the Deque storage type. The 2 is the length-prefixed encoding. /// The 1 is the "h" or "t". +#[cfg(test)] const DEQUE_META_FIELD_LEN: usize = 2 + (UNPROCESSED_DRAND_JOBS_KEY_LEN as usize) + 1; #[cw_serde] @@ -28,6 +31,7 @@ fn unprocessed_drand_jobs_key(round: u64) -> String { } /// Add an element to the unprocessed drand jobs queue of this round +#[cfg(test)] pub fn unprocessed_drand_jobs_enqueue( storage: &mut dyn Storage, round: u64, @@ -52,6 +56,7 @@ pub fn unprocessed_drand_jobs_len(storage: &dyn Storage, round: u64) -> StdResul Deque::::new(&prefix).len(storage) } +#[cfg(test)] pub fn all_unprocessed_drand_jobs( storage: &dyn Storage, order: Order, diff --git a/contracts/nois-gateway/src/state/drand_jobs/drand_jobs2.rs b/contracts/nois-gateway/src/state/drand_jobs/drand_jobs2.rs new file mode 100644 index 00000000..7514a794 --- /dev/null +++ b/contracts/nois-gateway/src/state/drand_jobs/drand_jobs2.rs @@ -0,0 +1,132 @@ +use cosmwasm_std::{Order, StdResult, Storage}; +use cw_storage_plus::Map; + +use super::Job; + +/// A map from (round, job ID) here job ID is a round specific auto incrementing ID +const JOBS: Map<(u32, u16), Job> = Map::new("djobs"); +const LAST_JOB_ID: Map = Map::new("djids"); + +/// Add an element to the unprocessed drand jobs queue of this round +pub fn unprocessed_drand_jobs_enqueue( + storage: &mut dyn Storage, + round: u64, + value: &Job, +) -> StdResult<()> { + let round: u32 = round.try_into().expect("round must not exceed u32 range"); + let new_id = LAST_JOB_ID.may_load(storage, round)?.unwrap_or_default() + 1; + JOBS.save(storage, (round, new_id), value)?; + LAST_JOB_ID.save(storage, round, &new_id)?; + Ok(()) +} + +/// Remove an element from the unprocessed drand jobs queue of this round +pub fn unprocessed_drand_jobs_dequeue( + storage: &mut dyn Storage, + round: u64, +) -> StdResult> { + let round: u32 = round.try_into().expect("round must not exceed u32 range"); + let first = JOBS + .prefix(round) + .range(storage, None, None, Order::Ascending) + .next(); + let Some(found) = first else { + return Ok(None); + }; + let (id, job) = found?; + JOBS.remove(storage, (round, id)); + Ok(Some(job)) +} + +/// Gets the number of unprocessed drand jobs queue of this round. +/// This is inefficient for many jobs in a single round. +pub fn unprocessed_drand_jobs_len(storage: &dyn Storage, round: u64) -> StdResult { + let round: u32 = round.try_into().expect("round must not exceed u32 range"); + let count = JOBS + .prefix(round) + .keys_raw(storage, None, None, Order::Ascending) + .count(); + Ok(count as u32) +} + +pub fn all_unprocessed_drand_jobs( + storage: &dyn Storage, + order: Order, + offset: usize, + limit: usize, +) -> StdResult> { + JOBS.range_raw(storage, None, None, order) + .skip(offset) + .take(limit) + .map(|res| res.map(|ok| ok.1)) + .collect::>>() +} + +#[cfg(test)] +mod tests { + use cosmwasm_std::{testing::MockStorage, Binary}; + + use super::*; + + #[test] + fn all_unprocessed_drand_jobs_works() { + let mut storage = MockStorage::default(); + + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap(); + assert_eq!(jobs, []); + + let job1 = Job { + channel: "chan-123".to_string(), + source_id: "drannd:foo:bar".to_string(), + origin: Binary::from([1, 2, 1, 2]), + }; + unprocessed_drand_jobs_enqueue(&mut storage, 3, &job1).unwrap(); + + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap(); + assert_eq!(jobs, &[job1.clone()]); + + let job2 = Job { + channel: "chan-123".to_string(), + source_id: "drannd:foo:baz".to_string(), + origin: Binary::from([17, 4]), + }; + unprocessed_drand_jobs_enqueue(&mut storage, 3, &job2).unwrap(); + + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap(); + assert_eq!(jobs, &[job1.clone(), job2.clone()]); + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 1).unwrap(); + assert_eq!(jobs, &[job1.clone()]); + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 1, 100).unwrap(); + assert_eq!(jobs, &[job2.clone()]); + let jobs = all_unprocessed_drand_jobs(&storage, Order::Descending, 0, 100).unwrap(); + assert_eq!(jobs, &[job2.clone(), job1.clone()]); + let jobs = all_unprocessed_drand_jobs(&storage, Order::Descending, 0, 1).unwrap(); + assert_eq!(jobs, &[job2.clone()]); + let jobs = all_unprocessed_drand_jobs(&storage, Order::Descending, 1, 100).unwrap(); + assert_eq!(jobs, &[job1.clone()]); + + let _ = unprocessed_drand_jobs_dequeue(&mut storage, 3).unwrap(); + + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap(); + assert_eq!(jobs, &[job2.clone()]); + + // new job in higher round + let job3 = Job { + channel: "chan-123".to_string(), + source_id: "drannd:foo:test".to_string(), + origin: Binary::from([42, 42]), + }; + unprocessed_drand_jobs_enqueue(&mut storage, 4, &job3).unwrap(); + + // new job in lower round + let job4 = Job { + channel: "chan-123".to_string(), + source_id: "drannd:foo:test".to_string(), + origin: Binary::from([12, 21]), + }; + unprocessed_drand_jobs_enqueue(&mut storage, 2, &job4).unwrap(); + + let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap(); + assert_eq!(jobs, &[job4, job2, job3]); + } +} diff --git a/contracts/nois-gateway/src/state/drand_jobs/mod.rs b/contracts/nois-gateway/src/state/drand_jobs/mod.rs new file mode 100644 index 00000000..aa4e12bc --- /dev/null +++ b/contracts/nois-gateway/src/state/drand_jobs/mod.rs @@ -0,0 +1,111 @@ +mod drand_jobs1; +mod drand_jobs2; + +use cosmwasm_std::{StdResult, Storage}; +pub use drand_jobs1::Job; + +// This is too inefficient for drand_jobs1 and only used in an informational query +pub use drand_jobs2::all_unprocessed_drand_jobs; + +// New jobs always go to v2 +pub use drand_jobs2::unprocessed_drand_jobs_enqueue; + +/// Gets the number of unprocessed drand jobs queue of this round. +/// This is inefficient for many jobs in a single round. +pub fn unprocessed_drand_jobs_len(storage: &dyn Storage, round: u64) -> StdResult { + let l1 = drand_jobs1::unprocessed_drand_jobs_len(storage, round)?; + let l2 = drand_jobs2::unprocessed_drand_jobs_len(storage, round)?; + Ok(l1 + l2) +} + +pub fn unprocessed_drand_jobs_dequeue( + storage: &mut dyn Storage, + round: u64, +) -> StdResult> { + if let Some(job_v1) = drand_jobs1::unprocessed_drand_jobs_dequeue(storage, round)? { + return Ok(Some(job_v1)); + } + if let Some(job_v2) = drand_jobs2::unprocessed_drand_jobs_dequeue(storage, round)? { + return Ok(Some(job_v2)); + } + Ok(None) +} + +#[cfg(test)] +mod tests { + use cosmwasm_std::{testing::MockStorage, Binary}; + + use super::*; + + fn make_job(id: u32) -> Job { + Job { + channel: "chan-123".to_string(), + source_id: "drannd:foo:bar".to_string(), + origin: Binary::from(id.to_be_bytes()), + } + } + + #[test] + fn unprocessed_drand_jobs_dequeue_works() { + let mut storage = MockStorage::default(); + + let round = 3; + + let job = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(job, None); + + let job1 = make_job(1); + let job2 = make_job(2); + let job3 = make_job(3); + let job4 = make_job(4); + + drand_jobs1::unprocessed_drand_jobs_enqueue(&mut storage, round, &job1).unwrap(); + drand_jobs1::unprocessed_drand_jobs_enqueue(&mut storage, round, &job2).unwrap(); + drand_jobs2::unprocessed_drand_jobs_enqueue(&mut storage, round, &job3).unwrap(); + drand_jobs2::unprocessed_drand_jobs_enqueue(&mut storage, round, &job4).unwrap(); + + let job = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(job, Some(job1)); + let job = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(job, Some(job2)); + let job = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(job, Some(job3)); + let job = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(job, Some(job4)); + let job = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(job, None); + } + + #[test] + fn unprocessed_drand_jobs_len_works() { + let mut storage = MockStorage::default(); + + let round = 3; + + let job1 = make_job(1); + let job2 = make_job(2); + let job3 = make_job(3); + let job4 = make_job(4); + + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 0); + drand_jobs1::unprocessed_drand_jobs_enqueue(&mut storage, round, &job1).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 1); + drand_jobs1::unprocessed_drand_jobs_enqueue(&mut storage, round, &job2).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 2); + drand_jobs2::unprocessed_drand_jobs_enqueue(&mut storage, round, &job3).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 3); + drand_jobs2::unprocessed_drand_jobs_enqueue(&mut storage, round, &job4).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 4); + + let _ = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 3); + let _ = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 2); + let _ = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 1); + let _ = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 0); + let _ = unprocessed_drand_jobs_dequeue(&mut storage, round).unwrap(); + assert_eq!(unprocessed_drand_jobs_len(&storage, round).unwrap(), 0); + } +} diff --git a/contracts/nois-gateway/src/state/mod.rs b/contracts/nois-gateway/src/state/mod.rs index 83be2f4f..d7e3b29f 100644 --- a/contracts/nois-gateway/src/state/mod.rs +++ b/contracts/nois-gateway/src/state/mod.rs @@ -1,12 +1,12 @@ mod config; mod customers; -mod jobs; +mod drand_jobs; mod requests_log; mod stats; pub use config::{Config, CONFIG}; pub use customers::{Customer, CUSTOMERS}; -pub use jobs::{ +pub use drand_jobs::{ all_unprocessed_drand_jobs, unprocessed_drand_jobs_dequeue, unprocessed_drand_jobs_enqueue, unprocessed_drand_jobs_len, Job, };