Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply review suggestions
  • Loading branch information
altonen committed Nov 21, 2022
commit 3850fd6d4edf656fa7e68d5fc960d6ba7fa2a188
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

227 changes: 120 additions & 107 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1992,28 +1992,30 @@ where

/// Generate block request for downloading of the target block body during warp sync.
fn warp_target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
if let Some(sync) = &self.warp_sync {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
self.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTargetBlock)
{
// Only one pending warp target block request is allowed.
return None
}
if let Some((target_number, request)) = sync.next_target_block_request() {
// Find a random peer that has a block with the target number.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target_number {
trace!(target: "sync", "New warp target block request for {}", id);
peer.state = PeerSyncState::DownloadingWarpTargetBlock;
self.allowed_requests.clear();
return Some((*id, request))
}
let sync = &self.warp_sync.as_ref()?;

if self.allowed_requests.is_empty() ||
sync.is_complete() ||
self.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTargetBlock)
{
// Only one pending warp target block request is allowed.
return None
}

if let Some((target_number, request)) = sync.next_target_block_request() {
// Find a random peer that has a block with the target number.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target_number {
trace!(target: "sync", "New warp target block request for {}", id);
peer.state = PeerSyncState::DownloadingWarpTargetBlock;
self.allowed_requests.clear();
return Some((*id, request))
}
}
}

None
}

Expand Down Expand Up @@ -2131,7 +2133,7 @@ where
}
}

pub fn on_block_response(
fn on_block_response(
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
Expand Down Expand Up @@ -2228,7 +2230,6 @@ where
fn process_outbound_requests(&mut self) {
for (id, request) in self
.block_requests()
.collect::<Vec<_>>()
{
self.send_block_request(id, request);
}
Expand Down Expand Up @@ -2415,20 +2416,20 @@ where
}))
}

fn block_requests<'a>(
&'a mut self,
) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<B>)> + 'a> {
fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
if self.mode == SyncMode::Warp {
return Box::new(std::iter::once(self.warp_target_block_request()).flatten())
return self
.warp_target_block_request()
.map_or_else(|| Vec::new(), |req| Vec::from([req]))
}

if self.allowed_requests.is_empty() || self.state_sync.is_some() {
return Box::new(std::iter::empty())
return Vec::new()
}

if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return Box::new(std::iter::empty())
return Vec::new()
}
let is_major_syncing = self.status().state.is_major_syncing();
let attrs = self.required_block_attributes();
Expand All @@ -2442,90 +2443,102 @@ where
let allowed_requests = self.allowed_requests.take();
let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
let gap_sync = &mut self.gap_sync;
let iter = self.peers.iter_mut().filter_map(move |(&id, peer)| {
if !peer.state.is_available() || !allowed_requests.contains(&id) {
return None
}
self.peers
.iter_mut()
.filter_map(move |(&id, peer)| {
if !peer.state.is_available() || !allowed_requests.contains(&id) {
return None
}

// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the
// common number, the peer best number is higher than our best queued and the common
// number is smaller than the last finalized block number, we should do an ancestor
// search to find a better common block. If the queue is full we wait till all blocks
// are imported though.
if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
best_queued < peer.best_number &&
peer.common_number < last_finalized &&
queue.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: "sync",
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
id,
peer.common_number,
best_queued,
);
let current = std::cmp::min(peer.best_number, best_queued);
peer.state = PeerSyncState::AncestorSearch {
current,
start: best_queued,
state: AncestorSearchState::ExponentialBackoff(One::one()),
};
Some((id, ancestry_request::<B>(current)))
} else if let Some((range, req)) = peer_block_request(
&id,
peer,
blocks,
attrs,
max_parallel,
last_finalized,
best_queued,
) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(
target: "sync",
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else if let Some((hash, req)) =
fork_sync_request(&id, fork_targets, best_queued, last_finalized, attrs, |hash| {
if queue.contains(hash) {
BlockStatus::Queued
} else {
client.block_status(&BlockId::Hash(*hash)).unwrap_or(BlockStatus::Unknown)
}
}) {
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
peer.state = PeerSyncState::DownloadingStale(hash);
Some((id, req))
} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
peer_gap_block_request(
// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from
// the common number, the peer best number is higher than our best queued and the
// common number is smaller than the last finalized block number, we should do an
// ancestor search to find a better common block. If the queue is full we wait till
// all blocks are imported though.
if best_queued.saturating_sub(peer.common_number) >
MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
best_queued < peer.best_number &&
peer.common_number < last_finalized &&
queue.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: "sync",
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
id,
peer.common_number,
best_queued,
);
let current = std::cmp::min(peer.best_number, best_queued);
peer.state = PeerSyncState::AncestorSearch {
current,
start: best_queued,
state: AncestorSearchState::ExponentialBackoff(One::one()),
};
Some((id, ancestry_request::<B>(current)))
} else if let Some((range, req)) = peer_block_request(
&id,
peer,
&mut sync.blocks,
blocks,
attrs,
sync.target,
sync.best_queued_number,
)
}) {
peer.state = PeerSyncState::DownloadingGap(range.start);
trace!(
target: "sync",
"New gap block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else {
None
}
});
Box::new(iter)
max_parallel,
last_finalized,
best_queued,
) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(
target: "sync",
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else if let Some((hash, req)) = fork_sync_request(
&id,
fork_targets,
best_queued,
last_finalized,
attrs,
|hash| {
if queue.contains(hash) {
BlockStatus::Queued
} else {
client
.block_status(&BlockId::Hash(*hash))
.unwrap_or(BlockStatus::Unknown)
}
},
) {
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
peer.state = PeerSyncState::DownloadingStale(hash);
Some((id, req))
} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
peer_gap_block_request(
&id,
peer,
&mut sync.blocks,
attrs,
sync.target,
sync.best_queued_number,
)
}) {
peer.state = PeerSyncState::DownloadingGap(range.start);
trace!(
target: "sync",
"New gap block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else {
None
}
})
.collect()
// Box::new(iter)
}

fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> {
Expand Down
1 change: 1 addition & 0 deletions client/network/sync/src/service/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl NetworkServiceHandle {
let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
}

/// Send request to peer
pub fn start_request(
&self,
who: PeerId,
Expand Down