Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7a76b40
Move import queue out of `sc-network`
altonen Nov 21, 2022
1d1b955
Move stuff to SyncingEngine
altonen Nov 22, 2022
bd8f6a2
Move `ChainSync` instanation to `SyncingEngine`
altonen Nov 22, 2022
9569593
Move peer hashmap to `SyncingEngine`
altonen Nov 22, 2022
befeac3
Let `SyncingEngine` to implement `ChainSyncInterface`
altonen Nov 23, 2022
c642a33
Introduce `SyncStatusProvider`
altonen Nov 23, 2022
badfbf3
Move `sync_peer_(connected|disconnected)` to `SyncingEngine`
altonen Nov 23, 2022
755b47c
Implement `SyncEventStream`
altonen Nov 25, 2022
0b11339
Introduce `ChainSyncInterface`
altonen Nov 25, 2022
6f4ac98
Move event stream polling to `SyncingEngine`
altonen Nov 26, 2022
e2ea277
Make `SyncingEngine` into an asynchronous runner
altonen Nov 26, 2022
caf54b4
Fix warnings
altonen Nov 27, 2022
a920a7e
Code refactoring
altonen Nov 27, 2022
5dd14e3
Use `SyncingService` for BEEFY
altonen Nov 28, 2022
83509c9
Use `SyncingService` for GRANDPA
altonen Nov 28, 2022
bb005b6
Remove call delegation from `NetworkService`
altonen Nov 28, 2022
e5d6c49
Remove `ChainSyncService`
altonen Nov 29, 2022
a4f5403
Remove `ChainSync` service tests
altonen Nov 29, 2022
f485d89
Merge remote-tracking branch 'origin/master' into import-queue-refact…
altonen Dec 1, 2022
2acb775
Refactor code
altonen Nov 29, 2022
b01586a
Merge branch 'import-queue-refactoring' into extract-syncing-from-sc-…
altonen Dec 1, 2022
0085220
Refactor code
altonen Dec 2, 2022
da14feb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Dec 12, 2022
ee39f7b
Merge remote-tracking branch 'origin/master' into extract-syncing
altonen Dec 30, 2022
a55a44d
Update client/finality-grandpa/src/communication/tests.rs
altonen Dec 30, 2022
9882a76
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 7, 2023
49240a2
Fix warnings
altonen Feb 8, 2023
e8c017d
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 17, 2023
17b6872
Apply review comments
altonen Feb 20, 2023
721cc49
Fix docs
altonen Feb 20, 2023
6851852
Fix test
altonen Feb 20, 2023
8eaa4cb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 20, 2023
d84be72
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 21, 2023
4f7cf6e
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 22, 2023
e50fda8
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 28, 2023
d8461c4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 1, 2023
e105fe5
cargo-fmt
altonen Mar 2, 2023
2f0d7d2
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
1f8d399
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
d21fffc
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
198a695
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
7c4babc
Add missing docs
altonen Mar 5, 2023
d028177
Refactor code
altonen Mar 5, 2023
efa7716
Merge remote-tracking branch 'origin/extract-syncing-from-sc-network'…
altonen Mar 5, 2023
800b4ae
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 5, 2023
242ee09
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
4f295d4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move sync_peer_(connected|disconnected) to SyncingEngine
  • Loading branch information
altonen committed Nov 23, 2022
commit badfbf339e12c9581d842d8efacce04f2b2397fe
207 changes: 8 additions & 199 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,6 @@ pub struct Protocol<B: BlockT, Client> {
roles: Roles,
genesis_hash: B::Hash,
chain: Arc<Client>,
/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
/// List of nodes that should never occupy peer slots.
default_peers_set_no_slot_peers: HashSet<PeerId>,
/// Actual list of connected no-slot nodes.
default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
/// Value that was passed as part of the configuration. Used to cap the number of full nodes.
default_peers_set_num_full: usize,
/// Number of slots to allocate to light nodes.
default_peers_set_num_light: usize,
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
/// Handles opening the unique substream and sending and receiving raw messages.
Expand All @@ -130,8 +119,6 @@ pub struct Protocol<B: BlockT, Client> {
/// solve this, an entry is added to this map whenever an invalid handshake is received.
/// Entries are removed when the corresponding "substream closed" is later received.
bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>,
/// The `PeerId`'s of all boot nodes.
boot_node_ids: HashSet<PeerId>,
// TODO: remove eventually
engine: SyncingEngine<B, Client>,
}
Expand All @@ -157,31 +144,6 @@ where
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let info = chain.info();

let boot_node_ids = {
let mut list = HashSet::new();
for node in &network_config.boot_nodes {
list.insert(node.peer_id);
}
list.shrink_to_fit();
list
};

let important_peers = {
let mut imp_p = HashSet::new();
for reserved in &network_config.default_peers_set.reserved_nodes {
imp_p.insert(reserved.peer_id);
}
for reserved in network_config
.extra_sets
.iter()
.flat_map(|s| s.set_config.reserved_nodes.iter())
{
imp_p.insert(reserved.peer_id);
}
imp_p.shrink_to_fit();
imp_p
};

let default_peers_set_no_slot_peers = {
let mut no_slot_p: HashSet<PeerId> = network_config
.default_peers_set
Expand Down Expand Up @@ -272,23 +234,13 @@ where
roles,
chain,
genesis_hash: info.genesis_hash,
important_peers,
default_peers_set_no_slot_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
default_peers_set_num_full: network_config.default_peers_set_num_full as usize,
default_peers_set_num_light: {
let total = network_config.default_peers_set.out_peers +
network_config.default_peers_set.in_peers;
total.saturating_sub(network_config.default_peers_set_num_full) as usize
},
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
.chain(network_config.extra_sets.iter().map(|s| s.notifications_protocol.clone()))
.collect(),
bad_handshake_substreams: Default::default(),
engine,
boot_node_ids,
};

Ok((protocol, peerset_handle, known_addresses))
Expand Down Expand Up @@ -366,158 +318,11 @@ where
self.engine.peers.iter().map(|(id, peer)| (id, &peer.info))
}

