Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7a76b40
Move import queue out of `sc-network`
altonen Nov 21, 2022
1d1b955
Move stuff to SyncingEngine
altonen Nov 22, 2022
bd8f6a2
Move `ChainSync` instanation to `SyncingEngine`
altonen Nov 22, 2022
9569593
Move peer hashmap to `SyncingEngine`
altonen Nov 22, 2022
befeac3
Let `SyncingEngine` to implement `ChainSyncInterface`
altonen Nov 23, 2022
c642a33
Introduce `SyncStatusProvider`
altonen Nov 23, 2022
badfbf3
Move `sync_peer_(connected|disconnected)` to `SyncingEngine`
altonen Nov 23, 2022
755b47c
Implement `SyncEventStream`
altonen Nov 25, 2022
0b11339
Introduce `ChainSyncInterface`
altonen Nov 25, 2022
6f4ac98
Move event stream polling to `SyncingEngine`
altonen Nov 26, 2022
e2ea277
Make `SyncingEngine` into an asynchronous runner
altonen Nov 26, 2022
caf54b4
Fix warnings
altonen Nov 27, 2022
a920a7e
Code refactoring
altonen Nov 27, 2022
5dd14e3
Use `SyncingService` for BEEFY
altonen Nov 28, 2022
83509c9
Use `SyncingService` for GRANDPA
altonen Nov 28, 2022
bb005b6
Remove call delegation from `NetworkService`
altonen Nov 28, 2022
e5d6c49
Remove `ChainSyncService`
altonen Nov 29, 2022
a4f5403
Remove `ChainSync` service tests
altonen Nov 29, 2022
f485d89
Merge remote-tracking branch 'origin/master' into import-queue-refact…
altonen Dec 1, 2022
2acb775
Refactor code
altonen Nov 29, 2022
b01586a
Merge branch 'import-queue-refactoring' into extract-syncing-from-sc-…
altonen Dec 1, 2022
0085220
Refactor code
altonen Dec 2, 2022
da14feb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Dec 12, 2022
ee39f7b
Merge remote-tracking branch 'origin/master' into extract-syncing
altonen Dec 30, 2022
a55a44d
Update client/finality-grandpa/src/communication/tests.rs
altonen Dec 30, 2022
9882a76
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 7, 2023
49240a2
Fix warnings
altonen Feb 8, 2023
e8c017d
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 17, 2023
17b6872
Apply review comments
altonen Feb 20, 2023
721cc49
Fix docs
altonen Feb 20, 2023
6851852
Fix test
altonen Feb 20, 2023
8eaa4cb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 20, 2023
d84be72
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 21, 2023
4f7cf6e
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 22, 2023
e50fda8
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 28, 2023
d8461c4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 1, 2023
e105fe5
cargo-fmt
altonen Mar 2, 2023
2f0d7d2
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
1f8d399
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
d21fffc
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
198a695
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
7c4babc
Add missing docs
altonen Mar 5, 2023
d028177
Refactor code
altonen Mar 5, 2023
efa7716
Merge remote-tracking branch 'origin/extract-syncing-from-sc-network'…
altonen Mar 5, 2023
800b4ae
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 5, 2023
242ee09
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
4f295d4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove ChainSyncService
  • Loading branch information
altonen committed Nov 29, 2022
commit e5d6c497eb4ed1a483fd409c5509cf0ded93efe7
1 change: 0 additions & 1 deletion client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" }
sc-network-common = { version = "0.10.0-dev", path = "./common" }
sc-network-sync = { version = "0.10.0-dev", path = "./sync" }
sc-peerset = { version = "4.0.0-dev", path = "../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
sp-arithmetic = { version = "6.0.0", path = "../../primitives/arithmetic" }
Expand Down
29 changes: 0 additions & 29 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,32 +467,3 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
}

