Skip to content

Commit 6c31b1a

Browse files
lexnvbkchrskunertdavxy
authored andcommitted
rpc: Use the blocks pinning API for chainHead methods (paritytech#13233)
* rpc/chain_head: Add backend to subscription management Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Pin blocks internally and adjust testing Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * client/in_mem: Reference for the number of pinned blocks Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/tests: Check in-memory references to pinned blocks Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Fix clippy Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Remove unused comment Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Place subscription handle under `Arc` and unpin blocks on drop Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/tests: Check all pinned blocks are unpinned on drop Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Apply suggestions from code review Co-authored-by: Bastian Köcher <git@kchr.de> * Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Bastian Köcher <git@kchr.de> * rpc/tests: Retry fetching the pinned references for CI correctness Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * client/service: Use 512 as maximum number of pinned blocks Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head: Fix merging conflicts Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Adjust subscriptions to use pinning API Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head/tests: Test subscription management Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Adjust chain_head follow to the new API Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Adjust chain_head.rs to the new API Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head/tests: Adjust test.rs to the new API Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * client/builder: Use new chainHead API Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Fix documentation Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * rpc/chain_head: Fix clippy Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * client/in_mem: ChainHead no longer uses `in_mem::children` Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert <skunert49@gmail.com> * Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert <skunert49@gmail.com> * Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert <skunert49@gmail.com> * Update client/rpc-spec-v2/src/chain_head/subscription.rs Co-authored-by: Sebastian Kunert <skunert49@gmail.com> * chain_head: Add block state machine Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Address feedback Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Use new_native_or_wasm_executor Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head: Remove 'static on Backend Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head: Add documentation Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head: Lock blocks before async blocks Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head_follower: Remove static on backend Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Update client/service/src/builder.rs Co-authored-by: Davide Galassi <davxy@datawok.net> * Update client/service/src/builder.rs Co-authored-by: Davide Galassi <davxy@datawok.net> * chain_head: Add BlockHeaderAbsent to the PartialEq impl Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * client: Add better documentation around pinning constants Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chain_head: Move subscription to dedicated module Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * subscription: Rename global pin / unpin functions Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Bastian Köcher <git@kchr.de> Co-authored-by: parity-processbot <> Co-authored-by: Sebastian Kunert <skunert49@gmail.com> Co-authored-by: Davide Galassi <davxy@datawok.net>
1 parent a3286f9 commit 6c31b1a

File tree

11 files changed

+1428
-364
lines changed

11 files changed

+1428
-364
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/api/src/in_mem.rs

+23-2
Original file line numberDiff line numberDiff line change
@@ -618,20 +618,36 @@ where
618618
states: RwLock<HashMap<Block::Hash, InMemoryBackend<HashFor<Block>>>>,
619619
blockchain: Blockchain<Block>,
620620
import_lock: RwLock<()>,
621+
pinned_blocks: RwLock<HashMap<Block::Hash, i64>>,
621622
}
622623

623624
impl<Block: BlockT> Backend<Block>
624625
where
625626
Block::Hash: Ord,
626627
{
627628
/// Create a new instance of in-mem backend.
629+
///
630+
/// # Warning
631+
///
632+
/// For testing purposes only!
628633
pub fn new() -> Self {
629634
Backend {
630635
states: RwLock::new(HashMap::new()),
631636
blockchain: Blockchain::new(),
632637
import_lock: Default::default(),
638+
pinned_blocks: Default::default(),
633639
}
634640
}
641+
642+
/// Return the number of references active for a pinned block.
643+
///
644+
/// # Warning
645+
///
646+
/// For testing purposes only!
647+
pub fn pin_refs(&self, hash: &<Block as BlockT>::Hash) -> Option<i64> {
648+
let blocks = self.pinned_blocks.read();
649+
blocks.get(hash).map(|value| *value)
650+
}
635651
}
636652

637653
impl<Block: BlockT> backend::AuxStore for Backend<Block>
@@ -781,11 +797,16 @@ where
781797
false
782798
}
783799

784-
fn pin_block(&self, _: <Block as BlockT>::Hash) -> blockchain::Result<()> {
800+
fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
801+
let mut blocks = self.pinned_blocks.write();
802+
*blocks.entry(hash).or_default() += 1;
785803
Ok(())
786804
}
787805

788-
fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
806+
fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
807+
let mut blocks = self.pinned_blocks.write();
808+
blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1);
809+
}
789810
}
790811

791812
impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}