/// Called by peer when it is disconnecting.
///
/// Returns a result if the handshake of this peer was indeed accepted.
pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> {
if self.important_peers.contains(&peer) {
warn!(target: "sync", "Reserved peer {} disconnected", peer);
} else {
debug!(target: "sync", "{} disconnected", peer);
}

if let Some(_peer_data) = self.engine.peers.remove(&peer) {
self.engine.chain_sync.peer_disconnected(&peer);
self.default_peers_set_no_slot_connected_peers.remove(&peer);
Ok(())
} else {
Err(())
}
}

/// Adjusts the reputation of a node.
pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) {
self.peerset_handle.report_peer(who, reputation)
}

/// Called on the first connection between two peers on the default set, after their exchange
/// of handshake.
///
/// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync
/// from.
fn on_sync_peer_connected(
&mut self,
who: PeerId,
status: BlockAnnouncesHandshake<B>,
) -> Result<(), ()> {
trace!(target: "sync", "New peer {} {:?}", who, status);

if self.engine.peers.contains_key(&who) {
error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", who);
debug_assert!(false);
return Err(())
}

if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Debug },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who, rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);

if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
}

return Err(())
}

if self.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who, rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
return Err(())
}

// we don't interested in peers that are far behind us
let self_best_block = self.chain.info().best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who, rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
return Err(())
}
}

let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&who);
let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };

if status.roles.is_full() &&
self.engine.chain_sync.num_peers() >=
self.default_peers_set_num_full +
self.default_peers_set_no_slot_connected_peers.len() +
this_peer_reserved_slot
{
debug!(target: "sync", "Too many full nodes, rejecting {}", who);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
return Err(())
}

if status.roles.is_light() &&
(self.engine.peers.len() - self.engine.chain_sync.num_peers()) >=
self.default_peers_set_num_light
{
// Make sure that not all slots are occupied by light clients.
debug!(target: "sync", "Too many light nodes, rejecting {}", who);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
return Err(())
}

let peer = Peer {
info: ExtendedPeerInfo {
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number,
},
known_blocks: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
),
};

let req = if peer.info.roles.is_full() {
match self.engine.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) {
Ok(req) => req,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
return Err(())
},
}
} else {
None
};

debug!(target: "sync", "Connected {}", who);

self.engine.peers.insert(who, peer);
if no_slot_peer {
self.default_peers_set_no_slot_connected_peers.insert(who);
}
self.pending_messages
.push_back(CustomMessageOutcome::PeerNewBest(who, status.best_number));

if let Some(req) = req {
self.engine.chain_sync.send_block_request(who, req);
}

Ok(())
}

/// Set handshake for the notification protocol.
pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec<u8>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
Expand Down Expand Up @@ -802,7 +607,7 @@ where
};

let roles = handshake.roles;
if self.on_sync_peer_connected(peer_id, handshake).is_ok() {
if self.engine.on_sync_peer_connected(peer_id, handshake).is_ok() {
self.pending_messages.push_back(
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
Expand Down Expand Up @@ -832,10 +637,13 @@ where
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(
&mut &received_handshake[..],
) {
// TODO: korjaa tämä toimimaan
Ok(handshake) => {
let roles = handshake.roles;
if self.on_sync_peer_connected(peer_id, handshake).is_ok() {
if self
.engine
.on_sync_peer_connected(peer_id, handshake)
.is_ok()
{
self.pending_messages.push_back(
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
Expand Down Expand Up @@ -914,10 +722,11 @@ where
notifications_sink,
}
},
// TODO(aaro): listen on event stream in `SyncingEngine`
NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
// Set number 0 is hardcoded the default set of peers we sync from.
if set_id == HARDCODED_PEERSETS_SYNC {
if self.on_sync_peer_disconnected(peer_id).is_ok() {
if self.engine.on_sync_peer_disconnected(peer_id).is_ok() {
CustomMessageOutcome::SyncDisconnected(peer_id)
} else {
log::trace!(
Expand Down
7 changes: 6 additions & 1 deletion client/network/src/service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sc_network_sync::{
ChainSync,
};
use sp_runtime::traits::{Block as BlockT, Header as _, Zero};
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use substrate_test_runtime_client::{
runtime::{Block as TestBlock, Hash as TestHash},
TestClient, TestClientBuilder, TestClientBuilderExt as _,
Expand Down Expand Up @@ -271,6 +271,11 @@ impl TestNetworkBuilder {
state_request_protocol_config.name.clone(),
None,
std::num::NonZeroUsize::new(16).unwrap(),
HashSet::new(),
HashSet::new(),
HashSet::new(),
0usize,
0usize,
)
.unwrap();
let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone()));
Expand Down
Loading