Skip to content

Commit

Permalink
Add mocks and basic tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Feb 8, 2022
1 parent daf27b2 commit be5b8aa
Show file tree
Hide file tree
Showing 30 changed files with 819 additions and 47 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2018"

[dependencies]
tari_app_utilities = { path = "../tari_app_utilities" }
tari_app_grpc = {path = "../tari_app_grpc" }
tari_app_grpc = { path = "../tari_app_grpc" }
tari_common = { path = "../../common" }
tari_comms = { path = "../../comms" }
tari_comms_dht = { path = "../../comms/dht" }
Expand All @@ -20,11 +20,11 @@ tari_p2p = { path = "../../base_layer/p2p" }
tari_service_framework = { path = "../../base_layer/service_framework" }
tari_shutdown = { path = "../../infrastructure/shutdown" }
tari_storage = { path = "../../infrastructure/storage" }
tari_core = {path = "../../base_layer/core"}
tari_dan_core = {path = "../../dan_layer/core"}
tari_dan_storage_sqlite = {path = "../../dan_layer/storage_sqlite"}
tari_dan_common_types = {path = "../../dan_layer/common_types"}
tari_common_types = {path = "../../base_layer/common_types"}
tari_core = { path = "../../base_layer/core" }
tari_dan_core = { path = "../../dan_layer/core" }
tari_dan_storage_sqlite = { path = "../../dan_layer/storage_sqlite" }
tari_dan_common_types = { path = "../../dan_layer/common_types" }
tari_common_types = { path = "../../base_layer/common_types" }

anyhow = "1.0.53"
async-trait = "0.1.50"
Expand All @@ -38,7 +38,7 @@ prost = "0.9"
prost-types = "0.9"
serde = "1.0.126"
thiserror = "^1.0.20"
tokio = { version="1.10", features = ["macros", "time", "sync"]}
tokio = { version = "1.10", features = ["macros", "time", "sync"] }
tokio-stream = { version = "0.1.7", features = ["sync"] }
tonic = "0.6.2"

Expand All @@ -48,7 +48,7 @@ bytecodec = { version = "0.4.14", features = ["bincode_codec"] }
serde_json = "1.0.64"

[dev-dependencies]
tari_test_utils = "0.8.1"
tari_test_utils = { path = "../../infrastructure/test_utils" }

[build-dependencies]
tari_common = { path = "../../common", features = ["build"] }
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ message InvokeMethodResponse {
Status status = 2;
}

message GetBlocksRequest {
message GetSidechainBlocksRequest {
bytes asset_public_key = 1;
bytes start_hash = 2;
bytes end_hash = 3;
}


message GetBlocksResponse {
message GetSidechainBlocksResponse {
tari.dan.common.SideChainBlock block = 1;
}
29 changes: 29 additions & 0 deletions applications/tari_validator_node/src/p2p/proto/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ impl From<SideChainBlock> for proto::common::SideChainBlock {
}
}

impl TryFrom<proto::common::SideChainBlock> for SideChainBlock {
type Error = String;

fn try_from(block: proto::common::SideChainBlock) -> Result<Self, Self::Error> {
let node = block
.node
.map(TryInto::try_into)
.ok_or_else(|| "No node provided in sidechain block".to_string())??;
let instructions = block
.instructions
.map(TryInto::try_into)
.ok_or_else(|| "No InstructionSet provided in sidechain block".to_string())??;
Ok(Self::new(node, instructions))
}
}

impl From<Node> for proto::common::Node {
fn from(node: Node) -> Self {
Self {
Expand All @@ -252,3 +268,16 @@ impl From<Node> for proto::common::Node {
}
}
}

impl TryFrom<proto::common::Node> for Node {
type Error = String;

fn try_from(node: proto::common::Node) -> Result<Self, Self::Error> {
let hash = TreeNodeHash::try_from(node.hash).map_err(|err| err.to_string())?;
let parent = TreeNodeHash::try_from(node.parent).map_err(|err| err.to_string())?;
let height = node.height;
let is_committed = node.is_committed;

Ok(Self::new(hash, parent, height, is_committed))
}
}
7 changes: 5 additions & 2 deletions applications/tari_validator_node/src/p2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

mod service_impl;

#[cfg(test)]
mod test;

pub use service_impl::ValidatorNodeRpcServiceImpl;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_comms_rpc_macros::tari_rpc;
Expand Down Expand Up @@ -55,8 +58,8 @@ pub trait ValidatorNodeRpcService: Send + Sync + 'static {
#[rpc(method = 4)]
async fn get_sidechain_blocks(
&self,
request: Request<proto::GetBlocksRequest>,
) -> Result<Streaming<proto::GetBlocksResponse>, RpcStatus>;
request: Request<proto::GetSidechainBlocksRequest>,
) -> Result<Streaming<proto::GetSidechainBlocksResponse>, RpcStatus>;
}

