Skip to content

Commit

Permalink
Log error and restart batch service when it fails (#5391)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored and ebmifa committed Oct 19, 2022
1 parent 46bb854 commit b8d7434
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 51 deletions.
15 changes: 14 additions & 1 deletion crates/sui-core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,20 @@ impl crate::authority::AuthorityState {
Ok(last_batch)
}

pub async fn run_batch_service(
pub async fn run_batch_service(&self, min_batch_size: u64, max_delay: Duration) {
loop {
match self.run_batch_service_once(min_batch_size, max_delay).await {
Ok(()) => error!("Restarting batch service, which exited without error"),
Err(e) => {
error!("Restarting batch service, which failed with error: {e:?}")
}
};
// Sleep before restart to prevent CPU pegging in case of immediate error.
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

pub async fn run_batch_service_once(
&self,
min_batch_size: u64,
max_delay: Duration,
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ impl AuthorityServer {
&self,
min_batch_size: u64,
max_delay: Duration,
) -> SuiResult<JoinHandle<SuiResult<()>>> {
) -> SuiResult<JoinHandle<()>> {
// Start the batching subsystem, and register the handles with the authority.
let state = self.state.clone();
let _batch_join_handle =
let batch_join_handle =
tokio::task::spawn(
async move { state.run_batch_service(min_batch_size, max_delay).await },
);

Ok(_batch_join_handle)
Ok(batch_join_handle)
}

pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
Expand Down
13 changes: 7 additions & 6 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ async fn test_batch_to_checkpointing() {
let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service(1000, Duration::from_millis(500))
.run_batch_service_once(1000, Duration::from_millis(500))
.await
});
// Send transactions out of order
Expand Down Expand Up @@ -982,7 +982,7 @@ async fn test_batch_to_checkpointing_init_crash() {
let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service(1000, Duration::from_millis(500))
.run_batch_service_once(1000, Duration::from_millis(500))
.await
});

Expand Down Expand Up @@ -1740,10 +1740,11 @@ pub async fn checkpoint_tests_setup(
.await;

let inner_state = authority.clone();
let _join =
tokio::task::spawn(
async move { inner_state.run_batch_service(1000, batch_interval).await },
);
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service_once(1000, batch_interval)
.await
});

let checkpoint = authority.checkpoints.clone();
authorities.push(TestAuthority {
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async fn test_batch_manager_happy_path() {
let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service(1000, Duration::from_millis(500))
.run_batch_service_once(1000, Duration::from_millis(500))
.await
});

Expand Down Expand Up @@ -261,7 +261,7 @@ async fn test_batch_manager_out_of_order() {
let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service(1000, Duration::from_millis(500))
.run_batch_service_once(1000, Duration::from_millis(500))
.await
});
// Send transactions out of order
Expand Down Expand Up @@ -333,7 +333,7 @@ async fn test_batch_manager_drop_out_of_order() {
inner_state
// Make sure that a batch will not be formed due to time, but will be formed
// when there are 4 transactions.
.run_batch_service(4, Duration::from_millis(10000))
.run_batch_service_once(4, Duration::from_millis(10000))
.await
});
// Send transactions out of order
Expand Down Expand Up @@ -397,7 +397,7 @@ async fn test_handle_move_order_with_batch() {
let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service(1000, Duration::from_millis(500))
.run_batch_service_once(1000, Duration::from_millis(500))
.await
});
// Send transactions out of order
Expand Down Expand Up @@ -448,7 +448,7 @@ async fn test_batch_store_retrieval() {
let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
.run_batch_service(10, Duration::from_secs(6000))
.run_batch_service_once(10, Duration::from_secs(6000))
.await
});
// Send transactions out of order
Expand Down
34 changes: 0 additions & 34 deletions crates/sui-core/src/unit_tests/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,6 @@ use sui_types::{
use crate::safe_client::SafeClient;
use typed_store::Map;

#[tokio::test]
async fn test_start_stop_batch_subsystem() {
let sender = dbg_addr(1);
let object_id = dbg_object_id(1);
let mut authority_state = init_state_with_object_id(sender, object_id).await;
authority_state
.init_batches_from_database()
.expect("Init batches failed!");

// The following two fields are only needed for shared objects (not by this bench).
let consensus_address = "/ip4/127.0.0.1/tcp/0/http".parse().unwrap();
let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1);

let server = Arc::new(AuthorityServer::new_for_test(
"/ip4/127.0.0.1/tcp/999/http".parse().unwrap(),
Arc::new(authority_state),
consensus_address,
tx_consensus_listener,
));
let join = server
.spawn_batch_subsystem(1000, Duration::from_secs(50))
.await
.expect("Problem launching subsystem.");

// Now drop the server to simulate the authority server ending processing.
server.state.batch_notifier.close();
drop(server);

// This should return immediately.
join.await
.expect("Error stopping subsystem")
.expect("Subsystem crashed?");
}

//This is the most basic example of how to test the server logic
#[tokio::test]
async fn test_simple_request() {
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct SuiNode {
grpc_server: tokio::task::JoinHandle<Result<()>>,
_json_rpc_service: Option<HttpServerHandle>,
_ws_subscription_service: Option<WsServerHandle>,
_batch_subsystem_handle: tokio::task::JoinHandle<Result<()>>,
_batch_subsystem_handle: tokio::task::JoinHandle<()>,
_post_processing_subsystem_handle: Option<tokio::task::JoinHandle<Result<()>>>,
_gossip_handle: Option<tokio::task::JoinHandle<()>>,
_execute_driver_handle: tokio::task::JoinHandle<()>,
Expand Down Expand Up @@ -218,7 +218,6 @@ impl SuiNode {
batch_state
.run_batch_service(1000, Duration::from_secs(1))
.await
.map_err(Into::into)
})
};

Expand Down

0 comments on commit b8d7434

Please sign in to comment.