Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ mod rep {
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}

enum PendingRequests {
enum AllowedRequests {
Some(HashSet<PeerId>),
All,
}

impl PendingRequests {
impl AllowedRequests {
fn add(&mut self, id: &PeerId) {
if let Self::Some(ref mut set) = self {
set.insert(*id);
Expand Down Expand Up @@ -174,9 +174,13 @@ impl PendingRequests {
Self::All => false,
}
}

fn clear(&mut self) {
*self = Self::Some(Default::default())
}
}

impl Default for PendingRequests {
impl Default for AllowedRequests {
fn default() -> Self {
Self::Some(HashSet::default())
}
Expand Down Expand Up @@ -211,7 +215,7 @@ pub struct ChainSync<B: BlockT> {
/// Fork sync targets.
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
/// A set of peers for which there might be potential block requests
pending_requests: PendingRequests,
allowed_requests: AllowedRequests,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
Expand Down Expand Up @@ -549,7 +553,7 @@ impl<B: BlockT> ChainSync<B> {
mode,
queue_blocks: Default::default(),
fork_targets: Default::default(),
pending_requests: Default::default(),
allowed_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
downloaded_blocks: 0,
Expand Down Expand Up @@ -730,7 +734,7 @@ impl<B: BlockT> ChainSync<B> {
)
};

self.pending_requests.add(&who);
self.allowed_requests.add(&who);
self.peers.insert(
who,
PeerSync {
Expand Down Expand Up @@ -774,7 +778,7 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
},
);
self.pending_requests.add(&who);
self.allowed_requests.add(&who);
Ok(None)
},
}
Expand Down Expand Up @@ -841,7 +845,7 @@ impl<B: BlockT> ChainSync<B> {
peer.best_number = number;
peer.best_hash = *hash;
}
self.pending_requests.add(peer_id);
self.allowed_requests.add(peer_id);
}
}

Expand Down Expand Up @@ -883,7 +887,7 @@ impl<B: BlockT> ChainSync<B> {

/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ {
if self.pending_requests.is_empty() ||
if self.allowed_requests.is_empty() ||
self.state_sync.is_some() ||
self.mode == SyncMode::Warp
{
Expand All @@ -903,11 +907,11 @@ impl<B: BlockT> ChainSync<B> {
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let pending_requests = self.pending_requests.take();
let allowed_requests = self.allowed_requests.take();
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let gap_sync = &mut self.gap_sync;
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() || !pending_requests.contains(id) {
if !peer.state.is_available() || !allowed_requests.contains(id) {
return None
}

Expand Down Expand Up @@ -994,19 +998,26 @@ impl<B: BlockT> ChainSync<B> {

/// Get a state request, if any.
pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) {
if self.allowed_requests.is_empty() {
return None
}
if (self.state_sync.is_some() || self.warp_sync.is_some()) &&
self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
{
// Only one pending state request is allowed.
return None
}
if let Some(sync) = &self.state_sync {
if sync.is_complete() {
return None
}

for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
self.allowed_requests.clear();
return Some((*id, request))
}
}
Expand All @@ -1022,6 +1033,7 @@ impl<B: BlockT> ChainSync<B> {
if peer.state.is_available() && peer.best_number >= target {
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
peer.state = PeerSyncState::DownloadingState;
self.allowed_requests.clear();
return Some((*id, request))
}
}
Expand All @@ -1032,16 +1044,16 @@ impl<B: BlockT> ChainSync<B> {

/// Get a warp sync request, if any.
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
if self
.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof)
{
// Only one pending state request is allowed.
return None
}
if let Some(sync) = &self.warp_sync {
if sync.is_complete() {
if self.allowed_requests.is_empty() || sync.is_complete() {
return None
}
if self
.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof)
{
// Only one pending state request is allowed.
return None
}
if let Some(request) = sync.next_warp_poof_request() {
Expand All @@ -1054,6 +1066,7 @@ impl<B: BlockT> ChainSync<B> {
if peer.state.is_available() && peer.best_number >= median {
trace!(target: "sync", "New WarpProofRequest for {}", id);
peer.state = PeerSyncState::DownloadingWarpProof;
self.allowed_requests.clear();
return Some((*id, request))
}
}
Expand Down Expand Up @@ -1087,7 +1100,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse()
}
self.pending_requests.add(who);
self.allowed_requests.add(who);
if let Some(request) = request {
match &mut peer.state {
PeerSyncState::DownloadingNew(_) => {
Expand Down Expand Up @@ -1306,6 +1319,7 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingState = peer.state {
peer.state = PeerSyncState::Available;
self.allowed_requests.set_all();
}
}
let import_result = if let Some(sync) = &mut self.state_sync {
Expand Down Expand Up @@ -1368,6 +1382,7 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingWarpProof = peer.state {
peer.state = PeerSyncState::Available;
self.allowed_requests.set_all();
}
}
let import_result = if let Some(sync) = &mut self.warp_sync {
Expand Down Expand Up @@ -1448,7 +1463,7 @@ impl<B: BlockT> ChainSync<B> {
return Ok(OnBlockJustification::Nothing)
};

self.pending_requests.add(&who);
self.allowed_requests.add(&who);
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;

Expand Down Expand Up @@ -1638,7 +1653,7 @@ impl<B: BlockT> ChainSync<B> {
};
}

self.pending_requests.set_all();
self.allowed_requests.set_all();
output.into_iter()
}

Expand All @@ -1648,7 +1663,7 @@ impl<B: BlockT> ChainSync<B> {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
self.extra_justifications
.try_finalize_root((hash, number), finalization_result, true);
self.pending_requests.set_all();
self.allowed_requests.set_all();
}

/// Notify about finalization of the given block.
Expand All @@ -1675,6 +1690,7 @@ impl<B: BlockT> ChainSync<B> {
);
self.state_sync =
Some(StateSync::new(self.client.clone(), header, *skip_proofs));
self.allowed_requests.set_all();
}
}
}
Expand Down Expand Up @@ -1725,7 +1741,7 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = new_common_number;
}
}
self.pending_requests.set_all();
self.allowed_requests.set_all();
}

/// Checks if there is a slot for a block announce validation.
Expand Down Expand Up @@ -1994,7 +2010,7 @@ impl<B: BlockT> ChainSync<B> {
peer.update_common_number(number - One::one());
}
}
self.pending_requests.add(&who);
self.allowed_requests.add(&who);

// known block case
if known || self.is_already_downloading(&hash) {
Expand Down Expand Up @@ -2060,7 +2076,7 @@ impl<B: BlockT> ChainSync<B> {
}
self.peers.remove(who);
self.extra_justifications.peer_disconnected(who);
self.pending_requests.set_all();
self.allowed_requests.set_all();
self.fork_targets.retain(|_, target| {
target.peers.remove(who);
!target.peers.is_empty()
Expand All @@ -2083,7 +2099,7 @@ impl<B: BlockT> ChainSync<B> {
if let Err(e) = self.reset_sync_start_point() {
warn!(target: "sync", "💔 Unable to restart sync: {}", e);
}
self.pending_requests.set_all();
self.allowed_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::take(&mut self.peers);

Expand Down