client/rpc-spec-v2/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime"
4343
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }
4444
sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
4545
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
46+
sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" }
4647
sc-utils = { version = "4.0.0-dev", path = "../utils" }
4748
assert_matches = "1.3.0"

client/rpc-spec-v2/src/chain_head/chain_head.rs

+92-60
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
chain_head_follow::ChainHeadFollower,
2525
error::Error as ChainHeadRpcError,
2626
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
27-
subscription::SubscriptionManagement,
27+
subscription::{SubscriptionManagement, SubscriptionManagementError},
2828
},
2929
SubscriptionTaskExecutor,
3030
};
@@ -44,46 +44,48 @@ use sp_api::CallApiAt;
4444
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
4545
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
4646
use sp_runtime::traits::Block as BlockT;
47-
use std::{marker::PhantomData, sync::Arc};
47+
use std::{marker::PhantomData, sync::Arc, time::Duration};
4848

4949
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
5050

5151
/// An API for chain head RPC calls.
52-
pub struct ChainHead<BE, Block: BlockT, Client> {
52+
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
5353
/// Substrate client.
5454
client: Arc<Client>,
5555
/// Backend of the chain.
5656
backend: Arc<BE>,
5757
/// Executor to spawn subscriptions.
5858
executor: SubscriptionTaskExecutor,
5959
/// Keep track of the pinned blocks for each subscription.
60-
subscriptions: Arc<SubscriptionManagement<Block>>,
60+
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
6161
/// The hexadecimal encoded hash of the genesis block.
6262
genesis_hash: String,
63-
/// The maximum number of pinned blocks allowed per connection.
64-
max_pinned_blocks: usize,
6563
/// Phantom member to pin the block type.
6664
_phantom: PhantomData<Block>,
6765
}
6866

69-
impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
67+
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
7068
/// Create a new [`ChainHead`].
7169
pub fn new<GenesisHash: AsRef<[u8]>>(
7270
client: Arc<Client>,
7371
backend: Arc<BE>,
7472
executor: SubscriptionTaskExecutor,
7573
genesis_hash: GenesisHash,
7674
max_pinned_blocks: usize,
75+
max_pinned_duration: Duration,
7776
) -> Self {
7877
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));
7978

8079
Self {
8180
client,
82-
backend,
81+
backend: backend.clone(),
8382
executor,
84-
subscriptions: Arc::new(SubscriptionManagement::new()),
83+
subscriptions: Arc::new(SubscriptionManagement::new(
84+
max_pinned_blocks,
85+
max_pinned_duration,
86+
backend,
87+
)),
8588
genesis_hash,
86-
max_pinned_blocks,
8789
_phantom: PhantomData,
8890
}
8991
}
@@ -159,9 +161,8 @@ where
159161
return Err(err)
160162
},
161163
};
162-
163164
// Keep track of the subscription.
164-
let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else {
165+
let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates) else {
165166
// Inserting the subscription can only fail if the JsonRPSee
166167
// generated a duplicate subscription ID.
167168
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
@@ -177,7 +178,7 @@ where
177178
let mut chain_head_follow = ChainHeadFollower::new(
178179
client,
179180
backend,
180-
sub_handle,
181+
subscriptions.clone(),
181182
runtime_updates,
182183
sub_id.clone(),
183184
);
@@ -202,19 +203,28 @@ where
202203
let client = self.client.clone();
203204
let subscriptions = self.subscriptions.clone();
204205

205-
let fut = async move {
206-
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
206+
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
207+
Ok(block) => block,
208+
Err(SubscriptionManagementError::SubscriptionAbsent) => {
207209
// Invalid invalid subscription ID.
208210
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
209-
return
210-
};
211-
212-
// Block is not part of the subscription.
213-
if !handle.contains_block(&hash) {
211+
return Ok(())
212+
},
213+
Err(SubscriptionManagementError::BlockHashAbsent) => {
214+
// Block is not part of the subscription.
214215
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
215-
return
216-
}
216+
return Ok(())
217+
},
218+
Err(error) => {
219+
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
220+
error: error.to_string(),
221+
}));
222+
return Ok(())
223+
},
224+
};
217225

