Skip to content

Commit

Permalink
Merge branch 'main' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbinth authored Sep 13, 2024
2 parents 2b24ef7 + 2cdabad commit 1f7cf23
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 159 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
- Optimized state synchronizations by removing unnecessary fetching and parsing of note details (#462).
- [BREAKING] Changed `GetAccountDetailsResponse` field to `details` (#481).

## 0.5.1 (2024-09-12)

### Enhancements

- Node component server startup is now coherent instead of requiring an arbitrary sleep amount (#488).

## 0.5.0 (2024-08-27)

### Enhancements
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ miden-node-utils = { workspace = true }
miden-objects = { workspace = true }
rand_chacha = "0.3"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.38", features = ["rt-multi-thread", "net", "macros"] }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "macros"] }
toml = { version = "0.8" }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
52 changes: 14 additions & 38 deletions bin/node/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::time::Duration;

use anyhow::{anyhow, Result};
use miden_node_block_producer::{config::BlockProducerConfig, server as block_producer_server};
use miden_node_rpc::{config::RpcConfig, server as rpc_server};
use miden_node_store::{config::StoreConfig, server as store_server};
use anyhow::{Context, Result};
use miden_node_block_producer::server::BlockProducer;
use miden_node_rpc::server::Rpc;
use miden_node_store::server::Store;
use tokio::task::JoinSet;

use crate::config::NodeConfig;
Expand All @@ -16,16 +14,18 @@ pub async fn start_node(config: NodeConfig) -> Result<()> {

let mut join_set = JoinSet::new();

// Start store
join_set.spawn(start_store(store));
// Start store. The store endpoint is available after loading completes.
let store = Store::init(store).await.context("Loading store")?;
join_set.spawn(async move { store.serve().await.context("Serving store") });

// Wait for store to start & start block-producer
tokio::time::sleep(Duration::from_secs(1)).await;
join_set.spawn(start_block_producer(block_producer));
// Start block-producer. The block-producer's endpoint is available after loading completes.
let block_producer =
BlockProducer::init(block_producer).await.context("Loading block-producer")?;
join_set.spawn(async move { block_producer.serve().await.context("Serving block-producer") });

// Wait for block-producer to start & start rpc
tokio::time::sleep(Duration::from_secs(1)).await;
join_set.spawn(start_rpc(rpc));
// Start RPC component.
let rpc = Rpc::init(rpc).await.context("Loading RPC")?;
join_set.spawn(async move { rpc.serve().await.context("Serving RPC") });

// block on all tasks
while let Some(res) = join_set.join_next().await {
Expand All @@ -35,27 +35,3 @@ pub async fn start_node(config: NodeConfig) -> Result<()> {

Ok(())
}

pub async fn start_block_producer(config: BlockProducerConfig) -> Result<()> {
block_producer_server::serve(config)
.await
.map_err(|err| anyhow!("Failed to serve block-producer: {}", err))?;

Ok(())
}

pub async fn start_rpc(config: RpcConfig) -> Result<()> {
rpc_server::serve(config)
.await
.map_err(|err| anyhow!("Failed to serve rpc: {}", err))?;

Ok(())
}

pub async fn start_store(config: StoreConfig) -> Result<()> {
store_server::serve(config)
.await
.map_err(|err| anyhow!("Failed to serve store: {}", err))?;

Ok(())
}
29 changes: 22 additions & 7 deletions bin/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::path::PathBuf;

use anyhow::{anyhow, Context};
use clap::{Parser, Subcommand};
use commands::{
init::init_config_files,
start::{start_block_producer, start_node, start_rpc, start_store},
};
use commands::{init::init_config_files, start::start_node};
use miden_node_block_producer::server::BlockProducer;
use miden_node_rpc::server::Rpc;
use miden_node_store::server::Store;
use miden_node_utils::config::load_config;

mod commands;
Expand Down Expand Up @@ -94,15 +94,30 @@ async fn main() -> anyhow::Result<()> {
},
StartCommand::BlockProducer => {
let config = load_config(config).context("Loading configuration file")?;
start_block_producer(config).await
BlockProducer::init(config)
.await
.context("Loading block-producer")?
.serve()
.await
.context("Serving block-producer")
},
StartCommand::Rpc => {
let config = load_config(config).context("Loading configuration file")?;
start_rpc(config).await
Rpc::init(config)
.await
.context("Loading RPC")?
.serve()
.await
.context("Serving RPC")
},
StartCommand::Store => {
let config = load_config(config).context("Loading configuration file")?;
start_store(config).await
Store::init(config)
.await
.context("Loading store")?
.serve()
.await
.context("Serving store")
},
},
Command::MakeGenesis { output_path, force, inputs_path } => {
Expand Down
5 changes: 3 additions & 2 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ miden-stdlib = { workspace = true }
miden-tx = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
thiserror = { workspace = true }
tokio = { version = "1.38", features = ["rt-multi-thread", "net", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "macros", "sync", "time"] }
tokio-stream = { workspace = true, features = ["net"] }
toml = { version = "0.8" }
tonic = { workspace = true }
tracing = { workspace = true }
Expand All @@ -40,5 +41,5 @@ miden-node-test-macro = { path = "../test-macro" }
miden-objects = { workspace = true, features = ["testing"] }
miden-tx = { workspace = true, features = ["testing"] }
rand_chacha = { version = "0.3", default-features = false }
tokio = { version = "1.38", features = ["test-util"] }
tokio = { workspace = true, features = ["test-util"] }
winterfell = { version = "0.9" }
135 changes: 82 additions & 53 deletions crates/block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{net::ToSocketAddrs, sync::Arc};

use miden_node_proto::generated::{block_producer::api_server, store::api_client as store_client};
use miden_node_utils::errors::ApiError;
use tonic::transport::Server;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tracing::info;

use crate::{
Expand All @@ -18,59 +19,87 @@ use crate::{

pub mod api;

// BLOCK PRODUCER INITIALIZER
// ================================================================================================
type Api = api::BlockProducerApi<
DefaultBatchBuilder<
DefaultStore,
DefaultBlockBuilder<DefaultStore, DefaultStateView<DefaultStore>>,
>,
DefaultStateView<DefaultStore>,
>;

pub async fn serve(config: BlockProducerConfig) -> Result<(), ApiError> {
info!(target: COMPONENT, %config, "Initializing server");
/// Represents an initialized block-producer component where the RPC connection is open,
/// but not yet actively responding to requests. Separating the connection binding
/// from the server spawning allows the caller to connect other components to the
/// store without resorting to sleeps or other mechanisms to spawn dependent components.
pub struct BlockProducer {
api_service: api_server::ApiServer<Api>,
listener: TcpListener,
}

impl BlockProducer {
/// Performs all expensive initialization tasks, and notably begins listening on the rpc
/// endpoint without serving the API yet. Incoming requests will be queued until
/// [`serve`](Self::serve) is called.
pub async fn init(config: BlockProducerConfig) -> Result<Self, ApiError> {
info!(target: COMPONENT, %config, "Initializing server");

let store = Arc::new(DefaultStore::new(
store_client::ApiClient::connect(config.store_url.to_string())
.await
.map_err(|err| ApiError::DatabaseConnectionFailed(err.to_string()))?,
));
let state_view =
Arc::new(DefaultStateView::new(Arc::clone(&store), config.verify_tx_proofs));

let block_builder = DefaultBlockBuilder::new(Arc::clone(&store), Arc::clone(&state_view));
let batch_builder_options = DefaultBatchBuilderOptions {
block_frequency: SERVER_BLOCK_FREQUENCY,
max_batches_per_block: SERVER_MAX_BATCHES_PER_BLOCK,
};
let batch_builder = Arc::new(DefaultBatchBuilder::new(
Arc::clone(&store),
Arc::new(block_builder),
batch_builder_options,
));

let transaction_queue_options = TransactionQueueOptions {
build_batch_frequency: SERVER_BUILD_BATCH_FREQUENCY,
batch_size: SERVER_BATCH_SIZE,
};
let queue = Arc::new(TransactionQueue::new(
state_view,
Arc::clone(&batch_builder),
transaction_queue_options,
));

let api_service =
api_server::ApiServer::new(api::BlockProducerApi::new(Arc::clone(&queue)));

tokio::spawn(async move { queue.run().await });
tokio::spawn(async move { batch_builder.run().await });

let addr = config
.endpoint
.to_socket_addrs()
.map_err(ApiError::EndpointToSocketFailed)?
.next()
.ok_or_else(|| ApiError::AddressResolutionFailed(config.endpoint.to_string()))?;

let listener = TcpListener::bind(addr).await?;

info!(target: COMPONENT, "Server initialized");

Ok(Self { api_service, listener })
}

let store = Arc::new(DefaultStore::new(
store_client::ApiClient::connect(config.store_url.to_string())
/// Serves the block-producers's RPC API.
///
/// Note: this blocks until the server dies.
pub async fn serve(self) -> Result<(), ApiError> {
tonic::transport::Server::builder()
.add_service(self.api_service)
.serve_with_incoming(TcpListenerStream::new(self.listener))
.await
.map_err(|err| ApiError::DatabaseConnectionFailed(err.to_string()))?,
));
let state_view = Arc::new(DefaultStateView::new(Arc::clone(&store), config.verify_tx_proofs));

let block_builder = DefaultBlockBuilder::new(Arc::clone(&store), Arc::clone(&state_view));
let batch_builder_options = DefaultBatchBuilderOptions {
block_frequency: SERVER_BLOCK_FREQUENCY,
max_batches_per_block: SERVER_MAX_BATCHES_PER_BLOCK,
};
let batch_builder = Arc::new(DefaultBatchBuilder::new(
Arc::clone(&store),
Arc::new(block_builder),
batch_builder_options,
));

let transaction_queue_options = TransactionQueueOptions {
build_batch_frequency: SERVER_BUILD_BATCH_FREQUENCY,
batch_size: SERVER_BATCH_SIZE,
};
let queue = Arc::new(TransactionQueue::new(
state_view,
Arc::clone(&batch_builder),
transaction_queue_options,
));

let block_producer = api_server::ApiServer::new(api::BlockProducerApi::new(Arc::clone(&queue)));

tokio::spawn(async move { queue.run().await });
tokio::spawn(async move { batch_builder.run().await });

info!(target: COMPONENT, "Server initialized");

let addr = config
.endpoint
.to_socket_addrs()
.map_err(ApiError::EndpointToSocketFailed)?
.next()
.ok_or_else(|| ApiError::AddressResolutionFailed(config.endpoint.to_string()))?;

Server::builder()
.add_service(block_producer)
.serve(addr)
.await
.map_err(ApiError::ApiServeFailed)?;

Ok(())
.map_err(ApiError::ApiServeFailed)
}
}
3 changes: 2 additions & 1 deletion crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ miden-objects = { workspace = true }
miden-tx = { workspace = true }
prost = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.38", features = ["rt-multi-thread", "net", "macros"] }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "macros"] }
tokio-stream = { workspace = true, features = ["net"] }
toml = { version = "0.8" }
tonic = { workspace = true }
tonic-web = { version = "0.12" }
Expand Down
Loading

0 comments on commit 1f7cf23

Please sign in to comment.