From afdc9646a6c314f99a9d19242f1878f85980e70d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 8 Oct 2019 17:55:52 +0200 Subject: [PATCH 01/24] core/finality-grandpa: Pass Grandpa msg sender up to UntilImported --- .../finality-grandpa/src/communication/mod.rs | 23 +++++++++++-------- core/finality-grandpa/src/lib.rs | 2 ++ core/finality-grandpa/src/until_imported.rs | 9 ++++++-- core/network/src/protocol/consensus_gossip.rs | 2 +- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 652c33c0262a5..fdea7ccc91be9 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -350,7 +350,7 @@ impl> NetworkBridge { local_key: Option, has_voted: HasVoted, ) -> ( - impl Stream,Error=Error>, + impl Stream, SignedMessage),Error=Error>, impl Sink,SinkError=Error>, ) { self.note_round( @@ -375,9 +375,9 @@ impl> NetworkBridge { if let Err(ref e) = decoded { debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); } - decoded.ok() + decoded.ok().map(|msg| (notification.sender, msg)) }) - .and_then(move |msg| { + .and_then(move |(sender, msg)| { match msg { GossipMessage::VoteOrPrecommit(msg) => { // check signature. @@ -410,7 +410,7 @@ impl> NetworkBridge { }, }; - Ok(Some(msg.message)) + Ok(Some((sender, msg.message))) } _ => { debug!(target: "afg", "Skipping unknown message type"); @@ -436,7 +436,9 @@ impl> NetworkBridge { format!("Failed to receive on unbounded receiver for round {}", round.0) )); - let incoming = incoming.select(out_rx); + // TODO: Do I understand correctly that we combine the incoming Grandpa messages with the messages we are + // sending out to other nodes and in addition thereby also send to ourself? + let incoming = incoming.select(out_rx.map(|msg| (None, msg))); (incoming, outgoing) } @@ -448,7 +450,7 @@ impl> NetworkBridge { voters: Arc>, is_voter: bool, ) -> ( - impl Stream, Error = Error>, + impl Stream, CommunicationIn), Error = Error>, impl Sink, SinkError = Error>, ) { self.validator.note_set( @@ -490,7 +492,7 @@ fn incoming_global>( voters: Arc>, gossip_validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, -) -> impl Stream, Error = Error> { +) -> impl Stream, CommunicationIn), Error = Error> { let process_commit = move | msg: FullCommitMessage, mut notification: network_gossip::TopicNotification, @@ -603,9 +605,11 @@ fn incoming_global>( .filter_map(move |(notification, msg)| { match msg { GossipMessage::Commit(msg) => - process_commit(msg, notification, &mut service, &gossip_validator, &*voters), + // TODO @mxinden: Might be able to be a bit smarter than just clone here. + process_commit(msg, notification.clone(), &mut service, &gossip_validator, &*voters).map(|c| (notification.sender, c)), GossipMessage::CatchUp(msg) => - process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters), + // TODO @mxinden: Might be able to be a bit smarter than just clone here. + process_catch_up(msg, notification.clone(), &mut service, &gossip_validator, &*voters).map(|cu| (notification.sender, cu)), _ => { debug!(target: "afg", "Skipping unknown message type"); return None; @@ -667,6 +671,7 @@ struct OutgoingMessages> { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, + // TODO: This sender is going back to ourself, right? That should be documented. sender: mpsc::UnboundedSender>, announce_sender: periodic::BlockAnnounceSender, network: N, diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index e4b71acbec463..fdc518add71e3 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -423,6 +423,8 @@ fn global_communication, B, E, N, RA>( ); // block commit and catch up messages until relevant blocks are imported. + // TODO: How about passing a BlockSyncRequestHandler in here to have the UntilImported component request blocks it + // is waiting on from the network? let global_in = UntilGlobalMessageBlocksImported::new( client.import_notification_stream(), client.clone(), diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index b8568c1f8510b..f6385b9685703 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -67,6 +67,7 @@ pub(crate) trait BlockUntilImported: Sized { pub(crate) struct UntilImported> { import_notifications: Fuse, Error = ()> + Send>>, status_check: Status, + // TODO: Why is this called inner? Why not being more descriptive and say finality_msg_stream? inner: Fuse, ready: VecDeque, check_pending: Interval, @@ -110,7 +111,7 @@ impl UntilImported impl Stream for UntilImported where Status: BlockStatus, - I: Stream, + I: Stream, M::Blocked),Error=Error>, M: BlockUntilImported, { type Item = M::Blocked; @@ -120,7 +121,9 @@ impl Stream for UntilImported loop { match self.inner.poll()? { Async::Ready(None) => return Ok(Async::Ready(None)), - Async::Ready(Some(input)) => { + Async::Ready(Some((sender, input))) => { + // TODO @mxinden: Do something with sender. + // new input: schedule wait of any parts which require // blocks to be known. let ready = &mut self.ready; @@ -180,6 +183,8 @@ impl Stream for UntilImported v.len(), ); + // TODO: This seems like THE place to be! + *last_log = next_log; } } diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index e23df7e1a59e7..fcfae688bb088 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -77,7 +77,7 @@ struct PeerConsensus { } /// Topic stream message with sender. -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct TopicNotification { /// Message data. pub message: Vec, From 61ac9dd715612d5fdbf7b8f00b84e450f282ade0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 8 Oct 2019 21:36:23 +0200 Subject: [PATCH 02/24] core/finality-grandpa: Track senders to maybe later request blocks --- core/finality-grandpa/src/until_imported.rs | 32 ++++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index f6385b9685703..c504c15cd4edd 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -71,7 +71,10 @@ pub(crate) struct UntilImported, ready: VecDeque, check_pending: Interval, - pending: HashMap)>, + /// Mapping block hashes to the point in time it was first encountered (Instant), nodes that have the corresponding + /// block (inferred by the fact that they send a message referencing it) and a list of Grandpa messages referencing + /// the block hash. + pending: HashMap, Vec<(M)>)>, identifier: &'static str, } @@ -122,7 +125,6 @@ impl Stream for UntilImported match self.inner.poll()? { Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(Some((sender, input))) => { - // TODO @mxinden: Do something with sender. // new input: schedule wait of any parts which require // blocks to be known. @@ -131,11 +133,19 @@ impl Stream for UntilImported M::schedule_wait( input, &self.status_check, - |target_hash, wait| pending - .entry(target_hash) - .or_insert_with(|| (Instant::now(), Vec::new())) - .1 - .push(wait), + |target_hash, wait| { + let entry = pending + .entry(target_hash) + .or_insert_with(|| (Instant::now(), Vec::new(), Vec::new())); + // Given that we received the message from the sender, we expect to be able to download the + // referenced block from them later in case we don't already download it automatically from + // elsewhere. + // TODO: Can we get around the clone? + if let Some(sender) = sender.clone() { + entry.1.push(sender); + } + entry.2.push(wait); + } , |ready_item| ready.push_back(ready_item), )?; } @@ -149,7 +159,7 @@ impl Stream for UntilImported Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(notification))) => { // new block imported. queue up all messages tied to that hash. - if let Some((_, messages)) = self.pending.remove(¬ification.hash) { + if let Some((_, _, messages)) = self.pending.remove(¬ification.hash) { let canon_number = notification.header.number().clone(); let ready_messages = messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); @@ -168,7 +178,7 @@ impl Stream for UntilImported if update_interval { let mut known_keys = Vec::new(); - for (&block_hash, &mut (ref mut last_log, ref v)) in &mut self.pending { + for (&block_hash, &mut (ref mut last_log, ref _senders, ref v)) in &mut self.pending { if let Some(number) = self.status_check.block_number(block_hash)? { known_keys.push((block_hash, number)); } else { @@ -183,7 +193,7 @@ impl Stream for UntilImported v.len(), ); - // TODO: This seems like THE place to be! + // TODO: This seems like THE place to be! Pass the senders down to the network sync service. *last_log = next_log; } @@ -191,7 +201,7 @@ impl Stream for UntilImported } for (known_hash, canon_number) in known_keys { - if let Some((_, pending_messages)) = self.pending.remove(&known_hash) { + if let Some((_, _, pending_messages)) = self.pending.remove(&known_hash) { let ready_messages = pending_messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); From ce9db3f13872344a329d16546e4f4a124b135207 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 10 Oct 2019 10:53:03 +0200 Subject: [PATCH 03/24] core/finality-grandpa: Make BlockStatus pub only within crate --- core/finality-grandpa/src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index fdc518add71e3..c8de8e5a39a98 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -22,11 +22,11 @@ //! //! # Usage //! -//! First, create a block-import wrapper with the `block_import` function. -//! The GRANDPA worker needs to be linked together with this block import object, -//! so a `LinkHalf` is returned as well. All blocks imported (from network or consensus or otherwise) -//! must pass through this wrapper, otherwise consensus is likely to break in -//! unexpected ways. +//! First, create a block-import wrapper with the `block_import` function. The +//! GRANDPA worker needs to be linked together with this block import object, so +//! a `LinkHalf` is returned as well. All blocks imported (from network or +//! consensus or otherwise) must pass through this wrapper, otherwise consensus +//! is likely to break in unexpected ways. //! //! Next, use the `LinkHalf` and a local configuration to `run_grandpa_voter`. //! This requires a `Network` implementation. The returned future should be @@ -238,7 +238,7 @@ impl From for Error { } /// Something which can determine if a block is known. -pub trait BlockStatus { +pub(crate) trait BlockStatus { /// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block /// is definitely known and has been imported. /// If an unexpected error occurs, return that. From 3cd6de040b73d381e79e0c23559b4f8e44ec3292 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 10 Oct 2019 11:32:49 +0200 Subject: [PATCH 04/24] core/finality-grandpa: Abstract NetworkBridge with BlockSyncRequester --- core/finality-grandpa/src/communication/mod.rs | 13 ++++++++++++- core/finality-grandpa/src/lib.rs | 14 ++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index fdea7ccc91be9..97cad8642c550 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -38,7 +38,7 @@ use network::{consensus_gossip as network_gossip, NetworkService}; use network_gossip::ConsensusMessage; use codec::{Encode, Decode}; use primitives::Pair; -use sr_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; +use sr_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; use tokio_executor::Executor; @@ -128,6 +128,9 @@ pub trait Network: Clone + Send + 'static { /// Inform peers that a block with given hash should be downloaded. fn announce(&self, block: Block::Hash, associated_data: Vec); + + /// Configure an explicit fork sync request. + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } /// Create a unique topic for a round and set-id combo. @@ -200,6 +203,10 @@ impl Network for Arc> where fn announce(&self, block: B::Hash, associated_data: Vec) { self.announce_block(block, associated_data) } + + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + NetworkService::set_sync_fork_request(self, peers, hash, number) + } } /// A stream used by NetworkBridge in its implementation of Network. @@ -484,6 +491,10 @@ impl> NetworkBridge { (incoming, outgoing) } + + pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor){ + self.service.set_sync_fork_request(peers, hash, number) + } } fn incoming_global>( diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index c8de8e5a39a98..22cdfa941172d 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -257,6 +257,20 @@ impl, RA> BlockStatus for Arc { + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); +} + +impl BlockSyncRequester for NetworkBridge where + Block: BlockT, + N: communication::Network, +{ + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor){ + NetworkBridge::set_sync_fork_request(self, peers, hash, number) + } +} + /// A new authority set along with the canonical block it changed at. #[derive(Debug)] pub(crate) struct NewAuthoritySet { From fd4800d6b3f68a10462019a345cf0e08bd380d44 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 10 Oct 2019 12:07:10 +0200 Subject: [PATCH 05/24] core/finality-grandpa: Pass BlockSyncRequester to UntilImported --- core/finality-grandpa/src/environment.rs | 3 ++ core/finality-grandpa/src/lib.rs | 3 +- core/finality-grandpa/src/until_imported.rs | 46 +++++++++++++-------- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index 70e45848be1dd..9d6e34e32b4d9 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -369,6 +369,8 @@ impl SharedVoterSetState { /// The environment we run GRANDPA in. pub(crate) struct Environment, RA, SC> { + // TODO: As far as I can tell this does not follow the pattern of having an outer struct protecting an inner struct + // via a lock in order to be Sync. How about renaming this to `client` like we do in many other places? pub(crate) inner: Arc>, pub(crate) select_chain: SC, pub(crate) voters: Arc>, @@ -581,6 +583,7 @@ where // corresponding blocks are imported. let incoming = Box::new(UntilVoteTargetImported::new( self.inner.import_notification_stream(), + self.network.clone(), self.inner.clone(), incoming, "round", diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 22cdfa941172d..6903c151ae6fb 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -437,10 +437,9 @@ fn global_communication, B, E, N, RA>( ); // block commit and catch up messages until relevant blocks are imported. - // TODO: How about passing a BlockSyncRequestHandler in here to have the UntilImported component request blocks it - // is waiting on from the network? let global_in = UntilGlobalMessageBlocksImported::new( client.import_notification_stream(), + network.clone(), client.clone(), global_in, "global", diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index c504c15cd4edd..0e3daab27aaf3 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -20,7 +20,7 @@ //! //! This is used for votes and commit messages currently. -use super::{BlockStatus, CommunicationIn, Error, SignedMessage}; +use super::{BlockStatus as BlockStatusT, BlockSyncRequester as BlockSyncRequesterT, CommunicationIn, Error, SignedMessage}; use log::{debug, warn}; use client::{BlockImportNotification, ImportNotifications}; @@ -54,7 +54,7 @@ pub(crate) trait BlockUntilImported: Sized { wait: Wait, ready: Ready, ) -> Result<(), Error> where - S: BlockStatus, + S: BlockStatusT, Wait: FnMut(Block::Hash, Self), Ready: FnMut(Self::Blocked); @@ -64,9 +64,10 @@ pub(crate) trait BlockUntilImported: Sized { } /// Buffering imported messages until blocks with given hashes are imported. -pub(crate) struct UntilImported> { +pub(crate) struct UntilImported> { import_notifications: Fuse, Error = ()> + Send>>, - status_check: Status, + block_sync_requester: BlockSyncRequester, + status_check: BlockStatus, // TODO: Why is this called inner? Why not being more descriptive and say finality_msg_stream? inner: Fuse, ready: VecDeque, @@ -78,13 +79,14 @@ pub(crate) struct UntilImported UntilImported - where Status: BlockStatus, M: BlockUntilImported +impl UntilImported + where BlockStatus: BlockStatusT, M: BlockUntilImported { /// Create a new `UntilImported` wrapper. pub(crate) fn new( import_notifications: ImportNotifications, - status_check: Status, + block_sync_requester: BlockSyncRequester, + status_check: BlockStatus, stream: I, identifier: &'static str, ) -> Self { @@ -102,6 +104,7 @@ impl UntilImported let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat(); Box::new(stream) as Box + Send> }.fuse(), + block_sync_requester, status_check, inner: stream.fuse(), ready: VecDeque::new(), @@ -112,8 +115,8 @@ impl UntilImported } } -impl Stream for UntilImported where - Status: BlockStatus, +impl Stream for UntilImported where + BlockStatus: BlockStatusT, I: Stream, M::Blocked),Error=Error>, M: BlockUntilImported, { @@ -235,13 +238,13 @@ fn warn_authority_wrong_target(hash: H, id: AuthorityId) impl BlockUntilImported for SignedMessage { type Blocked = Self; - fn schedule_wait( + fn schedule_wait( msg: Self::Blocked, - status_check: &S, + status_check: &BlockStatus, mut wait: Wait, mut ready: Ready, ) -> Result<(), Error> where - S: BlockStatus, + BlockStatus: BlockStatusT, Wait: FnMut(Block::Hash, Self), Ready: FnMut(Self::Blocked), { @@ -274,7 +277,13 @@ impl BlockUntilImported for SignedMessage { /// Helper type definition for the stream which waits until vote targets for /// signed messages are imported. -pub(crate) type UntilVoteTargetImported = UntilImported>; +pub(crate) type UntilVoteTargetImported = UntilImported< + Block, + BlockStatus, + BlockSyncRequester, + I, + SignedMessage, +>; /// This blocks a global message import, i.e. a commit or catch up messages, /// until all blocks referenced in its votes are known. @@ -289,13 +298,13 @@ pub(crate) struct BlockGlobalMessage { impl BlockUntilImported for BlockGlobalMessage { type Blocked = CommunicationIn; - fn schedule_wait( + fn schedule_wait( input: Self::Blocked, - status_check: &S, + status_check: &BlockStatus, mut wait: Wait, mut ready: Ready, ) -> Result<(), Error> where - S: BlockStatus, + BlockStatus: BlockStatusT, Wait: FnMut(Block::Hash, Self), Ready: FnMut(Self::Blocked), { @@ -440,9 +449,10 @@ impl BlockUntilImported for BlockGlobalMessage { /// A stream which gates off incoming global messages, i.e. commit and catch up /// messages, until all referenced block hashes have been imported. -pub(crate) type UntilGlobalMessageBlocksImported = UntilImported< +pub(crate) type UntilGlobalMessageBlocksImported = UntilImported< Block, - Status, + BlockStatus, + BlockSyncRequester, I, BlockGlobalMessage, >; From 0f8279581bd70eab1999341daf1dc3cbfa85c06e Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 10 Oct 2019 12:52:21 +0200 Subject: [PATCH 06/24] core/finality-grandpa: Track block number of pending within UntilImported --- core/finality-grandpa/src/until_imported.rs | 36 ++++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 0e3daab27aaf3..880f0a21dc44d 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -55,7 +55,7 @@ pub(crate) trait BlockUntilImported: Sized { ready: Ready, ) -> Result<(), Error> where S: BlockStatusT, - Wait: FnMut(Block::Hash, Self), + Wait: FnMut(Block::Hash, NumberFor, Self), Ready: FnMut(Self::Blocked); /// called when the wait has completed. The canonical number is passed through @@ -72,10 +72,10 @@ pub(crate) struct UntilImported, ready: VecDeque, check_pending: Interval, - /// Mapping block hashes to the point in time it was first encountered (Instant), nodes that have the corresponding - /// block (inferred by the fact that they send a message referencing it) and a list of Grandpa messages referencing - /// the block hash. - pending: HashMap, Vec<(M)>)>, + /// Mapping block hashes to their block number, the point in time it was first encountered (Instant), nodes that + /// have the corresponding block (inferred by the fact that they send a message referencing it) and a list of + /// Grandpa messages referencing the block hash. + pending: HashMap, Instant, Vec, Vec<(M)>)>, identifier: &'static str, } @@ -136,18 +136,18 @@ impl Stream for UntilImpor M::schedule_wait( input, &self.status_check, - |target_hash, wait| { + |target_hash, target_number, wait| { let entry = pending .entry(target_hash) - .or_insert_with(|| (Instant::now(), Vec::new(), Vec::new())); + .or_insert_with(|| (target_number, Instant::now(), Vec::new(), Vec::new())); // Given that we received the message from the sender, we expect to be able to download the // referenced block from them later in case we don't already download it automatically from // elsewhere. // TODO: Can we get around the clone? if let Some(sender) = sender.clone() { - entry.1.push(sender); + entry.2.push(sender); } - entry.2.push(wait); + entry.3.push(wait); } , |ready_item| ready.push_back(ready_item), )?; @@ -162,7 +162,7 @@ impl Stream for UntilImpor Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(notification))) => { // new block imported. queue up all messages tied to that hash. - if let Some((_, _, messages)) = self.pending.remove(¬ification.hash) { + if let Some((_, _, _, messages)) = self.pending.remove(¬ification.hash) { let canon_number = notification.header.number().clone(); let ready_messages = messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); @@ -181,11 +181,15 @@ impl Stream for UntilImpor if update_interval { let mut known_keys = Vec::new(); - for (&block_hash, &mut (ref mut last_log, ref _senders, ref v)) in &mut self.pending { + for (&block_hash, &mut (block_number, ref mut last_log, ref senders, ref v)) in &mut self.pending { if let Some(number) = self.status_check.block_number(block_hash)? { + // TODO: Now that we pass the number all the way down here, should we check if it is the same as we + // are expecting it to be? known_keys.push((block_hash, number)); } else { let next_log = *last_log + LOG_PENDING_INTERVAL; + // TODO: Should this not be >=? We want to log whenever we want to log every LOG_PENDING_INTERVAL, + // right? if Instant::now() <= next_log { debug!( target: "afg", @@ -204,7 +208,7 @@ impl Stream for UntilImpor } for (known_hash, canon_number) in known_keys { - if let Some((_, _, pending_messages)) = self.pending.remove(&known_hash) { + if let Some((_, _, _, pending_messages)) = self.pending.remove(&known_hash) { let ready_messages = pending_messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); @@ -245,7 +249,7 @@ impl BlockUntilImported for SignedMessage { mut ready: Ready, ) -> Result<(), Error> where BlockStatus: BlockStatusT, - Wait: FnMut(Block::Hash, Self), + Wait: FnMut(Block::Hash, NumberFor, Self), Ready: FnMut(Self::Blocked), { let (&target_hash, target_number) = msg.target(); @@ -257,7 +261,7 @@ impl BlockUntilImported for SignedMessage { ready(msg); } } else { - wait(target_hash, msg) + wait(target_hash, target_number, msg) } Ok(()) @@ -305,7 +309,7 @@ impl BlockUntilImported for BlockGlobalMessage { mut ready: Ready, ) -> Result<(), Error> where BlockStatus: BlockStatusT, - Wait: FnMut(Block::Hash, Self), + Wait: FnMut(Block::Hash, NumberFor, Self), Ready: FnMut(Self::Blocked), { use std::collections::hash_map::Entry; @@ -407,7 +411,7 @@ impl BlockUntilImported for BlockGlobalMessage { // if this is taking a long time. for (hash, is_known) in checked_hashes { if let KnownOrUnknown::Unknown(target_number) = is_known { - wait(hash, BlockGlobalMessage { + wait(hash, target_number, BlockGlobalMessage { inner: locked_global.clone(), target_number, }) From a2f66840b205cb42df0a40e095f4b851db5d47a1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 10 Oct 2019 12:53:00 +0200 Subject: [PATCH 07/24] core/finality-grandpa: Request block sync on long wait --- core/finality-grandpa/src/communication/mod.rs | 2 -- core/finality-grandpa/src/until_imported.rs | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 97cad8642c550..01d21f41e7979 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -616,10 +616,8 @@ fn incoming_global>( .filter_map(move |(notification, msg)| { match msg { GossipMessage::Commit(msg) => - // TODO @mxinden: Might be able to be a bit smarter than just clone here. process_commit(msg, notification.clone(), &mut service, &gossip_validator, &*voters).map(|c| (notification.sender, c)), GossipMessage::CatchUp(msg) => - // TODO @mxinden: Might be able to be a bit smarter than just clone here. process_catch_up(msg, notification.clone(), &mut service, &gossip_validator, &*voters).map(|cu| (notification.sender, cu)), _ => { debug!(target: "afg", "Skipping unknown message type"); diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 880f0a21dc44d..27d8461464c1f 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -117,6 +117,7 @@ impl UntilImported impl Stream for UntilImported where BlockStatus: BlockStatusT, + BlockSyncRequester: BlockSyncRequesterT, I: Stream, M::Blocked),Error=Error>, M: BlockUntilImported, { @@ -128,7 +129,6 @@ impl Stream for UntilImpor match self.inner.poll()? { Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(Some((sender, input))) => { - // new input: schedule wait of any parts which require // blocks to be known. let ready = &mut self.ready; @@ -143,7 +143,6 @@ impl Stream for UntilImpor // Given that we received the message from the sender, we expect to be able to download the // referenced block from them later in case we don't already download it automatically from // elsewhere. - // TODO: Can we get around the clone? if let Some(sender) = sender.clone() { entry.2.push(sender); } @@ -194,13 +193,14 @@ impl Stream for UntilImpor debug!( target: "afg", "Waiting to import block {} before {} {} messages can be imported. \ + Requesting network sync service to retrieve block from. \ Possible fork?", self.identifier, block_hash, v.len(), ); - // TODO: This seems like THE place to be! Pass the senders down to the network sync service. + self.block_sync_requester.set_sync_fork_request(senders.clone(), block_hash, block_number); *last_log = next_log; } From 9f9e15f6c0f849eae7c178e1d37470764afebd29 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 11 Oct 2019 11:59:27 +0200 Subject: [PATCH 08/24] core/finality-grandpa: Adjust unit tests to previous changes --- core/finality-grandpa/src/communication/tests.rs | 7 +++++-- core/finality-grandpa/src/tests.rs | 6 +++--- core/finality-grandpa/src/until_imported.rs | 14 +++++++++++--- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index 6215e30b80966..e8dd642b111d0 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -25,6 +25,7 @@ use tokio::runtime::current_thread; use std::sync::Arc; use keyring::Ed25519Keyring; use codec::Encode; +use sr_primitives::traits::NumberFor; use crate::environment::SharedVoterSetState; use super::gossip::{self, GossipValidator}; @@ -91,6 +92,8 @@ impl super::Network for TestNetwork { fn announce(&self, block: Hash, _associated_data: Vec) { let _ = self.sender.unbounded_send(Event::Announce(block)); } + + fn set_sync_fork_request(&self, peers: Vec, hash: Hash, number: NumberFor) {} } impl network_gossip::ValidatorContext for TestNetwork { @@ -296,7 +299,7 @@ fn good_commit_leads_to_relay() { // when the commit comes in, we'll tell the callback it was good. let handle_commit = commits_in.into_future() .map(|(item, _)| { - match item.unwrap() { + match item.unwrap().1 { grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => { callback.run(grandpa::voter::CommitProcessingOutcome::good()); }, @@ -411,7 +414,7 @@ fn bad_commit_leads_to_report() { // when the commit comes in, we'll tell the callback it was good. let handle_commit = commits_in.into_future() .map(|(item, _)| { - match item.unwrap() { + match item.unwrap().1 { grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => { callback.run(grandpa::voter::CommitProcessingOutcome::bad()); }, diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 1957ab5c4fbb3..5e89e0bd7be4b 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -1205,7 +1205,7 @@ fn voter_persists_its_votes() { runtime.spawn(round_rx.for_each(move |signed| { if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { // the first message we receive should be a prevote from alice. - let prevote = match signed.message { + let prevote = match signed.1.message { grandpa::Message::Prevote(prevote) => prevote, _ => panic!("voter should prevote."), }; @@ -1245,7 +1245,7 @@ fn voter_persists_its_votes() { } else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 { // the next message we receive should be our own prevote - let prevote = match signed.message { + let prevote = match signed.1.message { grandpa::Message::Prevote(prevote) => prevote, _ => panic!("We should receive our own prevote."), }; @@ -1262,7 +1262,7 @@ fn voter_persists_its_votes() { } else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 { // we then receive a precommit from alice for block 15 // even though we casted a prevote for block 30 - let precommit = match signed.message { + let precommit = match signed.1.message { grandpa::Message::Precommit(precommit) => precommit, _ => panic!("voter should precommit."), }; diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 27d8461464c1f..c484e23bbef6d 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -514,12 +514,18 @@ mod tests { inner: Arc>>, } - impl BlockStatus for TestBlockStatus { + impl BlockStatusT for TestBlockStatus { fn block_number(&self, hash: Hash) -> Result, Error> { Ok(self.inner.lock().get(&hash).map(|x| x.clone())) } } + struct TestBlockSyncRequester {} + + impl BlockSyncRequesterT for TestBlockSyncRequester { + fn set_sync_fork_request(&self, peers: Vec, hash: Hash, number: NumberFor){} + } + fn make_header(number: u64) -> Header { Header::new( number, @@ -564,12 +570,13 @@ mod tests { let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, + TestBlockSyncRequester{}, block_status, global_rx.map_err(|_| panic!("should never error")), "global", ); - global_tx.unbounded_send(msg).unwrap(); + global_tx.unbounded_send((None, msg)).unwrap(); let work = until_imported.into_future(); @@ -590,12 +597,13 @@ mod tests { let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, + TestBlockSyncRequester{}, block_status, global_rx.map_err(|_| panic!("should never error")), "global", ); - global_tx.unbounded_send(msg).unwrap(); + global_tx.unbounded_send((None, msg)).unwrap(); // NOTE: needs to be cloned otherwise it is moved to the stream and // dropped too early. From 126c35e77f71380cae0b5e5f8ab06dcc1304e1a6 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 11 Oct 2019 12:37:37 +0200 Subject: [PATCH 09/24] core/finality-grandpa: Fix line length --- .../finality-grandpa/src/communication/mod.rs | 16 ++++++++++++-- core/finality-grandpa/src/until_imported.rs | 22 ++++++++++++++----- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 01d21f41e7979..c0ef729793660 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -616,9 +616,21 @@ fn incoming_global>( .filter_map(move |(notification, msg)| { match msg { GossipMessage::Commit(msg) => - process_commit(msg, notification.clone(), &mut service, &gossip_validator, &*voters).map(|c| (notification.sender, c)), + process_commit( + msg, + notification.clone(), + &mut service, + &gossip_validator, + &*voters + ).map(|c| (notification.sender, c)), GossipMessage::CatchUp(msg) => - process_catch_up(msg, notification.clone(), &mut service, &gossip_validator, &*voters).map(|cu| (notification.sender, cu)), + process_catch_up( + msg, + notification.clone(), + &mut service, + &gossip_validator, + &*voters + ).map(|cu| (notification.sender, cu)), _ => { debug!(target: "afg", "Skipping unknown message type"); return None; diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index c484e23bbef6d..c38c262c11c12 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -20,7 +20,13 @@ //! //! This is used for votes and commit messages currently. -use super::{BlockStatus as BlockStatusT, BlockSyncRequester as BlockSyncRequesterT, CommunicationIn, Error, SignedMessage}; +use super::{ + BlockStatus as BlockStatusT, + BlockSyncRequester as BlockSyncRequesterT, + CommunicationIn, + Error, + SignedMessage, +}; use log::{debug, warn}; use client::{BlockImportNotification, ImportNotifications}; @@ -79,8 +85,11 @@ pub(crate) struct UntilImported UntilImported - where BlockStatus: BlockStatusT, M: BlockUntilImported +impl UntilImported where + Block: BlockT, + BlockStatus: BlockStatusT, + M: BlockUntilImported, + I: Stream, { /// Create a new `UntilImported` wrapper. pub(crate) fn new( @@ -115,9 +124,10 @@ impl UntilImported } } -impl Stream for UntilImported where - BlockStatus: BlockStatusT, - BlockSyncRequester: BlockSyncRequesterT, +impl Stream for UntilImported where + Block: BlockT, + BStatus: BlockStatusT, + BSyncRequester: BlockSyncRequesterT, I: Stream, M::Blocked),Error=Error>, M: BlockUntilImported, { From 72516c08b8e7917f0fd8381cff6da7a777e70156 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 15 Oct 2019 10:42:45 +0200 Subject: [PATCH 10/24] core/finality-grandpa: Add comment explaining in & out vote combination --- core/finality-grandpa/src/communication/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index c0ef729793660..0b88ec203001e 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -443,8 +443,8 @@ impl> NetworkBridge { format!("Failed to receive on unbounded receiver for round {}", round.0) )); - // TODO: Do I understand correctly that we combine the incoming Grandpa messages with the messages we are - // sending out to other nodes and in addition thereby also send to ourself? + // Combine incoming votes from external Grandpa nodes with outgoing votes from our own Grandpa voter to have a + // single vote-import-pipeline. let incoming = incoming.select(out_rx.map(|msg| (None, msg))); (incoming, outgoing) From 3b4adcad189a1680e9d4d04ed7508e432750e792 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 17 Oct 2019 11:30:42 +0200 Subject: [PATCH 11/24] core/finality-grandpa: Log after, not before, timeout expired The UntilImported component should log whenever waiting for a specific block to be imported surpassed a defined constant timeout. Without this patch the code would log whenever the current time was below the timeout. --- core/finality-grandpa/src/until_imported.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index c38c262c11c12..b5d79dc2a3361 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -197,9 +197,7 @@ impl Stream for UntilImported=? We want to log whenever we want to log every LOG_PENDING_INTERVAL, - // right? - if Instant::now() <= next_log { + if Instant::now() >= next_log { debug!( target: "afg", "Waiting to import block {} before {} {} messages can be imported. \ From 74d6637daf6625ca2554754ddee32383680609d2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 18 Oct 2019 10:12:02 +0200 Subject: [PATCH 12/24] core/finality-grandpa: Collect senders as HashSet for deduplication --- core/finality-grandpa/src/until_imported.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index b5d79dc2a3361..9fd6767a34654 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -38,7 +38,7 @@ use parking_lot::Mutex; use sr_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use tokio_timer::Interval; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; use std::time::{Duration, Instant}; use fg_primitives::AuthorityId; @@ -81,7 +81,7 @@ pub(crate) struct UntilImported, Instant, Vec, Vec<(M)>)>, + pending: HashMap, Instant, HashSet, Vec<(M)>)>, identifier: &'static str, } @@ -149,12 +149,12 @@ impl Stream for UntilImported Stream for UntilImported Date: Tue, 22 Oct 2019 21:01:50 +0200 Subject: [PATCH 13/24] Revert "core/finality-grandpa: Track senders to maybe later request blocks" This reverts commit 61ac9dd715612d5fdbf7b8f00b84e450f282ade0. --- core/finality-grandpa/src/until_imported.rs | 31 ++++++++------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index f93953ea092aa..67104cb8509cc 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -78,10 +78,9 @@ pub(crate) struct UntilImported, ready: VecDeque, check_pending: Interval, - /// Mapping block hashes to their block number, the point in time it was first encountered (Instant), nodes that - /// have the corresponding block (inferred by the fact that they send a message referencing it) and a list of + /// Mapping block hashes to their block number, the point in time it was first encountered (Instant) and a list of /// Grandpa messages referencing the block hash. - pending: HashMap, Instant, HashSet, Vec<(M)>)>, + pending: HashMap, Instant, Vec)>, identifier: &'static str, } @@ -146,18 +145,11 @@ impl Stream for UntilImported Stream for UntilImported return Ok(Async::Ready(None)), Ok(Async::Ready(Some(notification))) => { // new block imported. queue up all messages tied to that hash. - if let Some((_, _, _, messages)) = self.pending.remove(¬ification.hash) { + if let Some((_, _, messages)) = self.pending.remove(¬ification.hash) { let canon_number = notification.header.number().clone(); let ready_messages = messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); @@ -190,7 +182,7 @@ impl Stream for UntilImported Stream for UntilImported Stream for UntilImported Date: Tue, 22 Oct 2019 21:07:10 +0200 Subject: [PATCH 14/24] Revert "core/finality-grandpa: Pass Grandpa msg sender up to UntilImported" This reverts commit afdc9646a6c314f99a9d19242f1878f85980e70d. --- .../finality-grandpa/src/communication/mod.rs | 31 ++++++------------- core/finality-grandpa/src/until_imported.rs | 4 +-- core/network/src/protocol/consensus_gossip.rs | 2 +- 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index e4c6cd0eeb485..25e8137f6ebfe 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -379,7 +379,7 @@ impl> NetworkBridge { local_key: Option, has_voted: HasVoted, ) -> ( - impl Stream, SignedMessage),Error=Error>, + impl Stream,Error=Error>, impl Sink,SinkError=Error>, ) { self.note_round( @@ -404,9 +404,9 @@ impl> NetworkBridge { if let Err(ref e) = decoded { debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); } - decoded.ok().map(|msg| (notification.sender, msg)) + decoded.ok() }) - .and_then(move |(sender, msg)| { + .and_then(move |msg| { match msg { GossipMessage::Vote(msg) => { // check signature. @@ -439,7 +439,7 @@ impl> NetworkBridge { }, }; - Ok(Some((sender, msg.message))) + Ok(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); @@ -467,7 +467,7 @@ impl> NetworkBridge { // Combine incoming votes from external Grandpa nodes with outgoing votes from our own Grandpa voter to have a // single vote-import-pipeline. - let incoming = incoming.select(out_rx.map(|msg| (None, msg))); + let incoming = incoming.select(out_rx); (incoming, outgoing) } @@ -479,7 +479,7 @@ impl> NetworkBridge { voters: Arc>, is_voter: bool, ) -> ( - impl Stream, CommunicationIn), Error = Error>, + impl Stream, Error = Error>, impl Sink, SinkError = Error>, ) { self.validator.note_set( @@ -525,7 +525,7 @@ fn incoming_global>( voters: Arc>, gossip_validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, -) -> impl Stream, CommunicationIn), Error = Error> { +) -> impl Stream, Error = Error> { let process_commit = move | msg: FullCommitMessage, mut notification: network_gossip::TopicNotification, @@ -638,21 +638,9 @@ fn incoming_global>( .filter_map(move |(notification, msg)| { match msg { GossipMessage::Commit(msg) => - process_commit( - msg, - notification.clone(), - &mut service, - &gossip_validator, - &*voters - ).map(|c| (notification.sender, c)), + process_commit(msg, notification, &mut service, &gossip_validator, &*voters), GossipMessage::CatchUp(msg) => - process_catch_up( - msg, - notification.clone(), - &mut service, - &gossip_validator, - &*voters - ).map(|cu| (notification.sender, cu)), + process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters), _ => { debug!(target: "afg", "Skipping unknown message type"); return None; @@ -714,7 +702,6 @@ struct OutgoingMessages> { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, - // TODO: This sender is going back to ourself, right? That should be documented. sender: mpsc::UnboundedSender>, announce_sender: periodic::BlockAnnounceSender, network: N, diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 67104cb8509cc..74593f77ac670 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -127,7 +127,7 @@ impl Stream for UntilImported, BSyncRequester: BlockSyncRequesterT, - I: Stream, M::Blocked),Error=Error>, + I: Stream, M: BlockUntilImported, { type Item = M::Blocked; @@ -137,7 +137,7 @@ impl Stream for UntilImported return Ok(Async::Ready(None)), - Async::Ready(Some((sender, input))) => { + Async::Ready(Some(input)) => { // new input: schedule wait of any parts which require // blocks to be known. let ready = &mut self.ready; diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index fcfae688bb088..e23df7e1a59e7 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -77,7 +77,7 @@ struct PeerConsensus { } /// Topic stream message with sender. -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq)] pub struct TopicNotification { /// Message data. pub message: Vec, From 0805651d10b647576c933b5bb0340f39978780fe Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 22 Oct 2019 21:25:08 +0200 Subject: [PATCH 15/24] core/network/sync: Ask for block from all peers if none provided When requesting an explicit fork sync, try to sync from all known peers, when no specific peers were provided. --- core/network/src/protocol/sync.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index bd8a9fe27f60a..17ba6ca94d289 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -456,14 +456,20 @@ impl ChainSync { /// Request syncing for the given block from given set of peers. // The implementation is similar to on_block_announce with unknown parent hash. - pub fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor) { + pub fn set_sync_fork_request(&mut self, mut peers: Vec, hash: &B::Hash, number: NumberFor) { if peers.is_empty() { - if let Some(_) = self.fork_targets.remove(hash) { - debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers); - } - return; + debug!( + target: "sync", + "Explicit sync request for block {:?} with no peers specified. \ + Syncing from all connected peers {:?} instead.", + hash, peers, + ); + + peers = self.peers.iter().map(|(id, _)| id.clone()).collect(); + } else { + debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers); } - debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers); + if self.is_known(&hash) { debug!(target: "sync", "Refusing to sync known hash {:?}", hash); return; From 6fda6a679b97bcb4d185839d56a19805e624b1ae Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 23 Oct 2019 16:32:43 +0200 Subject: [PATCH 16/24] core/network/sync: Request specific fork sync from peers ahead or on par When making an explicit fork sync request without specifying any peers, make sure to only request it from the locally known peers that are either ahead or on a par compared to the block number we are looking for. --- core/network/src/protocol/sync.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index 17ba6ca94d289..d1d3b178c51c4 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -465,7 +465,11 @@ impl ChainSync { hash, peers, ); - peers = self.peers.iter().map(|(id, _)| id.clone()).collect(); + peers = self.peers.iter() + // Only request blocks from peers who are ahead or on a par. + .filter(|(id, peer)| peer.best_number >= number) + .map(|(id, _)| id.clone()) + .collect(); } else { debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers); } From 411e1010084a89ceb2f39a42bff9b52d7cdcb445 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:03:45 +0100 Subject: [PATCH 17/24] grandpa: fix tests --- core/finality-grandpa/src/communication/tests.rs | 4 ++-- core/finality-grandpa/src/tests.rs | 6 +++--- core/finality-grandpa/src/until_imported.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index d4adc0f0b29d4..c14377ce17e64 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -299,7 +299,7 @@ fn good_commit_leads_to_relay() { // when the commit comes in, we'll tell the callback it was good. let handle_commit = commits_in.into_future() .map(|(item, _)| { - match item.unwrap().1 { + match item.unwrap() { grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => { callback.run(grandpa::voter::CommitProcessingOutcome::good()); }, @@ -414,7 +414,7 @@ fn bad_commit_leads_to_report() { // when the commit comes in, we'll tell the callback it was good. let handle_commit = commits_in.into_future() .map(|(item, _)| { - match item.unwrap().1 { + match item.unwrap() { grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => { callback.run(grandpa::voter::CommitProcessingOutcome::bad()); }, diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 0fbcfd606d80e..7f4a0d053a4d4 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -1185,7 +1185,7 @@ fn voter_persists_its_votes() { runtime.spawn(round_rx.for_each(move |signed| { if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { // the first message we receive should be a prevote from alice. - let prevote = match signed.1.message { + let prevote = match signed.message { grandpa::Message::Prevote(prevote) => prevote, _ => panic!("voter should prevote."), }; @@ -1225,7 +1225,7 @@ fn voter_persists_its_votes() { } else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 { // the next message we receive should be our own prevote - let prevote = match signed.1.message { + let prevote = match signed.message { grandpa::Message::Prevote(prevote) => prevote, _ => panic!("We should receive our own prevote."), }; @@ -1242,7 +1242,7 @@ fn voter_persists_its_votes() { } else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 { // we then receive a precommit from alice for block 15 // even though we casted a prevote for block 30 - let precommit = match signed.1.message { + let precommit = match signed.message { grandpa::Message::Precommit(precommit) => precommit, _ => panic!("voter should precommit."), }; diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 74593f77ac670..cd142b7ede6c4 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -578,7 +578,7 @@ mod tests { "global", ); - global_tx.unbounded_send((None, msg)).unwrap(); + global_tx.unbounded_send(msg).unwrap(); let work = until_imported.into_future(); @@ -605,7 +605,7 @@ mod tests { "global", ); - global_tx.unbounded_send((None, msg)).unwrap(); + global_tx.unbounded_send(msg).unwrap(); // NOTE: needs to be cloned otherwise it is moved to the stream and // dropped too early. From 6d695d1231237b510e878cdacf1d63503d94421d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:06:07 +0100 Subject: [PATCH 18/24] grandpa: fix warnings --- core/finality-grandpa/src/communication/tests.rs | 3 ++- core/finality-grandpa/src/until_imported.rs | 4 ++-- core/network/src/protocol/sync.rs | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index c14377ce17e64..7b91b2ef0a95e 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -93,7 +93,8 @@ impl super::Network for TestNetwork { let _ = self.sender.unbounded_send(Event::Announce(block)); } - fn set_sync_fork_request(&self, peers: Vec, hash: Hash, number: NumberFor) {} + /// Notify the sync service to try syncing the given chain. + fn set_sync_fork_request(&self, _peers: Vec, _hash: Hash, _number: NumberFor) {} } impl network_gossip::ValidatorContext for TestNetwork { diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index cd142b7ede6c4..330c67a453976 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -38,7 +38,7 @@ use parking_lot::Mutex; use sr_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use tokio_timer::Interval; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; use std::time::{Duration, Instant}; use fg_primitives::AuthorityId; @@ -525,7 +525,7 @@ mod tests { struct TestBlockSyncRequester {} impl BlockSyncRequesterT for TestBlockSyncRequester { - fn set_sync_fork_request(&self, peers: Vec, hash: Hash, number: NumberFor){} + fn set_sync_fork_request(&self, _peers: Vec, hash: Hash, number: NumberFor) { } } fn make_header(number: u64) -> Header { diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index d1d3b178c51c4..4f08c942def1b 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -467,7 +467,7 @@ impl ChainSync { peers = self.peers.iter() // Only request blocks from peers who are ahead or on a par. - .filter(|(id, peer)| peer.best_number >= number) + .filter(|(_, peer)| peer.best_number >= number) .map(|(id, _)| id.clone()) .collect(); } else { @@ -1084,7 +1084,7 @@ impl ChainSync { parent_hash: Some(header.parent_hash().clone()), peers: Default::default(), }) - .peers.insert(who); + .peers.insert(who); } OnBlockAnnounce::Nothing From 202bcede7c58619281a4892008ae3f67513b55f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:06:33 +0100 Subject: [PATCH 19/24] grandpa: add test for block sync request on until_imported --- core/finality-grandpa/src/until_imported.rs | 97 ++++++++++++++++++++- 1 file changed, 93 insertions(+), 4 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 330c67a453976..20e077fe9ad6b 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -522,10 +522,23 @@ mod tests { } } - struct TestBlockSyncRequester {} + #[derive(Clone)] + struct TestBlockSyncRequester { + requests: Arc)>>>, + } + + impl Default for TestBlockSyncRequester { + fn default() -> Self { + TestBlockSyncRequester { + requests: Arc::new(Mutex::new(Vec::new())), + } + } + } impl BlockSyncRequesterT for TestBlockSyncRequester { - fn set_sync_fork_request(&self, _peers: Vec, hash: Hash, number: NumberFor) { } + fn set_sync_fork_request(&self, _peers: Vec, hash: Hash, number: NumberFor) { + self.requests.lock().push((hash, number)); + } } fn make_header(number: u64) -> Header { @@ -572,7 +585,7 @@ mod tests { let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, - TestBlockSyncRequester{}, + TestBlockSyncRequester::default(), block_status, global_rx.map_err(|_| panic!("should never error")), "global", @@ -599,7 +612,7 @@ mod tests { let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, - TestBlockSyncRequester{}, + TestBlockSyncRequester::default(), block_status, global_rx.map_err(|_| panic!("should never error")), "global", @@ -845,4 +858,80 @@ mod tests { unapply_catch_up(unknown_catch_up()), ); } + + #[test] + fn request_block_sync_for_needed_blocks() { + let (chain_state, import_notifications) = TestChainState::new(); + let block_status = chain_state.block_status(); + + let (global_tx, global_rx) = futures::sync::mpsc::unbounded(); + + let block_sync_requester = TestBlockSyncRequester::default(); + + let until_imported = UntilGlobalMessageBlocksImported::new( + import_notifications, + block_sync_requester.clone(), + block_status, + global_rx.map_err(|_| panic!("should never error")), + "global", + ); + + let h1 = make_header(5); + let h2 = make_header(6); + let h3 = make_header(7); + + // we create a commit message, with precommits for blocks 6 and 7 which + // we haven't imported. + let unknown_commit = CompactCommit:: { + target_hash: h1.hash(), + target_number: 5, + precommits: vec![ + Precommit { + target_hash: h2.hash(), + target_number: 6, + }, + Precommit { + target_hash: h3.hash(), + target_number: 7, + }, + ], + auth_data: Vec::new(), // not used + }; + + let unknown_commit = || voter::CommunicationIn::Commit( + 0, + unknown_commit.clone(), + voter::Callback::Blank, + ); + + // we send the commit message and spawn the until_imported stream + global_tx.unbounded_send(unknown_commit()).unwrap(); + + let mut runtime = Runtime::new().unwrap(); + runtime.spawn(until_imported.into_future().map(|_| ()).map_err(|_| ())); + + // assert that we will make sync requests + let assert = futures::future::poll_fn::<(), (), _>(|| { + let block_sync_requests = block_sync_requester.requests.lock(); + + // we request blocks targeted by the precommits that aren't imported + if block_sync_requests.contains(&(h2.hash(), *h2.number())) && + block_sync_requests.contains(&(h3.hash(), *h3.number())) + { + return Ok(Async::Ready(())); + } + + Ok(Async::NotReady) + }); + + // the `until_imported` stream doesn't request the blocks immediately, + // but it should request them after a small timeout + let timeout = Delay::new(Instant::now() + Duration::from_secs(60)); + let test = assert.select2(timeout).map(|res| match res { + Either::A(_) => {}, + Either::B(_) => panic!("timed out waiting for block sync request"), + }).map_err(|_| ()); + + runtime.block_on(test).unwrap(); + } } From 284ad34536262fe56c90263d4b30ad3282347e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:16:23 +0100 Subject: [PATCH 20/24] grandpa: rename Environment field inner to client --- core/finality-grandpa/src/environment.rs | 28 +++++++++++------------- core/finality-grandpa/src/lib.rs | 12 +++++----- core/finality-grandpa/src/tests.rs | 2 +- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index 7c5278028a6f8..149b00e80f920 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -370,9 +370,7 @@ impl SharedVoterSetState { /// The environment we run GRANDPA in. pub(crate) struct Environment, RA, SC, VR> { - // TODO: As far as I can tell this does not follow the pattern of having an outer struct protecting an inner struct - // via a lock in order to be Sync. How about renaming this to `client` like we do in many other places? - pub(crate) inner: Arc>, + pub(crate) client: Arc>, pub(crate) select_chain: SC, pub(crate) voters: Arc>, pub(crate) config: Config, @@ -415,7 +413,7 @@ where NumberFor: BlockNumberOps, { fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result, GrandpaError> { - ancestry(&self.inner, base, block) + ancestry(&self.client, base, block) } fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor)> { @@ -436,7 +434,7 @@ where match self.select_chain.finality_target(block, None) { Ok(Some(best_hash)) => { - let base_header = self.inner.header(&BlockId::Hash(block)).ok()? + let base_header = self.client.header(&BlockId::Hash(block)).ok()? .expect("Header known to exist after `best_containing` call; qed"); if let Some(limit) = limit { @@ -451,7 +449,7 @@ where } } - let best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()? + let best_header = self.client.header(&BlockId::Hash(best_hash)).ok()? .expect("Header known to exist after `best_containing` call; qed"); // check if our vote is currently being limited due to a pending change @@ -475,7 +473,7 @@ where break; } - target_header = self.inner.header(&BlockId::Hash(*target_header.parent_hash())).ok()? + target_header = self.client.header(&BlockId::Hash(*target_header.parent_hash())).ok()? .expect("Header known to exist after `best_containing` call; qed"); } @@ -494,7 +492,7 @@ where // authority set limit filter, which can be considered a // mandatory/implicit voting rule. self.voting_rule - .restrict_vote(&*self.inner, &base_header, &best_header, target_header) + .restrict_vote(&*self.client, &base_header, &best_header, target_header) .or(Some((target_header.hash(), *target_header.number()))) }, Ok(None) => { @@ -603,9 +601,9 @@ where // schedule incoming messages from the network to be held until // corresponding blocks are imported. let incoming = Box::new(UntilVoteTargetImported::new( - self.inner.import_notification_stream(), + self.client.import_notification_stream(), self.network.clone(), - self.inner.clone(), + self.client.clone(), incoming, "round", ).map_err(Into::into)); @@ -653,7 +651,7 @@ where current_rounds, }; - crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?; + crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?; Ok(Some(set_state)) })?; @@ -694,7 +692,7 @@ where current_rounds, }; - crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?; + crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?; Ok(Some(set_state)) })?; @@ -745,7 +743,7 @@ where current_rounds, }; - crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?; + crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?; Ok(Some(set_state)) })?; @@ -803,7 +801,7 @@ where current_rounds, }; - crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?; + crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?; Ok(Some(set_state)) })?; @@ -819,7 +817,7 @@ where commit: Commit, ) -> Result<(), Self::Error> { finalize_block( - &*self.inner, + &*self.client, &self.authority_set, &self.consensus_changes, Some(self.config.justification_period.into()), diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 5900ecd53c177..a92d1cec01b3b 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -632,7 +632,7 @@ where let voters = persistent_data.authority_set.current_authorities(); let env = Arc::new(Environment { - inner: client, + client, select_chain, voting_rule, voters: Arc::new(voters), @@ -671,7 +671,7 @@ where "authority_id" => authority_id.to_string(), ); - let chain_info = self.env.inner.info(); + let chain_info = self.env.client.info(); telemetry!(CONSENSUS_INFO; "afg.authority_set"; "number" => ?chain_info.chain.finalized_number, "hash" => ?chain_info.chain.finalized_hash, @@ -695,7 +695,7 @@ where let global_comms = global_communication( self.env.set_id, &self.env.voters, - &self.env.inner, + &self.env.client, &self.env.network, &self.env.config.keystore, ); @@ -743,7 +743,7 @@ where (new.canon_hash, new.canon_number), ); - aux_schema::write_voter_set_state(&*self.env.inner, &set_state)?; + aux_schema::write_voter_set_state(&*self.env.client, &set_state)?; Ok(Some(set_state)) })?; @@ -752,7 +752,7 @@ where set_id: new.set_id, voter_set_state: self.env.voter_set_state.clone(), // Fields below are simply transferred and not updated. - inner: self.env.inner.clone(), + client: self.env.client.clone(), select_chain: self.env.select_chain.clone(), config: self.env.config.clone(), authority_set: self.env.authority_set.clone(), @@ -772,7 +772,7 @@ where let completed_rounds = voter_set_state.completed_rounds(); let set_state = VoterSetState::Paused { completed_rounds }; - aux_schema::write_voter_set_state(&*self.env.inner, &set_state)?; + aux_schema::write_voter_set_state(&*self.env.client, &set_state)?; Ok(Some(set_state)) })?; diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 7f4a0d053a4d4..8c0047e38bdbf 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -1556,7 +1556,7 @@ fn grandpa_environment_respects_voting_rules() { authority_set: authority_set.clone(), config: config.clone(), consensus_changes: consensus_changes.clone(), - inner: link.client.clone(), + client: link.client.clone(), select_chain: link.select_chain.clone(), set_id: authority_set.set_id(), voter_set_state: set_state.clone(), From f030ec6e7010fc035bb2e5a5aa1c01c099f3e5fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:24:39 +0100 Subject: [PATCH 21/24] grandpa: fix minor nits --- core/finality-grandpa/src/communication/mod.rs | 7 ++++--- core/finality-grandpa/src/lib.rs | 10 ++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index d7758be87ac13..0e440f7e5e170 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -475,8 +475,9 @@ impl> NetworkBridge { format!("Failed to receive on unbounded receiver for round {}", round.0) )); - // Combine incoming votes from external Grandpa nodes with outgoing votes from our own Grandpa voter to have a - // single vote-import-pipeline. + // Combine incoming votes from external GRANDPA nodes with outgoing + // votes from our own GRANDPA voter to have a single + // vote-import-pipeline. let incoming = incoming.select(out_rx); (incoming, outgoing) @@ -524,7 +525,7 @@ impl> NetworkBridge { (incoming, outgoing) } - pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor){ + pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { self.service.set_sync_fork_request(peers, hash, number) } } diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index a92d1cec01b3b..63eddfd3f33fb 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -263,14 +263,20 @@ impl, RA> BlockStatus for Arc { - fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } impl BlockSyncRequester for NetworkBridge where Block: BlockT, N: communication::Network, { - fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor){ + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor) { NetworkBridge::set_sync_fork_request(self, peers, hash, number) } } From 10790e7fcad37bc4c2b557286f34e47c45c0e42c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:44:35 +0100 Subject: [PATCH 22/24] grandpa: minor nits in until_imported --- core/finality-grandpa/src/until_imported.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index 20e077fe9ad6b..c38e7d28468f4 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -78,8 +78,9 @@ pub(crate) struct UntilImported, ready: VecDeque, check_pending: Interval, - /// Mapping block hashes to their block number, the point in time it was first encountered (Instant) and a list of - /// Grandpa messages referencing the block hash. + /// Mapping block hashes to their block number, the point in time it was + /// first encountered (Instant) and a list of GRANDPA messages referencing + /// the block hash. pending: HashMap, Instant, Vec)>, identifier: &'static str, } @@ -184,8 +185,6 @@ impl Stream for UntilImported Stream for UntilImported Date: Wed, 23 Oct 2019 17:57:11 +0100 Subject: [PATCH 23/24] grandpa: copy docs for set_sync_fork_request --- core/finality-grandpa/src/communication/mod.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 0e440f7e5e170..6f43b1106a54e 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -130,7 +130,12 @@ pub trait Network: Clone + Send + 'static { /// Inform peers that a block with given hash should be downloaded. fn announce(&self, block: Block::Hash, associated_data: Vec); - /// Configure an explicit fork sync request. + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } @@ -525,6 +530,12 @@ impl> NetworkBridge { (incoming, outgoing) } + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { self.service.set_sync_fork_request(peers, hash, number) } From d668931eaa5619e5ca2c85203fc55838b0cae084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 23 Oct 2019 17:58:25 +0100 Subject: [PATCH 24/24] grandpa: remove stale TODO on UntilImported --- core/finality-grandpa/src/until_imported.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index c38e7d28468f4..5fca476a82b6a 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -74,7 +74,6 @@ pub(crate) struct UntilImported, Error = ()> + Send>>, block_sync_requester: BlockSyncRequester, status_check: BlockStatus, - // TODO: Why is this called inner? Why not being more descriptive and say finality_msg_stream? inner: Fuse, ready: VecDeque, check_pending: Interval,