From 68a9f76c9f23c85b45ce8a7d5aa329d4bbe4d9a5 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 18 Feb 2022 16:26:22 +0400 Subject: [PATCH] feat(validator-node): committee proposes genesis block w/ instructions (#3844) Description --- - committee proposes genesis block/node with initial instructions as per template id - state db is aware of which asset it is processing - minor cleanup of some generics - error handling Motivation and Context --- The committee can propose a genesis block with init instructions and agree on a genesis block and genesis merkle root How Has This Been Tested? --- Manually --- .../web-app/package-lock.json | 38 ++- .../tari_collectibles/web-app/src/App.js | 50 ++-- .../tari_validator_node/src/dan_node.rs | 12 +- .../src/grpc/validator_node_grpc_server.rs | 6 +- .../src/p2p/rpc/service_impl.rs | 27 +- .../services/inbound_connection_service.rs | 29 +- .../services/outbound_connection_service.rs | 15 +- .../configuration/validator_node_config.rs | 9 + dan_layer/core/src/digital_assets_error.rs | 8 + dan_layer/core/src/fixed_hash.rs | 2 +- .../core/src/models/hot_stuff_tree_node.rs | 4 +- dan_layer/core/src/models/instruction_set.rs | 41 ++- dan_layer/core/src/models/view_id.rs | 18 +- .../core/src/services/asset_processor.rs | 134 +++------ .../inbound_connection_service.rs | 9 +- .../infrastructure_services/mocks/mod.rs | 10 +- .../outbound_service.rs | 17 +- dan_layer/core/src/services/mocks/mod.rs | 28 +- .../core/src/services/payload_processor.rs | 19 +- .../core/src/services/payload_provider.rs | 23 +- .../src/services/service_specification.rs | 7 +- .../storage/chain/chain_db_unit_of_work.rs | 10 + dan_layer/core/src/storage/mocks/mod.rs | 4 +- dan_layer/core/src/storage/state/state_db.rs | 20 +- .../storage/state/state_db_unit_of_work.rs | 58 ++-- .../core/src/templates/tip002_template.rs | 58 ++-- .../core/src/templates/tip004_template.rs | 34 ++- .../core/src/templates/tip721_template.rs | 32 ++- .../core/src/workers/consensus_worker.rs | 15 +- .../core/src/workers/states/commit_state.rs | 67 +++-- .../core/src/workers/states/decide_state.rs | 104 +++---- .../core/src/workers/states/next_view.rs | 57 ++-- .../src/workers/states/pre_commit_state.rs | 82 +++--- dan_layer/core/src/workers/states/prepare.rs | 265 ++++++++++-------- dan_layer/core/src/workers/states/starting.rs | 60 +--- .../src/sqlite_chain_backend_adapter.rs | 8 +- .../storage_sqlite/src/sqlite_db_factory.rs | 10 +- 37 files changed, 745 insertions(+), 645 deletions(-) diff --git a/applications/tari_collectibles/web-app/package-lock.json b/applications/tari_collectibles/web-app/package-lock.json index f67cdad75c..a79f8ef0ac 100644 --- a/applications/tari_collectibles/web-app/package-lock.json +++ b/applications/tari_collectibles/web-app/package-lock.json @@ -12,7 +12,7 @@ "@emotion/styled": "^11.3.0", "@mui/icons-material": "^5.0.3", "@mui/material": "^5.0.3", - "@tauri-apps/api": "^1.0.0-beta.8", + "@tauri-apps/api": "^1.0.0-rc.1", "@testing-library/jest-dom": "^5.14.1", "@testing-library/react": "^11.2.7", "@testing-library/user-event": "^12.8.3", @@ -3501,9 +3501,12 @@ } }, "node_modules/@tauri-apps/api": { - "version": "1.0.0-beta.8", - "resolved": "https://registry.npmjs.org/@tauri-apps/api/-/api-1.0.0-beta.8.tgz", - "integrity": "sha512-a56lXB7XvQ4+fKtT0pxpkjTSKhyrQ1Vmjyvt2ox3mT9xw3l7s8IOKHJ1WuqW6TA6xdoy3Cyja3Z3prw8hflS7g==", + "version": "1.0.0-rc.1", + "resolved": "https://registry.npmjs.org/@tauri-apps/api/-/api-1.0.0-rc.1.tgz", + "integrity": "sha512-VBUOmfT8ea02JB/Qr+FHeaLnug5BRA7ro2pX47q0GZCbZsU9b+iPnOXl0ShJwT0melR7B6iamyhDwkHStMVfQA==", + "dependencies": { + "type-fest": "2.11.2" + }, "engines": { "node": ">= 12.13.0", "npm": ">= 6.6.0", @@ -3514,6 +3517,17 @@ "url": "https://opencollective.com/tauri" } }, + "node_modules/@tauri-apps/api/node_modules/type-fest": { + "version": "2.11.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-2.11.2.tgz", + "integrity": "sha512-reW2Y2Mpn0QNA/5fvtm5doROLwDPu2zOm5RtY7xQQS05Q7xgC8MOZ3yPNaP9m/s/sNjjFQtHo7VCNqYW2iI+Ig==", + "engines": { + "node": ">=12.20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/@testing-library/dom": { "version": "7.31.2", "resolved": "https://registry.npmjs.org/@testing-library/dom/-/dom-7.31.2.tgz", @@ -24302,9 +24316,19 @@ } }, "@tauri-apps/api": { - "version": "1.0.0-beta.8", - "resolved": "https://registry.npmjs.org/@tauri-apps/api/-/api-1.0.0-beta.8.tgz", - "integrity": "sha512-a56lXB7XvQ4+fKtT0pxpkjTSKhyrQ1Vmjyvt2ox3mT9xw3l7s8IOKHJ1WuqW6TA6xdoy3Cyja3Z3prw8hflS7g==" + "version": "1.0.0-rc.1", + "resolved": "https://registry.npmjs.org/@tauri-apps/api/-/api-1.0.0-rc.1.tgz", + "integrity": "sha512-VBUOmfT8ea02JB/Qr+FHeaLnug5BRA7ro2pX47q0GZCbZsU9b+iPnOXl0ShJwT0melR7B6iamyhDwkHStMVfQA==", + "requires": { + "type-fest": "2.11.2" + }, + "dependencies": { + "type-fest": { + "version": "2.11.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-2.11.2.tgz", + "integrity": "sha512-reW2Y2Mpn0QNA/5fvtm5doROLwDPu2zOm5RtY7xQQS05Q7xgC8MOZ3yPNaP9m/s/sNjjFQtHo7VCNqYW2iI+Ig==" + } + } }, "@testing-library/dom": { "version": "7.31.2", diff --git a/applications/tari_collectibles/web-app/src/App.js b/applications/tari_collectibles/web-app/src/App.js index e10288d406..20a74022a3 100644 --- a/applications/tari_collectibles/web-app/src/App.js +++ b/applications/tari_collectibles/web-app/src/App.js @@ -122,9 +122,23 @@ const AccountsMenu = (props) => { const [accounts, setAccounts] = useState([]); const [error, setError] = useState(""); - useEffect(() => { - async function inner() { - console.log("refreshing accounts"); + useEffect(async () => { + console.log("refreshing accounts"); + setError(""); + await binding + .command_asset_wallets_list() + .then((accounts) => { + console.log("accounts", accounts); + setAccounts(accounts); + }) + .catch((e) => { + // todo error handling + console.error("accounts_list error:", e); + setError(e.message); + }); + + await listen("asset_wallets::updated", (event) => { + console.log("accounts have changed"); setError(""); binding .command_asset_wallets_list() @@ -137,24 +151,7 @@ const AccountsMenu = (props) => { console.error("accounts_list error:", e); setError(e.message); }); - - await listen("asset_wallets::updated", (event) => { - console.log("accounts have changed"); - setError(""); - binding - .command_asset_wallets_list() - .then((accounts) => { - console.log("accounts", accounts); - setAccounts(accounts); - }) - .catch((e) => { - // todo error handling - console.error("accounts_list error:", e); - setError(e.message); - }); - }); - } - inner(); + }); }, [props.walletId]); // todo: hide accounts when not authenticated @@ -211,7 +208,8 @@ ProtectedRoute.propTypes = { }; function App() { - const [loading, setLoading] = useState(true); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); const [authenticated, setAuthenticated] = useState(false); const [walletId, setWalletId] = useState(""); const setPassword = useState("")[1]; @@ -223,9 +221,13 @@ function App() { binding .command_create_db() .then((r) => setLoading(false)) - .catch((e) => console.error(e)); + .catch((e) => { + setLoading(false); + setError(e); + }); }, []); if (loading) return ; + if (error) return {error.toString()}; return (
@@ -246,7 +248,7 @@ function App() { to="/dashboard" icon={} /> - + Issued Assets rpc::validator_node_ .get_state_db(&asset_public_key) .map_err(|e| Status::internal(format!("Could not create state db: {}", e)))? { - let mut state_db_reader = state.reader(); + let state_db_reader = state.reader(); + let instruction = Instruction::new(template_id, request.method, request.args); let response_bytes = self .asset_processor - .invoke_read_method(template_id, request.method, &request.args, &mut state_db_reader) + .invoke_read_method(&instruction, &state_db_reader) .map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?; Ok(Response::new(rpc::InvokeReadMethodResponse { result: response_bytes.unwrap_or_default(), diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 9a7d4947bf..b6a1ceba8c 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -20,7 +20,7 @@ // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR // OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH // DAMAGE. -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use log::*; use tari_common_types::types::PublicKey; @@ -30,7 +30,7 @@ use tari_comms::{ }; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::{Instruction, TemplateId, TreeNodeHash}, + models::{Instruction, TreeNodeHash}, services::{AssetProcessor, MempoolService}, storage::{state::StateDbUnitOfWorkReader, DbFactory}, }; @@ -92,15 +92,19 @@ where .map_err(|e| RpcStatus::general(format!("Could not create state db: {}", e)))? .ok_or_else(|| RpcStatus::not_found("This node does not process this asset".to_string()))?; - let mut unit_of_work = state.reader(); + let unit_of_work = state.reader(); + + let instruction = Instruction::new( + request + .template_id + .try_into() + .map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, + request.method, + request.args, + ); let response_bytes = self .asset_processor - .invoke_read_method( - TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, - request.method, - &request.args, - &mut unit_of_work, - ) + .invoke_read_method(&instruction, &unit_of_work) .map_err(|e| RpcStatus::general(format!("Could not invoke read method: {}", e)))?; Ok(Response::new(proto::InvokeReadMethodResponse { @@ -115,7 +119,10 @@ where dbg!(&request); let request = request.into_message(); let instruction = Instruction::new( - TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, + request + .template_id + .try_into() + .map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, request.method.clone(), request.args.clone(), /* TokenId(request.token_id.clone()), diff --git a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs index ba610ac9ce..0919ca9fd9 100644 --- a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs @@ -156,25 +156,29 @@ impl TariCommsInboundConnectionService { } => { // Check for already received messages let mut indexes_to_remove = vec![]; - let now = Instant::now(); let mut result_message = None; - for (index, message) in self.buffered_messages.iter().enumerate() { - if now - message.2 > self.expiry_time { - warn!(target: LOG_TARGET, "Message has expired: {:?}", message); + for (index, (from_pk, message, msg_time)) in self.buffered_messages.iter().enumerate() { + if msg_time.elapsed() > self.expiry_time { + warn!( + target: LOG_TARGET, + "Message has expired: ({:.2?}) {:?}", + msg_time.elapsed(), + message + ); indexes_to_remove.push(index); } else { match wait_for_type { WaitForMessageType::Message => { - if message.1.message_type() == message_type && message.1.view_number() == view_number { - result_message = Some((message.0.clone(), message.1.clone())); + if message.message_type() == message_type && message.view_number() == view_number { + result_message = Some((from_pk.clone(), message.clone())); indexes_to_remove.push(index); break; } }, WaitForMessageType::QuorumCertificate => { - if let Some(qc) = message.1.justify() { + if let Some(qc) = message.justify() { if qc.message_type() == message_type && qc.view_number() == view_number { - result_message = Some((message.0.clone(), message.1.clone())); + result_message = Some((from_pk.clone(), message.clone())); indexes_to_remove.push(index); break; } @@ -256,10 +260,10 @@ impl TariCommsInboundConnectionService { "Found waiter for this message, waking task... {:?}", message.message_type() ); - if let Some(w) = self.waiters.swap_remove_back(index) { + if let Some((_, _, _, reply)) = self.waiters.swap_remove_back(index) { // The receiver on the other end of this channel may have dropped naturally // as it moves out of scope and is not longer interested in receiving the message - if w.3.send((from.clone(), message.clone())).is_ok() { + if reply.send((from.clone(), message.clone())).is_ok() { return Ok(()); } } @@ -291,7 +295,10 @@ impl TariCommsInboundReceiverHandle { } #[async_trait] -impl InboundConnectionService for TariCommsInboundReceiverHandle { +impl InboundConnectionService for TariCommsInboundReceiverHandle { + type Addr = CommsPublicKey; + type Payload = TariDanPayload; + async fn wait_for_message( &self, message_type: HotStuffMessageType, diff --git a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs index f90935f3b1..99ca325722 100644 --- a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs @@ -37,6 +37,8 @@ use tokio::sync::mpsc::Sender; use crate::p2p::proto; +const LOG_TARGET: &str = "tari::validator_node::messages::outbound::validator_node"; + pub struct TariCommsOutboundService { outbound_message_requester: OutboundMessageRequester, loopback_service: Sender<(CommsPublicKey, HotStuffMessage)>, @@ -61,16 +63,25 @@ impl TariCommsOutboundService { } #[async_trait] -impl OutboundService for TariCommsOutboundService { +impl OutboundService for TariCommsOutboundService { + type Addr = CommsPublicKey; + type Payload = TariDanPayload; + async fn send( &mut self, from: CommsPublicKey, to: CommsPublicKey, message: HotStuffMessage, ) -> Result<(), DigitalAssetError> { - debug!(target: "messages::outbound::validator_node", "Outbound message to be sent:{} {:?}", to, message); + debug!(target: LOG_TARGET, "Outbound message to be sent:{} {:?}", to, message); // Tari comms does allow sending to itself if from == to && message.asset_public_key() == &self.asset_public_key { + debug!( + target: LOG_TARGET, + "Sending {:?} to self for asset {}", + message.message_type(), + message.asset_public_key() + ); self.loopback_service.send((from, message)).await.unwrap(); return Ok(()); } diff --git a/common/src/configuration/validator_node_config.rs b/common/src/configuration/validator_node_config.rs index 6ca54043b8..cf3895c196 100644 --- a/common/src/configuration/validator_node_config.rs +++ b/common/src/configuration/validator_node_config.rs @@ -41,11 +41,20 @@ pub struct ValidatorNodeConfig { pub base_node_grpc_address: SocketAddr, #[serde(default = "default_wallet_grpc_address")] pub wallet_grpc_address: SocketAddr, + #[serde(default = "default_true")] pub scan_for_assets: bool, + #[serde(default = "default_asset_scanning_interval")] pub new_asset_scanning_interval: u64, pub assets_allow_list: Option>, } +fn default_true() -> bool { + true +} +fn default_asset_scanning_interval() -> u64 { + 10 +} + fn default_asset_config_directory() -> PathBuf { PathBuf::from("assets") } diff --git a/dan_layer/core/src/digital_assets_error.rs b/dan_layer/core/src/digital_assets_error.rs index 65fcf6d072..e697c1a823 100644 --- a/dan_layer/core/src/digital_assets_error.rs +++ b/dan_layer/core/src/digital_assets_error.rs @@ -76,6 +76,14 @@ pub enum DigitalAssetError { StateSyncError(#[from] StateSyncError), #[error("Validator node client error: {0}")] ValidatorNodeClientError(#[from] ValidatorNodeClientError), + #[error("Peer did not send a quorum certificate in prepare phase")] + PreparePhaseNoQuorumCertificate, + #[error("Quorum certificate does not extend node")] + PreparePhaseCertificateDoesNotExtendNode, + #[error("Node not safe")] + PreparePhaseNodeNotSafe, + #[error("Unsupported template method {name}")] + TemplateUnsupportedMethod { name: String }, } impl From for DigitalAssetError { diff --git a/dan_layer/core/src/fixed_hash.rs b/dan_layer/core/src/fixed_hash.rs index ea642e2c3f..400bb5d6c2 100644 --- a/dan_layer/core/src/fixed_hash.rs +++ b/dan_layer/core/src/fixed_hash.rs @@ -31,7 +31,7 @@ const ZERO_HASH: [u8; FixedHash::byte_size()] = [0u8; FixedHash::byte_size()]; #[error("Invalid size")] pub struct FixedHashSizeError; -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] pub struct FixedHash([u8; FixedHash::byte_size()]); impl FixedHash { diff --git a/dan_layer/core/src/models/hot_stuff_tree_node.rs b/dan_layer/core/src/models/hot_stuff_tree_node.rs index 19d319ad16..9478168c22 100644 --- a/dan_layer/core/src/models/hot_stuff_tree_node.rs +++ b/dan_layer/core/src/models/hot_stuff_tree_node.rs @@ -47,12 +47,12 @@ impl HotStuffTreeNode { s } - pub fn genesis(payload: TPayload) -> HotStuffTreeNode { + pub fn genesis(payload: TPayload, state_root: StateRoot) -> HotStuffTreeNode { let mut s = Self { parent: TreeNodeHash::zero(), payload, hash: TreeNodeHash::zero(), - state_root: StateRoot::initial(), + state_root, height: 0, }; s.hash = s.calculate_hash(); diff --git a/dan_layer/core/src/models/instruction_set.rs b/dan_layer/core/src/models/instruction_set.rs index 1e9d6a7cd0..0ef6a50b22 100644 --- a/dan_layer/core/src/models/instruction_set.rs +++ b/dan_layer/core/src/models/instruction_set.rs @@ -20,15 +20,24 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{hash::Hash, iter::FromIterator}; +use std::{convert::TryFrom, hash::Hash, iter::FromIterator}; use tari_crypto::common::Blake256; use tari_mmr::MerkleMountainRange; -use crate::models::{ConsensusHash, Instruction}; +use crate::{ + fixed_hash::FixedHash, + models::{ConsensusHash, Instruction}, +}; #[derive(PartialEq, Clone, Debug, Hash)] -pub struct InstructionSetHash(Vec); +pub struct InstructionSetHash(FixedHash); + +impl InstructionSetHash { + pub fn zero() -> InstructionSetHash { + Self(FixedHash::zero()) + } +} impl InstructionSetHash { pub fn as_bytes(&self) -> &[u8] { @@ -36,6 +45,12 @@ impl InstructionSetHash { } } +impl From for InstructionSetHash { + fn from(hash: FixedHash) -> Self { + Self(hash) + } +} + // TODO: Implement hash properly #[allow(clippy::derive_hash_xor_eq)] #[derive(Clone, Debug)] @@ -52,7 +67,7 @@ impl InstructionSet { pub fn from_vec(instructions: Vec) -> Self { let mut result = Self { instructions, - hash: InstructionSetHash(vec![]), + hash: InstructionSetHash::zero(), }; result.hash = result.calculate_hash(); result @@ -65,7 +80,7 @@ impl InstructionSet { mmr.push(instruction.calculate_hash().to_vec()).unwrap(); } - InstructionSetHash(mmr.get_merkle_root().unwrap()) + FixedHash::try_from(mmr.get_merkle_root().unwrap()).unwrap().into() } pub fn instructions(&self) -> &[Instruction] { @@ -91,3 +106,19 @@ impl ConsensusHash for InstructionSet { self.hash.as_bytes() } } + +impl IntoIterator for InstructionSet { + type IntoIter = as IntoIterator>::IntoIter; + type Item = Instruction; + + fn into_iter(self) -> Self::IntoIter { + self.instructions.into_iter() + } +} + +impl Extend for InstructionSet { + fn extend>(&mut self, iter: T) { + self.instructions.extend(iter); + self.hash = self.calculate_hash(); + } +} diff --git a/dan_layer/core/src/models/view_id.rs b/dan_layer/core/src/models/view_id.rs index bc60c079ed..0f9459e3c3 100644 --- a/dan_layer/core/src/models/view_id.rs +++ b/dan_layer/core/src/models/view_id.rs @@ -23,7 +23,7 @@ use std::{ cmp::Ordering, fmt::{self, Display}, - ops::Sub, + ops::{Add, Sub}, }; #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -34,6 +34,10 @@ impl ViewId { (self.0 % committee_size as u64) as usize } + pub fn is_genesis(&self) -> bool { + self.0 == 0 + } + pub fn next(&self) -> ViewId { ViewId(self.0 + 1) } @@ -41,6 +45,10 @@ impl ViewId { pub fn as_u64(&self) -> u64 { self.0 } + + pub fn saturating_sub(self, other: ViewId) -> ViewId { + self.0.saturating_sub(other.0).into() + } } impl PartialOrd for ViewId { @@ -61,6 +69,14 @@ impl Display for ViewId { } } +impl Add for ViewId { + type Output = ViewId; + + fn add(self, rhs: Self) -> Self::Output { + ViewId(self.0 + rhs.0) + } +} + impl Sub for ViewId { type Output = ViewId; diff --git a/dan_layer/core/src/services/asset_processor.rs b/dan_layer/core/src/services/asset_processor.rs index 8d15ebd3ca..7015e28c97 100644 --- a/dan_layer/core/src/services/asset_processor.rs +++ b/dan_layer/core/src/services/asset_processor.rs @@ -26,20 +26,13 @@ use tari_core::transactions::transaction_components::TemplateParameter; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, Instruction, TemplateId}, + models::{Instruction, InstructionSet, TemplateId}, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, template_command::ExecutionResult, templates::{tip002_template, tip004_template, tip721_template}, }; pub trait AssetProcessor: Sync + Send + 'static { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError>; - // purposefully made sync, because instructions should be run in order, and complete before the // next one starts. There may be a better way to enforce this though... fn execute_instruction( @@ -50,10 +43,8 @@ pub trait AssetProcessor: Sync + Send + 'static { fn invoke_read_method( &self, - template_id: TemplateId, - method: String, - args: &[u8], - state_db: &mut TUnitOfWorkReader, + instruction: &Instruction, + state_db: &TUnitOfWorkReader, ) -> Result>, DigitalAssetError>; } @@ -63,108 +54,75 @@ pub struct ConcreteAssetProcessor { } impl AssetProcessor for ConcreteAssetProcessor { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - self.template_factory - .init(template_parameter, asset_definition, state_db) - } - fn execute_instruction( &self, instruction: &Instruction, - db: &mut TUnitOfWork, + state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { - self.execute( - instruction.template_id(), - instruction.method().to_owned(), - instruction.args().into(), - // InstructionCaller { - // owner_token_id: instruction.from_owner().to_owned(), - // }, - db, - ) + self.template_factory.invoke_write_method(instruction, state_db) } fn invoke_read_method( &self, - template_id: TemplateId, - method: String, - args: &[u8], - state_db: &mut TUnitOfWork, + instruction: &Instruction, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { + self.template_factory.invoke_read_method(instruction, state_db) + } +} + +#[derive(Default, Clone)] +pub struct TemplateFactory {} + +impl TemplateFactory { + pub fn initial_instructions(&self, template_param: &TemplateParameter) -> InstructionSet { + use TemplateId::*; + // TODO: We may want to use the TemplateId type, so that we know it is known/valid + let template_id = template_param.template_id.try_into().unwrap(); match template_id { - TemplateId::Tip002 => tip002_template::invoke_read_method(method, args, state_db), - TemplateId::Tip004 => tip004_template::invoke_read_method(method, args, state_db), - TemplateId::Tip721 => tip721_template::invoke_read_method(method, args, state_db), - _ => { + Tip002 => tip002_template::initial_instructions(template_param), + Tip003 => todo!(), + Tip004 => tip004_template::initial_instructions(template_param), + Tip721 => tip721_template::initial_instructions(template_param), + EditableMetadata => { todo!() }, } } -} -impl ConcreteAssetProcessor { - pub fn execute( + pub fn invoke_read_method( &self, - template_id: TemplateId, - method: String, - args: Vec, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - match template_id { - TemplateId::Tip002 => { - tip002_template::invoke_method(method, &args, state_db)?; - }, - TemplateId::Tip004 => { - tip004_template::invoke_method(method, &args, state_db)?; - }, - TemplateId::Tip721 => { - tip721_template::invoke_method(method, &args, state_db)?; - }, - _ => { + instruction: &Instruction, + state_db: &TUnitOfWork, + ) -> Result>, DigitalAssetError> { + use TemplateId::*; + match instruction.template_id() { + Tip002 => tip002_template::invoke_read_method(instruction.method(), instruction.args(), state_db), + Tip003 => todo!(), + Tip004 => tip004_template::invoke_read_method(instruction.method(), instruction.args(), state_db), + Tip721 => tip721_template::invoke_read_method(instruction.method(), instruction.args(), state_db), + EditableMetadata => { todo!() }, } - // let instruction = self.template_factory.create_command(template_id, method, args)?; - // let unit_of_work = state_db.new_unit_of_work(); - // let result = instruction.try_execute(db)?; - // unit_of_work.commit()?; - // self.instruction_log.store(hash, result); - // Ok(()) - Ok(()) } -} -#[derive(Default, Clone)] -pub struct TemplateFactory {} - -impl TemplateFactory { - pub fn init( + pub fn invoke_write_method( &self, - template: &TemplateParameter, - asset_definition: &AssetDefinition, + instruction: &Instruction, state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { - match template.template_id.try_into()? { - TemplateId::Tip002 => tip002_template::init(template, asset_definition, state_db)?, - _ => unimplemented!(), + use TemplateId::*; + match instruction.template_id() { + Tip002 => tip002_template::invoke_write_method(instruction.method(), instruction.args(), state_db), + Tip003 => todo!(), + Tip004 => tip004_template::invoke_write_method(instruction.method(), instruction.args(), state_db), + Tip721 => tip721_template::invoke_write_method(instruction.method(), instruction.args(), state_db), + EditableMetadata => { + todo!() + }, } - Ok(()) } - - // pub fn create_command( - // &self, - // _template: TemplateId, - // _method: String, - // _args: VecDeque>, - // // caller: InstructionCaller, - // ) -> Result<(), DigitalAssetError> { - // todo!() - // } } pub trait InstructionLog { diff --git a/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs b/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs index cb8456ca76..99461f5be8 100644 --- a/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs +++ b/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs @@ -29,16 +29,19 @@ use crate::{ }; #[async_trait] -pub trait InboundConnectionService { +pub trait InboundConnectionService { + type Addr: NodeAddressable; + type Payload: Payload; + async fn wait_for_message( &self, message_type: HotStuffMessageType, for_view: ViewId, - ) -> Result<(TAddr, HotStuffMessage), DigitalAssetError>; + ) -> Result<(Self::Addr, HotStuffMessage), DigitalAssetError>; async fn wait_for_qc( &self, message_type: HotStuffMessageType, for_view: ViewId, - ) -> Result<(TAddr, HotStuffMessage), DigitalAssetError>; + ) -> Result<(Self::Addr, HotStuffMessage), DigitalAssetError>; } diff --git a/dan_layer/core/src/services/infrastructure_services/mocks/mod.rs b/dan_layer/core/src/services/infrastructure_services/mocks/mod.rs index 62279c90bd..35fbe1198b 100644 --- a/dan_layer/core/src/services/infrastructure_services/mocks/mod.rs +++ b/dan_layer/core/src/services/infrastructure_services/mocks/mod.rs @@ -46,9 +46,12 @@ pub struct MockInboundConnectionService InboundConnectionService +impl InboundConnectionService for MockInboundConnectionService { + type Addr = TAddr; + type Payload = TPayload; + async fn wait_for_message( &self, _message_type: HotStuffMessageType, @@ -125,9 +128,12 @@ use std::fmt::Debug; use crate::models::{HotStuffMessageType, Payload, ViewId}; #[async_trait] -impl OutboundService +impl OutboundService for MockOutboundService { + type Addr = TAddr; + type Payload = TPayload; + async fn send( &mut self, from: TAddr, diff --git a/dan_layer/core/src/services/infrastructure_services/outbound_service.rs b/dan_layer/core/src/services/infrastructure_services/outbound_service.rs index c00bf494cf..3702ab246a 100644 --- a/dan_layer/core/src/services/infrastructure_services/outbound_service.rs +++ b/dan_layer/core/src/services/infrastructure_services/outbound_service.rs @@ -29,18 +29,21 @@ use crate::{ }; #[async_trait] -pub trait OutboundService { +pub trait OutboundService { + type Addr: NodeAddressable + Send; + type Payload: Payload; + async fn send( &mut self, - from: TAddr, - to: TAddr, - message: HotStuffMessage, + from: Self::Addr, + to: Self::Addr, + message: HotStuffMessage, ) -> Result<(), DigitalAssetError>; async fn broadcast( &mut self, - from: TAddr, - committee: &[TAddr], - message: HotStuffMessage, + from: Self::Addr, + committee: &[Self::Addr], + message: HotStuffMessage, ) -> Result<(), DigitalAssetError>; } diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index 36db7254da..d18914c4e0 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -28,7 +28,6 @@ use std::{ use async_trait::async_trait; use tari_common_types::types::PublicKey; -use tari_core::transactions::transaction_components::TemplateParameter; use super::CommitteeManager; use crate::{ @@ -44,7 +43,6 @@ use crate::{ Payload, Signature, StateRoot, - TemplateId, TreeNodeHash, }, services::{ @@ -114,7 +112,7 @@ impl PayloadProvider for MockStaticPayloadProvider< Ok(self.static_payload.clone()) } - fn create_genesis_payload(&self) -> TPayload { + fn create_genesis_payload(&self, _: &AssetDefinition) -> TPayload { self.static_payload.clone() } @@ -257,15 +255,6 @@ pub struct MockPayloadProcessor {} #[async_trait] impl PayloadProcessor for MockPayloadProcessor { - fn init_template( - &self, - _template_parameter: &TemplateParameter, - _asset_definition: &AssetDefinition, - _state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - todo!() - } - async fn process_payload( &self, _payload: &TPayload, @@ -279,15 +268,6 @@ impl PayloadProcessor for MockPayloadProcessor { pub struct MockAssetProcessor; impl AssetProcessor for MockAssetProcessor { - fn init_template( - &self, - _template_parameter: &TemplateParameter, - _asset_definition: &AssetDefinition, - _state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - todo!() - } - fn execute_instruction( &self, _instruction: &Instruction, @@ -298,10 +278,8 @@ impl AssetProcessor for MockAssetProcessor { fn invoke_read_method( &self, - _template_id: TemplateId, - _method: String, - _args: &[u8], - _state_db: &mut TUnifOfWork, + _instruction: &Instruction, + _state_db: &TUnifOfWork, ) -> Result>, DigitalAssetError> { todo!() } diff --git a/dan_layer/core/src/services/payload_processor.rs b/dan_layer/core/src/services/payload_processor.rs index c2c79daeda..177c50357a 100644 --- a/dan_layer/core/src/services/payload_processor.rs +++ b/dan_layer/core/src/services/payload_processor.rs @@ -21,23 +21,16 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use async_trait::async_trait; -use tari_core::transactions::transaction_components::TemplateParameter; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, Payload, StateRoot, TariDanPayload}, + models::{Payload, StateRoot, TariDanPayload}, services::AssetProcessor, storage::state::StateDbUnitOfWork, }; #[async_trait] pub trait PayloadProcessor { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError>; async fn process_payload( &self, payload: &TPayload, @@ -61,16 +54,6 @@ impl TariDanPayloadProcessor { impl PayloadProcessor for TariDanPayloadProcessor { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - self.asset_processor - .init_template(template_parameter, asset_definition, state_db) - } - async fn process_payload( &self, payload: &TariDanPayload, diff --git a/dan_layer/core/src/services/payload_provider.rs b/dan_layer/core/src/services/payload_provider.rs index 734e5cdcee..a2b34b84b8 100644 --- a/dan_layer/core/src/services/payload_provider.rs +++ b/dan_layer/core/src/services/payload_provider.rs @@ -24,14 +24,14 @@ use async_trait::async_trait; use crate::{ digital_assets_error::DigitalAssetError, - models::{InstructionSet, Payload, TariDanPayload, TreeNodeHash}, - services::MempoolService, + models::{AssetDefinition, InstructionSet, Payload, TariDanPayload, TreeNodeHash}, + services::{asset_processor::TemplateFactory, MempoolService}, }; #[async_trait] pub trait PayloadProvider { async fn create_payload(&self) -> Result; - fn create_genesis_payload(&self) -> TPayload; + fn create_genesis_payload(&self, asset_definition: &AssetDefinition) -> TPayload; async fn get_payload_queue(&self) -> usize; async fn reserve_payload( &mut self, @@ -41,13 +41,17 @@ pub trait PayloadProvider { async fn remove_payload(&mut self, reservation_key: &TreeNodeHash) -> Result<(), DigitalAssetError>; } -pub struct TariDanPayloadProvider { +pub struct TariDanPayloadProvider { mempool: TMempoolService, + template_factory: TemplateFactory, } impl TariDanPayloadProvider { pub fn new(mempool: TMempoolService) -> Self { - Self { mempool } + Self { + mempool, + template_factory: TemplateFactory {}, + } } } @@ -60,8 +64,13 @@ impl PayloadProvider for TariDa Ok(TariDanPayload::new(instruction_set, None)) } - fn create_genesis_payload(&self) -> TariDanPayload { - TariDanPayload::new(InstructionSet::empty(), None) + fn create_genesis_payload(&self, asset_definition: &AssetDefinition) -> TariDanPayload { + let mut instruction_set = InstructionSet::empty(); + for params in asset_definition.template_parameters.iter() { + let instructions = self.template_factory.initial_instructions(params); + instruction_set.extend(instructions); + } + TariDanPayload::new(instruction_set, None) } async fn get_payload_queue(&self) -> usize { diff --git a/dan_layer/core/src/services/service_specification.rs b/dan_layer/core/src/services/service_specification.rs index 20c057ed0b..ec7a0da79a 100644 --- a/dan_layer/core/src/services/service_specification.rs +++ b/dan_layer/core/src/services/service_specification.rs @@ -53,9 +53,12 @@ pub trait ServiceSpecification: Clone { type CommitteeManager: CommitteeManager; type DbFactory: DbFactory + Clone + Sync + Send + 'static; type EventsPublisher: EventsPublisher; - type InboundConnectionService: InboundConnectionService + 'static + Send + Sync; + type InboundConnectionService: InboundConnectionService + + 'static + + Send + + Sync; type MempoolService: MempoolService + Clone + Sync + Send + 'static; - type OutboundService: OutboundService; + type OutboundService: OutboundService; type Payload: Payload; type PayloadProcessor: PayloadProcessor; type PayloadProvider: PayloadProvider; diff --git a/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs b/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs index db812fe1df..1f13a37b5c 100644 --- a/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs +++ b/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs @@ -26,6 +26,8 @@ use std::{ sync::{Arc, RwLock}, }; +use log::*; + use crate::{ models::{Instruction, Node, QuorumCertificate, TreeNodeHash}, storage::{ @@ -35,6 +37,8 @@ use crate::{ }, }; +const LOG_TARGET: &str = "tari::dan::chain_db::unit_of_work"; + pub trait ChainDbUnitOfWork: Clone + Send + Sync { fn commit(&mut self) -> Result<(), StorageError>; fn add_node(&mut self, hash: TreeNodeHash, parent: TreeNodeHash, height: u32) -> Result<(), StorageError>; @@ -212,6 +216,12 @@ impl ChainDbUnitOfWork for ChainDbUnitOf true, )); } + + debug!( + target: LOG_TARGET, + "Marking proposed node '{}' as committed", + qc.node_hash() + ); let found_node = inner.find_proposed_node(qc.node_hash())?; let mut node = found_node.1.get_mut(); let mut n = node.deref_mut(); diff --git a/dan_layer/core/src/storage/mocks/mod.rs b/dan_layer/core/src/storage/mocks/mod.rs index d8c3d6bf00..fd5aa0700e 100644 --- a/dan_layer/core/src/storage/mocks/mod.rs +++ b/dan_layer/core/src/storage/mocks/mod.rs @@ -85,7 +85,7 @@ impl DbFactory for MockDbFactory { .unwrap() .get(asset_public_key) .cloned() - .map(StateDb::new)) + .map(|db| StateDb::new(asset_public_key.clone(), db))) } fn get_or_create_state_db( @@ -99,7 +99,7 @@ impl DbFactory for MockDbFactory { .entry(asset_public_key.clone()) .or_default() .clone(); - Ok(StateDb::new(entry)) + Ok(StateDb::new(asset_public_key.clone(), entry)) } } diff --git a/dan_layer/core/src/storage/state/state_db.rs b/dan_layer/core/src/storage/state/state_db.rs index a6452eff81..91bedac1a3 100644 --- a/dan_layer/core/src/storage/state/state_db.rs +++ b/dan_layer/core/src/storage/state/state_db.rs @@ -20,6 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use tari_common_types::types::PublicKey; + use crate::storage::state::{ state_db_unit_of_work::{StateDbUnitOfWorkImpl, StateDbUnitOfWorkReader, UnitOfWorkContext}, StateDbBackendAdapter, @@ -27,20 +29,30 @@ use crate::storage::state::{ pub struct StateDb { backend_adapter: TStateDbBackendAdapter, + asset_public_key: PublicKey, } impl StateDb { - pub fn new(backend_adapter: TStateDbBackendAdapter) -> Self { - Self { backend_adapter } + pub fn new(asset_public_key: PublicKey, backend_adapter: TStateDbBackendAdapter) -> Self { + Self { + backend_adapter, + asset_public_key, + } } pub fn new_unit_of_work(&self, height: u64) -> StateDbUnitOfWorkImpl { - StateDbUnitOfWorkImpl::new(UnitOfWorkContext::new(height), self.backend_adapter.clone()) + StateDbUnitOfWorkImpl::new( + UnitOfWorkContext::new(height, self.asset_public_key.clone()), + self.backend_adapter.clone(), + ) } pub fn reader(&self) -> impl StateDbUnitOfWorkReader { // TODO: A reader doesnt need the current context, should perhaps make a read-only implementation that the // writable implementation also uses - StateDbUnitOfWorkImpl::new(UnitOfWorkContext::new(0), self.backend_adapter.clone()) + StateDbUnitOfWorkImpl::new( + UnitOfWorkContext::new(0, self.asset_public_key.clone()), + self.backend_adapter.clone(), + ) } } diff --git a/dan_layer/core/src/storage/state/state_db_unit_of_work.rs b/dan_layer/core/src/storage/state/state_db_unit_of_work.rs index b973386be6..6a5c760229 100644 --- a/dan_layer/core/src/storage/state/state_db_unit_of_work.rs +++ b/dan_layer/core/src/storage/state/state_db_unit_of_work.rs @@ -28,7 +28,7 @@ use std::{ use digest::Digest; use log::*; -use tari_common_types::types::HashDigest; +use tari_common_types::types::{HashDigest, PublicKey}; use tari_crypto::common::Blake256; use tari_mmr::{MemBackendVec, MerkleMountainRange}; use tari_utilities::hex::Hex; @@ -52,8 +52,9 @@ pub trait StateDbUnitOfWork: StateDbUnitOfWorkReader { } pub trait StateDbUnitOfWorkReader: Clone + Send + Sync { - fn get_value(&mut self, schema: &str, key: &[u8]) -> Result>, StorageError>; - fn get_u64(&mut self, schema: &str, key: &[u8]) -> Result, StorageError>; + fn context(&self) -> &UnitOfWorkContext; + fn get_value(&self, schema: &str, key: &[u8]) -> Result>, StorageError>; + fn get_u64(&self, schema: &str, key: &[u8]) -> Result, StorageError>; fn find_keys_by_value(&self, schema: &str, value: &[u8]) -> Result>, StorageError>; fn calculate_root(&self) -> Result; fn get_all_state(&self) -> Result, StorageError>; @@ -62,12 +63,24 @@ pub trait StateDbUnitOfWorkReader: Clone + Send + Sync { #[derive(Debug, Clone)] pub struct UnitOfWorkContext { - pub height: u64, + asset_public_key: PublicKey, + height: u64, } impl UnitOfWorkContext { - pub fn new(height: u64) -> Self { - Self { height } + pub fn new(height: u64, asset_public_key: PublicKey) -> Self { + Self { + height, + asset_public_key, + } + } + + pub fn height(&self) -> u64 { + self.height + } + + pub fn asset_public_key(&self) -> &PublicKey { + &self.asset_public_key } } @@ -167,35 +180,20 @@ impl StateDbUnitOfWork for StateDbUnitOf } impl StateDbUnitOfWorkReader for StateDbUnitOfWorkImpl { - fn get_value(&mut self, schema: &str, key: &[u8]) -> Result>, StorageError> { - let mut inner = self.inner.write().unwrap(); - for v in &inner.updates { - let inner_v = v.get(); - if inner_v.schema == schema && inner_v.key == key { - return Ok(Some(inner_v.value.clone())); - } - } + fn context(&self) -> &UnitOfWorkContext { + &self.context + } + + fn get_value(&self, schema: &str, key: &[u8]) -> Result>, StorageError> { + let inner = self.inner.read().unwrap(); // Hit the DB. - let value = inner + inner .backend_adapter .get(schema, key) - .map_err(TBackendAdapter::Error::into)?; - if let Some(value) = value { - inner.updates.push(UnitOfWorkTracker::new( - DbKeyValue { - schema: schema.to_string(), - key: Vec::from(key), - value: value.clone(), - }, - false, - )); - Ok(Some(value)) - } else { - Ok(None) - } + .map_err(TBackendAdapter::Error::into) } - fn get_u64(&mut self, schema: &str, key: &[u8]) -> Result, StorageError> { + fn get_u64(&self, schema: &str, key: &[u8]) -> Result, StorageError> { let data = self.get_value(schema, key)?; match data { Some(data) => { diff --git a/dan_layer/core/src/templates/tip002_template.rs b/dan_layer/core/src/templates/tip002_template.rs index abe073b841..e7afd89d7a 100644 --- a/dan_layer/core/src/templates/tip002_template.rs +++ b/dan_layer/core/src/templates/tip002_template.rs @@ -26,56 +26,60 @@ use tari_crypto::tari_utilities::{hex::Hex, ByteArray}; use tari_dan_common_types::proto::tips::tip002; use crate::{ - models::AssetDefinition, + models::{Instruction, InstructionSet, TemplateId}, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, DigitalAssetError, }; -pub fn init( - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, -) -> Result<(), DigitalAssetError> { - let params = tip002::InitRequest::decode(&*template_parameter.template_data).map_err(|e| { - DigitalAssetError::ProtoBufDecodeError { - source: e, - message_type: "tip002::InitRequest".to_string(), - } - })?; - dbg!(¶ms); - state_db.set_value( - "owners".to_string(), - asset_definition.public_key.to_vec(), - Vec::from(params.total_supply.to_le_bytes()), - )?; - Ok(()) +pub fn initial_instructions(template_param: &TemplateParameter) -> InstructionSet { + InstructionSet::from_vec(vec![Instruction::new( + TemplateId::Tip002, + "init".to_string(), + template_param.template_data.clone(), + )]) } pub fn invoke_read_method( - method: String, + method: &str, args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { "balanceof" => balance_of(args, state_db), - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } -pub fn invoke_method( - method: String, +pub fn invoke_write_method( + method: &str, args: &[u8], state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { + "init" => init(args, state_db), "transfer" => transfer(args, state_db), - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } +fn init(args: &[u8], state_db: &mut TUnitOfWork) -> Result<(), DigitalAssetError> { + let params = tip002::InitRequest::decode(args).map_err(|e| DigitalAssetError::ProtoBufDecodeError { + source: e, + message_type: "tip002::InitRequest".to_string(), + })?; + dbg!(¶ms); + state_db.set_value( + "owners".to_string(), + state_db.context().asset_public_key().to_vec(), + // TODO: Encode full owner data + Vec::from(params.total_supply.to_le_bytes()), + )?; + Ok(()) +} + fn balance_of( args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { let request = tip002::BalanceOfRequest::decode(&*args).map_err(|e| DigitalAssetError::ProtoBufDecodeError { source: e, @@ -144,7 +148,7 @@ fn transfer(args: &[u8], state_db: &mut TUnitOfW dbg!(receiver_balance); state_db.set_value( "owners".to_string(), - request.to.clone(), + request.to, Vec::from(receiver_balance.to_le_bytes()), )?; Ok(()) diff --git a/dan_layer/core/src/templates/tip004_template.rs b/dan_layer/core/src/templates/tip004_template.rs index f59473b0d2..88b75325cb 100644 --- a/dan_layer/core/src/templates/tip004_template.rs +++ b/dan_layer/core/src/templates/tip004_template.rs @@ -23,36 +23,42 @@ use digest::Digest; use log::*; use prost::Message; +use tari_core::transactions::transaction_components::TemplateParameter; use tari_crypto::{common::Blake256, tari_utilities::hex::Hex}; use tari_dan_common_types::proto::tips::tip004; use crate::{ + models::InstructionSet, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, DigitalAssetError, }; const LOG_TARGET: &str = "tari::dan_layer::core::templates::tip004_template"; -pub fn invoke_method( - method: String, - args: &[u8], - state_db: &mut TUnitOfWork, -) -> Result<(), DigitalAssetError> { - match method.to_lowercase().replace("_", "").as_str() { - "mint" => mint(args, state_db), - _ => todo!(), - } +pub fn initial_instructions(_: &TemplateParameter) -> InstructionSet { + InstructionSet::empty() } pub fn invoke_read_method( - method: String, + method: &str, args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { "balanceof" => balance_of(args, state_db), "tokenofownerbyindex" => token_of_owner_by_index(args, state_db), - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), + } +} + +pub fn invoke_write_method( + method: &str, + args: &[u8], + state_db: &mut TUnitOfWork, +) -> Result<(), DigitalAssetError> { + match method.to_lowercase().replace("_", "").as_str() { + "mint" => mint(args, state_db), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } @@ -96,7 +102,7 @@ fn hash_of(s: &str) -> Vec { fn balance_of( args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { // TODO: move this to the invoke_read_method method let request = tip004::BalanceOfRequest::decode(&*args).map_err(|e| DigitalAssetError::ProtoBufDecodeError { @@ -116,7 +122,7 @@ fn balance_of( fn token_of_owner_by_index( args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { // TODO: move this to the invoke_read_method method let request = diff --git a/dan_layer/core/src/templates/tip721_template.rs b/dan_layer/core/src/templates/tip721_template.rs index 9fd13a216f..f954b352cb 100644 --- a/dan_layer/core/src/templates/tip721_template.rs +++ b/dan_layer/core/src/templates/tip721_template.rs @@ -22,31 +22,26 @@ use log::*; use prost::Message; +use tari_core::transactions::transaction_components::TemplateParameter; use tari_crypto::tari_utilities::{hex::Hex, ByteArray}; use tari_dan_common_types::proto::tips::tip721; use crate::{ + models::InstructionSet, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, DigitalAssetError, }; const LOG_TARGET: &str = "tari::dan_layer::core::templates::tip721_template"; -pub fn invoke_method( - method: String, - args: &[u8], - state_db: &mut TUnitOfWork, -) -> Result<(), DigitalAssetError> { - match method.to_lowercase().replace("_", "").as_str() { - "transferfrom" => transfer_from(args, state_db), - _ => todo!(), - } +pub fn initial_instructions(_: &TemplateParameter) -> InstructionSet { + InstructionSet::empty() } pub fn invoke_read_method( - method: String, + method: &str, args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { "ownerof" => { @@ -60,13 +55,24 @@ pub fn invoke_read_method( }; Ok(Some(response.encode_to_vec())) }, - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), + } +} + +pub fn invoke_write_method( + method: &str, + args: &[u8], + state_db: &mut TUnitOfWork, +) -> Result<(), DigitalAssetError> { + match method.to_lowercase().replace("_", "").as_str() { + "transferfrom" => transfer_from(args, state_db), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } fn owner_of( token_id: Vec, - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result, DigitalAssetError> { state_db .get_value("owners", &token_id)? diff --git a/dan_layer/core/src/workers/consensus_worker.rs b/dan_layer/core/src/workers/consensus_worker.rs index f10939c49d..cb3016ae55 100644 --- a/dan_layer/core/src/workers/consensus_worker.rs +++ b/dan_layer/core/src/workers/consensus_worker.rs @@ -155,9 +155,6 @@ impl> ConsensusWorker> ConsensusWorker> ConsensusWorker> ConsensusWorker Starting, (_, TimedOut) => { warn!(target: LOG_TARGET, "State timed out"); + self.current_view_id = self.current_view_id.saturating_sub(1.into()); NextView }, - (NextView, NewView { .. }) => { - self.current_view_id = self.current_view_id.next(); + (NextView, NewView { new_view }) => { + self.current_view_id = new_view; Prepare }, (Prepare, Prepared) => PreCommit, diff --git a/dan_layer/core/src/workers/states/commit_state.rs b/dan_layer/core/src/workers/states/commit_state.rs index c79796f1c3..c1318c0412 100644 --- a/dan_layer/core/src/workers/states/commit_state.rs +++ b/dan_layer/core/src/workers/states/commit_state.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashMap, marker::PhantomData, time::Instant}; +use std::{collections::HashMap, marker::PhantomData}; use log::*; use tari_common_types::types::PublicKey; @@ -28,9 +28,9 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, - models::{Committee, HotStuffMessage, HotStuffMessageType, Payload, QuorumCertificate, TreeNodeHash, View, ViewId}, + models::{Committee, HotStuffMessage, HotStuffMessageType, QuorumCertificate, TreeNodeHash, View, ViewId}, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, SigningService, }, storage::chain::ChainDbUnitOfWork, @@ -40,35 +40,33 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::commit"; // TODO: This is very similar to pre-commit state -pub struct CommitState -where - TInboundConnectionService: InboundConnectionService, - TAddr: NodeAddressable, - TPayload: Payload, - TOutboundService: OutboundService, - TSigningService: SigningService, +pub struct CommitState +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, - committee: Committee, + committee: Committee, phantom_inbound: PhantomData, phantom_outbound: PhantomData, - ta: PhantomData, - p_p: PhantomData, + ta: PhantomData, + p_p: PhantomData, p_s: PhantomData, - received_new_view_messages: HashMap>, + received_new_view_messages: HashMap>, } -impl - CommitState +impl + CommitState where - TInboundConnectionService: InboundConnectionService, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TPayload: Payload, - TSigningService: SigningService, + TInboundConnectionService: + InboundConnectionService, + TOutboundService: OutboundService, + TSigningService: SigningService, { - pub fn new(node_id: TAddr, asset_public_key: PublicKey, committee: Committee) -> Self { + pub fn new( + node_id: TOutboundService::Addr, + asset_public_key: PublicKey, + committee: Committee, + ) -> Self { Self { node_id, asset_public_key, @@ -92,20 +90,19 @@ where unit_of_work: TUnitOfWork, ) -> Result { self.received_new_view_messages.clear(); - let started = Instant::now(); let mut unit_of_work = unit_of_work; let next_event_result; + let timeout = sleep(timeout); + futures::pin_mut!(timeout); loop { tokio::select! { r = inbound_services.wait_for_message(HotStuffMessageType::PreCommit, current_view.view_id()) => { let (from, message) = r?; - if current_view.is_leader() { - if let Some(result) = self.process_leader_message(current_view, message.clone(), &from, outbound_service - ).await?{ + if current_view.is_leader() { + if let Some(result) = self.process_leader_message(current_view, message.clone(), &from, outbound_service).await?{ next_event_result = result; break; } - } }, r = inbound_services.wait_for_qc(HotStuffMessageType::PreCommit, current_view.view_id()) => { @@ -116,7 +113,7 @@ where break; } } - _ = sleep(timeout.saturating_sub(Instant::now() - started)) => { + _ = &mut timeout => { // TODO: perhaps this should be from the time the state was entered next_event_result = ConsensusWorkerStateEvent::TimedOut; break; @@ -129,8 +126,8 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, + message: HotStuffMessage, + sender: &TOutboundService::Addr, outbound: &mut TOutboundService, ) -> Result, DigitalAssetError> { if !message.matches(HotStuffMessageType::PreCommit, current_view.view_id) { @@ -211,10 +208,10 @@ where async fn process_replica_message( &mut self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, outbound: &mut TOutboundService, signing_service: &TSigningService, unit_of_work: &mut TUnitOfWork, @@ -259,7 +256,7 @@ where &self, node: TreeNodeHash, outbound: &mut TOutboundService, - view_leader: &TAddr, + view_leader: &TOutboundService::Addr, view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/decide_state.rs b/dan_layer/core/src/workers/states/decide_state.rs index eaca8e9e54..938d67d7b1 100644 --- a/dan_layer/core/src/workers/states/decide_state.rs +++ b/dan_layer/core/src/workers/states/decide_state.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashMap, marker::PhantomData, time::Instant}; +use std::{collections::HashMap, marker::PhantomData}; use log::*; use tari_common_types::types::PublicKey; @@ -29,9 +29,9 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, - models::{Committee, HotStuffMessage, HotStuffMessageType, Payload, QuorumCertificate, View, ViewId}, + models::{Committee, HotStuffMessage, HotStuffMessageType, QuorumCertificate, View, ViewId}, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, PayloadProvider, }, storage::chain::ChainDbUnitOfWork, @@ -41,32 +41,30 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::decide"; // TODO: This is very similar to pre-commit, and commit state -pub struct DecideState -where - TInboundConnectionService: InboundConnectionService, - TAddr: NodeAddressable, - TPayload: Payload, - TOutboundService: OutboundService, +pub struct DecideState +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, - committee: Committee, + committee: Committee, phantom_inbound: PhantomData, phantom_outbound: PhantomData, - ta: PhantomData, - p_p: PhantomData, - received_new_view_messages: HashMap>, + ta: PhantomData, + p_p: PhantomData, + received_new_view_messages: HashMap>, } -impl - DecideState +impl DecideState where - TInboundConnectionService: InboundConnectionService, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TPayload: Payload, + TInboundConnectionService: + InboundConnectionService, + TOutboundService: OutboundService, { - pub fn new(node_id: TAddr, asset_public_key: PublicKey, committee: Committee) -> Self { + pub fn new( + node_id: TOutboundService::Addr, + asset_public_key: PublicKey, + committee: Committee, + ) -> Self { Self { node_id, asset_public_key, @@ -79,7 +77,10 @@ where } } - pub async fn next_event>( + pub async fn next_event< + TUnitOfWork: ChainDbUnitOfWork, + TPayloadProvider: PayloadProvider, + >( &mut self, timeout: Duration, current_view: &View, @@ -89,37 +90,34 @@ where payload_provider: &mut TPayloadProvider, ) -> Result { self.received_new_view_messages.clear(); - let started = Instant::now(); let mut unit_of_work = unit_of_work; let next_event_result; + let timeout = sleep(timeout); + futures::pin_mut!(timeout); loop { tokio::select! { - r = inbound_services.wait_for_message(HotStuffMessageType::Commit, current_view.view_id()) => { + r = inbound_services.wait_for_message(HotStuffMessageType::Commit, current_view.view_id()) => { let (from, message) = r?; - if current_view.is_leader() { - if let Some(result) = self.process_leader_message(current_view, message.clone(), &from, outbound_service - ).await?{ - next_event_result = result; - break; - } - - } + if current_view.is_leader() { + if let Some(result) = self.process_leader_message(current_view, message.clone(), &from, outbound_service).await?{ + next_event_result = result; + break; + } + } }, r = inbound_services.wait_for_qc(HotStuffMessageType::Prepare, current_view.view_id()) => { let (from, message) = r?; - let leader= self.committee.leader_for_view(current_view.view_id).clone(); - if let Some(result) = self.process_replica_message(&message, current_view, &from, &leader, &mut unit_of_work, payload_provider).await? { - next_event_result = result; - break; - } - - } - , - _ = sleep(timeout.saturating_sub(Instant::now() - started)) => { - next_event_result = ConsensusWorkerStateEvent::TimedOut; - break; - } - } + let leader= self.committee.leader_for_view(current_view.view_id).clone(); + if let Some(result) = self.process_replica_message(&message, current_view, &from, &leader, &mut unit_of_work, payload_provider).await? { + next_event_result = result; + break; + } + }, + _ = &mut timeout => { + next_event_result = ConsensusWorkerStateEvent::TimedOut; + break; + } + } } Ok(next_event_result) } @@ -127,8 +125,8 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, + message: HotStuffMessage, + sender: &TOutboundService::Addr, outbound: &mut TOutboundService, ) -> Result, DigitalAssetError> { if !message.matches(HotStuffMessageType::Commit, current_view.view_id) { @@ -205,15 +203,19 @@ where Some(qc) } - async fn process_replica_message>( + async fn process_replica_message< + TUnitOfWork: ChainDbUnitOfWork, + TPayloadProvider: PayloadProvider, + >( &mut self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, unit_of_work: &mut TUnitOfWork, payload_provider: &mut TPayloadProvider, ) -> Result, DigitalAssetError> { + debug!(target: LOG_TARGET, "[DECIDE] replica message: {:?}", message); if let Some(justify) = message.justify() { if !justify.matches(HotStuffMessageType::Commit, current_view.view_id) { warn!( diff --git a/dan_layer/core/src/workers/states/next_view.rs b/dan_layer/core/src/workers/states/next_view.rs index 8233a36a0e..5fab76179f 100644 --- a/dan_layer/core/src/workers/states/next_view.rs +++ b/dan_layer/core/src/workers/states/next_view.rs @@ -25,8 +25,8 @@ use tari_shutdown::ShutdownSignal; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, Committee, HotStuffMessage, Payload, View}, - services::infrastructure_services::{NodeAddressable, OutboundService}, + models::{AssetDefinition, Committee, HotStuffMessage, HotStuffTreeNode, QuorumCertificate, StateRoot, View}, + services::{infrastructure_services::OutboundService, PayloadProvider}, storage::DbFactory, workers::states::ConsensusWorkerStateEvent, }; @@ -37,29 +37,46 @@ const LOG_TARGET: &str = "tari::dan::workers::states::next_view"; pub struct NextViewState {} impl NextViewState { - pub async fn next_event< - TPayload: Payload, - TOutboundService: OutboundService, - TAddr: NodeAddressable + Clone + Send, - TDbFactory: DbFactory, - >( + pub async fn next_event( &mut self, current_view: &View, db_factory: &TDbFactory, broadcast: &mut TOutboundService, - committee: &Committee, - node_id: TAddr, + committee: &Committee, + node_id: TOutboundService::Addr, asset_definition: &AssetDefinition, + payload_provider: &TPayloadProvider, _shutdown: &ShutdownSignal, - ) -> Result { - let db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; - let prepare_qc = db.find_highest_prepared_qc()?; - let message = HotStuffMessage::new_view(prepare_qc, current_view.view_id, asset_definition.public_key.clone()); - let next_view = current_view.view_id.next(); - let leader = committee.leader_for_view(next_view); - broadcast.send(node_id, leader.clone(), message).await?; - info!(target: LOG_TARGET, "End of view: {}", current_view.view_id.0); - debug!(target: LOG_TARGET, "--------------------------------"); - Ok(ConsensusWorkerStateEvent::NewView { new_view: next_view }) + ) -> Result + where + TOutboundService: OutboundService, + TDbFactory: DbFactory, + TPayloadProvider: PayloadProvider, + { + let chain_db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; + if chain_db.is_empty()? { + info!(target: LOG_TARGET, "Database is empty. Proposing genesis block"); + let node = HotStuffTreeNode::genesis( + payload_provider.create_genesis_payload(asset_definition), + StateRoot::initial(), + ); + let genesis_qc = QuorumCertificate::genesis(*node.hash()); + let genesis_view_no = genesis_qc.view_number(); + let leader = committee.leader_for_view(genesis_view_no); + let message = HotStuffMessage::new_view(genesis_qc, genesis_view_no, asset_definition.public_key.clone()); + broadcast.send(node_id, leader.clone(), message).await?; + Ok(ConsensusWorkerStateEvent::NewView { + new_view: genesis_view_no, + }) + } else { + let prepare_qc = chain_db.find_highest_prepared_qc()?; + let next_view = current_view.view_id.next(); + let message = HotStuffMessage::new_view(prepare_qc, next_view, asset_definition.public_key.clone()); + let leader = committee.leader_for_view(next_view); + broadcast.send(node_id, leader.clone(), message).await?; + info!(target: LOG_TARGET, "End of view: {}", current_view.view_id.0); + debug!(target: LOG_TARGET, "--------------------------------"); + Ok(ConsensusWorkerStateEvent::NewView { new_view: next_view }) + } } } diff --git a/dan_layer/core/src/workers/states/pre_commit_state.rs b/dan_layer/core/src/workers/states/pre_commit_state.rs index d84785a22c..646770f548 100644 --- a/dan_layer/core/src/workers/states/pre_commit_state.rs +++ b/dan_layer/core/src/workers/states/pre_commit_state.rs @@ -28,9 +28,9 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, - models::{Committee, HotStuffMessage, HotStuffMessageType, Payload, QuorumCertificate, TreeNodeHash, View, ViewId}, + models::{Committee, HotStuffMessage, HotStuffMessageType, QuorumCertificate, TreeNodeHash, View, ViewId}, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, SigningService, }, storage::chain::ChainDbUnitOfWork, @@ -39,35 +39,33 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::precommit"; -pub struct PreCommitState -where - TInboundConnectionService: InboundConnectionService, - TAddr: NodeAddressable, - TPayload: Payload, - TOutboundService: OutboundService, - TSigningService: SigningService, +pub struct PreCommitState +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, - committee: Committee, + committee: Committee, phantom_inbound: PhantomData, phantom_outbound: PhantomData, - ta: PhantomData, - p_p: PhantomData, + ta: PhantomData, + p_p: PhantomData, p_s: PhantomData, - received_new_view_messages: HashMap>, + received_prepare_messages: HashMap>, } -impl - PreCommitState +impl + PreCommitState where - TInboundConnectionService: InboundConnectionService, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TPayload: Payload, - TSigningService: SigningService, + TInboundConnectionService: + InboundConnectionService, + TOutboundService: OutboundService, + TSigningService: SigningService, { - pub fn new(node_id: TAddr, committee: Committee, asset_public_key: PublicKey) -> Self { + pub fn new( + node_id: TOutboundService::Addr, + committee: Committee, + asset_public_key: PublicKey, + ) -> Self { Self { node_id, asset_public_key, @@ -76,7 +74,7 @@ where phantom_outbound: PhantomData, ta: PhantomData, p_p: PhantomData, - received_new_view_messages: HashMap::new(), + received_prepare_messages: HashMap::new(), p_s: PhantomData, } } @@ -90,7 +88,7 @@ where signing_service: &TSigningService, unit_of_work: TUnitOfWork, ) -> Result { - self.received_new_view_messages.clear(); + self.received_prepare_messages.clear(); let mut unit_of_work = unit_of_work; let next_event_result; let timeout = sleep(timeout); @@ -127,8 +125,8 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, + message: HotStuffMessage, + sender: &TOutboundService::Addr, outbound: &mut TOutboundService, ) -> Result, DigitalAssetError> { debug!( @@ -141,17 +139,17 @@ where return Ok(None); } - if self.received_new_view_messages.contains_key(sender) { + if self.received_prepare_messages.contains_key(sender) { return Ok(None); } - self.received_new_view_messages.insert(sender.clone(), message); + self.received_prepare_messages.insert(sender.clone(), message); - if self.received_new_view_messages.len() >= self.committee.consensus_threshold() { + if self.received_prepare_messages.len() >= self.committee.consensus_threshold() { debug!( target: LOG_TARGET, "[PRECOMMIT] Consensus has been reached with {:?} out of {} votes", - self.received_new_view_messages.len(), + self.received_prepare_messages.len(), self.committee.len() ); @@ -167,7 +165,7 @@ where debug!( target: LOG_TARGET, "[PRECOMMIT] Consensus has NOT YET been reached with {:?} out of {} votes", - self.received_new_view_messages.len(), + self.received_prepare_messages.len(), self.committee.len() ); Ok(None) @@ -177,7 +175,7 @@ where async fn broadcast( &self, outbound: &mut TOutboundService, - committee: &Committee, + committee: &Committee, prepare_qc: QuorumCertificate, view_number: ViewId, ) -> Result<(), DigitalAssetError> { @@ -188,9 +186,9 @@ where } fn create_qc(&self, current_view: &View) -> Option { - let mut node = None; - for message in self.received_new_view_messages.values() { - node = match node { + let mut node_hash = None; + for message in self.received_prepare_messages.values() { + node_hash = match node_hash { None => message.node_hash().cloned(), Some(n) => { if let Some(m_node) = message.node_hash() { @@ -205,9 +203,9 @@ where }; } - let node = node.unwrap(); - let mut qc = QuorumCertificate::new(HotStuffMessageType::Prepare, current_view.view_id, node, None); - for message in self.received_new_view_messages.values() { + let node_hash = node_hash.unwrap(); + let mut qc = QuorumCertificate::new(HotStuffMessageType::Prepare, current_view.view_id, node_hash, None); + for message in self.received_prepare_messages.values() { qc.combine_sig(message.partial_sig().unwrap()) } Some(qc) @@ -215,10 +213,10 @@ where async fn process_replica_message( &mut self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, outbound: &mut TOutboundService, signing_service: &TSigningService, unit_of_work: &mut TUnitOfWork, @@ -266,7 +264,7 @@ where &self, node: TreeNodeHash, outbound: &mut TOutboundService, - view_leader: &TAddr, + view_leader: &TOutboundService::Addr, view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/prepare.rs b/dan_layer/core/src/workers/states/prepare.rs index f72ba0f401..546f65b8a4 100644 --- a/dan_layer/core/src/workers/states/prepare.rs +++ b/dan_layer/core/src/workers/states/prepare.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashMap, marker::PhantomData, time::Instant}; +use std::{collections::HashMap, marker::PhantomData}; use log::*; use tari_common_types::types::PublicKey; @@ -29,18 +29,18 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, models::{ + AssetDefinition, Committee, HotStuffMessage, HotStuffMessageType, HotStuffTreeNode, - Payload, QuorumCertificate, TreeNodeHash, View, ViewId, }, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, PayloadProcessor, PayloadProvider, SigningService, @@ -51,24 +51,10 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::prepare"; -pub struct Prepare< - TInboundConnectionService, - TOutboundService, - TAddr, - TSigningService, - TPayloadProvider, - TPayload, - TPayloadProcessor, -> where - TInboundConnectionService: InboundConnectionService + Send, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TSigningService: SigningService, - TPayload: Payload, - TPayloadProvider: PayloadProvider, - TPayloadProcessor: PayloadProcessor, +pub struct Prepare +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, // bft_service: Box, // TODO remove this hack @@ -77,37 +63,20 @@ pub struct Prepare< phantom_outbound: PhantomData, phantom_signing: PhantomData, phantom_processor: PhantomData, - received_new_view_messages: HashMap>, + received_new_view_messages: HashMap>, } -impl< - TInboundConnectionService, - TOutboundService, - TAddr, - TSigningService, - TPayloadProvider, - TPayload, - TPayloadProcessor, - > - Prepare< - TInboundConnectionService, - TOutboundService, - TAddr, - TSigningService, - TPayloadProvider, - TPayload, - TPayloadProcessor, - > +impl + Prepare where - TInboundConnectionService: InboundConnectionService + Send, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TSigningService: SigningService, - TPayload: Payload, - TPayloadProvider: PayloadProvider, - TPayloadProcessor: PayloadProcessor, + TOutboundService: OutboundService, + TInboundConnectionService: + InboundConnectionService + Send, + TSigningService: SigningService, + TPayloadProvider: PayloadProvider, + TPayloadProcessor: PayloadProcessor, { - pub fn new(node_id: TAddr, asset_public_key: PublicKey) -> Self { + pub fn new(node_id: TOutboundService::Addr, asset_public_key: PublicKey) -> Self { Self { node_id, asset_public_key, @@ -122,7 +91,7 @@ where #[allow(clippy::too_many_arguments)] pub async fn next_event< - TChainStorageService: ChainStorageService, + TChainStorageService: ChainStorageService, TUnitOfWork: ChainDbUnitOfWork, TStateDbUnitOfWork: StateDbUnitOfWork, TDbFactory: DbFactory, @@ -130,7 +99,8 @@ where &mut self, current_view: &View, timeout: Duration, - committee: &Committee, + asset_definition: &AssetDefinition, + committee: &Committee, inbound_services: &TInboundConnectionService, outbound_service: &mut TOutboundService, payload_provider: &mut TPayloadProvider, @@ -143,17 +113,40 @@ where ) -> Result { self.received_new_view_messages.clear(); - let started = Instant::now(); let mut chain_tx = chain_tx; let next_event_result; + let timeout = sleep(timeout); + futures::pin_mut!(timeout); + if current_view.is_leader() { + debug!( + target: LOG_TARGET, + "Waiting for NewView (view_id = {}) messages", + current_view.view_id() + ); + } else { + debug!( + target: LOG_TARGET, + "Waiting for Prepare (view_id = {}) messages", + current_view.view_id() + ); + } loop { tokio::select! { - r = inbound_services.wait_for_message(HotStuffMessageType::NewView, current_view.view_id() - 1.into()) => { + r = inbound_services.wait_for_message(HotStuffMessageType::NewView, current_view.view_id()) => { let (from, message) = r?; - debug!(target: LOG_TARGET, "Received leader message"); + debug!(target: LOG_TARGET, "Received leader message (is_leader = {:?})", current_view.is_leader()); if current_view.is_leader() { - if let Some(result) = self.process_leader_message(current_view, message.clone(), - &from, committee, payload_provider, payload_processor, outbound_service, db_factory).await?{ + if let Some(result) = self.process_leader_message( + current_view, + message.clone(), + &from, + asset_definition, + committee, + payload_provider, + payload_processor, + outbound_service, + db_factory, + ).await? { next_event_result = result; break; } @@ -162,15 +155,25 @@ where r = inbound_services.wait_for_message(HotStuffMessageType::Prepare, current_view.view_id()) => { let (from, message) = r?; debug!(target: LOG_TARGET, "Received replica message"); - if let Some(result) = self.process_replica_message(&message, current_view, &from, - committee.leader_for_view(current_view.view_id), outbound_service, signing_service, - payload_processor, payload_provider, &mut chain_tx, chain_storage_service, state_tx).await? { + if let Some(result) = self.process_replica_message( + &message, + current_view, + &from, + committee.leader_for_view(current_view.view_id), + outbound_service, + signing_service, + payload_processor, + payload_provider, + &mut chain_tx, + chain_storage_service, + state_tx, + ).await? { next_event_result = result; break; } }, - _ = sleep(timeout.saturating_sub(Instant::now() - started)) => { + _ = &mut timeout => { next_event_result = ConsensusWorkerStateEvent::TimedOut; break; } @@ -185,9 +188,10 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, - committee: &Committee, + message: HotStuffMessage, + sender: &TOutboundService::Addr, + asset_definition: &AssetDefinition, + committee: &Committee, payload_provider: &TPayloadProvider, payload_processor: &mut TPayloadProcessor, outbound: &mut TOutboundService, @@ -223,9 +227,10 @@ where let proposal = self .create_proposal( *high_qc.node_hash(), + asset_definition, payload_provider, payload_processor, - current_view.view_id.as_u64() as u32, + current_view.view_id, temp_state_tx, ) .await?; @@ -235,7 +240,7 @@ where } else { debug!( target: LOG_TARGET, - "[PREPARE] Consensus has NOT YET been reached with {:?} out of {} votes", + "[PREPARE] Consensus has NOT YET been reached with {} out of {} votes", self.received_new_view_messages.len(), committee.len() ); @@ -245,14 +250,14 @@ where async fn process_replica_message< TUnitOfWork: ChainDbUnitOfWork, - TChainStorageService: ChainStorageService, + TChainStorageService: ChainStorageService, TStateDbUnitOfWork: StateDbUnitOfWork, >( &self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, outbound: &mut TOutboundService, signing_service: &TSigningService, payload_processor: &mut TPayloadProcessor, @@ -275,46 +280,63 @@ where return Ok(None); } let node = message.node().unwrap(); - if let Some(justify) = message.justify() { - if self.does_extend(node, justify.node_hash()) { - if !self.is_safe_node(node, justify, chain_tx)? { - unimplemented!("Node is not safe") - } + let justify = message + .justify() + .ok_or(DigitalAssetError::PreparePhaseNoQuorumCertificate)?; - let res = payload_processor - .process_payload(node.payload(), state_tx.clone()) - .await?; - if &res == node.state_root() { - chain_storage_service - .add_node::(node, chain_tx.clone()) - .await?; - - payload_provider.reserve_payload(node.payload(), node.hash()).await?; - self.send_vote_to_leader( - *node.hash(), - outbound, - view_leader, - current_view.view_id, - signing_service, - ) - .await?; - Ok(Some(ConsensusWorkerStateEvent::Prepared)) - } else { - warn!( - target: LOG_TARGET, - "Calculated state root did not match the state root provided by the leader: Expected: {:?} \ - Leader provided:{:?}", - res, - node.state_root() - ); - Ok(None) - } - } else { - unimplemented!("Did not extend from qc.justify.node") + // The genesis does not extend any node + if !current_view.view_id().is_genesis() { + if !self.does_extend(node, justify.node_hash()) { + return Err(DigitalAssetError::PreparePhaseCertificateDoesNotExtendNode); } - } else { - unimplemented!("unexpected Null justify ") + + if !self.is_safe_node(node, justify, chain_tx)? { + return Err(DigitalAssetError::PreparePhaseNodeNotSafe); + } + } + + debug!( + target: LOG_TARGET, + "[PREPARE] Processing prepared payload for view {}", + current_view.view_id() + ); + + let state_root = payload_processor + .process_payload(node.payload(), state_tx.clone()) + .await?; + + if state_root != *node.state_root() { + warn!( + target: LOG_TARGET, + "Calculated state root did not match the state root provided by the leader: Expected: {:?} Leader \ + provided:{:?}", + state_root, + node.state_root() + ); + return Ok(None); } + + debug!( + target: LOG_TARGET, + "[PREPARE] Merkle root matches payload for view {}. Adding node '{}'", + current_view.view_id(), + node.hash() + ); + + chain_storage_service + .add_node::(node, chain_tx.clone()) + .await?; + + payload_provider.reserve_payload(node.payload(), node.hash()).await?; + self.send_vote_to_leader( + *node.hash(), + outbound, + view_leader, + current_view.view_id, + signing_service, + ) + .await?; + Ok(Some(ConsensusWorkerStateEvent::Prepared)) } fn find_highest_qc(&self) -> QuorumCertificate { @@ -338,26 +360,39 @@ where async fn create_proposal( &self, parent: TreeNodeHash, + asset_definition: &AssetDefinition, payload_provider: &TPayloadProvider, payload_processor: &mut TPayloadProcessor, - height: u32, + view_id: ViewId, state_db: TStateDbUnitOfWork, - ) -> Result, DigitalAssetError> { + ) -> Result, DigitalAssetError> { debug!(target: LOG_TARGET, "Creating new proposal"); // TODO: Artificial delay here to set the block time - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(1)).await; + + if view_id.is_genesis() { + let payload = payload_provider.create_genesis_payload(asset_definition); + let state_root = payload_processor.process_payload(&payload, state_db).await?; + Ok(HotStuffTreeNode::genesis(payload, state_root)) + } else { + let payload = payload_provider.create_payload().await?; - let payload = payload_provider.create_payload().await?; - let state_root = payload_processor.process_payload(&payload, state_db).await?; - Ok(HotStuffTreeNode::from_parent(parent, payload, state_root, height)) + let state_root = payload_processor.process_payload(&payload, state_db).await?; + Ok(HotStuffTreeNode::from_parent( + parent, + payload, + state_root, + view_id.as_u64() as u32, + )) + } } async fn broadcast_proposal( &self, outbound: &mut TOutboundService, - committee: &Committee, - proposal: HotStuffTreeNode, + committee: &Committee, + proposal: HotStuffTreeNode, high_qc: QuorumCertificate, view_number: ViewId, ) -> Result<(), DigitalAssetError> { @@ -367,13 +402,13 @@ where .await } - fn does_extend(&self, node: &HotStuffTreeNode, from: &TreeNodeHash) -> bool { + fn does_extend(&self, node: &HotStuffTreeNode, from: &TreeNodeHash) -> bool { from == node.parent() } fn is_safe_node( &self, - node: &HotStuffTreeNode, + node: &HotStuffTreeNode, quorum_certificate: &QuorumCertificate, chain_tx: &mut TUnitOfWork, ) -> Result { @@ -385,7 +420,7 @@ where &self, node: TreeNodeHash, outbound: &mut TOutboundService, - view_leader: &TAddr, + view_leader: &TOutboundService::Addr, view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/starting.rs b/dan_layer/core/src/workers/states/starting.rs index 101e4925c0..3eac2626d4 100644 --- a/dan_layer/core/src/workers/states/starting.rs +++ b/dan_layer/core/src/workers/states/starting.rs @@ -27,15 +27,9 @@ use tari_utilities::hex::Hex; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, HotStuffTreeNode, Payload, QuorumCertificate}, - services::{ - infrastructure_services::NodeAddressable, - BaseNodeClient, - CommitteeManager, - PayloadProcessor, - PayloadProvider, - }, - storage::{chain::ChainDbUnitOfWork, state::StateDbUnitOfWork, ChainStorageService, DbFactory}, + models::AssetDefinition, + services::{infrastructure_services::NodeAddressable, BaseNodeClient, CommitteeManager}, + storage::DbFactory, workers::states::ConsensusWorkerStateEvent, }; @@ -59,20 +53,13 @@ where TBaseNodeClient: BaseNodeClient pub async fn next_event< TAddr: NodeAddressable, TCommitteeManager: CommitteeManager, - TPayload: Payload, - TPayloadProvider: PayloadProvider, - TPayloadProcessor: PayloadProcessor, TDbFactory: DbFactory, - TChainStorageService: ChainStorageService, >( &self, base_node_client: &mut TBaseNodeClient, asset_definition: &AssetDefinition, committee_manager: &mut TCommitteeManager, db_factory: &TDbFactory, - payload_provider: &TPayloadProvider, - payload_processor: &TPayloadProcessor, - chain_storage_service: &TChainStorageService, node_id: &TAddr, ) -> Result { info!( @@ -113,46 +100,7 @@ where TBaseNodeClient: BaseNodeClient ); // read and create the genesis block info!(target: LOG_TARGET, "Creating DB"); - let chain_db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; - if chain_db.is_empty()? { - info!(target: LOG_TARGET, "DB is empty, initializing"); - let mut tx = chain_db.new_unit_of_work(); - - let state_db = db_factory.get_or_create_state_db(&asset_definition.public_key)?; - let mut state_tx = state_db.new_unit_of_work(0); - - info!(target: LOG_TARGET, "Loading initial state"); - let initial_state = asset_definition.initial_state(); - for schema in &initial_state.schemas { - debug!(target: LOG_TARGET, "Setting initial state for {}", schema.name); - for key_value in &schema.items { - debug!( - target: LOG_TARGET, - "Setting {:?} = {:?}", key_value.key, key_value.value - ); - state_tx.set_value(schema.name.clone(), key_value.key.clone(), key_value.value.clone())?; - } - } - dbg!(&asset_definition); - for template in &asset_definition.template_parameters { - debug!( - target: LOG_TARGET, - "Setting template parameters for: {}", template.template_id - ); - payload_processor.init_template(template, asset_definition, &mut state_tx)?; - } - info!(target: LOG_TARGET, "Saving genesis node"); - let node = HotStuffTreeNode::genesis(payload_provider.create_genesis_payload()); - let genesis_qc = QuorumCertificate::genesis(*node.hash()); - chain_storage_service.add_node(&node, tx.clone()).await?; - tx.commit_node(node.hash())?; - debug!(target: LOG_TARGET, "Setting locked QC"); - tx.set_locked_qc(&genesis_qc)?; - debug!(target: LOG_TARGET, "Committing state"); - state_tx.commit()?; - debug!(target: LOG_TARGET, "Committing node"); - tx.commit()?; - } + let _ = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; Ok(ConsensusWorkerStateEvent::Initialized) } diff --git a/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs b/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs index 83ece5b6da..2d4d7b32e9 100644 --- a/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs +++ b/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs @@ -351,11 +351,11 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { )) } - fn find_node_by_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error> { + fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error> { use crate::schema::nodes::dsl; let connection = self.get_connection()?; let node = dsl::nodes - .filter(nodes::parent.eq(parent_hash.as_bytes())) + .filter(nodes::hash.eq(node_hash.as_bytes())) .first::(&connection) .optional() .map_err(|source| SqliteStorageError::DieselError { @@ -374,11 +374,11 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } } - fn find_node_by_parent_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error> { + fn find_node_by_parent_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error> { use crate::schema::nodes::dsl; let connection = self.get_connection()?; let node = dsl::nodes - .filter(nodes::parent.eq(node_hash.as_bytes())) + .filter(nodes::parent.eq(parent_hash.as_bytes())) .first::(&connection) .optional() .map_err(|source| SqliteStorageError::DieselError { diff --git a/dan_layer/storage_sqlite/src/sqlite_db_factory.rs b/dan_layer/storage_sqlite/src/sqlite_db_factory.rs index 2d2aa029ea..2828b77a9a 100644 --- a/dan_layer/storage_sqlite/src/sqlite_db_factory.rs +++ b/dan_layer/storage_sqlite/src/sqlite_db_factory.rs @@ -124,7 +124,10 @@ impl DbFactory for SqliteDbFactory { ) -> Result>, StorageError> { let database_url = self.database_url_for(asset_public_key); match self.try_connect(&database_url)? { - Some(_) => Ok(Some(StateDb::new(SqliteStateDbBackendAdapter::new(database_url)))), + Some(_) => Ok(Some(StateDb::new( + asset_public_key.clone(), + SqliteStateDbBackendAdapter::new(database_url), + ))), None => Ok(None), } } @@ -147,6 +150,9 @@ impl DbFactory for SqliteDbFactory { })?; embed_migrations!("./migrations"); embedded_migrations::run(&connection).map_err(SqliteStorageError::from)?; - Ok(StateDb::new(SqliteStateDbBackendAdapter::new(database_url))) + Ok(StateDb::new( + asset_public_key.clone(), + SqliteStateDbBackendAdapter::new(database_url), + )) } }