Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

grandpa: always create and send justification if there are any subscribers #6935

Merged
5 commits merged into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/finality-grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"

[dependencies]
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
sc-rpc = { version = "2.0.0-rc6", path = "../../rpc" }
sp-core = { version = "2.0.0-rc6", path = "../../../primitives/core" }
sp-runtime = { version = "2.0.0-rc6", path = "../../../primitives/runtime" }
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
finality-grandpa = { version = "0.12.3", features = ["derive-codec"] }
jsonrpc-core = "14.2.0"
jsonrpc-core-client = "14.2.0"
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ mod tests {

// Notify with a header and justification
let justification = create_justification();
let _ = justification_sender.notify(justification.clone()).unwrap();
justification_sender.notify(|| Ok(justification.clone())).unwrap();

// Inspect what we received
let recv = receiver.take(1).wait().flatten().collect::<Vec<_>>();
Expand All @@ -418,7 +418,7 @@ mod tests {

let recv_sub_id: String =
serde_json::from_value(json_map["subscription"].take()).unwrap();
let recv_justification: Vec<u8> =
let recv_justification: sp_core::Bytes =
serde_json::from_value(json_map["result"].take()).unwrap();
let recv_justification: GrandpaJustification<Block> =
Decode::decode(&mut &recv_justification[..]).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/rpc/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use sc_finality_grandpa::GrandpaJustification;

/// An encoded justification proving that the given header has been finalized
#[derive(Clone, Serialize, Deserialize)]
pub struct JustificationNotification(Vec<u8>);
pub struct JustificationNotification(sp_core::Bytes);

impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationNotification {
fn from(notification: GrandpaJustification<Block>) -> Self {
JustificationNotification(notification.encode())
JustificationNotification(notification.encode().into())
}
}
68 changes: 43 additions & 25 deletions client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ pub(crate) fn ancestry<Block: BlockT, Client>(
client: &Arc<Client>,
base: Block::Hash,
block: Block::Hash,
) -> Result<Vec<Block::Hash>, GrandpaError> where
) -> Result<Vec<Block::Hash>, GrandpaError>
where
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
{
if base == block { return Err(GrandpaError::NotDescendent) }
Expand All @@ -671,15 +672,14 @@ pub(crate) fn ancestry<Block: BlockT, Client>(
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
}

impl<B, Block: BlockT, C, N, SC, VR>
voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, Block, C, N, SC, VR>
impl<B, Block: BlockT, C, N, SC, VR> voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, Block, C, N, SC, VR>
where
Block: 'static,
B: Backend<Block>,
C: crate::ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>,
N: NetworkT<Block> + 'static + Send + Sync,
N: NetworkT<Block> + 'static + Send + Sync,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C>,
NumberFor<Block>: BlockNumberOps,
Expand Down Expand Up @@ -1023,7 +1023,7 @@ where
number,
(round, commit).into(),
false,
&self.justification_sender,
self.justification_sender.as_ref(),
)
}

Expand Down Expand Up @@ -1088,9 +1088,10 @@ pub(crate) fn finalize_block<BE, Block, Client>(
number: NumberFor<Block>,
justification_or_commit: JustificationOrCommit<Block>,
initial_sync: bool,
justification_sender: &Option<GrandpaJustificationSender<Block>>,
) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>> where
Block: BlockT,
justification_sender: Option<&GrandpaJustificationSender<Block>>,
) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>
where
Block: BlockT,
BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>,
{
Expand Down Expand Up @@ -1154,14 +1155,29 @@ pub(crate) fn finalize_block<BE, Block, Client>(
}
}

// send a justification notification if a sender exists and in case of error log it.
fn notify_justification<Block: BlockT>(
justification_sender: Option<&GrandpaJustificationSender<Block>>,
justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
) {
if let Some(sender) = justification_sender {
if let Err(err) = sender.notify(justification) {
warn!(target: "afg", "Error creating justification for subscriber: {:?}", err);
}
}
}

