diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index ffd8f1c8c642d..b94c37d07e192 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -106,7 +106,7 @@ impl TestNetFactory for GrandpaTestNet { _cfg: &ProtocolConfig, _: &PeerData, ) -> Self::Verifier { - PassThroughVerifier(false) // use non-instant finality. + PassThroughVerifier::new(false) // use non-instant finality. } fn make_block_import(&self, client: PeersClient) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 90076552a75a5..ff3748bd55cf2 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -1287,7 +1287,7 @@ impl Protocol { } let is_best = self.context_data.chain.info().best_hash == hash; - debug!(target: "sync", "Reannouncing block {:?}", hash); + debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best); self.send_announcement(&header, data, is_best, true) } @@ -2160,7 +2160,7 @@ mod tests { reserved_only: false, priority_groups: Vec::new(), }, - Box::new(DefaultBlockAnnounceValidator::new(client.clone())), + Box::new(DefaultBlockAnnounceValidator), None, Default::default(), None, diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index c3e87ca19a341..bfd8c4fe218de 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -48,6 +48,7 @@ use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header, NumberFor, Zero, One, CheckedSub, SaturatedConversion, Hash, HashFor} }; +use sp_arithmetic::traits::Saturating; use std::{fmt, ops::Range, collections::{HashMap, HashSet, VecDeque}, sync::Arc}; mod blocks; @@ -388,7 +389,7 @@ impl ChainSync { /// Returns the current sync status. pub fn status(&self) -> Status { - let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number); + let best_seen = self.peers.values().map(|p| p.best_number).max(); let sync_state = if let Some(n) = best_seen { // A chain is classified as downloading if the provided best block is @@ -1186,6 +1187,21 @@ impl ChainSync { peer.recently_announced.pop_front(); } peer.recently_announced.push_back(hash.clone()); + + // Let external validator check the block announcement. + let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); + let is_best = match self.block_announce_validator.validate(&header, assoc_data) { + Ok(Validation::Success { is_new_best }) => is_new_best || is_best, + Ok(Validation::Failure) => { + debug!(target: "sync", "Block announcement validation of block {} from {} failed", hash, who); + return OnBlockAnnounce::Nothing + } + Err(e) => { + error!(target: "sync", "💔 Block announcement validation errored: {}", e); + return OnBlockAnnounce::Nothing + } + }; + if is_best { // update their best block peer.best_number = number; @@ -1216,20 +1232,6 @@ impl ChainSync { return OnBlockAnnounce::Nothing } - // Let external validator check the block announcement. - let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); - match self.block_announce_validator.validate(&header, assoc_data) { - Ok(Validation::Success) => (), - Ok(Validation::Failure) => { - debug!(target: "sync", "Block announcement validation of block {} from {} failed", hash, who); - return OnBlockAnnounce::Nothing - } - Err(e) => { - error!(target: "sync", "💔 Block announcement validation errored: {}", e); - return OnBlockAnnounce::Nothing - } - } - if ancient_parent { trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header); return OnBlockAnnounce::Nothing @@ -1428,14 +1430,24 @@ fn peer_block_request( max_parallel_downloads, MAX_DOWNLOAD_AHEAD, ) { + // The end is not part of the range. + let last = range.end.saturating_sub(One::one()); + + let from = if peer.best_number == last { + message::FromBlock::Hash(peer.best_hash) + } else { + message::FromBlock::Number(last) + }; + let request = message::generic::BlockRequest { id: 0, fields: attrs.clone(), - from: message::FromBlock::Number(range.start), + from, to: None, - direction: message::Direction::Ascending, + direction: message::Direction::Descending, max: Some((range.end - range.start).saturated_into::()) }; + Some((range, request)) } else { None @@ -1558,7 +1570,7 @@ mod test { let client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); - let block_announce_validator = Box::new(DefaultBlockAnnounceValidator::new(client.clone())); + let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); let mut sync = ChainSync::new( diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index c027c3be7378c..17d9553fa66bb 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -104,7 +104,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) protocol_id: config::ProtocolId::from(&b"/test-protocol-name"[..]), import_queue, block_announce_validator: Box::new( - sp_consensus::block_validation::DefaultBlockAnnounceValidator::new(client.clone()), + sp_consensus::block_validation::DefaultBlockAnnounceValidator, ), metrics_registry: None, }) diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index 46a395700c54d..6762b74b6b8bd 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -59,7 +59,7 @@ fn import_single_good_block_works() { &mut substrate_test_runtime_client::new(), BlockOrigin::File, block, - &mut PassThroughVerifier(true) + &mut PassThroughVerifier::new(true) ) { Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org)) if *num == number && *aux == expected_aux && *org == Some(peer_id) => {} @@ -74,7 +74,7 @@ fn import_single_good_known_block_is_ignored() { &mut client, BlockOrigin::File, block, - &mut PassThroughVerifier(true) + &mut PassThroughVerifier::new(true) ) { Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {} _ => panic!() @@ -89,7 +89,7 @@ fn import_single_good_block_without_header_fails() { &mut substrate_test_runtime_client::new(), BlockOrigin::File, block, - &mut PassThroughVerifier(true) + &mut PassThroughVerifier::new(true) ) { Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {} _ => panic!() @@ -101,7 +101,7 @@ fn async_import_queue_drops() { let executor = sp_core::testing::SpawnBlockingExecutor::new(); // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { - let verifier = PassThroughVerifier(true); + let verifier = PassThroughVerifier::new(true); let queue = BasicQueue::new( verifier, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index a3e644558b6e6..2896c4e3e1853 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -39,7 +39,7 @@ use sc_client_api::{ use sc_consensus::LongestChain; use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; use sc_network::config::Role; -use sp_consensus::block_validation::DefaultBlockAnnounceValidator; +use sp_consensus::block_validation::{DefaultBlockAnnounceValidator, BlockAnnounceValidator}; use sp_consensus::import_queue::{ BasicQueue, BoxJustificationImport, Verifier, BoxFinalityProofImport, }; @@ -67,7 +67,33 @@ type AuthorityId = sp_consensus_babe::AuthorityId; /// A Verifier that accepts all blocks and passes them on with the configured /// finality to be imported. #[derive(Clone)] -pub struct PassThroughVerifier(pub bool); +pub struct PassThroughVerifier { + finalized: bool, + fork_choice: ForkChoiceStrategy, +} + +impl PassThroughVerifier { + /// Create a new instance. + /// + /// Every verified block will use `finalized` for the `BlockImportParams`. + pub fn new(finalized: bool) -> Self { + Self { + finalized, + fork_choice: ForkChoiceStrategy::LongestChain, + } + } + + /// Create a new instance. + /// + /// Every verified block will use `finalized` for the `BlockImportParams` and + /// the given [`ForkChoiceStrategy`]. + pub fn new_with_fork_choice(finalized: bool, fork_choice: ForkChoiceStrategy) -> Self { + Self { + finalized, + fork_choice, + } + } +} /// This `Verifier` accepts all data as valid. impl Verifier for PassThroughVerifier { @@ -85,9 +111,9 @@ impl Verifier for PassThroughVerifier { .map(|blob| vec![(well_known_cache_keys::AUTHORITIES, blob.to_vec())]); let mut import = BlockImportParams::new(origin, header); import.body = body; - import.finalized = self.0; + import.finalized = self.finalized; import.justification = justification; - import.fork_choice = Some(ForkChoiceStrategy::LongestChain); + import.fork_choice = Some(self.fork_choice.clone()); Ok((import, maybe_keys)) } @@ -294,6 +320,7 @@ impl Peer { } else { Default::default() }; + self.block_import.import_block(import_block, cache).expect("block_import failed"); self.network.service().announce_block(hash, Vec::new()); at = hash; @@ -519,6 +546,15 @@ impl VerifierAdapter { } } +/// Configuration for a full peer. +#[derive(Default)] +pub struct FullPeerConfig { + /// Pruning window size. + pub keep_blocks: Option, + /// Block announce validator. + pub block_announce_validator: Option + Send + Sync>>, +} + pub trait TestNetFactory: Sized { type Verifier: 'static + Verifier; type PeerData: Default; @@ -579,12 +615,12 @@ pub trait TestNetFactory: Sized { } fn add_full_peer(&mut self) { - self.add_full_peer_with_states(None) + self.add_full_peer_with_config(Default::default()) } /// Add a full peer. - fn add_full_peer_with_states(&mut self, keep_blocks: Option) { - let test_client_builder = match keep_blocks { + fn add_full_peer_with_config(&mut self, config: FullPeerConfig) { + let test_client_builder = match config.keep_blocks { Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks), None => TestClientBuilder::with_default_backend(), }; @@ -641,7 +677,8 @@ pub trait TestNetFactory: Sized { transaction_pool: Arc::new(EmptyTransactionPool), protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), import_queue, - block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())), + block_announce_validator: config.block_announce_validator + .unwrap_or(Box::new(DefaultBlockAnnounceValidator)), metrics_registry: None, }).unwrap(); @@ -720,7 +757,7 @@ pub trait TestNetFactory: Sized { transaction_pool: Arc::new(EmptyTransactionPool), protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), import_queue, - block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())), + block_announce_validator: Box::new(DefaultBlockAnnounceValidator), metrics_registry: None, }).unwrap(); @@ -787,6 +824,20 @@ pub trait TestNetFactory: Sized { Poll::Ready(()) } + /// Polls the testnet until all peers are connected to each other. + /// + /// Must be executed in a task context. + fn poll_until_connected(&mut self, cx: &mut FutureContext) -> Poll<()> { + self.poll(cx); + + let num_peers = self.peers().len(); + if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) { + return Poll::Ready(()) + } + + Poll::Pending + } + /// Blocks the current thread until we are sync'ed. /// /// Calls `poll_until_sync` repeatedly. @@ -801,6 +852,15 @@ pub trait TestNetFactory: Sized { futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| self.poll_until_idle(cx))); } + /// Blocks the current thread until all peers are connected to each other. + /// + /// Calls `poll_until_connected` repeatedly with the runtime passed as parameter. + fn block_until_connected(&mut self) { + futures::executor::block_on( + futures::future::poll_fn::<(), _>(|cx| self.poll_until_connected(cx)), + ); + } + /// Polls the testnet. Processes all the pending actions and returns `NotReady`. fn poll(&mut self, cx: &mut FutureContext) { self.mut_peers(|peers| { @@ -831,6 +891,17 @@ pub trait TestNetFactory: Sized { pub struct TestNet { peers: Vec>, + fork_choice: ForkChoiceStrategy, +} + +impl TestNet { + /// Create a `TestNet` that used the given fork choice rule. + pub fn with_fork_choice(fork_choice: ForkChoiceStrategy) -> Self { + Self { + peers: Vec::new(), + fork_choice, + } + } } impl TestNetFactory for TestNet { @@ -841,13 +912,14 @@ impl TestNetFactory for TestNet { fn from_config(_config: &ProtocolConfig) -> Self { TestNet { peers: Vec::new(), + fork_choice: ForkChoiceStrategy::LongestChain, } } fn make_verifier(&self, _client: PeersClient, _config: &ProtocolConfig, _peer_data: &()) -> Self::Verifier { - PassThroughVerifier(false) + PassThroughVerifier::new_with_fork_choice(false, self.fork_choice.clone()) } fn peer(&mut self, i: usize) -> &mut Peer<()> { diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 0269eb35628ea..1cf2a8fee3798 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -20,6 +20,8 @@ use sp_consensus::BlockOrigin; use std::time::Duration; use futures::executor::block_on; use super::*; +use sp_consensus::block_validation::Validation; +use substrate_test_runtime::Header; fn test_ancestor_search_when_common_is(n: usize) { let _ = ::env_logger::try_init(); @@ -582,10 +584,10 @@ fn can_sync_explicit_forks() { #[test] fn syncs_header_only_forks() { - let _ = ::env_logger::try_init(); + let _ = env_logger::try_init(); let mut net = TestNet::new(0); - net.add_full_peer_with_states(None); - net.add_full_peer_with_states(Some(3)); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(FullPeerConfig { keep_blocks: Some(3), ..Default::default() }); net.peer(0).push_blocks(2, false); net.peer(1).push_blocks(2, false); @@ -683,7 +685,7 @@ fn imports_stale_once() { #[test] fn can_sync_to_peers_with_wrong_common_block() { - let _ = ::env_logger::try_init(); + let _ = env_logger::try_init(); let mut net = TestNet::new(2); net.peer(0).push_blocks(2, true); @@ -710,3 +712,41 @@ fn can_sync_to_peers_with_wrong_common_block() { assert!(net.peer(1).client().header(&BlockId::Hash(final_hash)).unwrap().is_some()); } +/// Returns `is_new_best = true` for each validated announcement. +struct NewBestBlockAnnounceValidator; + +impl BlockAnnounceValidator for NewBestBlockAnnounceValidator { + fn validate( + &mut self, + _: &Header, + _: &[u8], + ) -> Result> { + Ok(Validation::Success { is_new_best: true }) + } +} + +#[test] +fn sync_blocks_when_block_announce_validator_says_it_is_new_best() { + let _ = env_logger::try_init(); + log::trace!(target: "sync", "Test"); + let mut net = TestNet::with_fork_choice(ForkChoiceStrategy::Custom(false)); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(FullPeerConfig { + block_announce_validator: Some(Box::new(NewBestBlockAnnounceValidator)), + ..Default::default() + }); + + net.block_until_connected(); + + let block_hash = net.peer(0).push_blocks(1, false); + + while !net.peer(2).has_block(&block_hash) { + net.block_until_idle(); + } + + // Peer1 should not have the block, because peer 0 did not reported the block + // as new best. However, peer2 has a special block announcement validator + // that flags all blocks as `is_new_best` and thus, it should have synced the blocks. + assert!(!net.peer(1).has_block(&block_hash)); +} diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 8c96f514ddaee..234356856b313 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1426,7 +1426,7 @@ fn build_network( let block_announce_validator = if let Some(f) = block_announce_validator_builder { f(client.clone()) } else { - Box::new(DefaultBlockAnnounceValidator::new(client.clone())) + Box::new(DefaultBlockAnnounceValidator) }; let network_params = sc_network::config::Params { diff --git a/primitives/consensus/common/src/block_validation.rs b/primitives/consensus/common/src/block_validation.rs index e8054f3ae40d7..66f960f16fff3 100644 --- a/primitives/consensus/common/src/block_validation.rs +++ b/primitives/consensus/common/src/block_validation.rs @@ -36,7 +36,10 @@ impl, B: Block> Chain for Arc { #[derive(Debug, PartialEq, Eq)] pub enum Validation { /// Valid block announcement. - Success, + Success { + /// Is this the new best block of the node? + is_new_best: bool, + }, /// Invalid block announcement. Failure, } @@ -49,18 +52,10 @@ pub trait BlockAnnounceValidator { /// Default implementation of `BlockAnnounceValidator`. #[derive(Debug)] -pub struct DefaultBlockAnnounceValidator { - chain: C -} - -impl DefaultBlockAnnounceValidator { - pub fn new(chain: C) -> Self { - Self { chain } - } -} +pub struct DefaultBlockAnnounceValidator; -impl> BlockAnnounceValidator for DefaultBlockAnnounceValidator { +impl BlockAnnounceValidator for DefaultBlockAnnounceValidator { fn validate(&mut self, _h: &B::Header, _d: &[u8]) -> Result> { - Ok(Validation::Success) + Ok(Validation::Success { is_new_best: false }) } }