Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make storage layout more efficient for all jobs query #263

Merged
merged 6 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ and this project adheres to

- Upgrade Rust to 1.69.0 and workspace-optimizer to 0.13.0.
- proxy: make `test_mode` optional in `InstantiateMsg`.
- gateway: Use new storage layout to allow an efficient query of all jobs.
([#263])
- gateway: do not emit `jobs_left` attribute on a when beacon is added since
this cannot efficiently be queried. ([#263])

[#263]: https://github.com/noislabs/nois-contracts/pull/263

## [0.13.6] - 2023-06-17

Expand Down
20 changes: 0 additions & 20 deletions contracts/nois-gateway/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,8 @@ fn execute_add_verified_round(
let NewDrand {
msgs,
jobs_processed,
jobs_left,
} = router.new_drand(deps, env, round, &randomness, is_verifying_tx)?;
attributes.push(Attribute::new("jobs_processed", jobs_processed.to_string()));
attributes.push(Attribute::new("jobs_left", jobs_left.to_string()));

Ok(Response::new()
.add_messages(msgs)
Expand Down Expand Up @@ -1059,8 +1057,6 @@ mod tests {
assert_eq!(res.messages.len(), 0);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "0");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "0");

// Process one job
let msg = make_add_verified_round_msg(ROUND2, true);
Expand All @@ -1073,8 +1069,6 @@ mod tests {
));
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "1");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "0");

// Create 2 job
for i in 0..2 {
Expand Down Expand Up @@ -1105,8 +1099,6 @@ mod tests {
));
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "2");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "0");

// Create 21 job
for i in 0..21 {
Expand All @@ -1128,53 +1120,41 @@ mod tests {
assert_eq!(res.messages.len(), 2);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "2");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "19");

// Process next 2 jobs
let msg = make_add_verified_round_msg(ROUND4, true);
let res = execute(deps.as_mut(), mock_env(), mock_info(DRAND, &[]), msg).unwrap();
assert_eq!(res.messages.len(), 2);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "2");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "17");

// Process next 2 jobs
let msg = make_add_verified_round_msg(ROUND4, true);
let res = execute(deps.as_mut(), mock_env(), mock_info(DRAND, &[]), msg).unwrap();
assert_eq!(res.messages.len(), 2);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "2");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "15");

// Process next 14 jobs
let msg = make_add_verified_round_msg(ROUND4, false);
let res = execute(deps.as_mut(), mock_env(), mock_info(DRAND, &[]), msg).unwrap();
assert_eq!(res.messages.len(), 14);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "14");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "1");

// Process last 1 jobs
let msg = make_add_verified_round_msg(ROUND4, false);
let res = execute(deps.as_mut(), mock_env(), mock_info(DRAND, &[]), msg).unwrap();
assert_eq!(res.messages.len(), 1);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "1");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "0");

// No jobs left for later submissions
let msg = make_add_verified_round_msg(ROUND4, true);
let res = execute(deps.as_mut(), mock_env(), mock_info(DRAND, &[]), msg).unwrap();
assert_eq!(res.messages.len(), 0);
let jobs_processed = first_attr(&res.attributes, "jobs_processed").unwrap();
assert_eq!(jobs_processed, "0");
let jobs_left = first_attr(&res.attributes, "jobs_left").unwrap();
assert_eq!(jobs_left, "0");
}

//
Expand Down
5 changes: 1 addition & 4 deletions contracts/nois-gateway/src/request_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
drand_archive::{archive_lookup, archive_store},
state::{
increment_processed_drand_jobs, unprocessed_drand_jobs_dequeue,
unprocessed_drand_jobs_enqueue, unprocessed_drand_jobs_len, Job,
unprocessed_drand_jobs_enqueue, Job,
},
};