// NOTE: this code assumes that honest voters will never vote past a
// transition block, thus we don't have to worry about the case where
// we have a transition with `effective_block = N`, but we finalize
// `N+1`. this assumption is required to make sure we store
// justifications for transition blocks which will be requested by
// syncing clients.
let justification = match justification_or_commit {
JustificationOrCommit::Justification(justification) => Some(justification),
JustificationOrCommit::Justification(justification) => {
notify_justification(justification_sender, || Ok(justification.clone()));
Some(justification.encode())
},
JustificationOrCommit::Commit((round_number, commit)) => {
let mut justification_required =
// justification is always required when block that enacts new authorities
Expand All @@ -1181,29 +1197,31 @@ pub(crate) fn finalize_block<BE, Block, Client>(
}
}

// NOTE: the code below is a bit more verbose because we
// really want to avoid creating a justification if it isn't
// needed (e.g. if there's no subscribers), and also to avoid
// creating it twice. depending on the vote tree for the round,
// creating a justification might require multiple fetches of
// headers from the database.
let justification = || GrandpaJustification::from_commit(
&client,
round_number,
commit,
);

if justification_required {
let justification = GrandpaJustification::from_commit(
&client,
round_number,
commit,
)?;
let justification = justification()?;
notify_justification(justification_sender, || Ok(justification.clone()));

Some(justification)
Some(justification.encode())
} else {
notify_justification(justification_sender, justification);

None
}
},
};

// Notify any registered listeners in case we have a justification
if let Some(sender) = justification_sender {
if let Some(ref justification) = justification {
let _ = sender.notify(justification.clone());
}
}

let justification = justification.map(|j| j.encode());

debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);

// ideally some handle to a synchronization oracle would be used
Expand Down
3 changes: 1 addition & 2 deletions client/finality-grandpa/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ where
Client: crate::ClientForGrandpa<Block, BE>,
NumberFor<Block>: finality_grandpa::BlockNumberOps,
{

/// Import a block justification and finalize the block.
///
/// If `enacts_change` is set to true, then finalizing this block *must*
Expand Down Expand Up @@ -653,7 +652,7 @@ where
number,
justification.into(),
initial_sync,
&Some(self.justification_sender.clone()),
Some(&self.justification_sender),
);

match result {
Expand Down
23 changes: 18 additions & 5 deletions client/finality-grandpa/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::sync::Arc;
use parking_lot::Mutex;

use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};

use crate::justification::GrandpaJustification;
use crate::Error;

// Stream of justifications returned when subscribing.
type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>;
Expand Down Expand Up @@ -54,10 +55,22 @@ impl<Block: BlockT> GrandpaJustificationSender<Block> {

/// Send out a notification to all subscribers that a new justification
/// is available for a block.
pub fn notify(&self, notification: GrandpaJustification<Block>) -> Result<(), ()> {
self.subscribers.lock().retain(|n| {
!n.is_closed() && n.unbounded_send(notification.clone()).is_ok()
});
pub fn notify(
&self,
justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
) -> Result<(), Error> {
let mut subscribers = self.subscribers.lock();

// do an initial prune on closed subscriptions
subscribers.retain(|n| !n.is_closed());

// if there's no subscribers we avoid creating
// the justification which is a costly operation
if !subscribers.is_empty() {
let justification = justification()?;
subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok());
}

Ok(())
}
}
Expand Down
9 changes: 4 additions & 5 deletions client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
last_finalized_number: NumberFor<Block>,
commits: S,
note_round: F,
) -> impl Future<Output=Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> where
) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
where
NumberFor<Block>: BlockNumberOps,
S: Stream<
Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>,
>,
S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
F: Fn(u64),
BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>,
Expand Down Expand Up @@ -130,7 +129,7 @@ fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
finalized_number,
(round, commit).into(),
false,
&justification_sender,
justification_sender.as_ref(),
) {
Ok(_) => {},
Err(e) => return future::err(e),
Expand Down