226+
let fut = async move {
227+
let _block_guard = block_guard;
218228
let event = match client.block(hash) {
219229
Ok(Some(signed_block)) => {
220230
let extrinsics = signed_block.block.extrinsics();
@@ -226,10 +236,10 @@ where
226236
debug!(
227237
target: LOG_TARGET,
228238
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
229-
follow_subscription,
239+
&follow_subscription,
230240
hash
231241
);
232-
handle.stop();
242+
subscriptions.remove_subscription(&follow_subscription);
233243
ChainHeadEvent::<String>::Disjoint
234244
},
235245
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
@@ -246,16 +256,19 @@ where
246256
follow_subscription: String,
247257
hash: Block::Hash,
248258
) -> RpcResult<Option<String>> {
249-
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
250-
// Invalid invalid subscription ID.
251-
return Ok(None)
259+
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
260+
Ok(block) => block,
261+
Err(SubscriptionManagementError::SubscriptionAbsent) => {
262+
// Invalid invalid subscription ID.
263+
return Ok(None)
264+
},
265+
Err(SubscriptionManagementError::BlockHashAbsent) => {
266+
// Block is not part of the subscription.
267+
return Err(ChainHeadRpcError::InvalidBlock.into())
268+
},
269+
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
252270
};
253271

254-
// Block is not part of the subscription.
255-
if !handle.contains_block(&hash) {
256-
return Err(ChainHeadRpcError::InvalidBlock.into())
257-
}
258-
259272
self.client
260273
.header(hash)
261274
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
@@ -286,19 +299,28 @@ where
286299
let client = self.client.clone();
287300
let subscriptions = self.subscriptions.clone();
288301

289-
let fut = async move {
290-
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
302+
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
303+
Ok(block) => block,
304+
Err(SubscriptionManagementError::SubscriptionAbsent) => {
291305
// Invalid invalid subscription ID.
292306
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
293-
return
294-
};
295-
296-
// Block is not part of the subscription.
297-
if !handle.contains_block(&hash) {
307+
return Ok(())
308+
},
309+
Err(SubscriptionManagementError::BlockHashAbsent) => {
310+
// Block is not part of the subscription.
298311
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
299-
return
300-
}
312+
return Ok(())
313+
},
314+
Err(error) => {
315+
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
316+
error: error.to_string(),
317+
}));
318+
return Ok(())
319+
},
320+
};
301321

322+
let fut = async move {
323+
let _block_guard = block_guard;
302324
// The child key is provided, use the key to query the child trie.
303325
if let Some(child_key) = child_key {
304326
// The child key must not be prefixed with ":child_storage:" nor
@@ -367,21 +389,29 @@ where
367389
let client = self.client.clone();
368390
let subscriptions = self.subscriptions.clone();
369391

370-
let fut = async move {
371-
let Some(handle) = subscriptions.get_subscription(&follow_subscription) else {
392+
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
393+
Ok(block) => block,
394+
Err(SubscriptionManagementError::SubscriptionAbsent) => {
372395
// Invalid invalid subscription ID.
373396
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
374-
return
375-
};
376-
377-
// Block is not part of the subscription.
378-
if !handle.contains_block(&hash) {
397+
return Ok(())
398+
},
399+
Err(SubscriptionManagementError::BlockHashAbsent) => {
400+
// Block is not part of the subscription.
379401
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
380-
return
381-
}
402+
return Ok(())
403+
},
404+
Err(error) => {
405+
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
406+
error: error.to_string(),
407+
}));
408+
return Ok(())
409+
},
410+
};
382411

412+
let fut = async move {
383413
// Reject subscription if runtime_updates is false.
384-
if !handle.has_runtime_updates() {
414+
if !block_guard.has_runtime_updates() {
385415
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
386416
"The runtime updates flag must be set".into(),
387417
));
@@ -417,15 +447,17 @@ where
417447
follow_subscription: String,
418448
hash: Block::Hash,
419449
) -> RpcResult<()> {
420-
let Some(handle) = self.subscriptions.get_subscription(&follow_subscription) else {
421-
// Invalid invalid subscription ID.
422-
return Ok(())
423-
};
424-
425-
if !handle.unpin_block(&hash) {
426-
return Err(ChainHeadRpcError::InvalidBlock.into())
450+
match self.subscriptions.unpin_block(&follow_subscription, hash) {
451+
Ok(()) => Ok(()),
452+
Err(SubscriptionManagementError::SubscriptionAbsent) => {
453+
// Invalid invalid subscription ID.
454+
Ok(())
455+
},
456+
Err(SubscriptionManagementError::BlockHashAbsent) => {
457+
// Block is not part of the subscription.
458+
Err(ChainHeadRpcError::InvalidBlock.into())
459+
},
460+
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
427461
}
428-
429-
Ok(())
430462
}
431463
}

0 commit comments

Comments
 (0)