pub fn create_validator_node_rpc_service<
Expand Down
14 changes: 7 additions & 7 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::convert::TryFrom;

// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that
Expand All @@ -22,6 +20,8 @@ use std::convert::TryFrom;
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use std::convert::TryFrom;

use log::*;
use tari_common_types::types::PublicKey;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
Expand Down Expand Up @@ -140,8 +140,8 @@ where

async fn get_sidechain_blocks(
&self,
request: Request<proto::GetBlocksRequest>,
) -> Result<Streaming<proto::GetBlocksResponse>, RpcStatus> {
request: Request<proto::GetSidechainBlocksRequest>,
) -> Result<Streaming<proto::GetSidechainBlocksResponse>, RpcStatus> {
let msg = request.into_message();

let asset_public_key = PublicKey::from_bytes(&msg.asset_public_key)
Expand Down Expand Up @@ -172,7 +172,7 @@ where
.transpose()
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

if !end_block_exists.unwrap_or(false) {
if !end_block_exists.unwrap_or(true) {
return Err(RpcStatus::not_found(format!(
"Block not found with end_hash '{}'",
end_hash.unwrap_or_else(TreeNodeHash::zero)
Expand All @@ -184,7 +184,7 @@ where
task::spawn(async move {
let mut current_block_hash = *start_block.node().hash();
if tx
.send(Ok(proto::GetBlocksResponse {
.send(Ok(proto::GetSidechainBlocksResponse {
block: Some(start_block.into()),
}))
.await
Expand All @@ -197,7 +197,7 @@ where
Ok(Some(block)) => {
current_block_hash = *block.node().hash();
if tx
.send(Ok(proto::GetBlocksResponse {
.send(Ok(proto::GetSidechainBlocksResponse {
block: Some(block.into()),
}))
.await
Expand Down
132 changes: 132 additions & 0 deletions applications/tari_validator_node/src/p2p/rpc/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2022, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::convert::TryFrom;

use tari_common_types::types::PublicKey;
use tari_comms::{protocol::rpc::mock::RpcRequestMock, test_utils};
use tari_crypto::tari_utilities::{hex::Hex, ByteArray};
use tari_dan_core::{
fixed_hash::FixedHash,
models::{Node, TreeNodeHash},
services::mocks::{MockAssetProcessor, MockMempoolService},
storage::{chain::ChainDbUnitOfWork, mocks::MockDbFactory, DbFactory},
};
use tari_test_utils::{paths::tempdir, streams::convert_mpsc_to_stream};
use tokio_stream::StreamExt;

use crate::p2p::{
proto,
rpc::{ValidatorNodeRpcService, ValidatorNodeRpcServiceImpl},
};

fn setup() -> (
ValidatorNodeRpcServiceImpl<MockMempoolService, MockDbFactory, MockAssetProcessor>,
RpcRequestMock,
MockDbFactory,
) {
let tmp = tempdir().unwrap();
let peer_manager = test_utils::build_peer_manager(&tmp);
let mock = RpcRequestMock::new(peer_manager);
let mempool = MockMempoolService;
let db_factory = MockDbFactory::default();
let asset_processor = MockAssetProcessor;
let service = ValidatorNodeRpcServiceImpl::new(mempool, db_factory.clone(), asset_processor);

(service, mock, db_factory)
}

mod get_sidechain_blocks {

use super::*;

#[tokio::test]
async fn it_fetches_matching_block() {
let (service, mock, db_factory) = setup();
let asset_public_key = PublicKey::default();
let db = db_factory.get_or_create_chain_db(&asset_public_key).unwrap();
let mut uow = db.new_unit_of_work();

// Some random parent hash to ensure stream does not last forever
let parent =
TreeNodeHash::from_hex("972209d3622c1227a499fd2cfcfa75fdde547d1a21fa805522d3a1a315ebd1a3").unwrap();
uow.add_node(TreeNodeHash::zero(), parent, 1).unwrap();
uow.commit().unwrap();

let req = proto::validator_node::GetSidechainBlocksRequest {
asset_public_key: asset_public_key.to_vec(),
start_hash: TreeNodeHash::zero().as_bytes().to_vec(),
end_hash: vec![],
};
let req = mock.request_with_context(Default::default(), req);
let mut resp = service.get_sidechain_blocks(req).await.unwrap().into_inner();
let stream = convert_mpsc_to_stream(&mut resp).map(|r| r.unwrap());

let responses = stream
.collect::<Vec<proto::validator_node::GetSidechainBlocksResponse>>()
.await;
assert_eq!(responses.len(), 1);
let node = Node::new(TreeNodeHash::zero(), parent, 1, false);
let block = responses[0].block.clone();
assert_eq!(
Node::try_from(block.as_ref().unwrap().node.clone().unwrap()).unwrap(),
node
);
assert_eq!(
block.as_ref().unwrap().instructions.clone().unwrap().instructions.len(),
0
);
}

#[tokio::test]
async fn it_errors_if_asset_not_found() {
let (service, mock, _) = setup();

let req = proto::validator_node::GetSidechainBlocksRequest {
asset_public_key: PublicKey::default().to_vec(),
start_hash: FixedHash::zero().as_bytes().to_vec(),
end_hash: vec![],
};
let req = mock.request_with_context(Default::default(), req);
let err = service.get_sidechain_blocks(req).await.unwrap_err();
assert!(err.as_status_code().is_not_found());
assert_eq!(err.details(), "Asset not found");
}

#[tokio::test]
async fn it_errors_if_block_not_found() {
let (service, mock, db_factory) = setup();
let asset_public_key = PublicKey::default();
let db = db_factory.get_or_create_chain_db(&asset_public_key).unwrap();
db.new_unit_of_work().commit().unwrap();

let req = proto::validator_node::GetSidechainBlocksRequest {
asset_public_key: asset_public_key.to_vec(),
start_hash: FixedHash::zero().as_bytes().to_vec(),
end_hash: vec![],
};
let req = mock.request_with_context(Default::default(), req);
let err = service.get_sidechain_blocks(req).await.unwrap_err();
assert!(err.as_status_code().is_not_found());
assert!(err.details().starts_with("Block not found"));
}
}
39 changes: 38 additions & 1 deletion applications/tari_validator_node/src/p2p/services/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::convert::TryInto;

use async_trait::async_trait;
use log::*;
use tari_common_types::types::PublicKey;
Expand All @@ -32,10 +34,11 @@ use tari_comms::{
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::TemplateId,
models::{SideChainBlock, TemplateId, TreeNodeHash},
services::{ValidatorNodeClientFactory, ValidatorNodeRpcClient},
DigitalAssetError,
};
use tokio_stream::StreamExt;

use crate::p2p::{proto::validator_node as proto, rpc};

Expand Down Expand Up @@ -147,6 +150,40 @@ impl ValidatorNodeRpcClient for TariCommsValidatorNodeRpcClient {
Ok(Some(response.result))
}
}

async fn get_sidechain_blocks(
&mut self,
asset_public_key: &PublicKey,
start_hash: TreeNodeHash,
end_hash: Option<TreeNodeHash>,
) -> Result<Vec<SideChainBlock>, DigitalAssetError> {
let mut connection = self.create_connection().await?;
let mut client = connection.connect_rpc::<rpc::ValidatorNodeRpcClient>().await?;
let request = proto::GetSidechainBlocksRequest {
asset_public_key: asset_public_key.to_vec(),
start_hash: start_hash.as_bytes().to_vec(),
end_hash: end_hash.map(|h| h.as_bytes().to_vec()).unwrap_or_default(),
};

let stream = client.get_sidechain_blocks(request).await?;
// TODO: By first collecting all the blocks, we lose the advantage of streaming. Since you cannot return
// `Result<impl Stream<..>, _>`, and the Map type is private in tokio-stream, its a little tricky to
// return the stream and not leak the RPC response type out of the client
let blocks = stream
.map(|result| {
let resp = result.map_err(DigitalAssetError::from)?;
let block: SideChainBlock = resp
.block
.ok_or_else(|| DigitalAssetError::ConversionError("Node returned empty block".to_string()))?
.try_into()
.map_err(DigitalAssetError::ConversionError)?;
Ok(block)
})
.collect::<Result<_, DigitalAssetError>>()
.await?;

Ok(blocks)
}
}

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/tests/listener_dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
peer_manager::PeerFeatures,
protocol::ProtocolId,
runtime,
test_utils::{node_identity::build_node_identity, test_node::build_peer_manager},
test_utils::{build_peer_manager, node_identity::build_node_identity},
transports::MemoryTransport,
};

Expand Down
3 changes: 2 additions & 1 deletion comms/src/connection_manager/tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ use crate::{
runtime,
runtime::task,
test_utils::{
build_peer_manager,
count_string_occurrences,
node_identity::{build_node_identity, ordered_node_identities},
test_node::{build_connection_manager, build_peer_manager, TestNodeConfig},
test_node::{build_connection_manager, TestNodeConfig},
},
transports::{MemoryTransport, TcpTransport},
};
Expand Down
Loading

0 comments on commit be5b8aa

Please sign in to comment.