Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7a76b40
Move import queue out of `sc-network`
altonen Nov 21, 2022
1d1b955
Move stuff to SyncingEngine
altonen Nov 22, 2022
bd8f6a2
Move `ChainSync` instanation to `SyncingEngine`
altonen Nov 22, 2022
9569593
Move peer hashmap to `SyncingEngine`
altonen Nov 22, 2022
befeac3
Let `SyncingEngine` to implement `ChainSyncInterface`
altonen Nov 23, 2022
c642a33
Introduce `SyncStatusProvider`
altonen Nov 23, 2022
badfbf3
Move `sync_peer_(connected|disconnected)` to `SyncingEngine`
altonen Nov 23, 2022
755b47c
Implement `SyncEventStream`
altonen Nov 25, 2022
0b11339
Introduce `ChainSyncInterface`
altonen Nov 25, 2022
6f4ac98
Move event stream polling to `SyncingEngine`
altonen Nov 26, 2022
e2ea277
Make `SyncingEngine` into an asynchronous runner
altonen Nov 26, 2022
caf54b4
Fix warnings
altonen Nov 27, 2022
a920a7e
Code refactoring
altonen Nov 27, 2022
5dd14e3
Use `SyncingService` for BEEFY
altonen Nov 28, 2022
83509c9
Use `SyncingService` for GRANDPA
altonen Nov 28, 2022
bb005b6
Remove call delegation from `NetworkService`
altonen Nov 28, 2022
e5d6c49
Remove `ChainSyncService`
altonen Nov 29, 2022
a4f5403
Remove `ChainSync` service tests
altonen Nov 29, 2022
f485d89
Merge remote-tracking branch 'origin/master' into import-queue-refact…
altonen Dec 1, 2022
2acb775
Refactor code
altonen Nov 29, 2022
b01586a
Merge branch 'import-queue-refactoring' into extract-syncing-from-sc-…
altonen Dec 1, 2022
0085220
Refactor code
altonen Dec 2, 2022
da14feb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Dec 12, 2022
ee39f7b
Merge remote-tracking branch 'origin/master' into extract-syncing
altonen Dec 30, 2022
a55a44d
Update client/finality-grandpa/src/communication/tests.rs
altonen Dec 30, 2022
9882a76
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 7, 2023
49240a2
Fix warnings
altonen Feb 8, 2023
e8c017d
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 17, 2023
17b6872
Apply review comments
altonen Feb 20, 2023
721cc49
Fix docs
altonen Feb 20, 2023
6851852
Fix test
altonen Feb 20, 2023
8eaa4cb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 20, 2023
d84be72
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 21, 2023
4f7cf6e
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 22, 2023
e50fda8
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 28, 2023
d8461c4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 1, 2023
e105fe5
cargo-fmt
altonen Mar 2, 2023
2f0d7d2
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
1f8d399
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
d21fffc
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
198a695
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
7c4babc
Add missing docs
altonen Mar 5, 2023
d028177
Refactor code
altonen Mar 5, 2023
efa7716
Merge remote-tracking branch 'origin/extract-syncing-from-sc-network'…
altonen Mar 5, 2023
800b4ae
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 5, 2023
242ee09
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
4f295d4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Let SyncingEngine to implement ChainSyncInterface
  • Loading branch information
altonen committed Nov 23, 2022
commit befeac386e546a33bfccf79c28cf2088ea7b173d
4 changes: 4 additions & 0 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl NetworkNotification for TestNetwork {
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
unimplemented!();
}

fn set_notification_handshake(&self, _protocol: ProtocolName, _handshake: Vec<u8>) {
unimplemented!();
}
}

impl NetworkBlock<Hash, NumberFor<Block>> for TestNetwork {
Expand Down
4 changes: 4 additions & 0 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ mod tests {
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
unimplemented!();
}

fn set_notification_handshake(&self, _protocol: ProtocolName, handshake: Vec<u8>) {
unimplemented!();
}
}

impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestNetwork {
Expand Down
4 changes: 4 additions & 0 deletions client/network-gossip/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,10 @@ mod tests {
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
unimplemented!();
}

fn set_notification_handshake(&self, _protocol: ProtocolName, handshake: Vec<u8>) {
unimplemented!();
}
}

impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
Expand Down
7 changes: 7 additions & 0 deletions client/network/common/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ pub trait NetworkNotification {
target: PeerId,
protocol: ProtocolName,
) -> Result<Box<dyn NotificationSender>, NotificationSenderError>;

