Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(validator-node): committee proposes genesis block w/ instructions #3844

Merged
merged 2 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions applications/tari_collectibles/web-app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 26 additions & 24 deletions applications/tari_collectibles/web-app/src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,23 @@ const AccountsMenu = (props) => {
const [accounts, setAccounts] = useState([]);
const [error, setError] = useState("");

useEffect(() => {
async function inner() {
console.log("refreshing accounts");
useEffect(async () => {
console.log("refreshing accounts");
setError("");
await binding
.command_asset_wallets_list()
.then((accounts) => {
console.log("accounts", accounts);
setAccounts(accounts);
})
.catch((e) => {
// todo error handling
console.error("accounts_list error:", e);
setError(e.message);
});

await listen("asset_wallets::updated", (event) => {
console.log("accounts have changed");
setError("");
binding
.command_asset_wallets_list()
Expand All @@ -137,24 +151,7 @@ const AccountsMenu = (props) => {
console.error("accounts_list error:", e);
setError(e.message);
});

await listen("asset_wallets::updated", (event) => {
console.log("accounts have changed");
setError("");
binding
.command_asset_wallets_list()
.then((accounts) => {
console.log("accounts", accounts);
setAccounts(accounts);
})
.catch((e) => {
// todo error handling
console.error("accounts_list error:", e);
setError(e.message);
});
});
}
inner();
});
}, [props.walletId]);

// todo: hide accounts when not authenticated
Expand Down Expand Up @@ -211,7 +208,8 @@ ProtectedRoute.propTypes = {
};

function App() {
const [loading, setLoading] = useState(true);
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
const [authenticated, setAuthenticated] = useState(false);
const [walletId, setWalletId] = useState("");
const setPassword = useState("")[1];
Expand All @@ -223,9 +221,13 @@ function App() {
binding
.command_create_db()
.then((r) => setLoading(false))
.catch((e) => console.error(e));
.catch((e) => {
setLoading(false);
setError(e);
});
}, []);
if (loading) return <Spinner />;
if (error) return <Alert severity="error">{error.toString()}</Alert>;

return (
<div className="App">
Expand All @@ -246,7 +248,7 @@ function App() {
to="/dashboard"
icon={<DashboardIcon />}
/>
<Divider></Divider>
<Divider />
<AccountsMenu walletId={walletId} />
<ListSubheader>Issued Assets</ListSubheader>
<ListItemLink
Expand Down
12 changes: 6 additions & 6 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl DanNode {
if !allow_list.contains(&asset.public_key.to_hex()) {
debug!(
target: LOG_TARGET,
"Asset '{}' is not whitelisted for processing ", asset.public_key
"Asset '{}' is not allowlisted for processing ", asset.public_key
);
continue;
}
Expand Down Expand Up @@ -162,7 +162,7 @@ impl DanNode {
// todo!()
// }

async fn start_asset_worker(
pub async fn start_asset_worker(
asset_definition: AssetDefinition,
node_identity: NodeIdentity,
mempool_service: MempoolServiceHandle,
Expand Down Expand Up @@ -234,10 +234,10 @@ impl DanNode {
validator_node_client_factory,
);

consensus_worker
.run(shutdown.clone(), None)
.await
.map_err(|err| ExitError::new(ExitCode::ConfigError, err))?;
if let Err(err) = consensus_worker.run(shutdown.clone(), None).await {
error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err);
return Err(ExitError::new(ExitCode::UnknownError, err));
}

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tari_common_types::types::PublicKey;
use tari_comms::NodeIdentity;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::Instruction,
services::{AssetProcessor, AssetProxy, ServiceSpecification},
storage::DbFactory,
};
Expand Down Expand Up @@ -143,10 +144,11 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
.get_state_db(&asset_public_key)
.map_err(|e| Status::internal(format!("Could not create state db: {}", e)))?
{
let mut state_db_reader = state.reader();
let state_db_reader = state.reader();
let instruction = Instruction::new(template_id, request.method, request.args);
let response_bytes = self
.asset_processor
.invoke_read_method(template_id, request.method, &request.args, &mut state_db_reader)
.invoke_read_method(&instruction, &state_db_reader)
.map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?;
Ok(Response::new(rpc::InvokeReadMethodResponse {
result: response_bytes.unwrap_or_default(),
Expand Down
27 changes: 17 additions & 10 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use std::convert::TryFrom;
use std::convert::{TryFrom, TryInto};

use log::*;
use tari_common_types::types::PublicKey;
Expand All @@ -30,7 +30,7 @@ use tari_comms::{
};
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{Instruction, TemplateId, TreeNodeHash},
models::{Instruction, TreeNodeHash},
services::{AssetProcessor, MempoolService},
storage::{state::StateDbUnitOfWorkReader, DbFactory},
};
Expand Down Expand Up @@ -92,15 +92,19 @@ where
.map_err(|e| RpcStatus::general(format!("Could not create state db: {}", e)))?
.ok_or_else(|| RpcStatus::not_found("This node does not process this asset".to_string()))?;

let mut unit_of_work = state.reader();
let unit_of_work = state.reader();

let instruction = Instruction::new(
request
.template_id
.try_into()
.map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request.method,
request.args,
);
let response_bytes = self
.asset_processor
.invoke_read_method(
TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request.method,
&request.args,
&mut unit_of_work,
)
.invoke_read_method(&instruction, &unit_of_work)
.map_err(|e| RpcStatus::general(format!("Could not invoke read method: {}", e)))?;

Ok(Response::new(proto::InvokeReadMethodResponse {
Expand All @@ -115,7 +119,10 @@ where
dbg!(&request);
let request = request.into_message();
let instruction = Instruction::new(
TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request
.template_id
.try_into()
.map_err(|_| RpcStatus::bad_request("Invalid template_id"))?,
request.method.clone(),
request.args.clone(),
/* TokenId(request.token_id.clone()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,29 @@ impl TariCommsInboundConnectionService {
} => {
// Check for already received messages
let mut indexes_to_remove = vec![];
let now = Instant::now();
let mut result_message = None;
for (index, message) in self.buffered_messages.iter().enumerate() {
if now - message.2 > self.expiry_time {
warn!(target: LOG_TARGET, "Message has expired: {:?}", message);
for (index, (from_pk, message, msg_time)) in self.buffered_messages.iter().enumerate() {
if msg_time.elapsed() > self.expiry_time {
warn!(
target: LOG_TARGET,
"Message has expired: ({:.2?}) {:?}",
msg_time.elapsed(),
message
);
indexes_to_remove.push(index);
} else {
match wait_for_type {
WaitForMessageType::Message => {
if message.1.message_type() == message_type && message.1.view_number() == view_number {
result_message = Some((message.0.clone(), message.1.clone()));
if message.message_type() == message_type && message.view_number() == view_number {
result_message = Some((from_pk.clone(), message.clone()));
indexes_to_remove.push(index);
break;
}
},
WaitForMessageType::QuorumCertificate => {
if let Some(qc) = message.1.justify() {
if let Some(qc) = message.justify() {
if qc.message_type() == message_type && qc.view_number() == view_number {
result_message = Some((message.0.clone(), message.1.clone()));
result_message = Some((from_pk.clone(), message.clone()));
indexes_to_remove.push(index);
break;
}
Expand Down Expand Up @@ -256,10 +260,10 @@ impl TariCommsInboundConnectionService {
"Found waiter for this message, waking task... {:?}",
message.message_type()
);
if let Some(w) = self.waiters.swap_remove_back(index) {
if let Some((_, _, _, reply)) = self.waiters.swap_remove_back(index) {
// The receiver on the other end of this channel may have dropped naturally
// as it moves out of scope and is not longer interested in receiving the message
if w.3.send((from.clone(), message.clone())).is_ok() {
if reply.send((from.clone(), message.clone())).is_ok() {
return Ok(());
}
}
Expand Down Expand Up @@ -291,7 +295,10 @@ impl TariCommsInboundReceiverHandle {
}

#[async_trait]
impl InboundConnectionService<CommsPublicKey, TariDanPayload> for TariCommsInboundReceiverHandle {
impl InboundConnectionService for TariCommsInboundReceiverHandle {
type Addr = CommsPublicKey;
type Payload = TariDanPayload;

async fn wait_for_message(
&self,
message_type: HotStuffMessageType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use tokio::sync::mpsc::Sender;

use crate::p2p::proto;

const LOG_TARGET: &str = "tari::validator_node::messages::outbound::validator_node";

pub struct TariCommsOutboundService<TPayload: Payload> {
outbound_message_requester: OutboundMessageRequester,
loopback_service: Sender<(CommsPublicKey, HotStuffMessage<TPayload>)>,
Expand All @@ -61,16 +63,25 @@ impl<TPayload: Payload> TariCommsOutboundService<TPayload> {
}

#[async_trait]
impl OutboundService<CommsPublicKey, TariDanPayload> for TariCommsOutboundService<TariDanPayload> {
impl OutboundService for TariCommsOutboundService<TariDanPayload> {
type Addr = CommsPublicKey;
type Payload = TariDanPayload;

async fn send(
&mut self,
from: CommsPublicKey,
to: CommsPublicKey,
message: HotStuffMessage<TariDanPayload>,
) -> Result<(), DigitalAssetError> {
debug!(target: "messages::outbound::validator_node", "Outbound message to be sent:{} {:?}", to, message);
debug!(target: LOG_TARGET, "Outbound message to be sent:{} {:?}", to, message);
// Tari comms does allow sending to itself
if from == to && message.asset_public_key() == &self.asset_public_key {
debug!(
target: LOG_TARGET,
"Sending {:?} to self for asset {}",
message.message_type(),
message.asset_public_key()
);
self.loopback_service.send((from, message)).await.unwrap();
return Ok(());
}
Expand Down
9 changes: 9 additions & 0 deletions common/src/configuration/validator_node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ pub struct ValidatorNodeConfig {
pub base_node_grpc_address: SocketAddr,
#[serde(default = "default_wallet_grpc_address")]
pub wallet_grpc_address: SocketAddr,
#[serde(default = "default_true")]
pub scan_for_assets: bool,
#[serde(default = "default_asset_scanning_interval")]
pub new_asset_scanning_interval: u64,
pub assets_allow_list: Option<Vec<String>>,
}

fn default_true() -> bool {
true
}
fn default_asset_scanning_interval() -> u64 {
10
}

fn default_asset_config_directory() -> PathBuf {
PathBuf::from("assets")
}
Expand Down
Loading