Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Explicit sync API
  • Loading branch information
arkpar committed Sep 17, 2019
commit 42fe3088e9295994d47f4749bdc3861667bee9f8
11 changes: 11 additions & 0 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,17 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.sync.request_justification(&hash, number)
}

/// Request syncing for the given block from given set of peers.
/// Uses `protocol` to queue a new block download request and tries to dispatch all pending
/// requests.
pub fn sync_fork(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
let requests = self.sync.sync_fork(peers, hash, number);
for (who, request) in requests {
self.update_peer_info(&who);
self.send_message(who, GenericMessage::BlockRequest(request));
}
}

/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the importqueue, with or without
/// errors.
Expand Down
47 changes: 47 additions & 0 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,53 @@ impl<B: BlockT> ChainSync<B> {
})
}

/// Request syncing for the given block from given set of peers.
// This is similar to on_block_announce with unknown parent hash.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This is similar to on_block_announce with unknown parent hash.
/// This is similar to on_block_announce with unknown parent hash.

I feel like this could be a doc comment as well, right?

Copy link
Member Author

@arkpar arkpar Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is about the implementation detail, not the interface.

pub fn sync_fork(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) ->
Vec<(PeerId, BlockRequest<B>)>
{
debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers);
if self.is_known(&hash) || self.is_already_downloading(&hash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say we need something a bit more explicit. The fork-sync requests should be kept around forever until removed or the block is actually known, and the is_already_downloading check should be done periodically. We can imagine a case where is_already_downloading is true at the moment, but then for some reason the node stops downloading that block. Then our requested block still wouldn't be synced.

debug!(target: "sync", "Refusing to sync known hash {:?}", hash);
return Vec::default();
}

let block_status = self.client.block_status(&BlockId::Number(number - One::one()))
.unwrap_or(BlockStatus::Unknown);
if block_status == BlockStatus::InChainPruned {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up, there should be a special mode where we just download the headers for this chain if requested, but we don't have the state to process them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3639 for a more detailed writeup of ^

trace!(target: "sync", "Refusing to sync ancient block {:?}", hash);
return Vec::default();
}

self.is_idle = false;
let mut requests = Vec::new();
for peer_id in peers {
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
peer
} else {
debug!(target: "sync", "Called sync_fork with a bad peer ID");
continue;
};
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
continue;
}

if number > peer.best_number {
peer.best_number = number;
peer.best_hash = hash.clone();
Copy link
Contributor

@rphmeier rphmeier Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why set that to the peer's best block? i don't see this working when there is 1 peer with 2 forks we want to download, if that's a pre-requisite to being able to download those blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

best_number is used to track global sync target. best_hash is not used for anything really, other than reporting generic peer info.

}

if number <= self.best_queued_number {
if let Some(request) = self.download_unknown_stale(&peer_id, &hash) {
requests.push((peer_id, request));
}
} else if let Some((_, request)) = self.select_new_blocks(peer_id.clone()) {
requests.push((peer_id, request));
}
}
requests
}

/// Get an iterator over all scheduled justification requests.
pub fn justification_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ {
let peers = &mut self.peers;
Expand Down
10 changes: 10 additions & 0 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
Ok(())
}

/// Adds a `PeerId` and its address as reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Adds a `PeerId` and its address as reserved.

This is a copy/paste error from above, right?

pub fn sync_fork(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::SyncFork(peers, hash, number));
}

/// Modify a peerset priority group.
pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
let peers = peers.into_iter().map(|p| {
Expand Down Expand Up @@ -587,6 +594,7 @@ enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
GetValue(record::Key),
PutValue(record::Key, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
}

/// Main network worker. Must be polled in order for the network to advance.
Expand Down Expand Up @@ -665,6 +673,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne
self.network_service.put_value(key, value),
ServerToWorkerMsg::AddKnownAddress(peer_id, addr) =>
self.network_service.add_known_address(peer_id, addr),
ServerToWorkerMsg::SyncFork(peer_ids, hash, number) =>
self.network_service.user_protocol_mut().sync_fork(peer_ids, &hash, number),
}
}

Expand Down
10 changes: 10 additions & 0 deletions core/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ pub struct Peer<D, S: NetworkSpecialization<Block>> {
}

impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// Get this peer ID.
pub fn id(&self) -> PeerId {
self.network.service().local_peer_id()
}

/// Returns true if we're major syncing.
pub fn is_major_syncing(&self) -> bool {
self.network.service().is_major_syncing()
Expand Down Expand Up @@ -256,6 +261,11 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
self.network.service().announce_block(hash);
}

/// Request explicit fork sync.
pub fn sync_fork(&self, peers: Vec<PeerId>, hash: <Block as BlockT>::Hash, number: NumberFor<Block>) {
self.network.service().sync_fork(peers, hash, number);
}

/// Add blocks to the peer -- edit the block before adding
pub fn generate_blocks<F>(&mut self, count: usize, origin: BlockOrigin, edit_block: F) -> H256
where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block
Expand Down
56 changes: 56 additions & 0 deletions core/network/src/test/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,59 @@ fn light_peer_imports_header_from_announce() {
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, &mut runtime, known_stale_hash);
}

#[test]
fn can_sync_explicit_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);

// small fork + reorg on peer 1.
net.peer(0).push_blocks_at(BlockId::Number(30), 2, true);
let small_hash = net.peer(0).client().info().chain.best_hash;
let small_number = net.peer(0).client().info().chain.best_number;
net.peer(0).push_blocks_at(BlockId::Number(30), 10, false);
assert_eq!(net.peer(0).client().info().chain.best_number, 40);

// peer 1 only ever had the long fork.
net.peer(1).push_blocks(10, false);
assert_eq!(net.peer(1).client().info().chain.best_number, 40);

assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());

// poll until the two nodes connect, otherwise announcing the block will not work
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();

// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.

assert_eq!(net.peer(0).client().info().chain.best_number, 40);

assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());

// request explicit sync
let first_peer_id = net.peer(0).id();
net.peer(1).sync_fork(vec![first_peer_id], small_hash, small_number);

// peer 1 downloads the block.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();

assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
}