Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Txpool v2 features for futures connections with other modules #2216

Merged
merged 183 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
183 commits
Select commit Hold shift + click to select a range
2e07d37
Add skeleton txpool v2
AurelienFT Sep 4, 2024
4cbb11c
Add basic interfaces and retrieve some functionnalities of the old pool.
AurelienFT Sep 5, 2024
bef4aff
Add all machinery to make the tests
AurelienFT Sep 5, 2024
552af83
Add some logic that needs to be moved behind abstraction
AurelienFT Sep 6, 2024
8d39499
Add all storages for transactions and clean code a bit
AurelienFT Sep 9, 2024
3d1a1bb
Merge branch 'master' into txpool-v2
AurelienFT Sep 9, 2024
3d65076
Fix compilation errors and add selection mecanism
AurelienFT Sep 9, 2024
19830cb
Start thinking about traits
AurelienFT Sep 9, 2024
795567e
Add all abstractions in txpool to clean code
AurelienFT Sep 10, 2024
67afb9d
Split all logic across implementations and uncomment half of the tests
AurelienFT Sep 10, 2024
b956e2c
Add all tests
AurelienFT Sep 10, 2024
9ed577e
Fmt + toml lint + spellcheck
AurelienFT Sep 10, 2024
29ec06d
Resolves most of the TODO, remove unwrap and add test
AurelienFT Sep 11, 2024
9a22983
Document and allow warnings on dead code/unused
AurelienFT Sep 11, 2024
ca38a25
Merge branch 'master' into txpool-v2
AurelienFT Sep 11, 2024
fe9e995
fix fuel-core-type usage in ttxpool v2
AurelienFT Sep 11, 2024
34c7e07
Remove usage of txpool v2 in fuel-core
AurelienFT Sep 11, 2024
4576797
fmt fix
AurelienFT Sep 11, 2024
c8e86ec
Don't allow dependencies on output and change and specify todos
AurelienFT Sep 11, 2024
16a7e8f
Rework dependency between components
AurelienFT Sep 12, 2024
60ada65
Simplify usage of the db trait
AurelienFT Sep 12, 2024
0cf8835
update graph cumultative gas and tip
AurelienFT Sep 12, 2024
b328bc8
Docs fixes and config renaming
AurelienFT Sep 12, 2024
0d69dc1
fmt
AurelienFT Sep 12, 2024
29dd130
start definition of the pool
AurelienFT Sep 12, 2024
75a9180
Use persistant storage instead of db and reuse a view provider
AurelienFT Sep 12, 2024
4049dac
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 12, 2024
dd6f7c1
Add insertion skeleton
AurelienFT Sep 12, 2024
87e5027
Fix comments in tests
AurelienFT Sep 12, 2024
460beef
Add all the verifications in order. Still some todo and compil errors
AurelienFT Sep 13, 2024
75fb598
Fix compil errors
AurelienFT Sep 13, 2024
4e3e3ce
Update components trait to be more flexible.
AurelienFT Sep 13, 2024
826f13e
Update tests and make the insert function more readable
AurelienFT Sep 13, 2024
bafbf75
remove over constraint on trait and rename a test.
AurelienFT Sep 16, 2024
ed2302a
Merge branch 'master' into txpool-v2
AurelienFT Sep 16, 2024
054f648
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 16, 2024
a7666fc
Finish verification on all inputs
AurelienFT Sep 16, 2024
4426134
fix verifications to make no copy
AurelienFT Sep 17, 2024
86a7c86
Reorganize insertion and opti some code. Start rework of tests.
AurelienFT Sep 17, 2024
96e0cc9
Update all the tests
AurelienFT Sep 17, 2024
cb635e8
Add heavy async work for insert
AurelienFT Sep 18, 2024
d4ebf3c
Change some documentation and remove a lot of dead code.
AurelienFT Sep 18, 2024
45d5296
Merge branch 'master' into txpool-v2
AurelienFT Sep 18, 2024
1c1e5db
Update cargo lock
AurelienFT Sep 18, 2024
e379a0c
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 18, 2024
e7dc924
Clean up of the code
AurelienFT Sep 18, 2024
c12ee38
Update changelog
AurelienFT Sep 18, 2024
4f87fbd
fix clippy
AurelienFT Sep 18, 2024
fb83ef2
Add interface to get the executable transactions
AurelienFT Sep 18, 2024
f10ca86
Add new transactions notifier
AurelienFT Sep 18, 2024
a3e72fd
Add code in case of block import
AurelienFT Sep 18, 2024
6e5c7f8
Merge branch 'master' into txpool-v2
AurelienFT Sep 19, 2024
3f11941
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 19, 2024
e53ea28
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 19, 2024
bec7036
Add bunch of p2p related connexions and split shared state and servic…
AurelienFT Sep 19, 2024
2e739a1
Add arc transaction in storage and broadcast tx
AurelienFT Sep 19, 2024
2a5aabd
Merge branch 'master' into txpool-v2
AurelienFT Sep 19, 2024
e78544f
Fix changelog
AurelienFT Sep 19, 2024
697cf4c
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 19, 2024
fbdd0d5
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 19, 2024
430a2c8
Update changelog
AurelienFT Sep 19, 2024
350cdbf
Add clippy allow
AurelienFT Sep 19, 2024
dd7c095
Merge branch 'master' into txpool-v2
AurelienFT Sep 20, 2024
fdd5f8a
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 20, 2024
df2978a
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 20, 2024
14b9b53
Update crates/services/txpool_v2/src/lib.rs
AurelienFT Sep 20, 2024
978d0b8
remove unused config
AurelienFT Sep 20, 2024
68b1157
Remove unused parameters and secure the collisions subfields
AurelienFT Sep 20, 2024
1074e5a
Rename functions add checks, and fix clippy
AurelienFT Sep 20, 2024
f89174f
fmt
AurelienFT Sep 20, 2024
3ddea7e
Change max txs in chain rules
AurelienFT Sep 20, 2024
a5ff2ab
fmt
AurelienFT Sep 20, 2024
dd9ef6e
Moved collisions removal after checks
AurelienFT Sep 20, 2024
f0476e2
Add some caches to avoid more iteration and fix collission taken as d…
AurelienFT Sep 20, 2024
2f5a199
Merge branch 'master' into txpool-v2
AurelienFT Sep 20, 2024
ade8569
Split two functions and optimize iterator returns
AurelienFT Sep 21, 2024
6beb83d
Merge branch 'master' into txpool-v2
AurelienFT Sep 21, 2024
3dde832
elllude lifetime
AurelienFT Sep 23, 2024
72add0f
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 23, 2024
8cd8d48
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 23, 2024
cf9e9eb
Change insertion checks to be simplified
AurelienFT Sep 23, 2024
1c59fc7
Simplified insert
AurelienFT Sep 23, 2024
e859b60
Fix collision system to allow multiple collision to the same tx
AurelienFT Sep 23, 2024
1f8aaf8
Fix some node that wasn't removed.
AurelienFT Sep 23, 2024
c7b3a29
format
AurelienFT Sep 23, 2024
1eb58a9
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 23, 2024
8ed3a5b
Txpool TTL (#2217)
AurelienFT Sep 23, 2024
bb739d3
fix changelog
AurelienFT Sep 23, 2024
0ebfc5c
fix max size algo and add tests
AurelienFT Sep 23, 2024
0614084
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 23, 2024
d5ddc3c
Add the update sender along with the tests
AurelienFT Sep 24, 2024
4c542d9
add all p2p and service tests, not working yet, need to adapt the con…
AurelienFT Sep 24, 2024
c4690d7
Finalize all links and add tests
AurelienFT Sep 25, 2024
4de9f9e
Remove allow unused and remove all dead code
AurelienFT Sep 25, 2024
5f81c09
fmt toml
AurelienFT Sep 25, 2024
82c9dd9
Update crates/services/txpool_v2/src/config.rs
AurelienFT Sep 25, 2024
70a45e9
Update crates/services/txpool_v2/src/config.rs
AurelienFT Sep 25, 2024
677a130
Change collision manager and add more info in error
AurelienFT Sep 25, 2024
26871c8
Update pool check size
AurelienFT Sep 25, 2024
76b2af4
Move logic from pool to storage
AurelienFT Sep 25, 2024
13e6990
fmt and clippy
AurelienFT Sep 25, 2024
149902a
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 25, 2024
6249e5d
remove unused stream predicate verif
AurelienFT Sep 26, 2024
c391732
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 26, 2024
65cbc14
format
AurelienFT Sep 26, 2024
0543f60
Improve error type
AurelienFT Sep 26, 2024
f5ef031
update review comments
AurelienFT Sep 26, 2024
d6dea78
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 26, 2024
2bd0382
Merge branch 'master' into txpool-v2
AurelienFT Sep 26, 2024
83c56ae
Change way to get less worth it transactions
AurelienFT Sep 26, 2024
4e77350
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 26, 2024
de44984
Fix clippy
AurelienFT Sep 26, 2024
e516353
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 26, 2024
27e7001
Split the two heavy tasks to different pools
AurelienFT Sep 26, 2024
e9c78fc
remove wrong debug assert
AurelienFT Sep 26, 2024
f955ee5
Merge branch 'txpool-v2' of github.com:FuelLabs/fuel-core into txpool-v2
AurelienFT Sep 26, 2024
2edff72
Merge branch 'txpool-v2' into txpool-v2-api
AurelienFT Sep 26, 2024
f632f66
Merge branch 'txpool-v2-api' into txpool-v2-block-producer-importer
AurelienFT Sep 26, 2024
4f7b3fd
fix typo
AurelienFT Sep 26, 2024
3be0fc7
Remove unused error in storage and change collisions to hashset.
AurelienFT Sep 27, 2024
24cc034
change graph information stored in storedata
AurelienFT Sep 27, 2024
aa8cb24
Fix remove of a cache done in a case where it's not relevant
AurelienFT Sep 27, 2024
0bbdaf5
Make sure to edit all dependencies when we remove a dependent subtree.
AurelienFT Sep 27, 2024
c128c75
Txpool v2 insertion (#2193)
AurelienFT Sep 27, 2024
aeb2935
Minimized the number of `Result` in the code in places where we can't…
xgreenx Sep 28, 2024
8a746bf
Added fuzzer for TxPoolV1 and TxPoolV2 to create dependent graphs. It…
xgreenx Sep 28, 2024
0fcb185
Applied comments from the review
xgreenx Sep 29, 2024
88cec23
Make clippy happy
xgreenx Sep 29, 2024
47ebb4c
Moved the logic of collision and depdednencies verificaiton to the po…
xgreenx Sep 29, 2024
eed0d13
Revert rocksdb
xgreenx Sep 29, 2024
d88a016
Fix tests
xgreenx Sep 29, 2024
2fd7c15
Reuse direct dedpednencies.
xgreenx Sep 29, 2024
99da36b
Merge branch 'refs/heads/master' into txpool-v2
xgreenx Sep 29, 2024
eef1db8
Merge branch 'txpool-v2' into txpool-v2-block-producer-importer
AurelienFT Sep 30, 2024
a540eda
Reject transactions that creates diamond dependencies. It allows simp…
xgreenx Sep 30, 2024
b25b832
Make limits less strict for fuzzer. It is okay since we don't need to…
xgreenx Sep 30, 2024
07ff9ef
Fix a lot of cimpil error post merge
AurelienFT Oct 1, 2024
e880427
Merge branch 'txpool-v2' into txpool-v2-block-producer-importer
AurelienFT Oct 1, 2024
82fa757
Fix all compil errors and conflicts with arc
AurelienFT Oct 1, 2024
04d7cc5
add checks cfg on unused code
AurelienFT Oct 1, 2024
acf0ab5
add checks cfg on unused code
AurelienFT Oct 1, 2024
15d1bc3
Merge branch 'txpool-v2' into txpool-v2-block-producer-importer
AurelienFT Oct 1, 2024
6df300a
Applied comments from the PR
xgreenx Oct 1, 2024
d16424c
Added a comment for stabiltiy test
xgreenx Oct 1, 2024
6fe6d98
Merge branch 'master' into txpool-v2
xgreenx Oct 1, 2024
5def1d3
update tests following review
AurelienFT Oct 1, 2024
3b7fc49
remove unused and change order verif
AurelienFT Oct 1, 2024
692a5f1
Merge branch 'txpool-v2' into txpool-v2-block-producer-importer
AurelienFT Oct 2, 2024
48090ea
fix tests
AurelienFT Oct 2, 2024
0db90fc
Merge branch 'txpool-v2' into txpool-v2-block-producer-importer
AurelienFT Oct 2, 2024
1cd89e4
Merge branch 'master' into txpool-v2
AurelienFT Oct 2, 2024
6e27909
Change shared state to use message passing instead of rwlock
AurelienFT Oct 2, 2024
1a96d77
update insert return type
AurelienFT Oct 2, 2024
fb7ae3d
Merge branch 'txpool-v2' into txpool-v2-block-producer-importer
AurelienFT Oct 2, 2024
7917cb1
Merge branch 'master' into txpool-v2-block-producer-importer
AurelienFT Oct 3, 2024
5f48e87
Fix compil error
AurelienFT Oct 3, 2024
51a02a7
Re-add skipped transaction error propagation
AurelienFT Oct 3, 2024
7a365b5
Fix spelling and ordering toml
AurelienFT Oct 3, 2024
f62ea4a
Add gas verif in selection
AurelienFT Oct 4, 2024
a4e2d01
Add removal of coin dependents transactions
AurelienFT Oct 4, 2024
0087cf2
Merge branch 'master' into txpool-v2-block-producer-importer
AurelienFT Oct 4, 2024
4f76445
lower the stability rounds to help CI.
AurelienFT Oct 4, 2024
79cf576
fix test find
AurelienFT Oct 5, 2024
ab6aaba
Merge branch 'master' into txpool-v2-block-producer-importer
AurelienFT Oct 6, 2024
4542511
Add mock db to predicate verif in tests
AurelienFT Oct 6, 2024
2b038ab
Feature/refactoring service txpool v2 (#2307)
AurelienFT Oct 7, 2024
918f377
Merge branch 'master' into txpool-v2-block-producer-importer
AurelienFT Oct 7, 2024
6dbc31b
Merge branch 'refs/heads/master' into txpool-v2-block-producer-importer
xgreenx Oct 7, 2024
02cc19e
remove unused test file
AurelienFT Oct 7, 2024
b9c9d95
Merge remote-tracking branch 'origin/txpool-v2-block-producer-importe…
xgreenx Oct 7, 2024
8dfbf99
Merge branch 'master' into txpool-v2-block-producer-importer
AurelienFT Oct 7, 2024
d683d93
Merge remote-tracking branch 'origin/txpool-v2-block-producer-importe…
xgreenx Oct 7, 2024
dbc9206
Shuffle code)
xgreenx Oct 7, 2024
9d73f89
fix compilation txpool
AurelienFT Oct 7, 2024
f28da55
Merge branch 'txpool-v2-block-producer-importer' of github.com:FuelLa…
AurelienFT Oct 7, 2024
4e79e4a
Implemented sync and async processor. Used both in the TxPool to mana…
xgreenx Oct 7, 2024
d3dd371
Make CI happy
xgreenx Oct 7, 2024
68df4bc
Mia
xgreenx Oct 7, 2024
71213d9
fix tests old tx pool
AurelienFT Oct 7, 2024
0533ee1
Fix wrong comparison max gas
AurelienFT Oct 7, 2024
feda383
Merge branch 'master' into txpool-v2-block-producer-importer
AurelienFT Oct 7, 2024
badb310
Change type to pub(crate)
AurelienFT Oct 7, 2024
6d9c468
Merge branch 'txpool-v2-block-producer-importer' of github.com:FuelLa…
AurelienFT Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- [2216](https://github.com/FuelLabs/fuel-core/pull/2216): Add more function to the state and task of TxPoolV2 to handle the future interactions with others modules (PoA, BlockProducer, BlockImporter and P2P)

### Removed
- [2306](https://github.com/FuelLabs/fuel-core/pull/2306): Removed hack for genesis asset contract from the code.

Expand Down
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ async-trait = { workspace = true }
fuel-core-metrics = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

[dev-dependencies]
fuel-core-services = { path = ".", features = ["sync-processor"] }
futures = { workspace = true }
mockall = { workspace = true }

[features]
test-helpers = []
sync-processor = ["dep:rayon"]
258 changes: 258 additions & 0 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
use std::{
future::Future,
sync::Arc,
};
use tokio::{
runtime,
sync::{
OwnedSemaphorePermit,
Semaphore,
},
};

/// A processor that can execute async tasks with a limit on the number of tasks that can be
/// executed concurrently.
pub struct AsyncProcessor {
semaphore: Arc<Semaphore>,
thread_pool: runtime::Runtime,
}

/// A reservation for a task to be executed by the `AsyncProcessor`.
pub struct AsyncReservation(OwnedSemaphorePermit);

/// Out of capacity error.
#[derive(Debug, PartialEq, Eq)]
pub struct OutOfCapacity;

impl AsyncProcessor {
/// Create a new `AsyncProcessor` with the given number of threads and the number of pending
/// tasks.
pub fn new(
number_of_threads: usize,
number_of_pending_tasks: usize,
) -> anyhow::Result<Self> {
let thread_pool = runtime::Builder::new_multi_thread()
.worker_threads(number_of_threads)
.enable_all()
.build()
.map_err(|e| anyhow::anyhow!("Failed to create a tokio pool: {}", e))?;
let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks));
Ok(Self {
thread_pool,
semaphore,
})
}

/// Reserve a slot for a task to be executed.
pub fn reserve(&self) -> Result<AsyncReservation, OutOfCapacity> {
let permit = self.semaphore.clone().try_acquire_owned();
if let Ok(permit) = permit {
Ok(AsyncReservation(permit))
} else {
Err(OutOfCapacity)
}
}

/// Spawn a task with a reservation.
pub fn spawn_reserved<F>(&self, reservation: AsyncReservation, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
let permit = reservation.0;
self.thread_pool.spawn(async move {
let _drop = permit;
future.await
});
}

/// Tries to spawn a task. If the task cannot be spawned, returns an error.
pub fn try_spawn<F>(&self, future: F) -> Result<(), OutOfCapacity>
where
F: Future<Output = ()> + Send + 'static,
{
let reservation = self.reserve()?;
self.spawn_reserved(reservation, future);
Ok(())
}
}

#[cfg(test)]
#[allow(clippy::bool_assert_comparison)]
mod tests {
use super::*;
use std::{
thread::sleep,
time::Duration,
};
use tokio::time::Instant;

#[test]
fn one_spawn_single_tasks_works() {
// Given
let number_of_pending_tasks = 1;
let heavy_task_processor =
AsyncProcessor::new(1, number_of_pending_tasks).unwrap();

// When
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let result = heavy_task_processor.try_spawn(async move {
sender.send(()).unwrap();
});

// Then
assert_eq!(result, Ok(()));
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
let number_of_pending_tasks = 1;
let heavy_task_processor =
AsyncProcessor::new(1, number_of_pending_tasks).unwrap();
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
assert_eq!(first_spawn_result, Ok(()));

// When
let second_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});

// Then
assert_eq!(second_spawn_result, Err(OutOfCapacity));
}

#[test]
fn second_spawn_works_when_first_is_finished() {
let number_of_pending_tasks = 1;
let heavy_task_processor =
AsyncProcessor::new(1, number_of_pending_tasks).unwrap();

// Given
let (sender, receiver) = tokio::sync::oneshot::channel();
let first_spawn = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
sender.send(()).unwrap();
});
assert_eq!(first_spawn, Ok(()));
futures::executor::block_on(async move {
receiver.await.unwrap();
});

