Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 58 additions & 24 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ use std::{
fmt,
ops::Range,
pin::Pin,
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use warp::{WarpProofRequest, WarpSync, WarpSyncProvider};

Expand Down Expand Up @@ -234,10 +237,14 @@ pub struct ChainSync<B: BlockT> {
import_existing: bool,
/// Gap download process.
gap_sync: Option<GapSync<B>>,
/// A lock to make sure only one download is in progress.
sync_lock: Lock,
/// A lock to make sure only one warp proof download is in progress.
warp_sync_lock: Lock,
}

/// All the data we have about a Peer that we are trying to sync with
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PeerSync<B: BlockT> {
/// Peer id of this peer.
pub peer_id: PeerId,
Expand Down Expand Up @@ -284,11 +291,44 @@ struct ForkTarget<B: BlockT> {
peers: HashSet<PeerId>,
}

/// An exclusive lock.
#[derive(Debug, Default)]
struct Lock(Arc<AtomicBool>);

impl Lock {
fn lock(&self) -> Option<LockGuard> {
if self.0.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
Some(LockGuard(self.0.clone()))
} else {
None
}
}
}

/// A guard for an actively locked lock.
///
/// Will unlock the lock on drop.
#[derive(Debug)]
pub struct LockGuard(Arc<AtomicBool>);

impl PartialEq for LockGuard {
fn eq(&self, rhs: &LockGuard) -> bool {
Arc::ptr_eq(&self.0, &rhs.0)
}
}

impl Eq for LockGuard {}
impl Drop for LockGuard {
fn drop(&mut self) {
self.0.store(false, Ordering::SeqCst);
}
}

/// The state of syncing between a Peer and ourselves.
///
/// Generally two categories, "busy" or `Available`. If busy, the enum
/// defines what we are busy with.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[derive(Eq, PartialEq, Debug)]
pub enum PeerSyncState<B: BlockT> {
/// Available for sync requests.
Available,
Expand All @@ -303,9 +343,9 @@ pub enum PeerSyncState<B: BlockT> {
/// Downloading justification for given block hash.
DownloadingJustification(B::Hash),
/// Downloading state.
DownloadingState,
DownloadingState(LockGuard),
/// Downloading warp proof.
DownloadingWarpProof,
DownloadingWarpProof(LockGuard),
/// Actively downloading block history after warp sync.
DownloadingGap(NumberFor<B>),
}
Expand Down Expand Up @@ -560,6 +600,8 @@ impl<B: BlockT> ChainSync<B> {
warp_sync_provider,
import_existing: false,
gap_sync: None,
sync_lock: Default::default(),
warp_sync_lock: Default::default(),
};
sync.reset_sync_start_point()?;
Ok(sync)
Expand Down Expand Up @@ -994,17 +1036,15 @@ impl<B: BlockT> ChainSync<B> {

/// Get a state request, if any.
pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) {
// Only one pending state request is allowed.
return None
}
// Only one pending state request is allowed.
let lock = self.sync_lock.lock()?;
if let Some(sync) = &self.state_sync {
if sync.is_complete() {
return None
}
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
peer.state = PeerSyncState::DownloadingState;
peer.state = PeerSyncState::DownloadingState(lock);
let request = sync.next_request();
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
return Some((*id, request))
Expand All @@ -1021,7 +1061,7 @@ impl<B: BlockT> ChainSync<B> {
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target {
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
peer.state = PeerSyncState::DownloadingState;
peer.state = PeerSyncState::DownloadingState(lock);
return Some((*id, request))
}
}
Expand All @@ -1032,14 +1072,8 @@ impl<B: BlockT> ChainSync<B> {

/// Get a warp sync request, if any.
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
if self
.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof)
{
// Only one pending state request is allowed.
return None
}
// Only one pending state request is allowed.
let lock = self.warp_sync_lock.lock()?;
if let Some(sync) = &self.warp_sync {
if sync.is_complete() {
return None
Expand All @@ -1053,7 +1087,7 @@ impl<B: BlockT> ChainSync<B> {
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= median {
trace!(target: "sync", "New WarpProofRequest for {}", id);
peer.state = PeerSyncState::DownloadingWarpProof;
peer.state = PeerSyncState::DownloadingWarpProof(lock);
return Some((*id, request))
}
}
Expand Down Expand Up @@ -1261,8 +1295,8 @@ impl<B: BlockT> ChainSync<B> {
},
PeerSyncState::Available |
PeerSyncState::DownloadingJustification(..) |
PeerSyncState::DownloadingState |
PeerSyncState::DownloadingWarpProof => Vec::new(),
PeerSyncState::DownloadingState(..) |
PeerSyncState::DownloadingWarpProof(..) => Vec::new(),
}
} else {
// When request.is_none() this is a block announcement. Just accept blocks.
Expand Down Expand Up @@ -1304,7 +1338,7 @@ impl<B: BlockT> ChainSync<B> {
response: StateResponse,
) -> Result<OnStateData<B>, BadPeer> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingState = peer.state {
if let PeerSyncState::DownloadingState(..) = peer.state {
peer.state = PeerSyncState::Available;
}
}
Expand Down Expand Up @@ -1366,7 +1400,7 @@ impl<B: BlockT> ChainSync<B> {
response: warp::EncodedProof,
) -> Result<(), BadPeer> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingWarpProof = peer.state {
if let PeerSyncState::DownloadingWarpProof(..) = peer.state {
peer.state = PeerSyncState::Available;
}
}
Expand Down
49 changes: 46 additions & 3 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,38 @@ mod tests {

// Some Arbitrary instances to allow easy construction of random peer sets:

#[derive(Debug, Clone)]
#[derive(Debug)]
struct ArbitraryPeerSyncState(PeerSyncState<Block>);

fn clone_peer_sync_state(state: &PeerSyncState<Block>) -> PeerSyncState<Block> {
match state {
PeerSyncState::Available => PeerSyncState::Available,
PeerSyncState::DownloadingNew(ref block_number) =>
PeerSyncState::DownloadingNew(block_number.clone()),
PeerSyncState::DownloadingStale(ref hash) =>
PeerSyncState::DownloadingStale(hash.clone()),
PeerSyncState::DownloadingJustification(ref hash) =>
PeerSyncState::DownloadingJustification(hash.clone()),
state => unimplemented!("unsupported peer sync state: {:?}", state),
}
}

fn clone_peer_sync(peer_sync: &PeerSync<Block>) -> PeerSync<Block> {
PeerSync {
peer_id: peer_sync.peer_id.clone(),
common_number: peer_sync.common_number.clone(),
best_hash: peer_sync.best_hash.clone(),
best_number: peer_sync.best_number.clone(),
state: clone_peer_sync_state(&peer_sync.state),
}
}

impl Clone for ArbitraryPeerSyncState {
fn clone(&self) -> Self {
Self(clone_peer_sync_state(&self.0))
}
}

impl Arbitrary for ArbitraryPeerSyncState {
fn arbitrary(g: &mut Gen) -> Self {
let s = match u8::arbitrary(g) % 4 {
Expand All @@ -563,9 +592,15 @@ mod tests {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct ArbitraryPeerSync(PeerSync<Block>);

impl Clone for ArbitraryPeerSync {
fn clone(&self) -> Self {
ArbitraryPeerSync(clone_peer_sync(&self.0))
}
}

impl Arbitrary for ArbitraryPeerSync {
fn arbitrary(g: &mut Gen) -> Self {
let ps = PeerSync {
Expand All @@ -579,9 +614,17 @@ mod tests {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct ArbitraryPeers(HashMap<PeerId, PeerSync<Block>>);

impl Clone for ArbitraryPeers {
fn clone(&self) -> Self {
ArbitraryPeers(
self.0.iter().map(|(id, sync)| (id.clone(), clone_peer_sync(sync))).collect(),
)
}
}

impl Arbitrary for ArbitraryPeers {
fn arbitrary(g: &mut Gen) -> Self {
let mut peers = HashMap::with_capacity(g.size());
Expand Down