/// Set handshake for the notification protocol.
fn set_notification_handshake(&self, protocol: ProtocolName, handshake: Vec<u8>);
}

impl<T> NetworkNotification for Arc<T>
Expand All @@ -522,6 +525,10 @@ where
) -> Result<Box<dyn NotificationSender>, NotificationSenderError> {
T::notification_sender(self, target, protocol)
}

fn set_notification_handshake(&self, protocol: ProtocolName, handshake: Vec<u8>) {
T::set_notification_handshake(self, protocol, handshake)
}
}

/// Provides ability to send network requests.
Expand Down
8 changes: 7 additions & 1 deletion client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,20 @@ const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;

/// Abstraction over syncing-related services
pub trait ChainSyncInterface<B: BlockT>:
NetworkSyncForkRequest<B::Hash, NumberFor<B>> + JustificationSyncLink<B> + Link<B> + Send + Sync
NetworkSyncForkRequest<B::Hash, NumberFor<B>>
+ JustificationSyncLink<B>
+ Link<B>
+ NetworkBlock<B::Hash, NumberFor<B>>
+ Send
+ Sync
{
}

impl<T, B: BlockT> ChainSyncInterface<B> for T where
T: NetworkSyncForkRequest<B::Hash, NumberFor<B>>
+ JustificationSyncLink<B>
+ Link<B>
+ NetworkBlock<B::Hash, NumberFor<B>>
+ Send
+ Sync
{
Expand Down
99 changes: 39 additions & 60 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,15 @@ use sc_network_common::{
error,
protocol::{role::Roles, ProtocolName},
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
message::{BlockAnnounce, BlockAnnouncesHandshake},
BadPeer, ExtendedPeerInfo, SyncStatus,
},
utils::{interval, LruHashSet},
};
use sc_network_sync::engine::{Peer, SyncingEngine};
use sp_arithmetic::traits::SaturatedConversion;
use sp_blockchain::HeaderMetadata;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, Zero},
};
use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, Zero};
use std::{
collections::{HashSet, VecDeque},
io, iter,
Expand Down Expand Up @@ -322,6 +319,7 @@ where
self.behaviour.peerset_debug_info()
}

// TODO(aaro): implement using behaviour?
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.engine.peers.len()
Expand Down Expand Up @@ -362,19 +360,7 @@ where
self.engine.chain_sync.num_sync_requests()
}

/// Inform sync about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
debug!(target: "sync", "New best block imported {:?}/#{}", hash, number);

self.engine.chain_sync.update_chain_info(&hash, number);

self.behaviour.set_notif_protocol_handshake(
HARDCODED_PEERSETS_SYNC,
BlockAnnouncesHandshake::<B>::build(self.roles, number, hash, self.genesis_hash)
.encode(),
);
}

// TODO(aaro): implement using ChainSyncInterface
/// Returns information about all the peers we are connected to after the handshake message.
pub fn peers_info(&self) -> impl Iterator<Item = (&PeerId, &ExtendedPeerInfo<B>)> {
self.engine.peers.iter().map(|(id, peer)| (id, &peer.info))
Expand Down Expand Up @@ -532,48 +518,17 @@ where
Ok(())
}

/// Make sure an important block is propagated to peers.
///
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced.
pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
let header = match self.chain.header(BlockId::Hash(hash)) {
Ok(Some(header)) => header,
Ok(None) => {
warn!("Trying to announce unknown block: {}", hash);
return
},
Err(e) => {
warn!("Error reading block header {}: {}", hash, e);
return
},
};

// don't announce genesis block since it will be ignored
if header.number().is_zero() {
return
}

let is_best = self.chain.info().best_hash == hash;
debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best);

let data = data
.or_else(|| self.engine.block_announce_data_cache.get(&hash).cloned())
.unwrap_or_default();

for (who, ref mut peer) in self.engine.peers.iter_mut() {
let inserted = peer.known_blocks.insert(hash);
if inserted {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
let message = BlockAnnounce {
header: header.clone(),
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
data: Some(data.clone()),
};

self.behaviour
.write_notification(who, HARDCODED_PEERSETS_SYNC, message.encode());
}
/// Set handshake for the notification protocol.
pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec<u8>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.behaviour
.set_notif_protocol_handshake(sc_peerset::SetId::from(index), handshake);
} else {
error!(
target: "sub-libp2p",
"set_notification_handshake with unknown protocol: {}",
protocol
);
}
}

Expand Down Expand Up @@ -846,7 +801,18 @@ where
genesis_hash: handshake.genesis_hash,
};

