Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 116 additions & 118 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sp_consensus::{
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, Encode};
use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
Expand All @@ -53,7 +53,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;

Expand Down Expand Up @@ -271,8 +271,6 @@ struct Peer<B: BlockT, H: ExHashT> {
pub struct PeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
Expand Down Expand Up @@ -391,14 +389,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};

let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let mut behaviour = GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
);

let mut legacy_equiv_by_name = HashMap::new();

Expand All @@ -409,7 +399,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/transactions/1");
proto
});
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);

let block_announces_protocol: Cow<'static, str> = Cow::from({
Expand All @@ -419,12 +408,23 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/block-announces/1");
proto
});
behaviour.register_notif_protocol(
block_announces_protocol.clone(),
BlockAnnouncesHandshake::build(&config, &chain).encode()
);
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

let behaviour = {
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode();
GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
iter::once((block_announces_protocol.clone(), block_announces_handshake))
.chain(iter::once((transactions_protocol.clone(), vec![]))),
0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the concept of passing an index into an Iterator as a separate usize error prone. Instead of passing it as a separate usize, one idea that comes to my mind is to have the Iterator produce tuples of both protocol name as well as a bool indicating whether it is the handshake protocol?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have the Iterator produce tuples of both protocol name as well as a bool indicating whether it is the handshake protocol?

Then we need to enforce that one and only one element in the iterator contains true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying it is a good solution, only that I find it less error prone than passing an index.

Is there ever the use case to not have the first protocol be the one providing the handshake? If not, I would just always interpret the very first passed notifications protocol as the one providing the handshake. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever the use case to not have the first protocol be the one providing the handshake? If not, I would just always interpret the very first passed notifications protocol as the one providing the handshake. What do you think?

It's not really a matter of use-case. The caller chooses the order of the protocols, and can always move the handshake-bearing protocol as the first one.
However I would find it more confusing, as the fact that we special-case a certain protocol would no longer be explicited through the parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this. To me tracking both a list of protocols as well as an index into that list is error prone. I think that tracking a bool alongside each protocol is less error prone.

Copy link
Contributor Author

@tomaka tomaka Sep 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned I'm of the opposite opinion 😅
While it makes the API nicer-looking, you then introduce the possible situation where two protocols accidentally have the flag set to true.

Also, I know it's not great to do this, but this code is temporary and will be removed again once substreams can be opened individually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever the use case to not have the first protocol be the one providing the handshake? If not, I would just always interpret the very first passed notifications protocol as the one providing the handshake.

I'd prefer this as well, especially if the special-casing of the first protocol is temporary.

)
};

let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
Expand Down Expand Up @@ -829,99 +829,86 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(
/// Called on the first connection between two peers, after their exchange of handshake.
fn on_peer_connected(
&mut self,
who: PeerId,
status: message::Status<B>,
status: BlockAnnouncesHandshake<B>,
notifications_sink: NotificationsSink,
) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff here is quite large because I've remove an indentation level, sorry for that.

if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);

if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
}
if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);

return CustomMessageOutcome::None;
}
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
log!(
if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer {:?} using unsupported protocol version {}", who, status.version
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}

if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
return CustomMessageOutcome::None;
}

// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}

let peer = Peer {
info: PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
}

debug!(target: "sync", "Connected {}", who);
status.version
let peer = Peer {
info: PeerInfo {
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);

debug!(target: "sync", "Connected {}", who);

let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
Expand Down Expand Up @@ -1151,20 +1138,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if inserted || force {
let message = message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
Some(message::BlockState::Best)
} else {
Some(message::BlockState::Normal)
}
} else {
None
},
data: if peer.info.protocol_version >= 4 {
Some(data.clone())
state: if is_best {
Some(message::BlockState::Best)
} else {
None
Some(message::BlockState::Normal)
},
data: Some(data.clone()),
};

self.behaviour.write_notification(
Expand Down Expand Up @@ -1588,9 +1567,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) =>
self.on_peer_connected(peer_id, handshake, notifications_sink),
// `received_handshake` can be either a `Status` message if received from the
// legacy substream ,or a `BlockAnnouncesHandshake` if received from the block
// announces substream.
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let handshake = BlockAnnouncesHandshake {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};

self.on_peer_connected(peer_id, handshake, notifications_sink)
},
Ok(msg) => {
debug!(
target: "sync",
Expand All @@ -1602,15 +1592,23 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
CustomMessageOutcome::None
}
Err(err) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
peer_id,
received_handshake,
err.what()
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(handshake) => {
self.on_peer_connected(peer_id, handshake, notifications_sink)
}
Err(err2) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {} & {}",
peer_id,
received_handshake,
err.what(),
err2,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
}
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ pub struct GenericProto {
/// initial handshake.
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,

/// Index within `notif_protocols` of the protocol containing the handshake to report on the
/// external API.
handshake_protocol_index: usize,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,

Expand Down Expand Up @@ -336,14 +340,24 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
handshake_protocol_index: usize,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.collect::<Vec<_>>();

assert!(!notif_protocols.is_empty());
assert!(handshake_protocol_index < notif_protocols.len());

let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
local_peer_id,
legacy_protocol,
notif_protocols: Vec::new(),
notif_protocols,
handshake_protocol_index,
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
Expand Down Expand Up @@ -855,6 +869,7 @@ impl NetworkBehaviour for GenericProto {
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
self.handshake_protocol_index,
)
}

Expand Down
Loading