diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index a585f91145ede..c1887ce35bfdb 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -36,7 +36,7 @@ use sp_consensus::{ block_validation::BlockAnnounceValidator, import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin} }; -use codec::{Decode, Encode}; +use codec::{Decode, DecodeAll, Encode}; use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification}; use sp_runtime::traits::{ Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub @@ -53,7 +53,7 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry}; use std::sync::Arc; use std::fmt::Write; -use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time}; +use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time}; use log::{log, Level, trace, debug, warn, error}; use wasm_timer::Instant; @@ -271,8 +271,6 @@ struct Peer { pub struct PeerInfo { /// Roles pub roles: Roles, - /// Protocol version - pub protocol_version: u32, /// Peer best block hash pub best_hash: B::Hash, /// Peer best block number @@ -391,14 +389,6 @@ impl Protocol { }; let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config); - let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let mut behaviour = GenericProto::new( - local_peer_id, - protocol_id.clone(), - versions, - build_status_message(&config, &chain), - peerset, - ); let mut legacy_equiv_by_name = HashMap::new(); @@ -409,7 +399,6 @@ impl Protocol { proto.push_str("/transactions/1"); proto }); - behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new()); legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions); let block_announces_protocol: Cow<'static, str> = Cow::from({ @@ -419,12 +408,24 @@ impl Protocol { proto.push_str("/block-announces/1"); proto }); - behaviour.register_notif_protocol( - block_announces_protocol.clone(), - BlockAnnouncesHandshake::build(&config, &chain).encode() - ); legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce); + let behaviour = { + let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); + let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode(); + GenericProto::new( + local_peer_id, + protocol_id.clone(), + versions, + build_status_message(&config, &chain), + peerset, + // As documented in `GenericProto`, the first protocol in the list is always the + // one carrying the handshake reported in the `CustomProtocolOpen` event. + iter::once((block_announces_protocol.clone(), block_announces_handshake)) + .chain(iter::once((transactions_protocol.clone(), vec![]))), + ) + }; + let protocol = Protocol { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), @@ -829,99 +830,86 @@ impl Protocol { } } - /// Called on receipt of a status message via the legacy protocol on the first connection between two peers. - pub fn on_peer_connected( + /// Called on the first connection between two peers, after their exchange of handshake. + fn on_peer_connected( &mut self, who: PeerId, - status: message::Status, + status: BlockAnnouncesHandshake, notifications_sink: NotificationsSink, ) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); - let _protocol_version = { - if self.context_data.peers.contains_key(&who) { - debug!(target: "sync", "Ignoring duplicate status packet from {}", who); - return CustomMessageOutcome::None; - } - if status.genesis_hash != self.genesis_hash { - log!( - target: "sync", - if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace }, - "Peer is on different chain (our genesis: {} theirs: {})", - self.genesis_hash, status.genesis_hash - ); - self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH); - self.behaviour.disconnect_peer(&who); - if self.boot_node_ids.contains(&who) { - error!( - target: "sync", - "Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})", - who, - self.genesis_hash, - status.genesis_hash, - ); - } + if self.context_data.peers.contains_key(&who) { + debug!(target: "sync", "Ignoring duplicate status packet from {}", who); + return CustomMessageOutcome::None; + } + if status.genesis_hash != self.genesis_hash { + log!( + target: "sync", + if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace }, + "Peer is on different chain (our genesis: {} theirs: {})", + self.genesis_hash, status.genesis_hash + ); + self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH); + self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; - } - if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { - log!( + if self.boot_node_ids.contains(&who) { + error!( target: "sync", - if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace }, - "Peer {:?} using unsupported protocol version {}", who, status.version + "Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})", + who, + self.genesis_hash, + status.genesis_hash, ); - self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL); - self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; } - if self.config.roles.is_light() { - // we're not interested in light peers - if status.roles.is_light() { - debug!(target: "sync", "Peer {} is unable to serve light requests", who); - self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE); - self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; - } + return CustomMessageOutcome::None; + } - // we don't interested in peers that are far behind us - let self_best_block = self - .context_data - .chain - .info() - .best_number; - let blocks_difference = self_best_block - .checked_sub(&status.best_number) - .unwrap_or_else(Zero::zero) - .saturated_into::(); - if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { - debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT); - self.behaviour.disconnect_peer(&who); - return CustomMessageOutcome::None; - } + if self.config.roles.is_light() { + // we're not interested in light peers + if status.roles.is_light() { + debug!(target: "sync", "Peer {} is unable to serve light requests", who); + self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE); + self.behaviour.disconnect_peer(&who); + return CustomMessageOutcome::None; } - let peer = Peer { - info: PeerInfo { - protocol_version: status.version, - roles: status.roles, - best_hash: status.best_hash, - best_number: status.best_number - }, - block_request: None, - known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) - .expect("Constant is nonzero")), - known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) - .expect("Constant is nonzero")), - next_request_id: 0, - obsolete_requests: HashMap::new(), - }; - self.context_data.peers.insert(who.clone(), peer); + // we don't interested in peers that are far behind us + let self_best_block = self + .context_data + .chain + .info() + .best_number; + let blocks_difference = self_best_block + .checked_sub(&status.best_number) + .unwrap_or_else(Zero::zero) + .saturated_into::(); + if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { + debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); + self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT); + self.behaviour.disconnect_peer(&who); + return CustomMessageOutcome::None; + } + } - debug!(target: "sync", "Connected {}", who); - status.version + let peer = Peer { + info: PeerInfo { + roles: status.roles, + best_hash: status.best_hash, + best_number: status.best_number + }, + block_request: None, + known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) + .expect("Constant is nonzero")), + known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) + .expect("Constant is nonzero")), + next_request_id: 0, + obsolete_requests: HashMap::new(), }; + self.context_data.peers.insert(who.clone(), peer); + + debug!(target: "sync", "Connected {}", who); let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number)); @@ -1151,20 +1139,12 @@ impl Protocol { if inserted || force { let message = message::BlockAnnounce { header: header.clone(), - state: if peer.info.protocol_version >= 4 { - if is_best { - Some(message::BlockState::Best) - } else { - Some(message::BlockState::Normal) - } - } else { - None - }, - data: if peer.info.protocol_version >= 4 { - Some(data.clone()) + state: if is_best { + Some(message::BlockState::Best) } else { - None + Some(message::BlockState::Normal) }, + data: Some(data.clone()), }; self.behaviour.write_notification( @@ -1588,9 +1568,20 @@ impl NetworkBehaviour for Protocol { let outcome = match event { GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => { - match as Decode>::decode(&mut &received_handshake[..]) { - Ok(GenericMessage::Status(handshake)) => - self.on_peer_connected(peer_id, handshake, notifications_sink), + // `received_handshake` can be either a `Status` message if received from the + // legacy substream ,or a `BlockAnnouncesHandshake` if received from the block + // announces substream. + match as DecodeAll>::decode_all(&mut &received_handshake[..]) { + Ok(GenericMessage::Status(handshake)) => { + let handshake = BlockAnnouncesHandshake { + roles: handshake.roles, + best_number: handshake.best_number, + best_hash: handshake.best_hash, + genesis_hash: handshake.genesis_hash, + }; + + self.on_peer_connected(peer_id, handshake, notifications_sink) + }, Ok(msg) => { debug!( target: "sync", @@ -1602,15 +1593,23 @@ impl NetworkBehaviour for Protocol { CustomMessageOutcome::None } Err(err) => { - debug!( - target: "sync", - "Couldn't decode handshake sent by {}: {:?}: {}", - peer_id, - received_handshake, - err.what() - ); - self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); - CustomMessageOutcome::None + match as DecodeAll>::decode_all(&mut &received_handshake[..]) { + Ok(handshake) => { + self.on_peer_connected(peer_id, handshake, notifications_sink) + } + Err(err2) => { + debug!( + target: "sync", + "Couldn't decode handshake sent by {}: {:?}: {} & {}", + peer_id, + received_handshake, + err.what(), + err2, + ); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + CustomMessageOutcome::None + } + } } } } diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 996a810605d13..e7e2cb035d65c 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -336,14 +336,21 @@ impl GenericProto { versions: &[u8], handshake_message: Vec, peerset: sc_peerset::Peerset, + notif_protocols: impl Iterator, Vec)>, ) -> Self { + let notif_protocols = notif_protocols + .map(|(n, hs)| (n, Arc::new(RwLock::new(hs)))) + .collect::>(); + + assert!(!notif_protocols.is_empty()); + let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); GenericProto { local_peer_id, legacy_protocol, - notif_protocols: Vec::new(), + notif_protocols, peerset, peers: FnvHashMap::default(), delays: Default::default(), diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index bcdba87e1037c..f355fba60fb04 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -113,10 +113,11 @@ pub struct NotifsHandler { /// Handler for backwards-compatibility. legacy: LegacyProtoHandler, - /// In the situation where `legacy.is_open()` is true, but we haven't sent out any - /// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy - /// substream. - pending_legacy_handshake: Option>, + /// In the situation where either the legacy substream has been opened or the handshake-bearing + /// notifications protocol is open, but we haven't sent out any [`NotifsHandlerOut::Open`] + /// event yet, this contains the received handshake waiting to be reported through the + /// external API. + pending_handshake: Option>, /// State of this handler. enabled: EnabledState, @@ -172,7 +173,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { .collect(), endpoint: connected_point.clone(), legacy: self.legacy.into_handler(remote_peer_id, connected_point), - pending_legacy_handshake: None, + pending_handshake: None, enabled: EnabledState::Initial, pending_in: Vec::new(), notifications_sink_rx: None, @@ -360,11 +361,20 @@ impl NotifsHandlerProto { /// `list` is a list of notification protocols names, and the message to send as part of the /// handshake. At the moment, the message is always the same whether we open a substream /// ourselves or respond to handshake from the remote. + /// + /// The first protocol in `list` is special-cased as the protocol that contains the handshake + /// to report through the [`NotifsHandlerOut::Open`] event. + /// + /// # Panic + /// + /// - Panics if `list` is empty. + /// pub fn new( legacy: RegisteredProtocol, list: impl Into, Arc>>)>>, ) -> Self { let list = list.into(); + assert!(!list.is_empty()); let out_handlers = list .clone() @@ -614,11 +624,12 @@ impl ProtocolsHandler for NotifsHandler { } } - // If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy - // substream is open but the user isn't aware yet of the substreams being open. + // If `self.pending_handshake` is `Some`, we are in a state where the handshake-bearing + // substream (either the legacy substream or the one special-cased as providing the + // handshake) is open but the user isn't aware yet of the substreams being open. // When that is the case, neither the legacy substream nor the incoming notifications // substreams should be polled, otherwise there is a risk of receiving messages from them. - if self.pending_legacy_handshake.is_none() { + if self.pending_handshake.is_none() { while let Poll::Ready(ev) = self.legacy.poll(cx) { match ev { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => @@ -631,14 +642,16 @@ impl ProtocolsHandler for NotifsHandler { received_handshake, .. }) => { - self.pending_legacy_handshake = Some(received_handshake); + if self.notifications_sink_rx.is_none() { + debug_assert!(self.pending_handshake.is_none()); + self.pending_handshake = Some(received_handshake); + } cx.waker().wake_by_ref(); return Poll::Pending; }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => { // We consciously drop the receivers despite notifications being potentially // still buffered up. - debug_assert!(self.notifications_sink_rx.is_some()); self.notifications_sink_rx = None; return Poll::Ready(ProtocolsHandlerEvent::Custom( @@ -646,7 +659,6 @@ impl ProtocolsHandler for NotifsHandler { )) }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => { - debug_assert!(self.notifications_sink_rx.is_some()); return Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CustomMessage { message } )) @@ -663,7 +675,7 @@ impl ProtocolsHandler for NotifsHandler { for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { loop { - let poll = if self.pending_legacy_handshake.is_none() { + let poll = if self.notifications_sink_rx.is_some() { handler.poll(cx) } else { handler.poll_process(cx) @@ -692,7 +704,7 @@ impl ProtocolsHandler for NotifsHandler { }, ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - debug_assert!(self.pending_legacy_handshake.is_none()); + debug_assert!(self.pending_handshake.is_none()); if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, @@ -716,12 +728,17 @@ impl ProtocolsHandler for NotifsHandler { }), ProtocolsHandlerEvent::Close(err) => void::unreachable(err), - // At the moment we don't actually care whether any notifications protocol - // opens or closes. - // Whether our communications with the remote are open or closed entirely - // depends on the legacy substream, because as long as we are open the user of - // this struct might try to send legacy protocol messages which we need to - // deliver for things to work properly. + // Opened substream on the handshake-bearing notification protocol. + ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { handshake }) + if handler_num == 0 => + { + if self.notifications_sink_rx.is_none() && self.pending_handshake.is_none() { + self.pending_handshake = Some(handshake); + } + }, + + // Nothing to do in response to other notification substreams being opened + // or closed. ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { .. }) => {}, ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed) => {}, ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Refused) => {}, @@ -730,7 +747,7 @@ impl ProtocolsHandler for NotifsHandler { } if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) { - if let Some(handshake) = self.pending_legacy_handshake.take() { + if let Some(handshake) = self.pending_handshake.take() { let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index daa02efd02a05..d604645d4ac87 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -32,7 +32,7 @@ use libp2p::swarm::{ Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters, NetworkBehaviour, NetworkBehaviourAction }; -use std::{error, io, task::Context, task::Poll, time::Duration}; +use std::{error, io, iter, task::{Context, Poll}, time::Duration}; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -78,7 +78,10 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new(local_peer_id, "test", &[1], vec![], peerset), + inner: GenericProto::new( + local_peer_id, "test", &[1], vec![], peerset, + iter::once(("/foo".into(), Vec::new())) + ), addrs: addrs .iter() .enumerate() diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index a7fbb92387cf6..1cd78c0ed1dda 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -41,12 +41,6 @@ pub type Message = generic::Message< ::Extrinsic, >; -/// Type alias for using the status type using block type parameters. -pub type Status = generic::Status< - ::Hash, - <::Header as HeaderT>::Number, ->; - /// Type alias for using the block request type using block type parameters. pub type BlockRequest = generic::BlockRequest< ::Hash, diff --git a/client/rpc-api/src/system/helpers.rs b/client/rpc-api/src/system/helpers.rs index 5dbe93543d8e5..dd3294c243116 100644 --- a/client/rpc-api/src/system/helpers.rs +++ b/client/rpc-api/src/system/helpers.rs @@ -67,8 +67,6 @@ pub struct PeerInfo { pub peer_id: String, /// Roles pub roles: String, - /// Protocol version - pub protocol_version: u32, /// Peer best block hash pub best_hash: Hash, /// Peer best block number @@ -110,11 +108,10 @@ mod tests { ::serde_json::to_string(&PeerInfo { peer_id: "2".into(), roles: "a".into(), - protocol_version: 2, best_hash: 5u32, best_number: 6u32, }).unwrap(), - r#"{"peerId":"2","roles":"a","protocolVersion":2,"bestHash":5,"bestNumber":6}"#, + r#"{"peerId":"2","roles":"a","bestHash":5,"bestNumber":6}"#, ); } } diff --git a/client/rpc/src/system/tests.rs b/client/rpc/src/system/tests.rs index 099504bb009e6..f16d7da5b1a8b 100644 --- a/client/rpc/src/system/tests.rs +++ b/client/rpc/src/system/tests.rs @@ -73,7 +73,6 @@ fn api>>(sync: T) -> System { peers.push(PeerInfo { peer_id: status.peer_id.to_base58(), roles: format!("{}", Role::Full), - protocol_version: 1, best_hash: Default::default(), best_number: 1, }); @@ -259,7 +258,6 @@ fn system_peers() { vec![PeerInfo { peer_id: peer_id.to_base58(), roles: "FULL".into(), - protocol_version: 1, best_hash: Default::default(), best_number: 1u64, }] diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index fac09beb8bd60..d5d503d22d171 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -285,7 +285,6 @@ async fn build_network_future< sc_rpc::system::PeerInfo { peer_id: peer_id.to_base58(), roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, best_hash: p.best_hash, best_number: p.best_number, }