let roles = handshake.roles;
if self.on_sync_peer_connected(peer_id, handshake).is_ok() {
self.pending_messages.push_back(
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)]
.clone(),
negotiated_fallback,
roles,
notifications_sink,
},
);
CustomMessageOutcome::SyncConnected(peer_id)
} else {
CustomMessageOutcome::None
Expand All @@ -866,8 +832,21 @@ where
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(
&mut &received_handshake[..],
) {
// TODO: korjaa tämä toimimaan
Ok(handshake) => {
let roles = handshake.roles;
if self.on_sync_peer_connected(peer_id, handshake).is_ok() {
self.pending_messages.push_back(
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols
[usize::from(set_id)]
.clone(),
negotiated_fallback,
roles,
notifications_sink,
},
);
CustomMessageOutcome::SyncConnected(peer_id)
} else {
CustomMessageOutcome::None
Expand Down
42 changes: 0 additions & 42 deletions client/network/src/protocol/notifications/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,48 +554,6 @@ impl Notifications {
self.peerset.reserved_peers(set_id)
}

/// Sends a notification to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even if we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
///
/// The `fallback` parameter is used for backwards-compatibility reason if the remote doesn't
/// support our protocol. One needs to pass the equivalent of what would have been passed
/// with `send_packet`.
pub fn write_notification(
&mut self,
target: &PeerId,
set_id: sc_peerset::SetId,
message: impl Into<Vec<u8>>,
) {
let notifs_sink = match self.peers.get(&(*target, set_id)).and_then(|p| p.get_open()) {
None => {
trace!(
target: "sub-libp2p",
"Tried to sent notification to {:?} without an open channel.",
target,
);
return
},
Some(sink) => sink,
};

let message = message.into();

trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
target,
set_id,
message.len(),
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);

notifs_sink.send_sync_notification(message);
}

/// Returns the state of the peerset manager, for debugging purposes.
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
self.peerset.debug_info()
Expand Down
32 changes: 11 additions & 21 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,6 @@ where
.on_block_finalized(hash, &header);
}

/// Inform the network service about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
self.network_service
.behaviour_mut()
.user_protocol_mut()
.new_best_block_imported(hash, number);
}

/// Returns the local `PeerId`.
pub fn local_peer_id(&self) -> &PeerId {
Swarm::<Behaviour<B, Client>>::local_peer_id(&self.network_service)
Expand Down Expand Up @@ -1077,6 +1069,12 @@ where

Ok(Box::new(NotificationSender { sink, protocol_name: protocol, notification_size_metric }))
}

fn set_notification_handshake(&self, protocol: ProtocolName, handshake: Vec<u8>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::SetNotificationHandshake(protocol, handshake));
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1129,13 +1127,11 @@ where
H: ExHashT,
{
fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data));
let _ = self.chain_sync_service.announce_block(hash, data);
}

fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::NewBestBlockImported(hash, number));
let _ = self.chain_sync_service.new_best_block_imported(hash, number);
}
}

Expand Down Expand Up @@ -1210,7 +1206,6 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServiceToWorkerMsg<B: BlockT> {
AnnounceBlock(B::Hash, Option<Vec<u8>>),
GetValue(KademliaKey),
PutValue(KademliaKey, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
Expand Down Expand Up @@ -1238,7 +1233,7 @@ enum ServiceToWorkerMsg<B: BlockT> {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
DisconnectPeer(PeerId, ProtocolName),
NewBestBlockImported(B::Hash, NumberFor<B>),
SetNotificationHandshake(ProtocolName, Vec<u8>),
}

/// Main network worker. Must be polled in order for the network to advance.
Expand Down Expand Up @@ -1323,11 +1318,6 @@ where
Poll::Pending => break,
};
match msg {
ServiceToWorkerMsg::AnnounceBlock(hash, data) => this
.network_service
.behaviour_mut()
.user_protocol_mut()
.announce_block(hash, data),
ServiceToWorkerMsg::GetValue(key) =>
this.network_service.behaviour_mut().get_value(key),
ServiceToWorkerMsg::PutValue(key, value) =>
Expand Down Expand Up @@ -1406,11 +1396,11 @@ where
.behaviour_mut()
.user_protocol_mut()
.disconnect_peer(&who, protocol_name),
ServiceToWorkerMsg::NewBestBlockImported(hash, number) => this
ServiceToWorkerMsg::SetNotificationHandshake(protocol, handshake) => this
.network_service
.behaviour_mut()
.user_protocol_mut()
.new_best_block_imported(hash, number),
.set_notification_handshake(protocol, handshake),
}
}

Expand Down
Loading