Expand All @@ -32,7 +32,6 @@ pub struct RoutingReceipt {
pub struct NewDrand {
pub msgs: Vec<CosmosMsg>,
pub jobs_processed: u32,
pub jobs_left: u32,
}

pub struct RequestRouter {}
Expand Down Expand Up @@ -140,11 +139,9 @@ impl RequestRouter {
break;
}
}
let jobs_left = unprocessed_drand_jobs_len(deps.storage, round)?;
Ok(NewDrand {
msgs,
jobs_processed,
jobs_left,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use cosmwasm_schema::cw_serde;
use cosmwasm_std::{from_slice, Binary, Order, StdResult, Storage};
#[cfg(test)]
use cosmwasm_std::{from_slice, Order};
use cosmwasm_std::{Binary, StdResult, Storage};
use cw_storage_plus::Deque;

const UNPROCESSED_DRAND_JOBS_KEY_LEN: u16 = 24;

/// This is the length of the storage key of a meta field
/// of the Deque storage type. The 2 is the length-prefixed encoding.
/// The 1 is the "h" or "t".
#[cfg(test)]
const DEQUE_META_FIELD_LEN: usize = 2 + (UNPROCESSED_DRAND_JOBS_KEY_LEN as usize) + 1;

#[cw_serde]
Expand All @@ -28,6 +31,7 @@ fn unprocessed_drand_jobs_key(round: u64) -> String {
}

/// Add an element to the unprocessed drand jobs queue of this round
#[cfg(test)]
pub fn unprocessed_drand_jobs_enqueue(
storage: &mut dyn Storage,
round: u64,
Expand All @@ -52,6 +56,7 @@ pub fn unprocessed_drand_jobs_len(storage: &dyn Storage, round: u64) -> StdResul
Deque::<Job>::new(&prefix).len(storage)
}

#[cfg(test)]
pub fn all_unprocessed_drand_jobs(
storage: &dyn Storage,
order: Order,
Expand Down
132 changes: 132 additions & 0 deletions contracts/nois-gateway/src/state/drand_jobs/drand_jobs2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use cosmwasm_std::{Order, StdResult, Storage};
use cw_storage_plus::Map;

use super::Job;

/// A map from (round, job ID) here job ID is a round specific auto incrementing ID
const JOBS: Map<(u32, u16), Job> = Map::new("djobs");
const LAST_JOB_ID: Map<u32, u16> = Map::new("djids");

/// Add an element to the unprocessed drand jobs queue of this round
pub fn unprocessed_drand_jobs_enqueue(
storage: &mut dyn Storage,
round: u64,
value: &Job,
) -> StdResult<()> {
let round: u32 = round.try_into().expect("round must not exceed u32 range");
let new_id = LAST_JOB_ID.may_load(storage, round)?.unwrap_or_default() + 1;
JOBS.save(storage, (round, new_id), value)?;
LAST_JOB_ID.save(storage, round, &new_id)?;
Ok(())
}

/// Remove an element from the unprocessed drand jobs queue of this round
pub fn unprocessed_drand_jobs_dequeue(
storage: &mut dyn Storage,
round: u64,
) -> StdResult<Option<Job>> {
let round: u32 = round.try_into().expect("round must not exceed u32 range");
let first = JOBS
.prefix(round)
.range(storage, None, None, Order::Ascending)
.next();
let Some(found) = first else {
return Ok(None);
};
let (id, job) = found?;
JOBS.remove(storage, (round, id));
Ok(Some(job))
}

/// Gets the number of unprocessed drand jobs queue of this round.
/// This is inefficient for many jobs in a single round.
pub fn unprocessed_drand_jobs_len(storage: &dyn Storage, round: u64) -> StdResult<u32> {
let round: u32 = round.try_into().expect("round must not exceed u32 range");
let count = JOBS
.prefix(round)
.keys_raw(storage, None, None, Order::Ascending)
.count();
Ok(count as u32)
}

pub fn all_unprocessed_drand_jobs(
storage: &dyn Storage,
order: Order,
offset: usize,
limit: usize,
) -> StdResult<Vec<Job>> {
JOBS.range_raw(storage, None, None, order)
.skip(offset)
.take(limit)
.map(|res| res.map(|ok| ok.1))
.collect::<StdResult<Vec<_>>>()
}

#[cfg(test)]
mod tests {
use cosmwasm_std::{testing::MockStorage, Binary};

use super::*;

#[test]
fn all_unprocessed_drand_jobs_works() {
let mut storage = MockStorage::default();

let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap();
assert_eq!(jobs, []);

let job1 = Job {
channel: "chan-123".to_string(),
source_id: "drannd:foo:bar".to_string(),
origin: Binary::from([1, 2, 1, 2]),
};
unprocessed_drand_jobs_enqueue(&mut storage, 3, &job1).unwrap();

let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap();
assert_eq!(jobs, &[job1.clone()]);

let job2 = Job {
channel: "chan-123".to_string(),
source_id: "drannd:foo:baz".to_string(),
origin: Binary::from([17, 4]),
};
unprocessed_drand_jobs_enqueue(&mut storage, 3, &job2).unwrap();

let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap();
assert_eq!(jobs, &[job1.clone(), job2.clone()]);
let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 1).unwrap();
assert_eq!(jobs, &[job1.clone()]);
let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 1, 100).unwrap();
assert_eq!(jobs, &[job2.clone()]);
let jobs = all_unprocessed_drand_jobs(&storage, Order::Descending, 0, 100).unwrap();
assert_eq!(jobs, &[job2.clone(), job1.clone()]);
let jobs = all_unprocessed_drand_jobs(&storage, Order::Descending, 0, 1).unwrap();
assert_eq!(jobs, &[job2.clone()]);
let jobs = all_unprocessed_drand_jobs(&storage, Order::Descending, 1, 100).unwrap();
assert_eq!(jobs, &[job1.clone()]);

let _ = unprocessed_drand_jobs_dequeue(&mut storage, 3).unwrap();

let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap();
assert_eq!(jobs, &[job2.clone()]);

// new job in higher round
let job3 = Job {
channel: "chan-123".to_string(),
source_id: "drannd:foo:test".to_string(),
origin: Binary::from([42, 42]),
};
unprocessed_drand_jobs_enqueue(&mut storage, 4, &job3).unwrap();

// new job in lower round
let job4 = Job {
channel: "chan-123".to_string(),
source_id: "drannd:foo:test".to_string(),
origin: Binary::from([12, 21]),
};
unprocessed_drand_jobs_enqueue(&mut storage, 2, &job4).unwrap();

let jobs = all_unprocessed_drand_jobs(&storage, Order::Ascending, 0, 100).unwrap();
assert_eq!(jobs, &[job4, job2, job3]);
}
}
Loading