diff --git a/Cargo.lock b/Cargo.lock index 032886b9945e2..7f49ff75561db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8638,6 +8638,7 @@ name = "sc-network-sync" version = "0.10.0-dev" dependencies = [ "array-bytes", + "async-std", "fork-tree", "futures", "libp2p", @@ -8653,6 +8654,7 @@ dependencies = [ "sc-consensus", "sc-network-common", "sc-peerset", + "sc-utils", "smallvec", "sp-arithmetic", "sp-blockchain", diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index d3603c6792c84..aa761e66ebf68 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -395,4 +395,14 @@ pub trait ChainSync: Send { /// Decode implementation-specific state response. fn decode_state_response(&self, response: &[u8]) -> Result; + + /// Advance the state of `ChainSync` + /// + /// Internally calls [`ChainSync::poll_block_announce_validation()`] and + /// this function should be polled until it returns [`Poll::Pending`] to + /// consume all pending events. + fn poll( + &mut self, + cx: &mut std::task::Context, + ) -> Poll>; } diff --git a/client/network/src/config.rs b/client/network/src/config.rs index db3e8f0b98a33..dcd7c7d8ad6de 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -33,6 +33,7 @@ pub use sc_network_common::{ pub use libp2p::{build_multiaddr, core::PublicKey, identity}; +use crate::ChainSyncInterface; use core::{fmt, iter}; use libp2p::{ identity::{ed25519, Keypair}, @@ -91,6 +92,9 @@ where /// Instance of chain sync implementation. pub chain_sync: Box>, + /// Interface that can be used to delegate syncing-related function calls to `ChainSync` + pub chain_sync_service: Box>, + /// Registry for recording prometheus metrics to. pub metrics_registry: Option, diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 27f2a938154fe..f3faa44ee6dbd 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -279,6 +279,7 @@ pub use service::{ DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationSenderReady, OutboundFailure, PublicKey, }; +use sp_runtime::traits::{Block as BlockT, NumberFor}; pub use sc_peerset::ReputationChange; @@ -293,3 +294,14 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2; /// The maximum number of concurrent established connections that were incoming. const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000; + +/// Abstraction over syncing-related services +pub trait ChainSyncInterface: + NetworkSyncForkRequest> + Send + Sync +{ +} + +impl ChainSyncInterface for T where + T: NetworkSyncForkRequest> + Send + Sync +{ +} diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index c3def8adc6cfe..a04dba79f7fd2 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -947,18 +947,6 @@ where self.chain_sync.clear_justification_requests(); } - /// Request syncing for the given block from given set of peers. - /// Uses `protocol` to queue a new block download request and tries to dispatch all pending - /// requests. - pub fn set_sync_fork_request( - &mut self, - peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { - self.chain_sync.set_sync_fork_request(peers, hash, number) - } - /// A batch of blocks have been processed, with or without errors. /// Call this when a batch of blocks have been processed by the importqueue, with or without /// errors. @@ -1461,8 +1449,11 @@ where self.pending_messages.push_back(event); } - // Check if there is any block announcement validation finished. - while let Poll::Ready(result) = self.chain_sync.poll_block_announce_validation(cx) { + // Advance the state of `ChainSync` + // + // Process any received requests received from `NetworkService` and + // check if there is any block announcement validation finished. + while let Poll::Ready(result) = self.chain_sync.poll(cx) { match self.process_block_announce_validation_result(result) { CustomMessageOutcome::None => {}, outcome => self.pending_messages.push_back(outcome), diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 25916041285a3..d1f428cc292c3 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -35,7 +35,7 @@ use crate::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, protocol::{self, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready}, - transport, ReputationChange, + transport, ChainSyncInterface, ReputationChange, }; use futures::{channel::oneshot, prelude::*}; @@ -121,6 +121,8 @@ pub struct NetworkService { peerset: PeersetHandle, /// Channel that sends messages to the actual worker. to_worker: TracingUnboundedSender>, + /// Interface that can be used to delegate calls to `ChainSync` + chain_sync_service: Box>, /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Updated by the [`NetworkWorker`]. peers_notifications_sinks: Arc>>, @@ -433,6 +435,7 @@ where local_peer_id, local_identity, to_worker, + chain_sync_service: params.chain_sync_service, peers_notifications_sinks: peers_notifications_sinks.clone(), notifications_sizes_metric: metrics .as_ref() @@ -814,7 +817,7 @@ where /// a stale fork missing. /// Passing empty `peers` set effectively removes the sync request. fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number)); + self.chain_sync_service.set_sync_fork_request(peers, hash, number); } } @@ -1219,7 +1222,6 @@ enum ServiceToWorkerMsg { RemoveSetReserved(ProtocolName, PeerId), AddToPeersSet(ProtocolName, PeerId), RemoveFromPeersSet(ProtocolName, PeerId), - SyncFork(Vec, B::Hash, NumberFor), EventStream(out_events::Sender), Request { target: PeerId, @@ -1380,11 +1382,6 @@ where .behaviour_mut() .user_protocol_mut() .remove_from_peers_set(protocol, peer_id), - ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), ServiceToWorkerMsg::Request { target, diff --git a/client/network/src/service/chainsync_tests.rs b/client/network/src/service/chainsync_tests.rs index ca44c65d267f4..27c0588a67200 100644 --- a/client/network/src/service/chainsync_tests.rs +++ b/client/network/src/service/chainsync_tests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config, NetworkWorker}; +use crate::{config, ChainSyncInterface, NetworkWorker}; use futures::prelude::*; use libp2p::PeerId; @@ -35,7 +35,7 @@ use sc_network_common::{ use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, mock::MockChainSync, - state_request_handler::StateRequestHandler, + service::mock::MockChainSyncInterface, state_request_handler::StateRequestHandler, }; use sp_core::H256; use sp_runtime::{ @@ -56,6 +56,7 @@ const PROTOCOL_NAME: &str = "/foo"; fn make_network( chain_sync: Box>, + chain_sync_service: Box>, client: Arc, ) -> (TestNetworkWorker, Arc) { let network_config = config::NetworkConfiguration { @@ -174,6 +175,7 @@ fn make_network( fork_id, import_queue, chain_sync, + chain_sync_service, metrics_registry: None, block_request_protocol_config, state_request_protocol_config, @@ -193,7 +195,7 @@ fn set_default_expecations_no_peers( chain_sync.expect_state_request().returning(|| None); chain_sync.expect_justification_requests().returning(|| Box::new(iter::empty())); chain_sync.expect_warp_sync_request().returning(|| None); - chain_sync.expect_poll_block_announce_validation().returning(|_| Poll::Pending); + chain_sync.expect_poll().returning(|_| Poll::Pending); chain_sync.expect_status().returning(|| SyncStatus { state: SyncState::Idle, best_seen_block: None, @@ -207,11 +209,18 @@ fn set_default_expecations_no_peers( #[async_std::test] async fn normal_network_poll_no_peers() { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + + // build `ChainSync` and set default expectations for it let mut chain_sync = Box::new(MockChainSync::::new()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, client); + // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be + // called) + let chain_sync_service = + Box::new(MockChainSyncInterface::::new()); + + let (mut network, _) = make_network(chain_sync, chain_sync_service, client); // poll the network once futures::future::poll_fn(|cx| { @@ -224,6 +233,13 @@ async fn normal_network_poll_no_peers() { #[async_std::test] async fn request_justification() { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + + // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be + // called) + let chain_sync_service = + Box::new(MockChainSyncInterface::::new()); + + // build `ChainSync` and verify that call to `request_justification()` is made let mut chain_sync = Box::new(MockChainSync::::new()); @@ -237,7 +253,7 @@ async fn request_justification() { .returning(|_, _| ()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, client); + let (mut network, _) = make_network(chain_sync, chain_sync_service, client); // send "request justifiction" message and poll the network network.service().request_justification(&hash, number); @@ -252,13 +268,20 @@ async fn request_justification() { #[async_std::test] async fn clear_justification_requests(&mut self) { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + + // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be + // called) + let chain_sync_service = + Box::new(MockChainSyncInterface::::new()); + + // build `ChainSync` and verify that call to `clear_justification_requests()` is made let mut chain_sync = Box::new(MockChainSync::::new()); chain_sync.expect_clear_justification_requests().once().returning(|| ()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, client); + let (mut network, _) = make_network(chain_sync, chain_sync_service, client); // send "request justifiction" message and poll the network network.service().clear_justification_requests(); @@ -273,15 +296,23 @@ async fn clear_justification_requests(&mut self) { #[async_std::test] async fn set_sync_fork_request() { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + + // build `ChainSync` and set default expectations for it let mut chain_sync = Box::new(MockChainSync::::new()); + set_default_expecations_no_peers(&mut chain_sync); + + // build `ChainSyncInterface` provider and verify that the `set_sync_fork_request()` + // call is delegated to `ChainSyncInterface` (which eventually forwards it to `ChainSync`) + let mut chain_sync_service = + MockChainSyncInterface::::new(); let hash = H256::random(); let number = 1337u64; let peers = (0..3).map(|_| PeerId::random()).collect::>(); let copy_peers = peers.clone(); - chain_sync + chain_sync_service .expect_set_sync_fork_request() .withf(move |in_peers, in_hash, in_number| { &peers == in_peers && &hash == in_hash && &number == in_number @@ -289,8 +320,7 @@ async fn set_sync_fork_request() { .once() .returning(|_, _, _| ()); - set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, client); + let (mut network, _) = make_network(chain_sync, Box::new(chain_sync_service), client); // send "set sync fork request" message and poll the network network.service().set_sync_fork_request(copy_peers, hash, number); @@ -305,6 +335,12 @@ async fn set_sync_fork_request() { #[async_std::test] async fn on_block_finalized() { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + // build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be + // called) + let chain_sync_service = + Box::new(MockChainSyncInterface::::new()); + + // build `ChainSync` and verify that call to `on_block_finalized()` is made let mut chain_sync = Box::new(MockChainSync::::new()); @@ -326,7 +362,7 @@ async fn on_block_finalized() { .returning(|_, _| ()); set_default_expecations_no_peers(&mut chain_sync); - let (mut network, _) = make_network(chain_sync, client); + let (mut network, _) = make_network(chain_sync, chain_sync_service, client); // send "set sync fork request" message and poll the network network.on_block_finalized(hash, header); diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index b656d7a7c0ddc..4a0ef4611da6e 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -124,7 +124,7 @@ fn build_test_full_node( protocol_config }; - let chain_sync = ChainSync::new( + let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { config::SyncMode::Full => sc_network_common::sync::SyncMode::Full, config::SyncMode::Fast { skip_proofs, storage_chain_mode } => @@ -172,6 +172,7 @@ fn build_test_full_node( fork_id, import_queue, chain_sync: Box::new(chain_sync), + chain_sync_service, metrics_registry: None, block_request_protocol_config, state_request_protocol_config, diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index 9d032f5cca96c..323b70e9c22a4 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -34,6 +34,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sp-arithmetic = { version = "5.0.0", path = "../../../primitives/arithmetic" } sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } @@ -42,6 +43,7 @@ sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/final sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } [dev-dependencies] +async-std = { version = "1.11.0", features = ["attributes"] } quickcheck = { version = "1.0.3", default-features = false } sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" } sp-test-primitives = { version = "2.0.0", path = "../../../primitives/test-primitives" } diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 84998c747b3cc..64d9d4d668eb3 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -32,14 +32,18 @@ pub mod block_request_handler; pub mod blocks; pub mod mock; mod schema; +pub mod service; pub mod state; pub mod state_request_handler; +#[cfg(test)] +mod tests; pub mod warp; pub mod warp_request_handler; use crate::{ blocks::BlockCollection, schema::v1::{StateRequest, StateResponse}, + service::chain_sync::{ChainSyncInterfaceHandle, ToServiceCommand}, state::StateSync, warp::{WarpProofImportResult, WarpSync}, }; @@ -67,6 +71,7 @@ use sc_network_common::{ PollBlockAnnounceValidation, SyncMode, SyncState, SyncStatus, }, }; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; use sp_consensus::{ @@ -264,6 +269,8 @@ pub struct ChainSync { import_existing: bool, /// Gap download process. gap_sync: Option>, + /// Channel for receiving service commands + service_rx: TracingUnboundedReceiver>, } /// All the data we have about a Peer that we are trying to sync with @@ -1725,6 +1732,21 @@ where Ok(OpaqueStateResponse(Box::new(response))) } + + fn poll( + &mut self, + cx: &mut std::task::Context, + ) -> Poll> { + while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { + match event { + ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { + self.set_sync_fork_request(peers, &hash, number); + }, + } + } + + self.poll_block_announce_validation(cx) + } } impl ChainSync @@ -1746,7 +1768,9 @@ where block_announce_validator: Box + Send>, max_parallel_downloads: u32, warp_sync_provider: Option>>, - ) -> Result { + ) -> Result<(Self, Box>), ClientError> { + let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); + let mut sync = Self { client, peers: HashMap::new(), @@ -1768,9 +1792,10 @@ where warp_sync_provider, import_existing: false, gap_sync: None, + service_rx, }; sync.reset_sync_start_point()?; - Ok(sync) + Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)))) } /// Returns the best seen block number if we don't have that block yet, `None` otherwise. @@ -2664,7 +2689,7 @@ mod test { let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); - let mut sync = + let (mut sync, _) = ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1, None) .unwrap(); @@ -2712,7 +2737,7 @@ mod test { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), @@ -2879,7 +2904,7 @@ mod test { let mut client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), @@ -2994,7 +3019,7 @@ mod test { let mut client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), @@ -3137,7 +3162,7 @@ mod test { let info = client.info(); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), @@ -3268,7 +3293,7 @@ mod test { let info = client.info(); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), @@ -3399,7 +3424,7 @@ mod test { let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), @@ -3432,7 +3457,7 @@ mod test { let empty_client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new( + let (mut sync, _) = ChainSync::new( SyncMode::Full, empty_client.clone(), Box::new(DefaultBlockAnnounceValidator), diff --git a/client/network/sync/src/mock.rs b/client/network/sync/src/mock.rs index 2a3b059f735b2..fbb54bd5e998d 100644 --- a/client/network/sync/src/mock.rs +++ b/client/network/sync/src/mock.rs @@ -114,5 +114,9 @@ mockall::mock! { ) -> Result>, String>; fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result, String>; fn decode_state_response(&self, response: &[u8]) -> Result; + fn poll<'a>( + &mut self, + cx: &mut std::task::Context<'a>, + ) -> Poll>; } } diff --git a/client/network/sync/src/service/chain_sync.rs b/client/network/sync/src/service/chain_sync.rs new file mode 100644 index 0000000000000..cf07c65ee3109 --- /dev/null +++ b/client/network/sync/src/service/chain_sync.rs @@ -0,0 +1,58 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use libp2p::PeerId; +use sc_network_common::service::NetworkSyncForkRequest; +use sc_utils::mpsc::TracingUnboundedSender; +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +/// Commands send to `ChainSync` +#[derive(Debug)] +pub enum ToServiceCommand { + SetSyncForkRequest(Vec, B::Hash, NumberFor), +} + +/// Handle for communicating with `ChainSync` asynchronously +pub struct ChainSyncInterfaceHandle { + tx: TracingUnboundedSender>, +} + +impl ChainSyncInterfaceHandle { + /// Create new handle + pub fn new(tx: TracingUnboundedSender>) -> Self { + Self { tx } + } +} + +impl NetworkSyncForkRequest> + for ChainSyncInterfaceHandle +{ + /// Configure an explicit fork sync request. + /// + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// `set_sync_fork_request` should only be used if external code detects that there's + /// a stale fork missing. + /// + /// Passing empty `peers` set effectively removes the sync request. + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + let _ = self + .tx + .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number)); + } +} diff --git a/client/network/sync/src/service/mock.rs b/client/network/sync/src/service/mock.rs new file mode 100644 index 0000000000000..e283907b392d1 --- /dev/null +++ b/client/network/sync/src/service/mock.rs @@ -0,0 +1,31 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use libp2p::PeerId; +use sc_network_common::service::NetworkSyncForkRequest; +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +mockall::mock! { + pub ChainSyncInterface {} + + impl NetworkSyncForkRequest> + for ChainSyncInterface + { + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); + } +} diff --git a/client/network/sync/src/service/mod.rs b/client/network/sync/src/service/mod.rs new file mode 100644 index 0000000000000..d64d9bbd1b01f --- /dev/null +++ b/client/network/sync/src/service/mod.rs @@ -0,0 +1,22 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! `ChainSync`-related service code + +pub mod chain_sync; +pub mod mock; diff --git a/client/network/sync/src/tests.rs b/client/network/sync/src/tests.rs new file mode 100644 index 0000000000000..47483c4ac440d --- /dev/null +++ b/client/network/sync/src/tests.rs @@ -0,0 +1,59 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{ChainSync, ForkTarget}; + +use libp2p::PeerId; +use sc_network_common::{service::NetworkSyncForkRequest, sync::ChainSync as ChainSyncT}; +use sp_consensus::block_validation::DefaultBlockAnnounceValidator; +use sp_core::H256; +use std::{sync::Arc, task::Poll}; +use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; + +// verify that the fork target map is empty, then submit a new sync fork request, +// poll `ChainSync` and verify that a new sync fork request has been registered +#[async_std::test] +async fn delegate_to_chainsync() { + let (mut chain_sync, chain_sync_service) = ChainSync::new( + sc_network_common::sync::SyncMode::Full, + Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), + Box::new(DefaultBlockAnnounceValidator), + 1u32, + None, + ) + .unwrap(); + + let hash = H256::random(); + let in_number = 1337u64; + let peers = (0..3).map(|_| PeerId::random()).collect::>(); + + assert!(chain_sync.fork_targets.is_empty()); + chain_sync_service.set_sync_fork_request(peers, hash, in_number); + + futures::future::poll_fn(|cx| { + let _ = chain_sync.poll(cx); + Poll::Ready(()) + }) + .await; + + if let Some(ForkTarget { number, .. }) = chain_sync.fork_targets.get(&hash) { + assert_eq!(number, &in_number); + } else { + panic!("expected to contain `ForkTarget`"); + } +} diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index a7c58631dc0f7..d4bee77b54aff 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -864,7 +864,7 @@ where let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); - let chain_sync = ChainSync::new( + let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, SyncMode::Fast { skip_proofs, storage_chain_mode } => @@ -902,6 +902,7 @@ where fork_id, import_queue, chain_sync: Box::new(chain_sync), + chain_sync_service, metrics_registry: None, block_announce_config, block_request_protocol_config, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 987198d4b7f48..1a16268839054 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -844,7 +844,7 @@ where protocol_config }; - let chain_sync = ChainSync::new( + let (chain_sync, chain_sync_service) = ChainSync::new( match config.network.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, SyncMode::Fast { skip_proofs, storage_chain_mode } => @@ -889,6 +889,7 @@ where fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), import_queue: Box::new(import_queue), chain_sync: Box::new(chain_sync), + chain_sync_service, metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, block_request_protocol_config,