Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(p2p_service): clean up processing of p2p req/res protocol #2124

Merged
merged 13 commits into from
Aug 27, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2113](https://github.com/FuelLabs/fuel-core/pull/2113): Modify the way the gas price service and shared algo is initialized to have some default value based on best guess instead of `None`, and initialize service before graphql.
- [2112](https://github.com/FuelLabs/fuel-core/pull/2112): Alter the way the sealed blocks are fetched with a given height.
- [2115](https://github.com/FuelLabs/fuel-core/pull/2115): Add test for `SignMode` `is_available` method.
- [2124](https://github.com/FuelLabs/fuel-core/pull/2124): Generalize the way p2p req/res protocol handles requests.

### Added
- [2119](https://github.com/FuelLabs/fuel-core/pull/2119): GraphQL query fields for retrieving information about upgrades.
Expand Down
Binary file not shown.
7 changes: 2 additions & 5 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,11 @@ impl OnChainIterableKeyValueView {
pub fn get_sealed_block_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
let headers = block_height_range
.map(BlockHeight::from)
.map(|height| self.get_sealed_block_header(&height))
.collect::<StorageResult<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
.collect::<StorageResult<Option<Vec<_>>>>()?;
Ok(headers)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl P2pDb for OnChainIterableKeyValueView {
fn get_sealed_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
self.get_sealed_block_headers(block_height_range)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub trait P2pDb: Send + Sync {
fn get_sealed_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>>;
) -> StorageResult<Option<Vec<SealedBlockHeader>>>;

fn get_transactions(
&self,
Expand Down
193 changes: 93 additions & 100 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,116 +432,109 @@ where
request_message: RequestMessage,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
match request_message {
RequestMessage::Transactions(range) => {
self.handle_transactions_request(range, request_id)
}
RequestMessage::SealedHeaders(range) => {
self.handle_sealed_headers_request(range, request_id)
}
}
}

fn handle_request<DbLookUpFn, ResponseSenderFn, TaskRequestFn, R>(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
response_sender: ResponseSenderFn,
db_lookup: DbLookUpFn,
task_request: TaskRequestFn,
) -> anyhow::Result<()>
where
DbLookUpFn:
Fn(&V::LatestView, Range<u32>) -> anyhow::Result<Option<R>> + Send + 'static,
ResponseSenderFn: Fn(Option<R>) -> ResponseMessage + Send + 'static,
TaskRequestFn: Fn(Option<R>, InboundRequestId) -> TaskRequest + Send + 'static,
R: Send + 'static,
{
let instant = Instant::now();
let timeout = self.response_timeout;
let response_channel = self.request_sender.clone();
// For now, we only process requests that are smaller than the max_blocks_per_request
// If there are other types of data we send over p2p req/res protocol, then this needs
// to be generalized
let max_len = self.max_headers_per_request;

match request_message {
RequestMessage::Transactions(range) => {
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range of blocks is too big"
);
let response = None;
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::Transactions(response),
);
return Ok(())
}

let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Get transactions request timed out");
return;
}
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range is too big"
);
// TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311
let response = None;
rymnc marked this conversation as resolved.
Show resolved Hide resolved
let _ = self
.p2p_service
.send_response_msg(request_id, response_sender(response));
return Ok(());
}

let response = view
.get_transactions(range.clone())
.trace_err(
format!(
"Failed to get transactions for the range {:?}",
range
)
.as_str(),
)
.ok()
.flatten();

let _ = response_channel
.try_send(TaskRequest::DatabaseTransactionsLookUp {
response,
request_id,
})
.trace_err(
"Failed to send transactions response to the request channel",
);
});

if result.is_err() {
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::Transactions(None),
);
}
let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Request timed out");
return;
}
RequestMessage::SealedHeaders(range) => {
if range.len() > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
"Requested range of sealed headers is too big"
); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311
let response = None;
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::SealedHeaders(response),
);
return Ok(())
}

let view = self.view_provider.latest_view()?;
let result = self.database_processor.spawn(move || {
if instant.elapsed() > timeout {
tracing::warn!("Get headers request timed out");
return;
}
let response = db_lookup(&view, range.clone()).ok().flatten();

let response = view
.get_sealed_headers(range.clone())
.trace_err(
format!(
"Failed to get sealed headers for the range {:?}",
range
)
.as_str(),
)
.ok();

let _ = response_channel
.try_send(TaskRequest::DatabaseHeaderLookUp {
response,
request_id,
})
.trace_err(
"Failed to send headers response to the request channel",
);
});

if result.is_err() {
let _ = self.p2p_service.send_response_msg(
request_id,
ResponseMessage::SealedHeaders(None),
);
}
}
let _ = response_channel
.try_send(task_request(response, request_id))
.trace_err("Failed to send response to the request channel");
});

if result.is_err() {
let _ = self
.p2p_service
.send_response_msg(request_id, response_sender(None));
}

Ok(())
}

fn handle_transactions_request(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
self.handle_request(
range,
request_id,
ResponseMessage::Transactions,
|view, range| view.get_transactions(range).map_err(anyhow::Error::from),
|response, request_id| TaskRequest::DatabaseTransactionsLookUp {
response,
request_id,
},
)
}

fn handle_sealed_headers_request(
&mut self,
range: Range<u32>,
request_id: InboundRequestId,
) -> anyhow::Result<()> {
self.handle_request(
range,
request_id,
ResponseMessage::SealedHeaders,
|view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from),
|response, request_id| TaskRequest::DatabaseHeaderLookUp {
response,
request_id,
},
)
}
}

fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
Expand Down Expand Up @@ -995,7 +988,7 @@ pub mod tests {
fn get_sealed_headers(
&self,
_block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
unimplemented!()
}

Expand Down Expand Up @@ -1111,7 +1104,7 @@ pub mod tests {
fn get_sealed_headers(
&self,
_block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
todo!()
}

Expand Down
Loading