Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(p2p_service): clean up processing of p2p req/res protocol #2124

Merged
merged 13 commits into from
Aug 27, 2024
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- [2113](https://github.com/FuelLabs/fuel-core/pull/2113): Modify the way the gas price service and shared algo is initialized to have some default value based on best guess instead of `None`, and initialize service before graphql.
- [2112](https://github.com/FuelLabs/fuel-core/pull/2112): Alter the way the sealed blocks are fetched with a given height.

- [2124](https://github.com/FuelLabs/fuel-core/pull/2124): Generalize the way p2p req/res protocol handles requests.

## [Version 0.34.0]

Expand Down
202 changes: 102 additions & 100 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,116 +432,118 @@ where
request_message: RequestMessage,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
let instant = Instant::now();
let timeout = self.response_timeout;
let response_channel = self.request_sender.clone();
let max_len = self.max_headers_per_request;

match request_message {
RequestMessage::Transactions(range) => {
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range of blocks is too big"
);
let response = None;
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::Transactions(response),
);
return Ok(())
}

let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Get transactions request timed out");
return;
}

let response = view
.get_transactions(range.clone())
.trace_err(
format!(
"Failed to get transactions for the range {:?}",
range
)
.as_str(),
)
.ok()
.flatten();

let _ = response_channel
.try_send(TaskRequest::DatabaseTransactionsLookUp {
response,
request_id,
})
.trace_err(
"Failed to send transactions response to the request channel",
);
});

if result.is_err() {
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::Transactions(None),
);
}
self.handle_transactions_request(range, request_id)
}
RequestMessage::SealedHeaders(range) => {
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range of sealed headers is too big"
); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311
let response = None;
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::SealedHeaders(response),
);
return Ok(())
}
self.handle_sealed_headers_request(range, request_id)
}
}
}

let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Get headers request timed out");
return;
}
fn handle_request<F, R>(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
response_sender: fn(Option<R>) -> ResponseMessage,
db_lookup: F,
task_request: fn(Option<R>, InboundRequestId) -> TaskRequest,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I preferred using fn pointers for the response_sender, task_request instead of trait methods on RequestMessage to reduce verbosity of the code. Happy to implement it that way if the team prefers it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with taking functions as arguments, although since there are three related functions here, grouping them together in a trait might make the code even nicer to read.

One question though, response_sender and task_request are both function pointers whereas db_lookup is a FnOnce-generic. I think the code would read better if we used generics for all three arguments. This would make the function more flexible for future extensions as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in e52bffc, thanks
cc: @AurelienFT

) -> anyhow::Result<()>
where
F: FnOnce(&V::LatestView, Range<u32>) -> anyhow::Result<Option<R>>
+ Send
+ 'static,
R: Send + 'static,
{
let instant = Instant::now();
let timeout = self.response_timeout;
let response_channel = self.request_sender.clone();
// For now, we only process requests that are smaller than the max_blocks_per_request
// If there are other types of data we send over p2p req/res protocol, then this needs
// to be generalized
let max_len = self
.max_headers_per_request
.try_into()
.expect("u32 should always fit into usize");

if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range is too big"
);
let response = None;
rymnc marked this conversation as resolved.
Show resolved Hide resolved
let _ = self
.p2p_service
.send_response_msg(request_id, response_sender(response));
return Ok(());
}

let response = view
.get_sealed_headers(range.clone())
.trace_err(
format!(
"Failed to get sealed headers for the range {:?}",
range
)
.as_str(),
)
.ok();

let _ = response_channel
.try_send(TaskRequest::DatabaseHeaderLookUp {
response,
request_id,
})
.trace_err(
"Failed to send headers response to the request channel",
);
});

if result.is_err() {
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::SealedHeaders(None),
);
}
let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Request timed out");
return;
}

let response = db_lookup(&view, range.clone()).ok().flatten();

let _ = response_channel
.try_send(task_request(response, request_id))
.trace_err("Failed to send response to the request channel");
});

if result.is_err() {
let _ = self
.p2p_service
.send_response_msg(request_id, response_sender(None));
}

Ok(())
}

fn handle_transactions_request(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
self.handle_request(
range,
request_id,
ResponseMessage::Transactions,
|view, range| {
view.get_transactions(range)
.map_err(anyhow::Error::from)
.map(|res| res.and_then(|opt| Some(opt)))
},
|response, request_id| TaskRequest::DatabaseTransactionsLookUp {
response,
request_id,
},
)
}

fn handle_sealed_headers_request(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
self.handle_request(
range,
request_id,
ResponseMessage::SealedHeaders,
|view, range| {
view.get_sealed_headers(range)
.map_err(anyhow::Error::from)
.map(|res| Some(res))
},
|response, request_id| TaskRequest::DatabaseHeaderLookUp {
response,
request_id,
},
)
}
}

fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
Expand Down
Loading