Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: msp check previous block #337

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5c0e1f2
wip
undercover-cactus Jan 22, 2025
0bccfb6
needed file key value and some minors fixes
undercover-cactus Jan 23, 2025
d7c056a
added an integration test but it is failing
undercover-cactus Jan 29, 2025
3fdb907
failing integration test
undercover-cactus Feb 3, 2025
ee2c27b
fmt
undercover-cactus Feb 4, 2025
5b4461a
change name of the rpc function; incomplete integration test
undercover-cactus Feb 5, 2025
327be01
linter
undercover-cactus Feb 5, 2025
95adc49
typecheck
undercover-cactus Feb 5, 2025
0bd3cb2
remove sleep
undercover-cactus Feb 6, 2025
680bca7
update comment; change unresponded_ to pending_ function name;
undercover-cactus Feb 7, 2025
aceacad
convert Vec of storage requests to HashMap; minor changes;
undercover-cactus Feb 12, 2025
43a883d
use BTreeMap from the sp-std lib; remove sleep;
undercover-cactus Feb 13, 2025
5b6ecc4
typegen
undercover-cactus Feb 13, 2025
302a0ff
remove unused imports
undercover-cactus Feb 13, 2025
9dc8846
wip
undercover-cactus Feb 19, 2025
dff7e02
wip
undercover-cactus Feb 20, 2025
a09b3b2
add retry logic and complete integration test;
undercover-cactus Feb 21, 2025
d046d26
fmt
undercover-cactus Feb 21, 2025
6eb9df6
update integration tests
undercover-cactus Feb 21, 2025
8ca3749
created new utility function wait_for_num_blocks that wait for an amo…
undercover-cactus Feb 21, 2025
db47403
linter
undercover-cactus Feb 21, 2025
51039c4
feat: :construction: Trying to fix this, passing it to lola
ffarall Feb 26, 2025
b17af7f
fix test
undercover-cactus Feb 26, 2025
643be98
remove unused import
undercover-cactus Feb 26, 2025
b9e8149
make the tail value an option and default back to undefined
undercover-cactus Feb 27, 2025
0ed2221
remove only:true
undercover-cactus Feb 27, 2025
aefdf1a
still need sleep
undercover-cactus Feb 28, 2025
c202fcf
fix linter
undercover-cactus Feb 28, 2025
158bd85
Merge branch 'main' into feat/msp-check-previous-block
undercover-cactus Mar 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub enum BlockchainServiceCommand {
block_number: BlockNumber,
callback: tokio::sync::oneshot::Sender<tokio::sync::oneshot::Receiver<()>>,
},
WaitForNumBlocks {
number_of_blocks: BlockNumber,
callback: tokio::sync::oneshot::Sender<tokio::sync::oneshot::Receiver<()>>,
},
WaitForTick {
tick_number: TickNumber,
callback:
Expand Down Expand Up @@ -239,6 +243,9 @@ pub trait BlockchainServiceInterface {
/// Wait for a block number.
async fn wait_for_block(&self, block_number: BlockNumber) -> Result<()>;

/// Wait for a number blocks to pass.
async fn wait_for_num_blocks(&self, number_of_blocks: BlockNumber) -> Result<()>;

/// Wait for a tick number.
async fn wait_for_tick(&self, tick_number: TickNumber) -> Result<(), ApiError>;

Expand Down Expand Up @@ -487,6 +494,19 @@ where
Ok(())
}

async fn wait_for_num_blocks(&self, number_of_blocks: BlockNumber) -> Result<()> {
let (callback, rx) = tokio::sync::oneshot::channel();
// Build command to send to blockchain service.
let message = BlockchainServiceCommand::WaitForNumBlocks {
number_of_blocks,
callback,
};
self.send(message).await;
let rx = rx.await.expect("Failed to receive response from BlockchainService. Probably means BlockchainService has crashed.");
rx.await.expect("Failed to wait for block");
Ok(())
}

async fn wait_for_tick(&self, tick_number: TickNumber) -> Result<(), ApiError> {
let (callback, rx) = tokio::sync::oneshot::channel();
// Build command to send to blockchain service.
Expand Down
66 changes: 64 additions & 2 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
use shc_actors_framework::actor::{Actor, ActorEventLoop};
use shc_common::{
blockchain_utils::{convert_raw_multiaddresses_to_multiaddr, get_events_at_block},
types::{BlockNumber, ParachainClient, TickNumber},
types::{
BlockNumber, EitherBucketOrBspId, Fingerprint, ParachainClient, StorageProviderId,

Check failure on line 35 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

unresolved import `shc_common::types::EitherBucketOrBspId`

Check failure on line 35 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Build node image

unresolved import `shc_common::types::EitherBucketOrBspId`

Check failure on line 35 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Prepare artifacts

unresolved import `shc_common::types::EitherBucketOrBspId`
StorageRequestMetadata, TickNumber, BCSV_KEY_TYPE,
},
};

use crate::{
Expand Down Expand Up @@ -352,6 +355,28 @@
}
}
}
BlockchainServiceCommand::WaitForNumBlocks {
number_of_blocks,
callback,
} => {
let current_block_number = self.client.info().best_number;

let (tx, rx) = tokio::sync::oneshot::channel();

self.wait_for_block_request_by_number
.entry(current_block_number + number_of_blocks)
.or_insert_with(Vec::new)
.push(tx);

match callback.send(rx) {
Ok(_) => {
trace!(target: LOG_TARGET, "Block message receiver sent successfully");
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to send block message receiver: {:?}", e);
}
}
}
BlockchainServiceCommand::WaitForTick {
tick_number,
callback,
Expand Down Expand Up @@ -1268,7 +1293,44 @@
Some(ManagedProvider::Bsp(_)) => {
self.bsp_initial_sync();
}
Some(ManagedProvider::Msp(_)) => {
Some(StorageProviderId::MainStorageProvider(msp_id)) => {

Check failure on line 1296 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

mismatched types
// TODO: Send events to check that this node has a Forest Storage for each Bucket this MSP manages.
// TODO: Catch up to Forest root writes in the Bucket's Forests.

info!(target: LOG_TARGET, "Checking for storage requests for this MSP");

let storage_requests: BTreeMap<H256, StorageRequestMetadata> = match self
.client
.runtime_api()
.pending_storage_requests_by_msp(block_hash, msp_id)
{
Ok(sr) => sr,
Err(_) => {
// If querying for pending storage requests fail, do not try to answer them
warn!(target: LOG_TARGET, "Failed to get pending storage request");
return;
}
};

info!(
"We have {} pending storage requests",
storage_requests.len()
);

// loop over each pending storage requests to start a new storage request task for the MSP
for (file_key, sr) in storage_requests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe ignore since it's personal preference

Can you rename sr to storage_request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to differentiate storage_requests and storage_request when browsing the code. I realize that sr is not a good name but we can quickly understand what it is by looking at the loop. It is a trade off and I guess linked to programming habits.

Maybe to improve readability I could add a comment ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a comment is fine by me, btw.

self.emit(NewStorageRequest {

Check failure on line 1322 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

cannot find struct, variant or union type `NewStorageRequest` in this scope

Check failure on line 1322 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Build node image

cannot find struct, variant or union type `NewStorageRequest` in this scope

Check failure on line 1322 in client/blockchain-service/src/handler.rs

View workflow job for this annotation

GitHub Actions / Prepare artifacts

cannot find struct, variant or union type `NewStorageRequest` in this scope
who: sr.owner,
file_key: file_key.into(),
bucket_id: sr.bucket_id,
location: sr.location,
fingerprint: Fingerprint::from(sr.fingerprint.as_bytes()),
size: sr.size,
user_peer_ids: sr.user_peer_ids,
expires_at: sr.expires_at,
})
}

self.msp_initial_sync();
}
None => {
Expand Down
1 change: 1 addition & 0 deletions client/common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub type Balance = pallet_storage_providers::types::BalanceOf<Runtime>;
pub type OpaqueBlock = storage_hub_runtime::opaque::Block;
pub type BlockHash = <OpaqueBlock as BlockT>::Hash;
pub type PeerId = pallet_file_system::types::PeerId<Runtime>;
pub type StorageRequestMetadata = pallet_file_system::types::StorageRequestMetadata<Runtime>;
pub type MaxBatchConfirmStorageRequests =
<Runtime as pallet_file_system::Config>::MaxBatchConfirmStorageRequests;

Expand Down
11 changes: 2 additions & 9 deletions client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ use tokio::{fs, fs::create_dir_all, sync::RwLock};

use pallet_file_system_runtime_api::FileSystemApi as FileSystemRuntimeApi;
use pallet_proofs_dealer_runtime_api::ProofsDealerApi as ProofsDealerRuntimeApi;
use shc_common::{
consts::CURRENT_FOREST_KEY,
types::{
BackupStorageProviderId, BlockNumber, BucketId, ChunkId, CustomChallenge, FileMetadata,
ForestLeaf, HashT, KeyProof, KeyProofs, MainStorageProviderId, ProofsDealerProviderId,
Proven, RandomnessOutput, StorageProof, StorageProofsMerkleTrieLayout, BCSV_KEY_TYPE,
FILE_CHUNK_SIZE,
},
};
use shc_common::{consts::CURRENT_FOREST_KEY, types::*};
use shc_file_manager::traits::{ExcludeType, FileDataTrie, FileStorage, FileStorageError};
use shc_forest_manager::traits::{ForestStorage, ForestStorageHandler};
use sp_core::{sr25519::Pair as Sr25519Pair, Encode, Pair, H256};
Expand Down Expand Up @@ -323,6 +315,7 @@ where
BlockNumber,
ChunkId,
BucketId,
StorageRequestMetadata,
>,
FL: FileStorage<StorageProofsMerkleTrieLayout> + Send + Sync,
FSH: ForestStorageHandler + Send + Sync + 'static,
Expand Down
6 changes: 2 additions & 4 deletions node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use sc_consensus_manual_seal::{
};
use sc_rpc::DenyUnsafe;
use sc_transaction_pool_api::TransactionPool;
use shc_common::types::{
BackupStorageProviderId, BlockNumber, BucketId, ChunkId, CustomChallenge, ForestLeaf,
MainStorageProviderId, ProofsDealerProviderId, RandomnessOutput,
};
use shc_common::types::*;
use shc_forest_manager::traits::ForestStorageHandler;
use shc_rpc::{StorageHubClientApiServer, StorageHubClientRpc, StorageHubClientRpcConfig};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -73,6 +70,7 @@ where
BlockNumber,
ChunkId,
BucketId,
StorageRequestMetadata,
>,
P: TransactionPool + Send + Sync + 'static,
FL: FileStorageT,
Expand Down
3 changes: 3 additions & 0 deletions node/src/tasks/bsp_charge_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ where
.map_err(|e| anyhow!("Failed to get metadata from Forest: {:?}", e))?;

if !user_files.is_empty() {
// We only take the first file of the list in order to generate a proof submit it with an extrinsic and then release the lock to process the next file and generate the next proof.
// It is not ideal because it means one extrinsic per file but batch deletion is not yet implemented.
// TODO: Improve it once batch deletion is implemented.
let (file_key, metadata) = user_files.first().expect("User files is not empty");
let bucket_id = H256::from_slice(metadata.bucket_id().as_ref());
let location = sp_runtime::BoundedVec::truncate_from(metadata.location().clone());
Expand Down
48 changes: 44 additions & 4 deletions node/src/tasks/user_sends_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ where
{
warn!(
target: LOG_TARGET,
"Batch upload rejected by peer {:?}, retrying... (attempt {})",
"Final batch upload rejected by peer {:?}, retrying... (attempt {})",
peer_id,
retry_attempts + 1
);
Expand All @@ -326,7 +326,27 @@ where
// Wait for a short time before retrying
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Err(RequestError::RequestFailure(RequestFailure::Refused)) => {
Err(RequestError::RequestFailure(RequestFailure::Network(_)))
| Err(RequestError::RequestFailure(RequestFailure::NotConnected))
if retry_attempts < 10 =>
{
warn!(
target: LOG_TARGET,
"Unable to upload final batch to peer {:?}, retrying... (attempt {})",
peer_id,
retry_attempts + 1
);
retry_attempts += 1;

// Wait a bit for the MSP to be online
self.storage_hub_handler
.blockchain
.wait_for_num_blocks(5)
.await?;
}
Err(RequestError::RequestFailure(RequestFailure::Refused))
| Err(RequestError::RequestFailure(RequestFailure::Network(_)))
| Err(RequestError::RequestFailure(RequestFailure::NotConnected)) => {
// Return an error if the provider refused to answer.
return Err(anyhow::anyhow!("Failed to send file {:?}", file_key));
}
Expand Down Expand Up @@ -388,7 +408,7 @@ where
.upload_request(peer_id, file_key.as_ref().into(), proof.clone(), None)
.await;

match upload_response {
match upload_response.as_ref() {
Ok(r) => {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -421,7 +441,27 @@ where
// Wait for a short time before retrying
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to standardize the retrying logic, should we also wait for one block here instead of one second ? Because Refused happened if the request hasn't been yet processed.

And because of latency and block propagation time, one second could be too short in a real network.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this flow independent from the blockchain. The user here is trying to send data to a peer (provider).

}
Err(RequestError::RequestFailure(RequestFailure::Refused)) => {
Err(RequestError::RequestFailure(RequestFailure::Network(_)))
| Err(RequestError::RequestFailure(RequestFailure::NotConnected))
if retry_attempts < 10 =>
{
warn!(
target: LOG_TARGET,
"Unable to upload final batch to peer {:?}, retrying... (attempt {})",
peer_id,
retry_attempts + 1
);
retry_attempts += 1;

// Wait a bit for the MSP to be online
self.storage_hub_handler
.blockchain
.wait_for_num_blocks(5)
.await?;
}
Err(RequestError::RequestFailure(RequestFailure::Refused))
| Err(RequestError::RequestFailure(RequestFailure::Network(_)))
| Err(RequestError::RequestFailure(RequestFailure::NotConnected)) => {
// Return an error if the provider refused to answer.
return Err(anyhow::anyhow!("Failed to send file {:?}", file_key));
}
Expand Down
4 changes: 3 additions & 1 deletion pallets/file-system/runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { workspace = true, features = ["derive"] }
scale-info = { workspace = true }
sp-api = { workspace = true }
sp-core = { workspace = true }
sp-runtime = { workspace = true }
sp-std = { workspace = true }

[features]
default = ["std"]
std = ["codec/std", "sp-api/std", "sp-runtime/std"]
std = ["codec/std", "sp-api/std", "sp-runtime/std", "sp-std/std"]
12 changes: 11 additions & 1 deletion pallets/file-system/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use codec::{Codec, Decode, Encode};
use scale_info::prelude::vec::Vec;
use scale_info::TypeInfo;
use sp_core::H256;
use sp_runtime::RuntimeDebug;
use sp_std::collections::btree_map::BTreeMap;

/// Error type for the `is_storage_request_open_to_volunteers` runtime API call.
#[derive(Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
Expand Down Expand Up @@ -53,21 +55,29 @@ pub enum GenericApplyDeltaEventInfoError {
DecodeError,
}

/// Error type for the `pending_storage_requests_by_msp`.
#[derive(Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum StorageRequestsByMspError {
FailedToRetrieveStorageRequests,
}

sp_api::decl_runtime_apis! {
#[api_version(1)]
pub trait FileSystemApi<BackupStorageProviderId, MainStorageProviderId, FileKey, TickNumber, ChunkId, GenericApplyDeltaEventInfo>
pub trait FileSystemApi<BackupStorageProviderId, MainStorageProviderId, FileKey, TickNumber, ChunkId, GenericApplyDeltaEventInfo, StorageRequestMetadata>
where
BackupStorageProviderId: Codec,
MainStorageProviderId: Codec,
FileKey: Codec,
TickNumber: Codec,
ChunkId: Codec,
GenericApplyDeltaEventInfo: Codec,
StorageRequestMetadata: Codec,
{
fn is_storage_request_open_to_volunteers(file_key: FileKey) -> Result<bool, IsStorageRequestOpenToVolunteersError>;
fn query_earliest_file_volunteer_tick(bsp_id: BackupStorageProviderId, file_key: FileKey) -> Result<TickNumber, QueryFileEarliestVolunteerTickError>;
fn query_bsp_confirm_chunks_to_prove_for_file(bsp_id: BackupStorageProviderId, file_key: FileKey) -> Result<Vec<ChunkId>, QueryBspConfirmChunksToProveForFileError>;
fn query_msp_confirm_chunks_to_prove_for_file(msp_id: MainStorageProviderId, file_key: FileKey) -> Result<Vec<ChunkId>, QueryMspConfirmChunksToProveForFileError>;
fn decode_generic_apply_delta_event_info(encoded_event_info: Vec<u8>) -> Result<GenericApplyDeltaEventInfo, GenericApplyDeltaEventInfoError>;
fn pending_storage_requests_by_msp(msp_id: MainStorageProviderId) -> BTreeMap<H256, StorageRequestMetadata>;
}
}
17 changes: 17 additions & 0 deletions pallets/file-system/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ReadBucketsInterface, ReadProvidersInterface, ReadStorageProvidersInterface,
ReadUserSolvencyInterface, TrieAddMutation, TrieRemoveMutation,
};
use sp_std::collections::btree_map::BTreeMap;

use crate::{
pallet,
Expand Down Expand Up @@ -71,7 +72,7 @@
}};
// Handle boolean type
($condition:expr, $error_msg:expr, $error_type:path, bool) => {{
if !$condition {

Check warning on line 75 in pallets/file-system/src/utils.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

the use of negated comparison operators on partially ordered types produces code that is hard to read and refactor, please consider using the `partial_cmp` method instead, to make it clear that the two values could be incomparable
#[cfg(test)]
unreachable!($error_msg);

Expand Down Expand Up @@ -2841,6 +2842,22 @@
) -> Result<BucketIdFor<T>, codec::Error> {
BucketIdFor::<T>::decode(&mut encoded_event_info)
}

pub fn pending_storage_requests_by_msp(
msp_id: ProviderIdFor<T>,
) -> BTreeMap<MerkleHash<T>, StorageRequestMetadata<T>> {
// Get the storage requests for a specific MSP
StorageRequests::<T>::iter()
.filter(|(_, metadata)| {
if let Some(msp) = metadata.msp {
msp.0 == msp_id && !msp.1
} else {
false
}
})

Check warning on line 2857 in pallets/file-system/src/utils.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

unnecessary map of the identity function
.map(|(file_key, metadata)| (file_key, metadata))
.collect()
}
}

mod hooks {
Expand Down
Loading
Loading