// When
let second_spawn = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});

// Then
assert_eq!(second_spawn, Ok(()));
}

#[test]
fn can_spawn_10_tasks_when_limit_is_10() {
// Given
let number_of_pending_tasks = 10;
let heavy_task_processor =
AsyncProcessor::new(1, number_of_pending_tasks).unwrap();

for _ in 0..number_of_pending_tasks {
// When
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
});

// Then
assert_eq!(result, Ok(()));
}
}

#[test]
fn executes_10_tasks_for_10_seconds_with_one_thread() {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 1;
let heavy_task_processor =
AsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
}
drop(broadcast_sender);

// Then
futures::executor::block_on(async move {
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() >= Duration::from_secs(10));
});
}

#[test]
fn executes_10_tasks_for_2_seconds_with_10_thread() {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
let heavy_task_processor =
AsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
}
drop(broadcast_sender);

// Then
futures::executor::block_on(async move {
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
});
}

#[test]
fn executes_10_tasks_for_2_seconds_with_1_thread() {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
let heavy_task_processor =
AsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
}
drop(broadcast_sender);

// Then
futures::executor::block_on(async move {
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
});
}
}
11 changes: 11 additions & 0 deletions crates/services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
#![deny(missing_docs)]
#![deny(warnings)]

mod async_processor;
mod service;
mod state;
mod sync;
#[cfg(feature = "sync-processor")]
mod sync_processor;

/// Re-exports for streaming utilities
pub mod stream {
Expand Down Expand Up @@ -58,6 +61,7 @@ where
}
}

pub use async_processor::AsyncProcessor;
pub use service::{
EmptyShared,
RunnableService,
Expand All @@ -74,3 +78,10 @@ pub use sync::{
Shared,
SharedMutex,
};
#[cfg(feature = "sync-processor")]
pub use sync_processor::SyncProcessor;

// For tests
use crate as fuel_core_services;
#[allow(unused_imports)]
use fuel_core_services as _;
Comment on lines +84 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this aliasing dance? We import the crate under a type alias and then alias it to _. This feels weird, but I guess we must achieve something by doing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

It was done to enable feature by default during testing

Loading
Loading