Skip to content

Commit

Permalink
[storage] support pagination by adding prefix_state_value_iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
lightmark committed Oct 31, 2022
1 parent 214a98c commit 71efff2
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"message": "Resource not found by Address(0xf), Struct tag(0x1::account::Account) and Ledger version(0)",
"error_code": "resource_not_found",
"message": "Account not found by Address(0xf) and Ledger version(0)",
"error_code": "account_not_found",
"vm_error_code": null
}
138 changes: 82 additions & 56 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use aptos_api_types::{
};
use aptos_types::access_path::AccessPath;
use aptos_types::account_config::AccountResource;
use aptos_types::account_state::AccountState;
use aptos_types::event::EventHandle;
use aptos_types::event::EventKey;
use aptos_types::state_store::state_key::StateKey;
Expand All @@ -31,6 +30,7 @@ use poem_openapi::{param::Path, OpenApi};
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use storage_interface::MAX_REQUEST_LIMIT;

/// API for accounts, their associated resources, and modules
pub struct AccountsApi {
Expand Down Expand Up @@ -170,29 +170,7 @@ impl Account {
/// * BCS: Return a BCS encoded version of [`AccountData`]
pub fn account(self, accept_type: &AcceptType) -> BasicResultWith404<AccountData> {
// Retrieve the Account resource and convert it accordingly
let state_key = StateKey::AccessPath(AccessPath::resource_access_path(ResourceKey::new(
self.address.into(),
AccountResource::struct_tag(),
)));

let state_value = self.context.get_state_value_poem(
&state_key,
self.ledger_version,
&self.latest_ledger_info,
)?;

let state_value = match state_value {
Some(state_value) => state_value,
None => {
// If there's no account info, then it's not found
return Err(resource_not_found(
self.address,
&AccountResource::struct_tag(),
self.ledger_version,
&self.latest_ledger_info,
));
}
};
let state_value = self.get_account_resource()?;

// Convert the AccountResource into the summary object AccountData
let account_resource: AccountResource = bcs::from_bytes(&state_value)
Expand Down Expand Up @@ -220,21 +198,55 @@ impl Account {
}
}

pub fn get_account_resource(&self) -> Result<Vec<u8>, BasicErrorWith404> {
let state_key = StateKey::AccessPath(AccessPath::resource_access_path(ResourceKey::new(
self.address.into(),
AccountResource::struct_tag(),
)));

let state_value = self.context.get_state_value_poem(
&state_key,
self.ledger_version,
&self.latest_ledger_info,
)?;

state_value.ok_or_else(|| {
account_not_found(self.address, self.ledger_version, &self.latest_ledger_info)
})
}

/// Retrieves the move resources associated with the account
///
/// * JSON: Return a JSON encoded version of [`Vec<MoveResource>`]
/// * BCS: Return a sorted BCS encoded version of BCS encoded resources [`BTreeMap<StructTag, Vec<u8>>`]
pub fn resources(self, accept_type: &AcceptType) -> BasicResultWith404<Vec<MoveResource>> {
let account_state = self.account_state()?;
let resources = account_state.get_resources();
// check account exists
self.get_account_resource()?;
let (resources, next_state_key) = self
.context
.get_resources_by_pagination(
self.address.into(),
None,
self.ledger_version,
MAX_REQUEST_LIMIT,
)
.context("Failed to get resources from storage")
.map_err(|err| {
BasicErrorWith404::internal_with_code(
err,
AptosErrorCode::InternalError,
&self.latest_ledger_info,
)
})?;
assert_eq!(next_state_key, None);

match accept_type {
AcceptType::Json => {
// Resolve the BCS encoded versions into `MoveResource`s
let move_resolver = self.context.move_resolver_poem(&self.latest_ledger_info)?;
let converted_resources = move_resolver
.as_converter(self.context.db.clone())
.try_into_resources(resources)
.try_into_resources(resources.iter().map(|(k, v)| (k.clone(), v.as_slice())))
.context("Failed to build move resource response from data in DB")
.map_err(|err| {
BasicErrorWith404::internal_with_code(
Expand All @@ -252,9 +264,7 @@ impl Account {
}
AcceptType::Bcs => {
// Put resources in a BTreeMap to ensure they're ordered the same every time
let resources: BTreeMap<StructTag, Vec<u8>> = resources
.map(|(key, value)| (key, value.to_vec()))
.collect();
let resources: BTreeMap<StructTag, Vec<u8>> = resources.into_iter().collect();
BasicResponse::try_from_bcs((
resources,
&self.latest_ledger_info,
Expand All @@ -269,14 +279,33 @@ impl Account {
/// * JSON: Return a JSON encoded version of [`Vec<MoveModuleBytecode>`] with parsed ABIs
/// * BCS: Return a sorted BCS encoded version of bytecode [`BTreeMap<MoveModuleId, Vec<u8>>`]
pub fn modules(self, accept_type: &AcceptType) -> BasicResultWith404<Vec<MoveModuleBytecode>> {
let modules = self.account_state()?.into_modules();
// check account exists
self.get_account_resource()?;
let (modules, next_state_key) = self
.context
.get_modules_by_pagination(
self.address.into(),
None,
self.ledger_version,
MAX_REQUEST_LIMIT,
)
.context("Failed to get modules from storage")
.map_err(|err| {
BasicErrorWith404::internal_with_code(
err,
AptosErrorCode::InternalError,
&self.latest_ledger_info,
)
})?;
assert_eq!(next_state_key, None);

match accept_type {
AcceptType::Json => {
// Read bytecode and parse ABIs for output
let mut converted_modules = Vec::new();
for (_, module) in modules {
converted_modules.push(
MoveModuleBytecode::new(module)
MoveModuleBytecode::new(module.clone())
.try_parse_abi()
.context("Failed to parse move module ABI")
.map_err(|err| {
Expand All @@ -297,7 +326,8 @@ impl Account {
AcceptType::Bcs => {
// Sort modules by name
let modules: BTreeMap<MoveModuleId, Vec<u8>> = modules
.map(|(key, value)| (key.into(), value.to_vec()))
.into_iter()
.map(|(key, value)| (key.into(), value))
.collect();
BasicResponse::try_from_bcs((
modules,
Expand All @@ -308,23 +338,6 @@ impl Account {
}
}

// Helpers for processing account state.

/// Retrieves the account state
pub fn account_state(&self) -> Result<AccountState, BasicErrorWith404> {
self.context
.get_account_state(
self.address.into(),
self.ledger_version,
&self.latest_ledger_info,
)?
.ok_or_else(|| {
account_not_found(self.address, self.ledger_version, &self.latest_ledger_info)
})
}

// Events specific stuff.

/// Retrieves an event key from a [`MoveStructTag`] and a [`Identifier`] field name
///
/// e.g. If there's the `CoinStore` module, it has a field named `withdraw_events` for
Expand Down Expand Up @@ -392,24 +405,37 @@ impl Account {
&self,
struct_tag: &StructTag,
) -> Result<Vec<(Identifier, MoveValue)>, BasicErrorWith404> {
let account_state = self.account_state()?;
let (typ, data) = account_state
.get_resources()
.find(|(tag, _data)| tag == struct_tag)
let state_key = StateKey::AccessPath(AccessPath::resource_access_path(ResourceKey::new(
self.address.into(),
struct_tag.clone(),
)));
let state_value_bytes = self
.context
.db
.get_state_value_by_version(&state_key, self.ledger_version)
.context("Failed to read state value from db")
.map_err(|err| {
BasicErrorWith404::internal_with_code(
err,
AptosErrorCode::InternalError,
&self.latest_ledger_info,
)
})?
.ok_or_else(|| {
resource_not_found(
self.address,
struct_tag,
self.ledger_version,
&self.latest_ledger_info,
)
})?;
})?
.into_bytes();

// Convert to fields in move struct
let move_resolver = self.context.move_resolver_poem(&self.latest_ledger_info)?;
move_resolver
.as_converter(self.context.db.clone())
.move_struct_fields(&typ, data)
.move_struct_fields(struct_tag, state_value_bytes.as_slice())
.context("Failed to convert move structs from storage")
.map_err(|err| {
BasicErrorWith404::internal_with_code(
Expand Down
112 changes: 108 additions & 4 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ use crate::response::{
block_pruned_by_height, json_api_disabled, version_not_found, version_pruned, ForbiddenError,
InternalError, NotFoundError, ServiceUnavailableError, StdApiError,
};
use anyhow::{ensure, format_err, Context as AnyhowContext, Result};
use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result};
use aptos_api_types::{
AptosErrorCode, AsConverter, BcsBlock, GasEstimation, LedgerInfo, TransactionOnChainData,
};
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_gas::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::error;
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
use aptos_state_view::StateView;
use aptos_types::access_path::{AccessPath, Path};
use aptos_types::account_config::NewBlockEvent;
use aptos_types::account_view::AccountView;
use aptos_types::on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig};
Expand All @@ -33,11 +35,12 @@ use aptos_types::{
use aptos_vm::data_cache::{IntoMoveResolver, StorageAdapter, StorageAdapterOwned};
use futures::{channel::oneshot, SinkExt};
use itertools::Itertools;
use move_core_types::language_storage::{ModuleId, StructTag};
use std::sync::RwLock;
use std::{collections::HashMap, sync::Arc};
use storage_interface::{
state_view::{DbStateView, DbStateViewAtVersion, LatestDbStateCheckpointView},
DbReader, Order,
DbReader, Order, MAX_REQUEST_LIMIT,
};

// Context holds application scope context
Expand Down Expand Up @@ -236,10 +239,111 @@ impl Context {
address: AccountAddress,
version: u64,
) -> Result<HashMap<StateKey, StateValue>> {
self.db
.get_state_values_by_key_prefix(&StateKeyPrefix::from(address), version)
let mut iter = self.db.get_prefixed_state_value_iterator(
&StateKeyPrefix::from(address),
None,
version,
)?;
let kvs = iter
.by_ref()
.take(MAX_REQUEST_LIMIT as usize)
.collect::<Result<_>>()?;
if iter.next().transpose()?.is_some() {
bail!("Too many state items under account ({:?}).", address);
}
Ok(kvs)
}

pub fn get_resources_by_pagination(
&self,
address: AccountAddress,
prev_state_key: Option<&StateKey>,
version: u64,
limit: u64,
) -> Result<(Vec<(StructTag, Vec<u8>)>, Option<StateKey>)> {
let account_iter = self.db.get_prefixed_state_value_iterator(
&StateKeyPrefix::from(address),
prev_state_key,
version,
)?;
let mut resource_iter = account_iter
.filter_map(|res| match res {
Ok((k, v)) => match k {
StateKey::AccessPath(AccessPath { address: _, path }) => {
match Path::try_from(path.as_slice()) {
Ok(Path::Resource(struct_tag)) => {
Some(Ok((struct_tag, v.into_bytes())))
}
Ok(Path::Code(_)) => None,
Err(e) => Some(Err(anyhow::Error::from(e))),
}
}
_ => {
error!("storage prefix scan return inconsistent key ({:?}) with expected key prefix ({:?}).", k, StateKeyPrefix::from(address));
Some(Err(format_err!( "storage prefix scan return inconsistent key ({:?})", k )))
}
},
Err(e) => Some(Err(e)),
})
.take(limit as usize + 1);
let kvs = resource_iter
.by_ref()
.take(limit as usize)
.collect::<Result<_>>()?;
let next_key = resource_iter.next().transpose()?.map(|(struct_tag, _v)| {
StateKey::AccessPath(AccessPath::new(
address,
AccessPath::resource_path_vec(struct_tag),
))
});
Ok((kvs, next_key))
}

pub fn get_modules_by_pagination(
&self,
address: AccountAddress,
prev_state_key: Option<&StateKey>,
version: u64,
limit: u64,
) -> Result<(Vec<(ModuleId, Vec<u8>)>, Option<StateKey>)> {
let account_iter = self.db.get_prefixed_state_value_iterator(
&StateKeyPrefix::from(address),
prev_state_key,
version,
)?;
let mut module_iter = account_iter
.filter_map(|res| match res {
Ok((k, v)) => match k {
StateKey::AccessPath(AccessPath { address: _, path }) => {
match Path::try_from(path.as_slice()) {
Ok(Path::Code(module_id)) => Some(Ok((module_id, v.into_bytes()))),
Ok(Path::Resource(_)) => None,
Err(e) => Some(Err(anyhow::Error::from(e))),
}
}
_ => {
error!("storage prefix scan return inconsistent key ({:?}) with expected key prefix ({:?}).", k, StateKeyPrefix::from(address));
Some(Err(format_err!( "storage prefix scan return inconsistent key ({:?})", k )))
}
},
Err(e) => Some(Err(e)),
})
.take(limit as usize + 1);
let kvs = module_iter
.by_ref()
.take(limit as usize)
.collect::<Result<_>>()?;
let next_key = module_iter.next().transpose()?.map(|(module_id, _v)| {
StateKey::AccessPath(AccessPath::new(
address,
AccessPath::code_path_vec(module_id),
))
});
Ok((kvs, next_key))
}

// This function should be deprecated. DO NOT USE it.
// Instead, call either `get_modules_by_pagination` or `get_modules_by_pagination`.
pub fn get_account_state<E: InternalError>(
&self,
address: AccountAddress,
Expand Down
2 changes: 1 addition & 1 deletion api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl EventsApi {

// Ensure that account exists
let account = Account::new(self.context.clone(), address.0, None)?;
account.account_state()?;
account.get_account_resource()?;
self.list(
account.latest_ledger_info,
accept_type,
Expand Down
Loading

0 comments on commit 71efff2

Please sign in to comment.