Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 11, 2022
1 parent babf00b commit 4dfe866
Showing 1 changed file with 47 additions and 47 deletions.
94 changes: 47 additions & 47 deletions base_layer/core/src/mempool/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ impl MempoolService {

// Incoming request messages from the Comms layer
Some(domain_msg) = inbound_request_stream.next() => {
self.spawn_handle_incoming_request(domain_msg).await;
self.spawn_handle_incoming_request(domain_msg);
},

// Incoming response messages from the Comms layer
Some(domain_msg) = inbound_response_stream.next() => {
self.spawn_handle_incoming_response(domain_msg).await;
self.spawn_handle_incoming_response(domain_msg);
},

// Incoming transaction messages from the Comms layer
Expand All @@ -172,19 +172,19 @@ impl MempoolService {

// Incoming local request messages from the LocalMempoolServiceInterface and other local services
Some(local_request_context) = local_request_stream.next() => {
self.spawn_handle_local_request(local_request_context).await;
self.spawn_handle_local_request(local_request_context);
},

// Block events from local Base Node.
block_event = block_event_stream.recv() => {
if let Ok(block_event) = block_event {
self.spawn_handle_block_event(block_event).await;
self.spawn_handle_block_event(block_event);
}
},

// Timeout events for waiting requests
Some(timeout_request_key) = timeout_receiver_stream.recv() => {
self.spawn_handle_request_timeout(timeout_request_key).await;
self.spawn_handle_request_timeout(timeout_request_key);
},

else => {
Expand Down Expand Up @@ -238,30 +238,30 @@ impl MempoolService {
});
}

async fn spawn_handle_incoming_request(&self, domain_msg: DomainMessage<mempool_proto::MempoolServiceRequest>) {
fn spawn_handle_incoming_request(&self, domain_msg: DomainMessage<mempool_proto::MempoolServiceRequest>) {
let inbound_handlers = self.inbound_handlers.clone();
let outbound_message_service = self.outbound_message_service.clone();
// task::spawn(async move {
let result = handle_incoming_request(inbound_handlers, outbound_message_service, domain_msg).await;
task::spawn(async move {
let result = handle_incoming_request(inbound_handlers, outbound_message_service, domain_msg).await;

if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle incoming request message: {:?}", e);
}
// });
if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle incoming request message: {:?}", e);
}
});
}

async fn spawn_handle_incoming_response(&self, domain_msg: DomainMessage<mempool_proto::MempoolServiceResponse>) {
fn spawn_handle_incoming_response(&self, domain_msg: DomainMessage<mempool_proto::MempoolServiceResponse>) {
let waiting_requests = self.waiting_requests.clone();
// task::spawn(async move {
let result = handle_incoming_response(waiting_requests, domain_msg.into_inner()).await;
task::spawn(async move {
let result = handle_incoming_response(waiting_requests, domain_msg.into_inner()).await;

if let Err(e) = result {
error!(
target: LOG_TARGET,
"Failed to handle incoming response message: {:?}", e
);
}
// });
if let Err(e) = result {
error!(
target: LOG_TARGET,
"Failed to handle incoming response message: {:?}", e
);
}
});
}

fn spawn_handle_incoming_tx(&self, tx_msg: DomainMessage<Transaction>) {
Expand Down Expand Up @@ -289,43 +289,43 @@ impl MempoolService {
});
}

async fn spawn_handle_local_request(
fn spawn_handle_local_request(
&self,
request_context: RequestContext<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>,
) {
let mut inbound_handlers = self.inbound_handlers.clone();
// task::spawn(async move {
let (request, reply_tx) = request_context.split();
let result = reply_tx.send(inbound_handlers.handle_request(request).await);
task::spawn(async move {
let (request, reply_tx) = request_context.split();
let result = reply_tx.send(inbound_handlers.handle_request(request).await);

if let Err(e) = result {
error!(
target: LOG_TARGET,
"MempoolService failed to send reply to local request {:?}", e
);
}
// });
if let Err(e) = result {
error!(
target: LOG_TARGET,
"MempoolService failed to send reply to local request {:?}", e
);
}
});
}

async fn spawn_handle_block_event(&self, block_event: Arc<BlockEvent>) {
fn spawn_handle_block_event(&self, block_event: Arc<BlockEvent>) {
let mut inbound_handlers = self.inbound_handlers.clone();
// task::spawn(async move {
let result = inbound_handlers.handle_block_event(&block_event).await;
if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle base node block event: {:?}", e);
}
// });
task::spawn(async move {
let result = inbound_handlers.handle_block_event(&block_event).await;
if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle base node block event: {:?}", e);
}
});
}

async fn spawn_handle_request_timeout(&self, timeout_request_key: u64) {
fn spawn_handle_request_timeout(&self, timeout_request_key: u64) {
let waiting_requests = self.waiting_requests.clone();
// task::spawn(async move {
let result = handle_request_timeout(waiting_requests, timeout_request_key).await;
task::spawn(async move {
let result = handle_request_timeout(waiting_requests, timeout_request_key).await;

if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle request timeout event: {:?}", e);
}
// });
if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle request timeout event: {:?}", e);
}
});
}
}

Expand Down

0 comments on commit 4dfe866

Please sign in to comment.