Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit f5ea01e

Browse files
arkpargavofyork
authored andcommitted
Persist block announcements (#3826)
* Persist block announcements * Renamed sync requests to fork targets * Fixed pruning detection condition
1 parent ac21a1b commit f5ea01e

File tree

4 files changed

+104
-127
lines changed

4 files changed

+104
-127
lines changed

core/network/src/protocol.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,18 +1121,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
11211121
};
11221122

11231123
match self.sync.on_block_announce(who.clone(), hash, &announce, is_their_best) {
1124-
sync::OnBlockAnnounce::Request(peer, req) => {
1125-
self.send_message(peer, GenericMessage::BlockRequest(req));
1126-
return CustomMessageOutcome::None
1127-
}
11281124
sync::OnBlockAnnounce::Nothing => {
1129-
// try_import is only true when we have all data required to import block
1125+
// `on_block_announce` returns `OnBlockAnnounce::ImportHeader`
1126+
// when we have all data required to import the block
11301127
// in the BlockAnnounce message. This is only when:
11311128
// 1) we're on light client;
11321129
// AND
1133-
// - EITHER 2.1) announced block is stale;
1134-
// - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e.
1135-
// there are no ascendants of this block scheduled for retrieval)
1130+
// 2) parent block is already imported and not pruned.
11361131
return CustomMessageOutcome::None
11371132
}
11381133
sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import.

core/network/src/protocol/sync.rs

Lines changed: 88 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ const MAJOR_SYNC_BLOCKS: u8 = 5;
6969
/// Number of recently announced blocks to track for each peer.
7070
const ANNOUNCE_HISTORY_SIZE: usize = 64;
7171

72-
/// Max number of blocks to download for unknown forks.
73-
const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32;
74-
7572
/// Reputation change when a peer sent us a status message that led to a
7673
/// database read error.
7774
const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16);
@@ -125,8 +122,8 @@ pub struct ChainSync<B: BlockT> {
125122
best_importing_number: NumberFor<B>,
126123
/// Finality proof handler.
127124
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
128-
/// Explicit sync requests.
129-
sync_requests: HashMap<B::Hash, SyncRequest<B>>,
125+
/// Fork sync targets.
126+
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
130127
/// A flag that caches idle state with no pending requests.
131128
is_idle: bool,
132129
/// A type to check incoming block announcements.
@@ -160,8 +157,9 @@ pub struct PeerInfo<B: BlockT> {
160157
pub best_number: NumberFor<B>
161158
}
162159

163-
struct SyncRequest<B: BlockT> {
160+
struct ForkTarget<B: BlockT> {
164161
number: NumberFor<B>,
162+
parent_hash: Option<B::Hash>,
165163
peers: HashSet<PeerId>,
166164
}
167165

@@ -242,13 +240,11 @@ pub enum OnBlockData<B: BlockT> {
242240

243241
/// Result of [`ChainSync::on_block_announce`].
244242
#[derive(Debug, Clone, PartialEq, Eq)]
245-
pub enum OnBlockAnnounce<B: BlockT> {
243+
pub enum OnBlockAnnounce {
246244
/// The announcement does not require further handling.
247245
Nothing,
248246
/// The announcement header should be imported.
249247
ImportHeader,
250-
/// Another block request to the given peer is necessary.
251-
Request(PeerId, BlockRequest<B>)
252248
}
253249

254250
/// Result of [`ChainSync::on_block_justification`].
@@ -307,7 +303,7 @@ impl<B: BlockT> ChainSync<B> {
307303
queue_blocks: Default::default(),
308304
best_importing_number: Zero::zero(),
309305
request_builder,
310-
sync_requests: Default::default(),
306+
fork_targets: Default::default(),
311307
is_idle: false,
312308
block_announce_validator,
313309
}
@@ -462,7 +458,7 @@ impl<B: BlockT> ChainSync<B> {
462458
// The implementation is similar to on_block_announce with unknown parent hash.
463459
pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
464460
if peers.is_empty() {
465-
if let Some(_) = self.sync_requests.remove(hash) {
461+
if let Some(_) = self.fork_targets.remove(hash) {
466462
debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers);
467463
}
468464
return;
@@ -494,11 +490,12 @@ impl<B: BlockT> ChainSync<B> {
494490
}
495491
}
496492

497-
self.sync_requests
493+
self.fork_targets
498494
.entry(hash.clone())
499-
.or_insert_with(|| SyncRequest {
495+
.or_insert_with(|| ForkTarget {
500496
number,
501497
peers: Default::default(),
498+
parent_hash: None,
502499
})
503500
.peers.extend(peers);
504501
}
@@ -562,17 +559,30 @@ impl<B: BlockT> ChainSync<B> {
562559
}
563560
let blocks = &mut self.blocks;
564561
let attrs = &self.required_block_attributes;
565-
let sync_requests = &self.sync_requests;
562+
let fork_targets = &self.fork_targets;
566563
let mut have_requests = false;
567564
let last_finalized = self.client.info().chain.finalized_number;
568565
let best_queued = self.best_queued_number;
566+
let client = &self.client;
567+
let queue = &self.queue_blocks;
569568
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
570569
if !peer.state.is_available() {
571570
trace!(target: "sync", "Peer {} is busy", id);
572571
return None
573572
}
574-
if let Some((hash, req)) = explicit_sync_request(id, sync_requests, best_queued, last_finalized, attrs) {
575-
trace!(target: "sync", "Downloading explicitly requested block {:?} from {}", hash, id);
573+
if let Some((hash, req)) = fork_sync_request(
574+
id,
575+
fork_targets,
576+
best_queued,
577+
last_finalized,
578+
attrs,
579+
|hash| if queue.contains(hash) {
580+
BlockStatus::Queued
581+
} else {
582+
client.block_status(&BlockId::Hash(*hash)).unwrap_or(BlockStatus::Unknown)
583+
},
584+
) {
585+
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
576586
peer.state = PeerSyncState::DownloadingStale(hash);
577587
have_requests = true;
578588
Some((id.clone(), req))
@@ -665,6 +675,26 @@ impl<B: BlockT> ChainSync<B> {
665675
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
666676
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
667677
} else {
678+
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
679+
// add it to sync targets if necessary.
680+
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={}",
681+
self.best_queued_hash,
682+
self.best_queued_number,
683+
peer.best_hash,
684+
peer.best_number,
685+
peer.common_number
686+
);
687+
if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number {
688+
trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who);
689+
self.fork_targets
690+
.entry(peer.best_hash.clone())
691+
.or_insert_with(|| ForkTarget {
692+
number: peer.best_number,
693+
parent_hash: None,
694+
peers: Default::default(),
695+
})
696+
.peers.insert(who);
697+
}
668698
peer.state = PeerSyncState::Available;
669699
Vec::new()
670700
}
@@ -922,14 +952,14 @@ impl<B: BlockT> ChainSync<B> {
922952
self.best_queued_number = number;
923953
self.best_queued_hash = *hash;
924954
}
925-
if let Some(_) = self.sync_requests.remove(&hash) {
926-
trace!(target: "sync", "Completed explicit sync request {:?}", hash);
955+
if let Some(_) = self.fork_targets.remove(&hash) {
956+
trace!(target: "sync", "Completed fork sync {:?}", hash);
927957
}
928958
// Update common blocks
929959
for (n, peer) in self.peers.iter_mut() {
930960
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
931-
// Abort search.
932-
peer.state = PeerSyncState::Available;
961+
// Wait for ancestry search to complete first.
962+
continue;
933963
}
934964
let new_common_number = if peer.best_number >= number {
935965
number
@@ -952,12 +982,12 @@ impl<B: BlockT> ChainSync<B> {
952982

953983
/// Call when a node announces a new block.
954984
///
955-
/// If true is returned, then the caller MUST try to import passed
985+
/// If `OnBlockAnnounce::ImportHeader` is returned, then the caller MUST try to import passed
956986
/// header (call `on_block_data`). The network request isn't sent
957987
/// in this case. Both hash and header is passed as an optimization
958988
/// to avoid rehashing the header.
959989
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce<B::Header>, is_best: bool)
960-
-> OnBlockAnnounce<B>
990+
-> OnBlockAnnounce
961991
{
962992
let header = &announce.header;
963993
let number = *header.number();
@@ -1001,6 +1031,9 @@ impl<B: BlockT> ChainSync<B> {
10011031
// known block case
10021032
if known || self.is_already_downloading(&hash) {
10031033
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
1034+
if let Some(target) = self.fork_targets.get_mut(&hash) {
1035+
target.peers.insert(who);
1036+
}
10041037
return OnBlockAnnounce::Nothing
10051038
}
10061039

@@ -1009,79 +1042,42 @@ impl<B: BlockT> ChainSync<B> {
10091042
match self.block_announce_validator.validate(&header, assoc_data) {
10101043
Ok(Validation::Success) => (),
10111044
Ok(Validation::Failure) => {
1012-
debug!(target: "sync", "block announcement validation of block {} from {} failed", hash, who);
1045+
debug!(target: "sync", "Block announcement validation of block {} from {} failed", hash, who);
10131046
return OnBlockAnnounce::Nothing
10141047
}
10151048
Err(e) => {
1016-
error!(target: "sync", "block announcement validation errored: {}", e);
1049+
error!(target: "sync", "Block announcement validation errored: {}", e);
10171050
return OnBlockAnnounce::Nothing
10181051
}
10191052
}
10201053

1021-
// stale block case
1022-
let requires_additional_data = !self.role.is_light();
1023-
if number <= self.best_queued_number {
1024-
if !(known_parent || self.is_already_downloading(header.parent_hash())) {
1025-
let block_status = self.client.block_status(&BlockId::Number(*header.number()))
1026-
.unwrap_or(BlockStatus::Unknown);
1027-
if block_status == BlockStatus::InChainPruned {
1028-
trace!(
1029-
target: "sync",
1030-
"Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header
1031-
);
1032-
return OnBlockAnnounce::Nothing
1033-
}
1034-
trace!(
1035-
target: "sync",
1036-
"Considering new unknown stale block announced from {}: {} {:?}", who, hash, header
1037-
);
1038-
if let Some(request) = self.download_unknown_stale(&who, &hash) {
1039-
if requires_additional_data {
1040-
return OnBlockAnnounce::Request(who, request)
1041-
} else {
1042-
return OnBlockAnnounce::ImportHeader
1043-
}
1044-
} else {
1045-
return OnBlockAnnounce::Nothing
1046-
}
1047-
} else {
1048-
if ancient_parent {
1049-
trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header);
1050-
return OnBlockAnnounce::Nothing
1051-
}
1052-
if let Some(request) = self.download_stale(&who, &hash) {
1053-
if requires_additional_data {
1054-
return OnBlockAnnounce::Request(who, request)
1055-
} else {
1056-
return OnBlockAnnounce::ImportHeader
1057-
}
1058-
} else {
1059-
return OnBlockAnnounce::Nothing
1060-
}
1061-
}
1062-
}
1063-
10641054
if ancient_parent {
10651055
trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header);
10661056
return OnBlockAnnounce::Nothing
10671057
}
10681058

1069-
trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header);
1070-
1071-
let (range, request) = match self.select_new_blocks(who.clone()) {
1072-
Some((range, request)) => (range, request),
1073-
None => return OnBlockAnnounce::Nothing
1074-
};
1075-
1076-
let is_required_data_available = !requires_additional_data
1077-
&& range.end - range.start == One::one()
1078-
&& range.start == *header.number();
1059+
let requires_additional_data = !self.role.is_light() || !known_parent;
1060+
if !requires_additional_data {
1061+
trace!(target: "sync", "Importing new header announced from {}: {} {:?}", who, hash, header);
1062+
return OnBlockAnnounce::ImportHeader
1063+
}
10791064

1080-
if !is_required_data_available {
1081-
return OnBlockAnnounce::Request(who, request)
1065+
if number <= self.best_queued_number {
1066+
trace!(
1067+
target: "sync",
1068+
"Added sync target for block announced from {}: {} {:?}", who, hash, header
1069+
);
1070+
self.fork_targets
1071+
.entry(hash.clone())
1072+
.or_insert_with(|| ForkTarget {
1073+
number,
1074+
parent_hash: Some(header.parent_hash().clone()),
1075+
peers: Default::default(),
1076+
})
1077+
.peers.insert(who);
10821078
}
10831079

1084-
OnBlockAnnounce::ImportHeader
1080+
OnBlockAnnounce::Nothing
10851081
}
10861082

10871083
/// Call when a peer has disconnected.
@@ -1117,40 +1113,6 @@ impl<B: BlockT> ChainSync<B> {
11171113
})
11181114
}
11191115

1120-
/// Download old block with known parent.
1121-
fn download_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option<BlockRequest<B>> {
1122-
let peer = self.peers.get_mut(who)?;
1123-
if !peer.state.is_available() {
1124-
return None
1125-
}
1126-
peer.state = PeerSyncState::DownloadingStale(*hash);
1127-
Some(message::generic::BlockRequest {
1128-
id: 0,
1129-
fields: self.required_block_attributes.clone(),
1130-
from: message::FromBlock::Hash(*hash),
1131-
to: None,
1132-
direction: message::Direction::Ascending,
1133-
max: Some(1),
1134-
})
1135-
}
1136-
1137-
/// Download old block with unknown parent.
1138-
fn download_unknown_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option<BlockRequest<B>> {
1139-
let peer = self.peers.get_mut(who)?;
1140-
if !peer.state.is_available() {
1141-
return None
1142-
}
1143-
peer.state = PeerSyncState::DownloadingStale(*hash);
1144-
Some(message::generic::BlockRequest {
1145-
id: 0,
1146-
fields: self.required_block_attributes.clone(),
1147-
from: message::FromBlock::Hash(*hash),
1148-
to: None,
1149-
direction: message::Direction::Descending,
1150-
max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN),
1151-
})
1152-
}
1153-
11541116
/// Select a range of new blocks to download from the given peer.
11551117
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
11561118
// when there are too many blocks in the queue => do not try to download new blocks
@@ -1298,28 +1260,35 @@ fn peer_block_request<B: BlockT>(
12981260
}
12991261
}
13001262

1301-
/// Get pending explicit sync request for a peer.
1302-
fn explicit_sync_request<B: BlockT>(
1263+
/// Get pending fork sync targets for a peer.
1264+
fn fork_sync_request<B: BlockT>(
13031265
id: &PeerId,
1304-
requests: &HashMap<B::Hash, SyncRequest<B>>,
1266+
targets: &HashMap<B::Hash, ForkTarget<B>>,
13051267
best_num: NumberFor<B>,
13061268
finalized: NumberFor<B>,
13071269
attributes: &message::BlockAttributes,
1270+
check_block: impl Fn(&B::Hash) -> BlockStatus,
13081271
) -> Option<(B::Hash, BlockRequest<B>)>
13091272
{
1310-
for (hash, r) in requests {
1273+
for (hash, r) in targets {
13111274
if !r.peers.contains(id) {
13121275
continue
13131276
}
13141277
if r.number <= best_num {
13151278
trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id);
1279+
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
1280+
let mut count = (r.number - finalized).saturated_into::<u32>(); // up to the last finalized block
1281+
if parent_status != BlockStatus::Unknown {
1282+
// request only single block
1283+
count = 1;
1284+
}
13161285
return Some((hash.clone(), message::generic::BlockRequest {
13171286
id: 0,
13181287
fields: attributes.clone(),
13191288
from: message::FromBlock::Hash(hash.clone()),
13201289
to: None,
13211290
direction: message::Direction::Descending,
1322-
max: Some((r.number - finalized).saturated_into::<u32>()), // up to the last finalized block
1291+
max: Some(count),
13231292
}))
13241293
}
13251294
}

core/network/src/test/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,9 @@ pub trait TestNetFactory: Sized {
704704
fn poll(&mut self) {
705705
self.mut_peers(|peers| {
706706
for peer in peers {
707+
trace!(target: "sync", "-- Polling {}", peer.id());
707708
peer.network.poll().unwrap();
709+
trace!(target: "sync", "-- Polling complete {}", peer.id());
708710

709711
// We poll `imported_blocks_stream`.
710712
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {

0 commit comments

Comments
 (0)