#[async_trait::async_trait]
pub trait ChainSyncService<Block: BlockT>: Send + Sync {
/// Returns the number of peers we're connected to and that are being queried.
async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled>;

/// Target sync block number.
async fn best_seen_block(&self) -> Result<Option<NumberFor<Block>>, oneshot::Canceled>;

/// Number of peers participating in syncing.
async fn num_sync_peers(&self) -> Result<u32, oneshot::Canceled>;

/// Number of blocks in the import queue.
async fn num_queued_blocks(&self) -> Result<u32, oneshot::Canceled>;

/// Number of downloaded blocks.
async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled>;

/// Number of active sync requests.
async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled>;

/// Returns information about all the peers we are connected to after the handshake message.
async fn peers_info(&self)
-> Result<Vec<(PeerId, ExtendedPeerInfo<Block>)>, oneshot::Canceled>;

/// Call this when a block has been finalized. The sync layer may have some additional
/// requesting to perform.
fn on_block_finalized(&self, hash: Block::Hash, header: Block::Header);
}
9 changes: 1 addition & 8 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ use std::{
use zeroize::Zeroize;

/// Network initialization parameters.
pub struct Params<B, Client>
where
B: BlockT + 'static,
{
pub struct Params<Client> {
/// Assigned role for our node (full, light, ...).
pub role: Role,

Expand All @@ -80,10 +77,6 @@ where
/// name on the wire.
pub fork_id: Option<String>,

// TODO(aaro): remove this
/// Interface that can be used to delegate syncing-related function calls to `ChainSync`
pub sync_service: Arc<sc_network_sync::SyncingService<B>>,

/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,

Expand Down
34 changes: 1 addition & 33 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ pub use sc_network_common::{
},
sync::{
warp::{WarpSyncPhase, WarpSyncProgress},
ChainSyncService, ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState,
SyncStatusProvider,
ExtendedPeerInfo, StateDownloadProgress, SyncEventStream, SyncState, SyncStatusProvider,
},
};
pub use service::{
Expand All @@ -296,34 +295,3 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2;

/// The maximum number of concurrent established connections that were incoming.
const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;

/// Abstraction over syncing-related services
pub trait ChainSyncInterface<B: BlockT>:
NetworkSyncForkRequest<B::Hash, NumberFor<B>>
+ JustificationSyncLink<B>
+ Link<B>
+ NetworkBlock<B::Hash, NumberFor<B>>
+ SyncStatusProvider<B>
+ SyncEventStream
+ ChainSyncService<B>
+ SyncOracle
+ Send
+ Sync
+ 'static
{
}

impl<T, B: BlockT> ChainSyncInterface<B> for T where
T: NetworkSyncForkRequest<B::Hash, NumberFor<B>>
+ JustificationSyncLink<B>
+ Link<B>
+ NetworkBlock<B::Hash, NumberFor<B>>
+ SyncStatusProvider<B>
+ SyncEventStream
+ ChainSyncService<B>
+ SyncOracle
+ Send
+ Sync
+ 'static
{
}
12 changes: 7 additions & 5 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
// TODO(aaro): remove this
/// Interface that can be used to delegate calls to `ChainSync`
sync_service: Arc<sc_network_sync::SyncingService<B>>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
Expand All @@ -129,6 +126,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
/// Marker for block type
_block: PhantomData<B>,
}

impl<B, H, Client> NetworkWorker<B, H, Client>
Expand All @@ -148,7 +147,7 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(mut params: Params<B, Client>) -> Result<Self, Error> {
pub fn new(mut params: Params<Client>) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
Expand Down Expand Up @@ -434,12 +433,12 @@ where
local_peer_id,
local_identity,
to_worker,
sync_service: params.sync_service,
peers_notifications_sinks: peers_notifications_sinks.clone(),
notifications_sizes_metric: metrics
.as_ref()
.map(|metrics| metrics.notifications_sizes.clone()),
_marker: PhantomData,
_block: Default::default(),
});

Ok(NetworkWorker {
Expand All @@ -453,6 +452,7 @@ where
metrics,
boot_node_ids,
_marker: Default::default(),
_block: Default::default(),
})
}

Expand Down Expand Up @@ -1152,6 +1152,8 @@ where
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
/// Marker for block type
_block: PhantomData<B>,
}

impl<B, H, Client> Future for NetworkWorker<B, H, Client>
Expand Down
8 changes: 4 additions & 4 deletions client/network/src/service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{config, ChainSyncInterface, NetworkService, NetworkWorker};
use crate::{config, NetworkService, NetworkWorker};

