Skip to content

Commit

Permalink
chore(p2p_service): clean up processing of p2p req/res protocol (#2124)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
<!-- List of related issues/PRs -->
- #2112
- #2121

## Description
<!-- List of detailed changes -->
Cleans up the impl for `TaskRequest` by breaking the processing into
digestable but extendable functions

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
rymnc and xgreenx authored Aug 27, 2024
1 parent 198c158 commit b54b019
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 107 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2113](https://github.com/FuelLabs/fuel-core/pull/2113): Modify the way the gas price service and shared algo is initialized to have some default value based on best guess instead of `None`, and initialize service before graphql.
- [2112](https://github.com/FuelLabs/fuel-core/pull/2112): Alter the way the sealed blocks are fetched with a given height.
- [2115](https://github.com/FuelLabs/fuel-core/pull/2115): Add test for `SignMode` `is_available` method.
- [2124](https://github.com/FuelLabs/fuel-core/pull/2124): Generalize the way p2p req/res protocol handles requests.

### Added
- [2119](https://github.com/FuelLabs/fuel-core/pull/2119): GraphQL query fields for retrieving information about upgrades.
Expand Down
Binary file not shown.
7 changes: 2 additions & 5 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,11 @@ impl OnChainIterableKeyValueView {
pub fn get_sealed_block_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
let headers = block_height_range
.map(BlockHeight::from)
.map(|height| self.get_sealed_block_header(&height))
.collect::<StorageResult<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
.collect::<StorageResult<Option<Vec<_>>>>()?;
Ok(headers)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl P2pDb for OnChainIterableKeyValueView {
fn get_sealed_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
self.get_sealed_block_headers(block_height_range)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub trait P2pDb: Send + Sync {
fn get_sealed_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>>;
) -> StorageResult<Option<Vec<SealedBlockHeader>>>;

fn get_transactions(
&self,
Expand Down
193 changes: 93 additions & 100 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,116 +432,109 @@ where
request_message: RequestMessage,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
match request_message {
RequestMessage::Transactions(range) => {
self.handle_transactions_request(range, request_id)
}
RequestMessage::SealedHeaders(range) => {
self.handle_sealed_headers_request(range, request_id)
}
}
}

fn handle_request<DbLookUpFn, ResponseSenderFn, TaskRequestFn, R>(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
response_sender: ResponseSenderFn,
db_lookup: DbLookUpFn,
task_request: TaskRequestFn,
) -> anyhow::Result<()>
where
DbLookUpFn:
Fn(&V::LatestView, Range<u32>) -> anyhow::Result<Option<R>> + Send + 'static,
ResponseSenderFn: Fn(Option<R>) -> ResponseMessage + Send + 'static,
TaskRequestFn: Fn(Option<R>, InboundRequestId) -> TaskRequest + Send + 'static,
R: Send + 'static,
{
let instant = Instant::now();
let timeout = self.response_timeout;
let response_channel = self.request_sender.clone();
// For now, we only process requests that are smaller than the max_blocks_per_request
// If there are other types of data we send over p2p req/res protocol, then this needs
// to be generalized
let max_len = self.max_headers_per_request;

match request_message {
RequestMessage::Transactions(range) => {
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range of blocks is too big"
);
let response = None;
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::Transactions(response),
);
return Ok(())
}

let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Get transactions request timed out");
return;
}
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range is too big"
);
// TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311
let response = None;
let _ = self
.p2p_service
.send_response_msg(request_id, response_sender(response));
return Ok(());
}

let response = view
.get_transactions(range.clone())
.trace_err(
format!(
"Failed to get transactions for the range {:?}",
range
)
.as_str(),
)
.ok()
.flatten();

let _ = response_channel
.try_send(TaskRequest::DatabaseTransactionsLookUp {
response,
request_id,
})
.trace_err(
"Failed to send transactions response to the request channel",
);
});

if result.is_err() {
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::Transactions(None),
);
}
let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Request timed out");
return;
}
RequestMessage::SealedHeaders(range) => {
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range of sealed headers is too big"
); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311
let response = None;
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::SealedHeaders(response),
);
return Ok(())
}

let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Get headers request timed out");
return;
}
let response = db_lookup(&view, range.clone()).ok().flatten();

let response = view
.get_sealed_headers(range.clone())
.trace_err(
format!(
"Failed to get sealed headers for the range {:?}",
range
)
.as_str(),
)
.ok();

let _ = response_channel
.try_send(TaskRequest::DatabaseHeaderLookUp {
response,
request_id,
})
.trace_err(
"Failed to send headers response to the request channel",
);
});

if result.is_err() {
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::SealedHeaders(None),
);
}
}
let _ = response_channel
.try_send(task_request(response, request_id))
.trace_err("Failed to send response to the request channel");
});

if result.is_err() {
let _ = self
.p2p_service
.send_response_msg(request_id, response_sender(None));
}

Ok(())
}

fn handle_transactions_request(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
self.handle_request(
range,
request_id,
ResponseMessage::Transactions,
|view, range| view.get_transactions(range).map_err(anyhow::Error::from),
|response, request_id| TaskRequest::DatabaseTransactionsLookUp {
response,
request_id,
},
)
}

fn handle_sealed_headers_request(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
self.handle_request(
range,
request_id,
ResponseMessage::SealedHeaders,
|view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from),
|response, request_id| TaskRequest::DatabaseHeaderLookUp {
response,
request_id,
},
)
}
}

fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
Expand Down Expand Up @@ -995,7 +988,7 @@ pub mod tests {
fn get_sealed_headers(
&self,
_block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
unimplemented!()
}

Expand Down Expand Up @@ -1111,7 +1104,7 @@ pub mod tests {
fn get_sealed_headers(
&self,
_block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
todo!()
}

Expand Down

0 comments on commit b54b019

Please sign in to comment.