Skip to content

Commit a06c020

Browse files
arkparandresilva
authored andcommitted
Send block status with announcement (paritytech#3607)
* Send block status with announcement * Fixed tests * Whitespace Co-Authored-By: Gavin Wood <gavin@parity.io> * Additional comment * Update comment Co-Authored-By: André Silva <andre.beat@gmail.com>
1 parent 3781a7c commit a06c020

File tree

6 files changed

+85
-30
lines changed

6 files changed

+85
-30
lines changed

core/network/src/protocol.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
6464
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
6565

6666
/// Current protocol version.
67-
pub(crate) const CURRENT_VERSION: u32 = 3;
67+
pub(crate) const CURRENT_VERSION: u32 = 4;
6868
/// Lowest version we support
69-
pub(crate) const MIN_VERSION: u32 = 2;
69+
pub(crate) const MIN_VERSION: u32 = 3;
7070

7171
// Maximum allowed entries in `BlockResponse`
7272
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
@@ -1028,14 +1028,33 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
10281028
return;
10291029
}
10301030

1031-
let hash = header.hash();
1031+
let is_best = self.context_data.chain.info().chain.best_hash == hash;
1032+
debug!(target: "sync", "Reannouncing block {:?}", hash);
1033+
self.send_announcement(&header, is_best, true)
1034+
}
10321035

1033-
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() });
1036+
fn send_announcement(&mut self, header: &B::Header, is_best: bool, force: bool) {
1037+
let hash = header.hash();
10341038

10351039
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
1036-
trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who);
1037-
peer.known_blocks.insert(hash);
1038-
self.behaviour.send_packet(who, message.clone())
1040+
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
1041+
let inserted = peer.known_blocks.insert(hash);
1042+
if inserted || force {
1043+
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce {
1044+
header: header.clone(),
1045+
state: if peer.info.protocol_version >= 4 {
1046+
if is_best {
1047+
Some(message::BlockState::Best)
1048+
} else {
1049+
Some(message::BlockState::Normal)
1050+
}
1051+
} else {
1052+
None
1053+
}
1054+
});
1055+
1056+
self.behaviour.send_packet(who, message)
1057+
}
10391058
}
10401059
}
10411060

@@ -1072,7 +1091,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
10721091
peerset: self.peerset_handle.clone(),
10731092
}, who.clone(), *header.number());
10741093

1075-
match self.sync.on_block_announce(who.clone(), hash, &header) {
1094+
let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) {
1095+
message::BlockState::Best => true,
1096+
message::BlockState::Normal => false,
1097+
};
1098+
1099+
match self.sync.on_block_announce(who.clone(), hash, &header, is_their_best) {
10761100
sync::OnBlockAnnounce::Request(peer, req) => {
10771101
self.send_message(peer, GenericMessage::BlockRequest(req));
10781102
return CustomMessageOutcome::None
@@ -1132,8 +1156,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
11321156

11331157
/// Call this when a block has been imported in the import queue and we should announce it on
11341158
/// the network.
1135-
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
1136-
self.sync.update_chain_info(header);
1159+
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, is_best: bool) {
1160+
if is_best {
1161+
self.sync.update_chain_info(header);
1162+
}
11371163
self.specialization.on_block_imported(
11381164
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
11391165
hash.clone(),
@@ -1146,15 +1172,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
11461172
}
11471173

11481174
// send out block announcements
1149-
1150-
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() });
1151-
1152-
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
1153-
if peer.known_blocks.insert(hash.clone()) {
1154-
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
1155-
self.behaviour.send_packet(who, message.clone())
1156-
}
1157-
}
1175+
self.send_announcement(&header, is_best, false);
11581176
}
11591177

11601178
/// Call this when a block has been finalized. The sync layer may have some additional

core/network/src/protocol/message.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ pub enum Direction {
107107
Descending = 1,
108108
}
109109