use futures::prelude::*;
use libp2p::Multiaddr;
Expand All @@ -33,6 +33,7 @@ use sc_network_sync::{
engine::SyncingEngine,
service::network::{NetworkServiceHandle, NetworkServiceProvider},
state_request_handler::StateRequestHandler,
SyncingService,
};
use sp_runtime::traits::{Block as BlockT, Header as _};
use std::{collections::HashSet, sync::Arc};
Expand Down Expand Up @@ -91,7 +92,7 @@ struct TestNetworkBuilder {
client: Option<Arc<substrate_test_runtime_client::TestClient>>,
listen_addresses: Vec<Multiaddr>,
set_config: Option<SetConfig>,
chain_sync: Option<(Box<dyn ChainSyncT<TestBlock>>, Box<dyn ChainSyncInterface<TestBlock>>)>,
chain_sync: Option<(Box<dyn ChainSyncT<TestBlock>>, Box<SyncingService<TestBlock>>)>,
chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>,
config: Option<config::NetworkConfiguration>,
}
Expand Down Expand Up @@ -132,7 +133,7 @@ impl TestNetworkBuilder {

pub fn with_chain_sync(
mut self,
chain_sync: (Box<dyn ChainSyncT<TestBlock>>, Box<dyn ChainSyncInterface<TestBlock>>),
chain_sync: (Box<dyn ChainSyncT<TestBlock>>, Box<SyncingService<TestBlock>>),
) -> Self {
self.chain_sync = Some(chain_sync);
self
Expand Down Expand Up @@ -287,7 +288,6 @@ impl TestNetworkBuilder {
chain: client.clone(),
protocol_id,
fork_id,
sync_service: Arc::new(chain_sync_service),
metrics_registry: None,
request_response_protocol_configs: [
block_request_protocol_config,
Expand Down
116 changes: 56 additions & 60 deletions client/network/sync/src/service/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ use libp2p::PeerId;
use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network_common::{
service::{NetworkBlock, NetworkSyncForkRequest},
sync::{
ChainSyncService, ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus,
SyncStatusProvider,
},
sync::{ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_runtime::traits::{Block as BlockT, NumberFor};
Expand Down Expand Up @@ -83,6 +80,61 @@ impl<B: BlockT> SyncingService<B> {
) -> Self {
Self { tx, num_connected, is_major_syncing }
}

pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));

rx.await
}

pub async fn best_seen_block(&self) -> Result<Option<NumberFor<B>>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::BestSeenBlock(tx));

rx.await
}

pub async fn num_sync_peers(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncPeers(tx));

rx.await
}

pub async fn num_queued_blocks(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumQueuedBlocks(tx));

rx.await
}

pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));

rx.await
}

pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));

rx.await
}

pub async fn peers_info(
&self,
) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));

rx.await
}

pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
}
}

impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
Expand Down Expand Up @@ -174,62 +226,6 @@ impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
}
}

#[async_trait::async_trait]
impl<B: BlockT> ChainSyncService<B> for SyncingService<B> {
async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));

rx.await
}

async fn best_seen_block(&self) -> Result<Option<NumberFor<B>>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::BestSeenBlock(tx));

rx.await
}

async fn num_sync_peers(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncPeers(tx));

rx.await
}

async fn num_queued_blocks(&self) -> Result<u32, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumQueuedBlocks(tx));

rx.await
}

async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));

rx.await
}

async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));

rx.await
}

async fn peers_info(&self) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));

rx.await
}

fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
}
}

impl<B: BlockT> sp_consensus::SyncOracle for SyncingService<B> {
fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
Expand Down
4 changes: 1 addition & 3 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use sc_consensus::{
};
use sc_network::{
config::{NetworkConfiguration, RequestResponseConfig, Role, SyncMode},
ChainSyncService, Multiaddr, NetworkService, NetworkWorker,
Multiaddr, NetworkService, NetworkWorker,
};
use sc_network_common::{
config::{
Expand Down Expand Up @@ -956,7 +956,6 @@ where
chain: client.clone(),
protocol_id,
fork_id,
sync_service: sync_service.clone(),
metrics_registry: None,
block_announce_config,
request_response_protocol_configs: [
Expand Down Expand Up @@ -1135,7 +1134,6 @@ where
while let Poll::Ready(Some(notification)) =
peer.finality_notification_stream.as_mut().poll_next(cx)
{
use sc_network::ChainSyncService;
peer.sync_service.on_block_finalized(notification.hash, notification.header);
}
}
Expand Down
3 changes: 1 addition & 2 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sc_client_db::{Backend, DatabaseSettings};
use sc_consensus::import_queue::ImportQueue;
use sc_executor::RuntimeVersionOf;
use sc_keystore::LocalKeystore;
use sc_network::{config::SyncMode, ChainSyncInterface, NetworkService};
use sc_network::{config::SyncMode, NetworkService};
use sc_network_bitswap::BitswapRequestHandler;
use sc_network_common::{
protocol::role::Roles,
Expand Down Expand Up @@ -942,7 +942,6 @@ where
chain: client.clone(),
protocol_id: protocol_id.clone(),
fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
sync_service: sync_service.clone(),
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
request_response_protocol_configs: request_response_protocol_configs
Expand Down
Loading