Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequential relayer's commits #1942

Merged
merged 8 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- [#1942](https://github.com/FuelLabs/fuel-core/pull/1942): Sequential relayer's commits.

### Fixed
- [#1950](https://github.com/FuelLabs/fuel-core/pull/1950): Fix cursor `BlockHeight` encoding in `SortedTXCursor`

Expand Down
4 changes: 1 addition & 3 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,7 @@ where
.advance_height()
.ok_or(DatabaseError::FailedToAdvanceHeight)?;

// TODO: After https://github.com/FuelLabs/fuel-core/issues/451
// we can replace `next_expected_height > new_height` with `next_expected_height != new_height`.
if next_expected_height > new_height {
if next_expected_height != new_height {
return Err(DatabaseError::HeightsAreNotLinked {
prev_height: prev_height.as_u64(),
new_height: new_height.as_u64(),
Expand Down
56 changes: 33 additions & 23 deletions crates/services/relayer/src/service/get_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ use fuel_core_types::{
services::relayer::Event,
};
use futures::TryStreamExt;
use std::collections::BTreeMap;
use std::collections::HashMap;

#[cfg(test)]
mod test;

pub struct DownloadedLogs {
pub start_height: u64,
pub last_height: u64,
pub logs: Vec<Log>,
}

/// Download the logs from the DA layer.
pub(crate) fn download_logs<'a, P>(
eth_sync_gap: &state::EthSyncGap,
contracts: Vec<H160>,
eth_node: &'a P,
page_size: u64,
) -> impl futures::Stream<Item = Result<(u64, Vec<Log>), ProviderError>> + 'a
) -> impl futures::Stream<Item = Result<DownloadedLogs, ProviderError>> + 'a
where
P: Middleware<Error = ProviderError> + 'static,
{
Expand All @@ -41,16 +47,23 @@ where
page.latest()
);

let oldest_block = page.oldest();
let latest_block = page.latest();

// Reduce the page.
let page = page.reduce();

// Get the logs and return the reduced page.
eth_node
.get_logs(&filter)
.await
.map(|logs| Some(((latest_block, logs), page)))
eth_node.get_logs(&filter).await.map(|logs| {
Some((
DownloadedLogs {
start_height: oldest_block,
last_height: latest_block,
logs,
},
page,
))
})
}
}
}
Expand All @@ -62,12 +75,16 @@ where
pub(crate) async fn write_logs<D, S>(database: &mut D, logs: S) -> anyhow::Result<()>
where
D: RelayerDb,
S: futures::Stream<Item = Result<(u64, Vec<Log>), ProviderError>>,
S: futures::Stream<Item = Result<DownloadedLogs, ProviderError>>,
{
tokio::pin!(logs);
while let Some((last_height, events)) = logs.try_next().await? {
let last_height = last_height.into();
let mut ordered_events = BTreeMap::<DaBlockHeight, Vec<Event>>::new();
while let Some(DownloadedLogs {
start_height,
last_height,
logs: events,
}) = logs.try_next().await?
{
let mut unordered_events = HashMap::<DaBlockHeight, Vec<Event>>::new();
let sorted_events = sort_events_by_log_index(events)?;
let fuel_events = sorted_events.into_iter().filter_map(|event| {
match EthEventLog::try_from(&event) {
Expand All @@ -90,21 +107,14 @@ where
for event in fuel_events {
let event = event?;
let height = event.da_height();
ordered_events.entry(height).or_default().push(event);
}

let mut inserted_last_height = false;
for (height, events) in ordered_events {
database.insert_events(&height, &events)?;
if height == last_height {
inserted_last_height = true;
}
unordered_events.entry(height).or_default().push(event);
}

// TODO: For https://github.com/FuelLabs/fuel-core/issues/451 we need to write each height
// (not only the last height), even if it's empty.
if !inserted_last_height {
database.insert_events(&last_height, &[])?;
let empty_events = Vec::new();
for height in start_height..=last_height {
let height: DaBlockHeight = height.into();
let events = unordered_events.get(&height).unwrap_or(&empty_events);
database.insert_events(&height, events)?;
}
}
Ok(())
Expand Down
27 changes: 17 additions & 10 deletions crates/services/relayer/src/service/get_logs/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async fn can_paginate_logs(input: Input) -> Expected {
&eth_node,
DEFAULT_LOG_PAGE_SIZE,
)
.map_ok(|(_, l)| l)
.map_ok(|logs| logs.logs)
.try_concat()
.await
.unwrap();
Expand All @@ -148,32 +148,39 @@ async fn can_paginate_logs(input: Input) -> Expected {
}

#[test_case(vec![
Ok((1, messages_n(1, 0)))
Ok((1, 1, messages_n(1, 0)))
] => 1 ; "Can add single"
)]
#[test_case(vec![
Ok((3, messages_n(3, 0))),
Ok((4, messages_n(1, 4)))
Ok((3, 3, messages_n(3, 0))),
Ok((4, 4, messages_n(1, 4)))
] => 4 ; "Can add two"
)]
#[test_case(vec![
Ok((3, messages_n(3, 0))),
Ok((4, vec![]))
Ok((3, 3, messages_n(3, 0))),
Ok((4, 4, vec![]))
] => 4 ; "Can add empty"
)]
#[test_case(vec![
Ok((7, messages_n(3, 0))),
Ok((19, messages_n(1, 4))),
Ok((1, 7, messages_n(3, 0))),
Ok((8, 19, messages_n(1, 4))),
Err(ProviderError::CustomError("".to_string()))
] => 19 ; "Still adds height when error"
)]
#[tokio::test]
#[allow(clippy::type_complexity)]
async fn test_da_height_updates(
stream: Vec<Result<(u64, Vec<Log>), ProviderError>>,
stream: Vec<Result<(u64, u64, Vec<Log>), ProviderError>>,
) -> u64 {
let mut mock_db = crate::mock_db::MockDb::default();

let logs = futures::stream::iter(stream);
let logs = futures::stream::iter(stream).map(|result| {
result.map(|(start_height, last_height, logs)| DownloadedLogs {
start_height,
last_height,
logs,
})
});

let _ = write_logs(&mut mock_db, logs).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn can_download_logs() {
&eth_node,
DEFAULT_LOG_PAGE_SIZE,
)
.map_ok(|(_, l)| l)
.map_ok(|logs| logs.logs)
.try_concat()
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/test-helpers/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use fuel_core::{
use fuel_core_client::client::FuelClient;
use fuel_core_poa::Trigger;
use fuel_core_types::{
blockchain::header::LATEST_STATE_TRANSITION_VERSION,
fuel_asm::op,
fuel_tx::{
field::Inputs,
Expand Down Expand Up @@ -210,7 +211,7 @@ impl TestSetupBuilder {

let latest_block = self.starting_block.map(|starting_block| LastBlockConfig {
block_height: starting_block,
state_transition_version: 0,
state_transition_version: LATEST_STATE_TRANSITION_VERSION - 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the other changes? Why are we pointing to the "old" version here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not related to the change, just saw in the codebase=D We want to use previous version because after regenesis we want to use a native executor just to speed up tests.

..Default::default()
});

Expand Down
Loading