110+
/// Block state in the chain.
111+
#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)]
112+
pub enum BlockState {
113+
/// Block is not part of the best chain.
114+
Normal,
115+
/// Latest best block.
116+
Best,
117+
}
118+
110119
/// Remote call response.
111120
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
112121
pub struct RemoteCallResponse {
@@ -127,12 +136,13 @@ pub struct RemoteReadResponse {
127136

128137
/// Generic types.
129138
pub mod generic {
130-
use codec::{Encode, Decode};
139+
use codec::{Encode, Decode, Input, Output};
131140
use sr_primitives::Justification;
132141
use crate::config::Roles;
133142
use super::{
134143
RemoteReadResponse, Transactions, Direction,
135144
RequestId, BlockAttributes, RemoteCallResponse, ConsensusEngineId,
145+
BlockState,
136146
};
137147
/// Consensus is mostly opaque to us
138148
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
@@ -257,10 +267,35 @@ pub mod generic {
257267
}
258268

259269
/// Announce a new complete relay chain block on the network.
260-
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
270+
#[derive(Debug, PartialEq, Eq, Clone)]
261271
pub struct BlockAnnounce<H> {
262272
/// New block header.
263273
pub header: H,
274+
/// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common.
275+
pub state: Option<BlockState>,
276+
}
277+
278+
// Custom Encode/Decode impl to maintain backwards compatibility with v3.
279+
// This assumes that the packet contains nothing but the announcement message.
280+
// TODO: Get rid of it once protocol v4 is common.
281+
impl<H: Encode> Encode for BlockAnnounce<H> {
282+
fn encode_to<T: Output>(&self, dest: &mut T) {
283+
self.header.encode_to(dest);
284+
if let Some(state) = &self.state {
285+
state.encode_to(dest);
286+
}
287+
}
288+
}
289+
290+
impl<H: Decode> Decode for BlockAnnounce<H> {
291+
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
292+
let header = H::decode(input)?;
293+
let state = BlockState::decode(input).ok();
294+
Ok(BlockAnnounce {
295+
header,
296+
state,
297+
})
298+
}
264299
}
265300

266301
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]

core/network/src/protocol/sync.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,9 @@ impl<B: BlockT> ChainSync<B> {
885885
/// header (call `on_block_data`). The network request isn't sent
886886
/// in this case. Both hash and header is passed as an optimization
887887
/// to avoid rehashing the header.
888-
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header) -> OnBlockAnnounce<B> {
888+
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header, is_best: bool)
889+
-> OnBlockAnnounce<B>
890+
{
889891
let number = *header.number();
890892
debug!(target: "sync", "Received block announcement with number {:?}", number);
891893
if number.is_zero() {
@@ -907,17 +909,17 @@ impl<B: BlockT> ChainSync<B> {
907909
peer.recently_announced.pop_front();
908910
}
909911
peer.recently_announced.push_back(hash.clone());
910-
if number > peer.best_number {
912+
if is_best && number > peer.best_number {
911913
// update their best block
912914
peer.best_number = number;
913915
peer.best_hash = hash;
914916
}
915917
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
916918
return OnBlockAnnounce::Nothing
917919
}
918-
// We assume that the announced block is the latest they have seen, and so our common number
920+
// If the announced block is the best they have seen, our common number
919921
// is either one further ahead or it's the one they just announced, if we know about it.
920-
if known {
922+
if known && is_best {
921923
peer.common_number = number
922924
} else if header.parent_hash() == &self.best_queued_hash || known_parent {
923925
peer.common_number = number - One::one();

core/network/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
297297
}
298298

299299
/// You must call this when a new block is imported by the client.
300-
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header) {
301-
self.network_service.user_protocol_mut().on_block_imported(hash, &header);
300+
pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, is_best: bool) {
301+
self.network_service.user_protocol_mut().on_block_imported(hash, &header, is_best);
302302
}
303303

304304
/// You must call this when a new block is finalized by the client.

core/network/src/test/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
300300
Default::default()
301301
};
302302
self.block_import.import_block(import_block, cache).expect("block_import failed");
303-
self.network.on_block_imported(hash, header);
303+
self.network.on_block_imported(hash, header, true);
304304
at = hash;
305305
}
306306

@@ -662,7 +662,7 @@ pub trait TestNetFactory: Sized {
662662

663663
// We poll `imported_blocks_stream`.
664664
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
665-
peer.network.on_block_imported(notification.hash, notification.header);
665+
peer.network.on_block_imported(notification.hash, notification.header, true);
666666
}
667667

668668
// We poll `finality_notification_stream`, but we only take the last event.

core/service/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ fn build_network_future<
682682

683683
// We poll `imported_blocks_stream`.
684684
while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() {
685-
network.on_block_imported(notification.hash, notification.header);
685+
network.on_block_imported(notification.hash, notification.header, notification.is_new_best);
686686
}
687687

688688
// We poll `finality_notification_stream`, but we only take the last event.

0 commit comments

Comments
 (0)