Skip to content

Commit

Permalink
feat(validator-node): committee proposes genesis block w/ instructions (
Browse files Browse the repository at this point in the history
#3844)

Description
---
- committee proposes genesis block/node with initial instructions as per template id
- state db is aware of which asset it is processing
- minor cleanup of some generics
- error handling

Motivation and Context
---
The committee can propose a genesis block with init instructions and agree on a genesis block and genesis merkle root

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Feb 18, 2022
1 parent a49b1af commit 68a9f76
Show file tree
Hide file tree
Showing 37 changed files with 745 additions and 645 deletions.
38 changes: 31 additions & 7 deletions applications/tari_collectibles/web-app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 26 additions & 24 deletions applications/tari_collectibles/web-app/src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,23 @@ const AccountsMenu = (props) => {
const [accounts, setAccounts] = useState([]);
const [error, setError] = useState("");

useEffect(() => {
async function inner() {
console.log("refreshing accounts");
useEffect(async () => {
console.log("refreshing accounts");
setError("");
await binding
.command_asset_wallets_list()
.then((accounts) => {
console.log("accounts", accounts);
setAccounts(accounts);
})
.catch((e) => {
// todo error handling
console.error("accounts_list error:", e);
setError(e.message);
});

await listen("asset_wallets::updated", (event) => {
console.log("accounts have changed");
setError("");
binding
.command_asset_wallets_list()
Expand All @@ -137,24 +151,7 @@ const AccountsMenu = (props) => {
console.error("accounts_list error:", e);
setError(e.message);
});

await listen("asset_wallets::updated", (event) => {
console.log("accounts have changed");
setError("");
binding
.command_asset_wallets_list()
.then((accounts) => {
console.log("accounts", accounts);
setAccounts(accounts);
})
.catch((e) => {
// todo error handling
console.error("accounts_list error:", e);
setError(e.message);
});
});
}
inner();
});
}, [props.walletId]);

// todo: hide accounts when not authenticated
Expand Down Expand Up @@ -211,7 +208,8 @@ ProtectedRoute.propTypes = {
};

function App() {
const [loading, setLoading] = useState(true);
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
const [authenticated, setAuthenticated] = useState(false);
const [walletId, setWalletId] = useState("");
const setPassword = useState("")[1];
Expand All @@ -223,9 +221,13 @@ function App() {
binding
.command_create_db()
.then((r) => setLoading(false))
.catch((e) => console.error(e));
.catch((e) => {
setLoading(false);
setError(e);
});
}, []);
if (loading) return <Spinner />;
if (error) return <Alert severity="error">{error.toString()}</Alert>;

return (
<div className="App">
Expand All @@ -246,7 +248,7 @@ function App() {
to="/dashboard"
icon={<DashboardIcon />}
/>
<Divider></Divider>
<Divider />
<AccountsMenu walletId={walletId} />
<ListSubheader>Issued Assets</ListSubheader>
<ListItemLink
Expand Down
12 changes: 6 additions & 6 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl DanNode {
if !allow_list.contains(&asset.public_key.to_hex()) {
debug!(
target: LOG_TARGET,
"Asset '{}' is not whitelisted for processing ", asset.public_key
"Asset '{}' is not allowlisted for processing ", asset.public_key
);
continue;
}
Expand Down Expand Up @@ -162,7 +162,7 @@ impl DanNode {
// todo!()
// }

async fn start_asset_worker(
pub async fn start_asset_worker(
asset_definition: AssetDefinition,
node_identity: NodeIdentity,
mempool_service: MempoolServiceHandle,
Expand Down Expand Up @@ -234,10 +234,10 @@ impl DanNode {
validator_node_client_factory,
);

consensus_worker
.run(shutdown.clone(), None)
.await
.map_err(|err| ExitError::new(ExitCode::ConfigError, err))?;
if let Err(err) = consensus_worker.run(shutdown.clone(), None).await {
error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err);
return Err(ExitError::new(ExitCode::UnknownError, err));
}

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tari_common_types::types::PublicKey;
use tari_comms::NodeIdentity;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::Instruction,
services::{AssetProcessor, AssetProxy, ServiceSpecification},
storage::DbFactory,
};
Expand Down Expand Up @@ -143,10 +144,11 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
.get_state_db(&asset_public_key)
.map_err(|e| Status::internal(format!("Could not create state db: {}", e)))?
{
let mut state_db_reader = state.reader();
let state_db_reader = state.reader();
let instruction = Instruction::new(template_id, request.method, request.args);
let response_bytes = self
.asset_processor
.invoke_read_method(template_id, request.method, &request.args, &mut state_db_reader)
.invoke_read_method(&instruction, &state_db_reader)
.map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?;
Ok(Response::new(rpc::InvokeReadMethodResponse {
result: response_bytes.unwrap_or_default(),
Expand Down
27 changes: 17 additions & 10 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use std::convert::TryFrom;
use std::convert::{TryFrom, TryInto};

use log::*;
use tari_common_types::types::PublicKey;
Expand All @@ -30,7 +30,7 @@ use tari_comms::{
};
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{Instruction, TemplateId, TreeNodeHash},
models::{Instruction, TreeNodeHash},
services::{AssetProcessor, MempoolService},
storage::{state::StateDbUnitOfWorkReader, DbFactory},
};
Expand Down Expand Up @@ -92,15 +92,19 @@ where
.map_err(|e| RpcStatus::general(format!("Could not create state db: {}", e)))?
.ok_or_else(|| RpcStatus::not_found("This node does not process this asset".to_string()))?;

let mut unit_of_work = state.reader();
let unit_of_work = state.reader();

let instruction = Instruction::new(
request
.template_id
.try_into()
.map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request.method,
request.args,
);
let response_bytes = self
.asset_processor
.invoke_read_method(
TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request.method,
&request.args,
&mut unit_of_work,
)
.invoke_read_method(&instruction, &unit_of_work)
.map_err(|e| RpcStatus::general(format!("Could not invoke read method: {}", e)))?;

Ok(Response::new(proto::InvokeReadMethodResponse {
Expand All @@ -115,7 +119,10 @@ where
dbg!(&request);
let request = request.into_message();
let instruction = Instruction::new(
TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request
.template_id
.try_into()
.map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request.method.clone(),
request.args.clone(),
/* TokenId(request.token_id.clone()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,29 @@ impl TariCommsInboundConnectionService {
} => {
// Check for already received messages
let mut indexes_to_remove = vec![];
let now = Instant::now();
let mut result_message = None;
for (index, message) in self.buffered_messages.iter().enumerate() {
if now - message.2 > self.expiry_time {
warn!(target: LOG_TARGET, "Message has expired: {:?}", message);
for (index, (from_pk, message, msg_time)) in self.buffered_messages.iter().enumerate() {
if msg_time.elapsed() > self.expiry_time {
warn!(
target: LOG_TARGET,
"Message has expired: ({:.2?}) {:?}",
msg_time.elapsed(),
message
);
indexes_to_remove.push(index);
} else {
match wait_for_type {
WaitForMessageType::Message => {
if message.1.message_type() == message_type && message.1.view_number() == view_number {
result_message = Some((message.0.clone(), message.1.clone()));
if message.message_type() == message_type && message.view_number() == view_number {
result_message = Some((from_pk.clone(), message.clone()));
indexes_to_remove.push(index);
break;
}
},
WaitForMessageType::QuorumCertificate => {
if let Some(qc) = message.1.justify() {
if let Some(qc) = message.justify() {
if qc.message_type() == message_type && qc.view_number() == view_number {
result_message = Some((message.0.clone(), message.1.clone()));
result_message = Some((from_pk.clone(), message.clone()));
indexes_to_remove.push(index);
break;
}
Expand Down Expand Up @@ -256,10 +260,10 @@ impl TariCommsInboundConnectionService {
"Found waiter for this message, waking task... {:?}",
message.message_type()
);
if let Some(w) = self.waiters.swap_remove_back(index) {
if let Some((_, _, _, reply)) = self.waiters.swap_remove_back(index) {
// The receiver on the other end of this channel may have dropped naturally
// as it moves out of scope and is not longer interested in receiving the message
if w.3.send((from.clone(), message.clone())).is_ok() {
if reply.send((from.clone(), message.clone())).is_ok() {
return Ok(());
}
}
Expand Down Expand Up @@ -291,7 +295,10 @@ impl TariCommsInboundReceiverHandle {
}

#[async_trait]
impl InboundConnectionService<CommsPublicKey, TariDanPayload> for TariCommsInboundReceiverHandle {
impl InboundConnectionService for TariCommsInboundReceiverHandle {
type Addr = CommsPublicKey;
type Payload = TariDanPayload;

async fn wait_for_message(
&self,
message_type: HotStuffMessageType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use tokio::sync::mpsc::Sender;

use crate::p2p::proto;

const LOG_TARGET: &str = "tari::validator_node::messages::outbound::validator_node";

pub struct TariCommsOutboundService<TPayload: Payload> {
outbound_message_requester: OutboundMessageRequester,
loopback_service: Sender<(CommsPublicKey, HotStuffMessage<TPayload>)>,
Expand All @@ -61,16 +63,25 @@ impl<TPayload: Payload> TariCommsOutboundService<TPayload> {
}

#[async_trait]
impl OutboundService<CommsPublicKey, TariDanPayload> for TariCommsOutboundService<TariDanPayload> {
impl OutboundService for TariCommsOutboundService<TariDanPayload> {
type Addr = CommsPublicKey;
type Payload = TariDanPayload;

async fn send(
&mut self,
from: CommsPublicKey,
to: CommsPublicKey,
message: HotStuffMessage<TariDanPayload>,
) -> Result<(), DigitalAssetError> {
debug!(target: "messages::outbound::validator_node", "Outbound message to be sent:{} {:?}", to, message);
debug!(target: LOG_TARGET, "Outbound message to be sent:{} {:?}", to, message);
// Tari comms does allow sending to itself
if from == to && message.asset_public_key() == &self.asset_public_key {
debug!(
target: LOG_TARGET,
"Sending {:?} to self for asset {}",
message.message_type(),
message.asset_public_key()
);
self.loopback_service.send((from, message)).await.unwrap();
return Ok(());
}
Expand Down
9 changes: 9 additions & 0 deletions common/src/configuration/validator_node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ pub struct ValidatorNodeConfig {
pub base_node_grpc_address: SocketAddr,
#[serde(default = "default_wallet_grpc_address")]
pub wallet_grpc_address: SocketAddr,
#[serde(default = "default_true")]
pub scan_for_assets: bool,
#[serde(default = "default_asset_scanning_interval")]
pub new_asset_scanning_interval: u64,
pub assets_allow_list: Option<Vec<String>>,
}

fn default_true() -> bool {
true
}
fn default_asset_scanning_interval() -> u64 {
10
}

fn default_asset_config_directory() -> PathBuf {
PathBuf::from("assets")
}
Expand Down
Loading

0 comments on commit 68a9f76

Please sign in to comment.