Skip to content

Commit

Permalink
Extract off chain logic from the executor (#1579)
Browse files Browse the repository at this point in the history
Closes FuelLabs/fuel-core#1549

## Overview

The change extracts the off-chain-related logic from the executor and
moves it to the GraphQL off-chain worker. It creates two new concepts -
Off-chain and On-chain databases where the GraphQL worker has exclusive
ownership of the database and may modify it without intersecting with
the On-chain database.


## Challenges caused by the change

Delegating updating of the state to something other than `BlockImporter`
causes several new problems:
- The commitment to the on-chain and off-chain databases is done in
different places. The off-chain database may be out of sync with the
on-chain database due to race conditions.
- The result of the block execution(receipts, statuses) is not stored
anywhere and may be lost due to emergency shutdown.

We don't want to duplicate on-chain data inside of the off-chain
database, so the GraphQL service works with two sources of data, which
leads to two problems:
- The off-chain database may be out of sync with the on-chain database
due to race conditions causing failing requests.
- The view of the databases during the GraphQL request may change,
causing invalid responses with a mix of old and new data. We had this
problem before, but now it is more critical.
## Solutions to the challenges

### Out of sync

The change applies two steps to solve this issue. The main one is a new
trait for the database:
```rust
/// Provides a view of the storage at the given height.
/// It guarantees to be atomic, meaning the view is immutable to outside modifications.
pub trait AtomicView<View>: Send + Sync {
    /// Returns the view of the storage at the given `height`.
    fn view_at(&self, height: BlockHeight) -> StorageResult<View>;

    /// Returns the view of the storage for the latest block height.
    fn latest_view(&self) -> View;
}
```

Another one to await on the `BlockCommiter` side finishing processing
the `ImportResult` by all listeners.

The goal of the trait is to provide an immutable read-only view of the
database at a specific time. However, this trait has not yet been
implemented properly during this PR and will be implemented in the
following PRs. The `view_at` requires functionality from
FuelLabs/fuel-core#451. We already can
implement the `latest_view` method via
[`RocksDB::Transaction`](https://github.com/facebook/rocksdb/wiki/Transactions#reading-from-a-transaction),
but it is better to do it after merging
FuelLabs/fuel-core#1576.

Waiting on the `BlockImporter` side is a temporary solution to not
escalate the problem. But maybe we can keep it later to guarantee the
consistent state of the blockchain.

### Losing result of execution

The `AtomicView` trait also solves the issue of losing the state of the
execution because it is possible to get a view of the database at a
specific block height and execute the block again receiving the same
execution result.

Waiting inside the `BlockImporter` guarantees that we will not lose more
than one `ImportResult`.

### Inconsistent database view within GraphQL requests

The GraphQL now has `ReadDatabase`:

```rust
pub type OnChainView = Arc<dyn OnChainDatabase>;
pub type OffChainView = Arc<dyn OffChainDatabase>;

pub struct ReadDatabase {
    on_chain: Box<dyn AtomicView<OnChainView>>,
    off_chain: Box<dyn AtomicView<OffChainView>>,
}
```

It implements the `view` method that returns the `ReadView` type. The
`ReadView` implements all required methods by using internal on-chain
view and off-chain view.

The `AtomicView` allows us to get the `last_view` of the off-chain
database and get the `view_at(off_chain_view.last_height())` of the
on-chain database creating a consistent view for both databases at a
specific height.

The change also adds a `ViewExtension` to the GraphQL that creates a
`ReadView` for each request.

```rust
/// The extension that adds the `ReadView` to the request context.
/// It guarantees that the request works with the one view of the database,
/// and external database modification cannot affect the result.
struct ViewExtension;

#[async_trait::async_trait]
impl Extension for ViewExtension {
    async fn prepare_request(
        &self,
        ctx: &ExtensionContext<'_>,
        request: Request,
        next: NextPrepareRequest<'_>,
    ) -> ServerResult<Request> {
        let database: &ReadDatabase = ctx.data_unchecked();
        let view = database.view();
        let request = request.data(view);
        next.run(ctx, request).await
    }
}
```

## Implementation details

- The `ExecutionResult` now also has receipts for the transaction along
with its status. The off-chain worker will insert them later into the
database, while the `dry_run` can fetch them immediately.
- All API requests now work with the `ReadView` instead of the
`Database` type. The `ReadDatabase` is only used in one place in the
`ViewExtension`.
- The `BlockImpoerter::comit_result` now is `async` and awaits for the
previous block to be processed by all listeners. The execution of the
`execute_and_commit` now runs `verify_and_execute_block` in the spawned
task in the `tokio_rayon`.

## Follow up

- FuelLabs/fuel-core#1580
- FuelLabs/fuel-core#1581
- FuelLabs/fuel-core#1582
- FuelLabs/fuel-core#1583
- FuelLabs/fuel-core#1584
  • Loading branch information
crypto523 committed Jan 19, 2024
1 parent 4806a60 commit 842d625
Show file tree
Hide file tree
Showing 56 changed files with 1,449 additions and 767 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Description of the upcoming release here.

- [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly.
- [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p
- [#1579](https://github.com/FuelLabs/fuel-core/pull/1579): The change extracts the off-chain-related logic from the executor and moves it to the GraphQL off-chain worker. It creates two new concepts - Off-chain and On-chain databases where the GraphQL worker has exclusive ownership of the database and may modify it without intersecting with the On-chain database.
- [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor.
- [#1601](https://github.com/FuelLabs/fuel-core/pull/1601): Fix formatting in docs and check that `cargo doc` passes in the CI.

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 32 additions & 23 deletions crates/fuel-core/src/coins_query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
fuel_core_graphql_api::service::Database,
fuel_core_graphql_api::database::ReadView,
query::asset_query::{
AssetQuery,
AssetSpendTarget,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl SpendQuery {
}

/// Return [`AssetQuery`]s.
pub fn asset_queries<'a>(&'a self, db: &'a Database) -> Vec<AssetQuery<'a>> {
pub fn asset_queries<'a>(&'a self, db: &'a ReadView) -> Vec<AssetQuery<'a>> {
self.query_per_asset
.iter()
.map(|asset| {
Expand Down Expand Up @@ -159,7 +159,7 @@ pub fn largest_first(query: &AssetQuery) -> Result<Vec<CoinType>, CoinsQueryErro

// An implementation of the method described on: https://iohk.io/en/blog/posts/2018/07/03/self-organisation-in-coin-selection/
pub fn random_improve(
db: &Database,
db: &ReadView,
spend_query: &SpendQuery,
) -> Result<Vec<Vec<CoinType>>, CoinsQueryError> {
let mut coins_per_asset = vec![];
Expand Down Expand Up @@ -229,7 +229,7 @@ mod tests {
SpendQuery,
},
database::Database,
fuel_core_graphql_api::service::Database as ServiceDatabase,
fuel_core_graphql_api::api_service::ReadDatabase as ServiceDatabase,
query::asset_query::{
AssetQuery,
AssetSpendTarget,
Expand Down Expand Up @@ -323,15 +323,19 @@ mod tests {
let result: Vec<_> = spend_query
.iter()
.map(|asset| {
largest_first(&AssetQuery::new(owner, asset, base_asset_id, None, db))
.map(|coins| {
coins
.iter()
.map(|coin| {
(*coin.asset_id(base_asset_id), coin.amount())
})
.collect()
})
largest_first(&AssetQuery::new(
owner,
asset,
base_asset_id,
None,
&db.view(),
))
.map(|coins| {
coins
.iter()
.map(|coin| (*coin.asset_id(base_asset_id), coin.amount()))
.collect()
})
})
.try_collect()?;
Ok(result)
Expand Down Expand Up @@ -484,7 +488,7 @@ mod tests {
db: &ServiceDatabase,
) -> Result<Vec<(AssetId, u64)>, CoinsQueryError> {
let coins = random_improve(
db,
&db.view(),
&SpendQuery::new(owner, &query_per_asset, None, base_asset_id)?,
);

Expand Down Expand Up @@ -682,7 +686,7 @@ mod tests {
Some(excluded_ids),
base_asset_id,
)?;
let coins = random_improve(&db.service_database(), &spend_query);
let coins = random_improve(&db.service_database().view(), &spend_query);

// Transform result for convenience
coins.map(|coins| {
Expand Down Expand Up @@ -840,7 +844,7 @@ mod tests {
}

let coins = random_improve(
&db.service_database(),
&db.service_database().view(),
&SpendQuery::new(
owner,
&[AssetSpendTarget {
Expand Down Expand Up @@ -930,7 +934,8 @@ mod tests {
}

fn service_database(&self) -> ServiceDatabase {
Box::new(self.database.clone())
let database = self.database.clone();
ServiceDatabase::new(database.clone(), database)
}
}

Expand Down Expand Up @@ -980,18 +985,22 @@ mod tests {

pub fn owned_coins(&self, owner: &Address) -> Vec<Coin> {
use crate::query::CoinQueryData;
let db = self.service_database();
db.owned_coins_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| db.coin(id).unwrap()))
let query = self.service_database();
let query = query.view();
query
.owned_coins_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.coin(id).unwrap()))
.try_collect()
.unwrap()
}

pub fn owned_messages(&self, owner: &Address) -> Vec<Message> {
use crate::query::MessageQueryData;
let db = self.service_database();
db.owned_message_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| db.message(&id).unwrap()))
let query = self.service_database();
let query = query.view();
query
.owned_message_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.message(&id).unwrap()))
.try_collect()
.unwrap()
}
Expand Down
37 changes: 11 additions & 26 deletions crates/fuel-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod tests {
Coins,
ContractsRawCode,
Messages,
Receipts,
},
StorageAsMut,
};
Expand Down Expand Up @@ -662,23 +661,18 @@ mod tests {
coinbase_recipient: config_coinbase,
..Default::default()
};
let mut producer = create_executor(Default::default(), config);
let producer = create_executor(Default::default(), config);

let mut block = Block::default();
*block.transactions_mut() = vec![script.clone().into()];

assert!(producer
let ExecutionResult { tx_status, .. } = producer
.execute_and_commit(
ExecutionBlock::Production(block.into()),
Default::default()
Default::default(),
)
.is_ok());
let receipts = producer
.database
.storage::<Receipts>()
.get(&script.id(&producer.config.consensus_parameters.chain_id))
.unwrap()
.unwrap();
.expect("Should execute the block");
let receipts = &tx_status[0].receipts;

if let Some(Receipt::Return { val, .. }) = receipts.first() {
*val == 1
Expand Down Expand Up @@ -2756,20 +2750,16 @@ mod tests {
},
);

executor
let ExecutionResult { tx_status, .. } = executor
.execute_and_commit(
ExecutionBlock::Production(block),
ExecutionOptions {
utxo_validation: true,
},
)
.unwrap();
.expect("Should execute the block");

let receipts = database
.storage::<Receipts>()
.get(&tx.id(&ChainId::default()))
.unwrap()
.unwrap();
let receipts = &tx_status[0].receipts;
assert_eq!(block_height as u64, receipts[0].val().unwrap());
}

Expand Down Expand Up @@ -2835,21 +2825,16 @@ mod tests {
},
);

executor
let ExecutionResult { tx_status, .. } = executor
.execute_and_commit(
ExecutionBlock::Production(block),
ExecutionOptions {
utxo_validation: true,
},
)
.unwrap();

let receipts = database
.storage::<Receipts>()
.get(&tx.id(&ChainId::default()))
.unwrap()
.unwrap();
.expect("Should execute the block");

let receipts = &tx_status[0].receipts;
assert_eq!(time.0, receipts[0].val().unwrap());
}
}
5 changes: 4 additions & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use fuel_core_types::{
};
use std::net::SocketAddr;

pub mod api_service;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
pub mod service;
pub(crate) mod view_extension;
pub mod worker_service;

#[derive(Clone, Debug)]
pub struct Config {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::{
fuel_core_graphql_api::ports::{
BlockProducerPort,
ConsensusModulePort,
DatabasePort,
P2pPort,
TxPoolPort,
},
graphql_api::{
fuel_core_graphql_api::{
database::{
OffChainView,
OnChainView,
},
metrics_extension::MetricsExtension,
ports::{
BlockProducerPort,
ConsensusModulePort,
P2pPort,
TxPoolPort,
},
view_extension::ViewExtension,
Config,
},
schema::{
Expand Down Expand Up @@ -55,6 +59,7 @@ use fuel_core_services::{
RunnableTask,
StateWatcher,
};
use fuel_core_storage::transactional::AtomicView;
use futures::Stream;
use serde_json::json;
use std::{
Expand All @@ -75,7 +80,7 @@ use tower_http::{

pub type Service = fuel_core_services::ServiceRunner<GraphqlService>;

pub type Database = Box<dyn DatabasePort>;
pub use super::database::ReadDatabase;

pub type BlockProducer = Box<dyn BlockProducerPort>;
// In the future GraphQL should not be aware of `TxPool`. It should
Expand Down Expand Up @@ -160,28 +165,35 @@ impl RunnableTask for Task {

// Need a seperate Data Object for each Query endpoint, cannot be avoided
#[allow(clippy::too_many_arguments)]
pub fn new_service(
pub fn new_service<OnChain, OffChain>(
config: Config,
schema: CoreSchemaBuilder,
database: Database,
on_database: OnChain,
off_database: OffChain,
txpool: TxPool,
producer: BlockProducer,
consensus_module: ConsensusModule,
p2p_service: P2pService,
log_threshold_ms: Duration,
request_timeout: Duration,
) -> anyhow::Result<Service> {
) -> anyhow::Result<Service>
where
OnChain: AtomicView<OnChainView> + 'static,
OffChain: AtomicView<OffChainView> + 'static,
{
let network_addr = config.addr;
let combined_read_database = ReadDatabase::new(on_database, off_database);

let schema = schema
.data(config)
.data(database)
.data(combined_read_database)
.data(txpool)
.data(producer)
.data(consensus_module)
.data(p2p_service)
.extension(async_graphql::extensions::Tracing)
.extension(MetricsExtension::new(log_threshold_ms))
.extension(ViewExtension::new())
.finish();

let router = Router::new()
Expand Down
Loading

0 comments on commit 842d625

Please sign in to comment.