diff --git a/client/network/src/service.rs b/client/network/src/service.rs index d24a1543c56fe..8dfd46f24a731 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -94,8 +94,6 @@ use std::{ pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; -#[cfg(test)] -mod chainsync_tests; mod metrics; mod out_events; #[cfg(test)] diff --git a/client/network/src/service/chainsync_tests.rs b/client/network/src/service/tests/chain_sync.rs similarity index 52% rename from client/network/src/service/chainsync_tests.rs rename to client/network/src/service/tests/chain_sync.rs index 27c0588a67200..7ff8c589d8550 100644 --- a/client/network/src/service/chainsync_tests.rs +++ b/client/network/src/service/tests/chain_sync.rs @@ -16,178 +16,26 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config, ChainSyncInterface, NetworkWorker}; +use crate::service::tests::TestNetworkBuilder; use futures::prelude::*; use libp2p::PeerId; use sc_block_builder::BlockBuilderProvider; -use sc_client_api::{BlockBackend, HeaderBackend}; +use sc_client_api::HeaderBackend; use sc_consensus::JustificationSyncLink; use sc_network_common::{ - config::{ - NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, - TransportConfig, - }, - protocol::role::Roles, service::NetworkSyncForkRequest, - sync::{message::BlockAnnouncesHandshake, ChainSync as ChainSyncT, SyncState, SyncStatus}, -}; -use sc_network_light::light_client_requests::handler::LightClientRequestHandler; -use sc_network_sync::{ - block_request_handler::BlockRequestHandler, mock::MockChainSync, - service::mock::MockChainSyncInterface, state_request_handler::StateRequestHandler, + sync::{SyncState, SyncStatus}, }; +use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface}; use sp_core::H256; use sp_runtime::{ generic::BlockId, - traits::{Block as BlockT, Header as _, Zero}, + traits::{Block as BlockT, Header as _}, }; use std::{iter, sync::Arc, task::Poll}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; -type TestNetworkWorker = NetworkWorker< - substrate_test_runtime_client::runtime::Block, - substrate_test_runtime_client::runtime::Hash, - substrate_test_runtime_client::TestClient, ->; - -const BLOCK_ANNOUNCE_PROTO_NAME: &str = "/block-announces"; -const PROTOCOL_NAME: &str = "/foo"; - -fn make_network( - chain_sync: Box>, - chain_sync_service: Box>, - client: Arc, -) -> (TestNetworkWorker, Arc) { - let network_config = config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: Default::default(), - }], - listen_addresses: vec![config::build_multiaddr![Memory(rand::random::())]], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }; - - #[derive(Clone)] - struct PassThroughVerifier(bool); - - #[async_trait::async_trait] - impl sc_consensus::Verifier for PassThroughVerifier { - async fn verify( - &mut self, - mut block: sc_consensus::BlockImportParams, - ) -> Result< - ( - sc_consensus::BlockImportParams, - Option)>>, - ), - String, - > { - let maybe_keys = block - .header - .digest() - .log(|l| { - l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) - .or_else(|| { - l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus( - b"babe", - )) - }) - }) - .map(|blob| { - vec![(sp_blockchain::well_known_cache_keys::AUTHORITIES, blob.to_vec())] - }); - - block.finalized = self.0; - block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); - Ok((block, maybe_keys)) - } - } - - let import_queue = Box::new(sc_consensus::BasicQueue::new( - PassThroughVerifier(false), - Box::new(client.clone()), - None, - &sp_core::testing::TaskExecutor::new(), - None, - )); - - let protocol_id = ProtocolId::from("/test-protocol-name"); - - let fork_id = Some(String::from("test-fork-id")); - - let block_request_protocol_config = { - let (handler, protocol_config) = - BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let state_request_protocol_config = { - let (handler, protocol_config) = - StateRequestHandler::new(&protocol_id, None, client.clone(), 50); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let light_client_request_protocol_config = { - let (handler, protocol_config) = - LightClientRequestHandler::new(&protocol_id, None, client.clone()); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let block_announce_config = NonDefaultSetConfig { - notifications_protocol: BLOCK_ANNOUNCE_PROTO_NAME.into(), - fallback_names: vec![], - max_notification_size: 1024 * 1024, - handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::< - substrate_test_runtime_client::runtime::Block, - >::build( - Roles::from(&config::Role::Full), - client.info().best_number, - client.info().best_hash, - client - .block_hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - ))), - set_config: SetConfig { - in_peers: 0, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Deny, - }, - }; - - let worker = NetworkWorker::new(config::Params { - block_announce_config, - role: config::Role::Full, - executor: None, - network_config, - chain: client.clone(), - protocol_id, - fork_id, - import_queue, - chain_sync, - chain_sync_service, - metrics_registry: None, - block_request_protocol_config, - state_request_protocol_config, - light_client_request_protocol_config, - warp_sync_protocol_config: None, - request_response_protocol_configs: Vec::new(), - }) - .unwrap(); - - (worker, client) -} - fn set_default_expecations_no_peers( chain_sync: &mut MockChainSync, ) { @@ -208,8 +56,6 @@ fn set_default_expecations_no_peers( #[async_std::test] async fn normal_network_poll_no_peers() { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - // build `ChainSync` and set default expectations for it let mut chain_sync = Box::new(MockChainSync::::new()); @@ -220,11 +66,13 @@ async fn normal_network_poll_no_peers() { let chain_sync_service = Box::new(MockChainSyncInterface::::new()); - let (mut network, _) = make_network(chain_sync, chain_sync_service, client); + let mut network = TestNetworkBuilder::new() + .with_chain_sync((chain_sync, chain_sync_service)) + .build(); // poll the network once futures::future::poll_fn(|cx| { - let _ = network.poll_unpin(cx); + let _ = network.network().poll_unpin(cx); Poll::Ready(()) }) .await; @@ -232,8 +80,6 @@ async fn normal_network_poll_no_peers() { #[async_std::test] async fn request_justification() { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be // called) let chain_sync_service = @@ -253,22 +99,22 @@ async fn request_justification() { .returning(|_, _| ()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, chain_sync_service, client); + let mut network = TestNetworkBuilder::new() + .with_chain_sync((chain_sync, chain_sync_service)) + .build(); // send "request justifiction" message and poll the network network.service().request_justification(&hash, number); futures::future::poll_fn(|cx| { - let _ = network.poll_unpin(cx); + let _ = network.network().poll_unpin(cx); Poll::Ready(()) }) .await; } #[async_std::test] -async fn clear_justification_requests(&mut self) { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - +async fn clear_justification_requests() { // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be // called) let chain_sync_service = @@ -281,13 +127,15 @@ async fn clear_justification_requests(&mut self) { chain_sync.expect_clear_justification_requests().once().returning(|| ()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, chain_sync_service, client); + let mut network = TestNetworkBuilder::new() + .with_chain_sync((chain_sync, chain_sync_service)) + .build(); // send "request justifiction" message and poll the network network.service().clear_justification_requests(); futures::future::poll_fn(|cx| { - let _ = network.poll_unpin(cx); + let _ = network.network().poll_unpin(cx); Poll::Ready(()) }) .await; @@ -295,8 +143,6 @@ async fn clear_justification_requests(&mut self) { #[async_std::test] async fn set_sync_fork_request() { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - // build `ChainSync` and set default expectations for it let mut chain_sync = Box::new(MockChainSync::::new()); @@ -320,13 +166,15 @@ async fn set_sync_fork_request() { .once() .returning(|_, _, _| ()); - let (mut network, _) = make_network(chain_sync, Box::new(chain_sync_service), client); + let mut network = TestNetworkBuilder::new() + .with_chain_sync((chain_sync, Box::new(chain_sync_service))) + .build(); // send "set sync fork request" message and poll the network network.service().set_sync_fork_request(copy_peers, hash, number); futures::future::poll_fn(|cx| { - let _ = network.poll_unpin(cx); + let _ = network.network().poll_unpin(cx); Poll::Ready(()) }) .await; @@ -362,13 +210,16 @@ async fn on_block_finalized() { .returning(|_, _| ()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, chain_sync_service, client); + let mut network = TestNetworkBuilder::new() + .with_client(client) + .with_chain_sync((chain_sync, chain_sync_service)) + .build(); // send "set sync fork request" message and poll the network - network.on_block_finalized(hash, header); + network.network().on_block_finalized(hash, header); futures::future::poll_fn(|cx| { - let _ = network.poll_unpin(cx); + let _ = network.network().poll_unpin(cx); Poll::Ready(()) }) .await; diff --git a/client/network/src/service/tests/mod.rs b/client/network/src/service/tests/mod.rs new file mode 100644 index 0000000000000..f829d9d43090f --- /dev/null +++ b/client/network/src/service/tests/mod.rs @@ -0,0 +1,297 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{config, ChainSyncInterface, NetworkService, NetworkWorker}; + +use futures::prelude::*; +use libp2p::Multiaddr; +use sc_client_api::{BlockBackend, HeaderBackend}; +use sc_consensus::ImportQueue; +use sc_network_common::{ + config::{ + NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, + TransportConfig, + }, + protocol::{event::Event, role::Roles}, + service::NetworkEventStream, + sync::{message::BlockAnnouncesHandshake, ChainSync as ChainSyncT}, +}; +use sc_network_light::light_client_requests::handler::LightClientRequestHandler; +use sc_network_sync::{ + block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + ChainSync, +}; +use sp_runtime::traits::{Block as BlockT, Header as _, Zero}; +use std::sync::Arc; +use substrate_test_runtime_client::{ + runtime::{Block as TestBlock, Hash as TestHash}, + TestClient, TestClientBuilder, TestClientBuilderExt as _, +}; + +#[cfg(test)] +mod chain_sync; +#[cfg(test)] +mod service; + +type TestNetworkWorker = NetworkWorker; +type TestNetworkService = NetworkService; + +const BLOCK_ANNOUNCE_PROTO_NAME: &str = "/block-announces"; +const PROTOCOL_NAME: &str = "/foo"; + +struct TestNetwork { + network: TestNetworkWorker, +} + +impl TestNetwork { + pub fn new(network: TestNetworkWorker) -> Self { + Self { network } + } + + pub fn service(&self) -> &Arc { + &self.network.service() + } + + pub fn network(&mut self) -> &mut TestNetworkWorker { + &mut self.network + } + + pub fn start_network( + self, + ) -> (Arc, (impl Stream + std::marker::Unpin)) { + let worker = self.network; + let service = worker.service().clone(); + let event_stream = service.event_stream("test"); + + async_std::task::spawn(async move { + futures::pin_mut!(worker); + let _ = worker.await; + }); + + (service, event_stream) + } +} + +struct TestNetworkBuilder { + import_queue: Option>>, + client: Option>, + listen_addresses: Vec, + set_config: Option, + chain_sync: Option<(Box>, Box>)>, + config: Option, +} + +impl TestNetworkBuilder { + pub fn new() -> Self { + Self { + import_queue: None, + client: None, + listen_addresses: Vec::new(), + set_config: None, + chain_sync: None, + config: None, + } + } + + pub fn with_client(mut self, client: Arc) -> Self { + self.client = Some(client); + self + } + + pub fn with_config(mut self, config: config::NetworkConfiguration) -> Self { + self.config = Some(config); + self + } + + pub fn with_listen_addresses(mut self, addresses: Vec) -> Self { + self.listen_addresses = addresses; + self + } + + pub fn with_set_config(mut self, set_config: SetConfig) -> Self { + self.set_config = Some(set_config); + self + } + + pub fn with_chain_sync( + mut self, + chain_sync: (Box>, Box>), + ) -> Self { + self.chain_sync = Some(chain_sync); + self + } + + pub fn build(mut self) -> TestNetwork { + let client = self.client.as_mut().map_or( + Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), + |v| v.clone(), + ); + + let network_config = self.config.unwrap_or(config::NetworkConfiguration { + extra_sets: vec![NonDefaultSetConfig { + notifications_protocol: PROTOCOL_NAME.into(), + fallback_names: Vec::new(), + max_notification_size: 1024 * 1024, + handshake: None, + set_config: self.set_config.unwrap_or_default(), + }], + listen_addresses: self.listen_addresses, + transport: TransportConfig::MemoryOnly, + ..config::NetworkConfiguration::new_local() + }); + + #[derive(Clone)] + struct PassThroughVerifier(bool); + + #[async_trait::async_trait] + impl sc_consensus::Verifier for PassThroughVerifier { + async fn verify( + &mut self, + mut block: sc_consensus::BlockImportParams, + ) -> Result< + ( + sc_consensus::BlockImportParams, + Option)>>, + ), + String, + > { + let maybe_keys = block + .header + .digest() + .log(|l| { + l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) + .or_else(|| { + l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus( + b"babe", + )) + }) + }) + .map(|blob| { + vec![(sp_blockchain::well_known_cache_keys::AUTHORITIES, blob.to_vec())] + }); + + block.finalized = self.0; + block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); + Ok((block, maybe_keys)) + } + } + + let import_queue = self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new( + PassThroughVerifier(false), + Box::new(client.clone()), + None, + &sp_core::testing::TaskExecutor::new(), + None, + ))); + + let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({ + let (chain_sync, chain_sync_service) = ChainSync::new( + match network_config.sync_mode { + config::SyncMode::Full => sc_network_common::sync::SyncMode::Full, + config::SyncMode::Fast { skip_proofs, storage_chain_mode } => + sc_network_common::sync::SyncMode::LightState { + skip_proofs, + storage_chain_mode, + }, + config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, + }, + client.clone(), + Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), + network_config.max_parallel_downloads, + None, + ) + .unwrap(); + + (Box::new(chain_sync), chain_sync_service) + }); + + let protocol_id = ProtocolId::from("test-protocol-name"); + let fork_id = Some(String::from("test-fork-id")); + + let block_request_protocol_config = { + let (handler, protocol_config) = + BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + + let state_request_protocol_config = { + let (handler, protocol_config) = + StateRequestHandler::new(&protocol_id, None, client.clone(), 50); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + + let light_client_request_protocol_config = { + let (handler, protocol_config) = + LightClientRequestHandler::new(&protocol_id, None, client.clone()); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + + let block_announce_config = NonDefaultSetConfig { + notifications_protocol: BLOCK_ANNOUNCE_PROTO_NAME.into(), + fallback_names: vec![], + max_notification_size: 1024 * 1024, + handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::< + substrate_test_runtime_client::runtime::Block, + >::build( + Roles::from(&config::Role::Full), + client.info().best_number, + client.info().best_hash, + client + .block_hash(Zero::zero()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + ))), + set_config: SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Deny, + }, + }; + + let worker = NetworkWorker::< + substrate_test_runtime_client::runtime::Block, + substrate_test_runtime_client::runtime::Hash, + substrate_test_runtime_client::TestClient, + >::new(config::Params { + block_announce_config, + role: config::Role::Full, + executor: None, + network_config, + chain: client.clone(), + protocol_id, + fork_id, + import_queue, + chain_sync, + chain_sync_service, + metrics_registry: None, + block_request_protocol_config, + state_request_protocol_config, + light_client_request_protocol_config, + warp_sync_protocol_config: None, + request_response_protocol_configs: Vec::new(), + }) + .unwrap(); + + TestNetwork::new(worker) + } +} diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests/service.rs similarity index 56% rename from client/network/src/service/tests.rs rename to client/network/src/service/tests/service.rs index 4a0ef4611da6e..90945fdcef2cf 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests/service.rs @@ -16,183 +16,22 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config, NetworkService, NetworkWorker}; +use crate::{config, service::tests::TestNetworkBuilder, NetworkService}; use futures::prelude::*; use libp2p::PeerId; -use sc_client_api::{BlockBackend, HeaderBackend}; use sc_network_common::{ - config::{ - MultiaddrWithPeerId, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, - ProtocolId, SetConfig, TransportConfig, - }, - protocol::{event::Event, role::Roles}, - service::{NetworkEventStream, NetworkNotification, NetworkPeers, NetworkStateInfo}, - sync::message::BlockAnnouncesHandshake, + config::{MultiaddrWithPeerId, NonDefaultSetConfig, SetConfig, TransportConfig}, + protocol::event::Event, + service::{NetworkNotification, NetworkPeers, NetworkStateInfo}, }; -use sc_network_light::light_client_requests::handler::LightClientRequestHandler; -use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, - ChainSync, -}; -use sp_consensus::block_validation::DefaultBlockAnnounceValidator; -use sp_runtime::traits::{Block as BlockT, Header as _, Zero}; use std::{sync::Arc, time::Duration}; -use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; type TestNetworkService = NetworkService< substrate_test_runtime_client::runtime::Block, substrate_test_runtime_client::runtime::Hash, >; -/// Builds a full node to be used for testing. Returns the node service and its associated events -/// stream. -/// -/// > **Note**: We return the events stream in order to not possibly lose events between the -/// > construction of the service and the moment the events stream is grabbed. -fn build_test_full_node( - network_config: config::NetworkConfiguration, -) -> (Arc, impl Stream) { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - - #[derive(Clone)] - struct PassThroughVerifier(bool); - - #[async_trait::async_trait] - impl sc_consensus::Verifier for PassThroughVerifier { - async fn verify( - &mut self, - mut block: sc_consensus::BlockImportParams, - ) -> Result< - ( - sc_consensus::BlockImportParams, - Option)>>, - ), - String, - > { - let maybe_keys = block - .header - .digest() - .log(|l| { - l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) - .or_else(|| { - l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus( - b"babe", - )) - }) - }) - .map(|blob| { - vec![(sp_blockchain::well_known_cache_keys::AUTHORITIES, blob.to_vec())] - }); - - block.finalized = self.0; - block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); - Ok((block, maybe_keys)) - } - } - - let import_queue = Box::new(sc_consensus::BasicQueue::new( - PassThroughVerifier(false), - Box::new(client.clone()), - None, - &sp_core::testing::TaskExecutor::new(), - None, - )); - - let protocol_id = ProtocolId::from("/test-protocol-name"); - - let fork_id = Some(String::from("test-fork-id")); - - let block_request_protocol_config = { - let (handler, protocol_config) = - BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let state_request_protocol_config = { - let (handler, protocol_config) = - StateRequestHandler::new(&protocol_id, None, client.clone(), 50); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let light_client_request_protocol_config = { - let (handler, protocol_config) = - LightClientRequestHandler::new(&protocol_id, None, client.clone()); - async_std::task::spawn(handler.run().boxed()); - protocol_config - }; - - let (chain_sync, chain_sync_service) = ChainSync::new( - match network_config.sync_mode { - config::SyncMode::Full => sc_network_common::sync::SyncMode::Full, - config::SyncMode::Fast { skip_proofs, storage_chain_mode } => - sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, - config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, - }, - client.clone(), - Box::new(DefaultBlockAnnounceValidator), - network_config.max_parallel_downloads, - None, - ) - .unwrap(); - - let block_announce_config = NonDefaultSetConfig { - notifications_protocol: BLOCK_ANNOUNCE_PROTO_NAME.into(), - fallback_names: vec![], - max_notification_size: 1024 * 1024, - handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::< - substrate_test_runtime_client::runtime::Block, - >::build( - Roles::from(&config::Role::Full), - client.info().best_number, - client.info().best_hash, - client - .block_hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - ))), - set_config: SetConfig { - in_peers: 0, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Deny, - }, - }; - - let worker = NetworkWorker::new(config::Params { - block_announce_config, - role: config::Role::Full, - executor: None, - network_config, - chain: client.clone(), - protocol_id, - fork_id, - import_queue, - chain_sync: Box::new(chain_sync), - chain_sync_service, - metrics_registry: None, - block_request_protocol_config, - state_request_protocol_config, - light_client_request_protocol_config, - warp_sync_protocol_config: None, - request_response_protocol_configs: Vec::new(), - }) - .unwrap(); - - let service = worker.service().clone(); - let event_stream = service.event_stream("test"); - - async_std::task::spawn(async move { - futures::pin_mut!(worker); - let _ = worker.await; - }); - - (service, event_stream) -} - const BLOCK_ANNOUNCE_PROTO_NAME: &str = "/block-announces"; const PROTOCOL_NAME: &str = "/foo"; @@ -206,37 +45,21 @@ fn build_nodes_one_proto() -> ( ) { let listen_addr = config::build_multiaddr![Memory(rand::random::())]; - let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: Default::default(), - }], - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); + let (node1, events_stream1) = TestNetworkBuilder::new() + .with_listen_addresses(vec![listen_addr.clone()]) + .build() + .start_network(); - let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: SetConfig { - reserved_nodes: vec![MultiaddrWithPeerId { - multiaddr: listen_addr, - peer_id: node1.local_peer_id(), - }], - ..Default::default() - }, - }], - listen_addresses: vec![], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); + let (node2, events_stream2) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id(), + }], + ..Default::default() + }) + .build() + .start_network(); (node1, events_stream1, node2, events_stream2) } @@ -393,22 +216,15 @@ fn notifications_state_consistent() { }); } -#[test] -fn lots_of_incoming_peers_works() { +#[async_std::test] +async fn lots_of_incoming_peers_works() { let listen_addr = config::build_multiaddr![Memory(rand::random::())]; - let (main_node, _) = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: SetConfig { in_peers: u32::MAX, ..Default::default() }, - }], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); + let (main_node, _) = TestNetworkBuilder::new() + .with_listen_addresses(vec![listen_addr.clone()]) + .with_set_config(SetConfig { in_peers: u32::MAX, ..Default::default() }) + .build() + .start_network(); let main_node_peer_id = main_node.local_peer_id(); @@ -417,24 +233,16 @@ fn lots_of_incoming_peers_works() { let mut background_tasks_to_wait = Vec::new(); for _ in 0..32 { - let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![], - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: SetConfig { - reserved_nodes: vec![MultiaddrWithPeerId { - multiaddr: listen_addr.clone(), - peer_id: main_node_peer_id, - }], - ..Default::default() - }, - }], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); + let (_dialing_node, event_stream) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr.clone(), + peer_id: main_node_peer_id, + }], + ..Default::default() + }) + .build() + .start_network(); background_tasks_to_wait.push(async_std::task::spawn(async move { // Create a dummy timer that will "never" fire, and that will be overwritten when we @@ -469,7 +277,7 @@ fn lots_of_incoming_peers_works() { })); } - futures::executor::block_on(async move { future::join_all(background_tasks_to_wait).await }); + future::join_all(background_tasks_to_wait).await; } #[test] @@ -531,42 +339,35 @@ fn notifications_back_pressure() { fn fallback_name_working() { // Node 1 supports the protocols "new" and "old". Node 2 only supports "old". Checks whether // they can connect. - const NEW_PROTOCOL_NAME: &str = "/new-shiny-protocol-that-isnt-PROTOCOL_NAME"; let listen_addr = config::build_multiaddr![Memory(rand::random::())]; - - let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: NEW_PROTOCOL_NAME.into(), - fallback_names: vec![PROTOCOL_NAME.into()], - max_notification_size: 1024 * 1024, - handshake: None, - set_config: Default::default(), - }], - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); - - let (_, mut events_stream2) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: SetConfig { - reserved_nodes: vec![MultiaddrWithPeerId { - multiaddr: listen_addr, - peer_id: node1.local_peer_id(), - }], - ..Default::default() - }, - }], - listen_addresses: vec![], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); + let (node1, mut events_stream1) = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + extra_sets: vec![NonDefaultSetConfig { + notifications_protocol: NEW_PROTOCOL_NAME.into(), + fallback_names: vec![PROTOCOL_NAME.into()], + max_notification_size: 1024 * 1024, + handshake: None, + set_config: Default::default(), + }], + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + ..config::NetworkConfiguration::new_local() + }) + .build() + .start_network(); + + let (_, mut events_stream2) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id(), + }], + ..Default::default() + }) + .build() + .start_network(); let receiver = async_std::task::spawn(async move { // Wait for the `NotificationStreamOpened`. @@ -604,39 +405,7 @@ fn fallback_name_working() { // protocol name and verify that `SyncDisconnected` event is emitted #[async_std::test] async fn disconnect_sync_peer_using_block_announcement_protocol_name() { - let listen_addr = config::build_multiaddr![Memory(rand::random::())]; - - let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: vec![], - max_notification_size: 1024 * 1024, - handshake: None, - set_config: Default::default(), - }], - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); - - let (node2, mut events_stream2) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![NonDefaultSetConfig { - notifications_protocol: PROTOCOL_NAME.into(), - fallback_names: Vec::new(), - max_notification_size: 1024 * 1024, - handshake: None, - set_config: SetConfig { - reserved_nodes: vec![MultiaddrWithPeerId { - multiaddr: listen_addr, - peer_id: node1.local_peer_id(), - }], - ..Default::default() - }, - }], - listen_addresses: vec![], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new_local() - }); + let (node1, mut events_stream1, node2, mut events_stream2) = build_nodes_one_proto(); async fn wait_for_events(stream: &mut (impl Stream + std::marker::Unpin)) { let mut notif_received = false; @@ -673,11 +442,19 @@ async fn disconnect_sync_peer_using_block_announcement_protocol_name() { fn ensure_listen_addresses_consistent_with_transport_memory() { let listen_addr = config::build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(0_u16)]; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -685,10 +462,18 @@ fn ensure_listen_addresses_consistent_with_transport_memory() { fn ensure_listen_addresses_consistent_with_transport_not_memory() { let listen_addr = config::build_multiaddr![Memory(rand::random::())]; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -700,12 +485,20 @@ fn ensure_boot_node_addresses_consistent_with_transport_memory() { peer_id: PeerId::random(), }; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - boot_nodes: vec![boot_node], - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + boot_nodes: vec![boot_node], + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -717,11 +510,19 @@ fn ensure_boot_node_addresses_consistent_with_transport_not_memory() { peer_id: PeerId::random(), }; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - boot_nodes: vec![boot_node], - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + boot_nodes: vec![boot_node], + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -733,12 +534,23 @@ fn ensure_reserved_node_addresses_consistent_with_transport_memory() { peer_id: PeerId::random(), }; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - default_peers_set: SetConfig { reserved_nodes: vec![reserved_node], ..Default::default() }, - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + default_peers_set: SetConfig { + reserved_nodes: vec![reserved_node], + ..Default::default() + }, + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -750,11 +562,22 @@ fn ensure_reserved_node_addresses_consistent_with_transport_not_memory() { peer_id: PeerId::random(), }; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - default_peers_set: SetConfig { reserved_nodes: vec![reserved_node], ..Default::default() }, - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + default_peers_set: SetConfig { + reserved_nodes: vec![reserved_node], + ..Default::default() + }, + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -763,12 +586,20 @@ fn ensure_public_addresses_consistent_with_transport_memory() { let listen_addr = config::build_multiaddr![Memory(rand::random::())]; let public_address = config::build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(0_u16)]; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - transport: TransportConfig::MemoryOnly, - public_addresses: vec![public_address], - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + public_addresses: vec![public_address], + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); } #[test] @@ -777,9 +608,17 @@ fn ensure_public_addresses_consistent_with_transport_not_memory() { let listen_addr = config::build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(0_u16)]; let public_address = config::build_multiaddr![Memory(rand::random::())]; - let _ = build_test_full_node(config::NetworkConfiguration { - listen_addresses: vec![listen_addr.clone()], - public_addresses: vec![public_address], - ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) - }); + let _ = TestNetworkBuilder::new() + .with_config(config::NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + public_addresses: vec![public_address], + ..config::NetworkConfiguration::new( + "test-node", + "test-client", + Default::default(), + None, + ) + }) + .build() + .start_network(); }