diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index a7e6f36ef6215..0075e856e7574 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -40,7 +40,7 @@ use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, future::Future, iter, pin::Pin, @@ -77,8 +77,6 @@ type PendingSyncSubstreamValidation = // Lock must always be taken in order declared here. pub struct Protocol { - /// Pending list of messages to return from `poll` as a priority. - pending_messages: VecDeque, /// Used to report reputation changes. peerset_handle: sc_peerset::PeersetHandle, /// Handles opening the unique substream and sending and receiving raw messages. @@ -181,7 +179,6 @@ impl Protocol { }; let protocol = Self { - pending_messages: VecDeque::new(), peerset_handle: peerset_handle.clone(), behaviour, notification_protocols: iter::once(block_announces_protocol.notifications_protocol) @@ -409,8 +406,21 @@ impl NetworkBehaviour for Protocol { cx: &mut std::task::Context, params: &mut impl PollParameters, ) -> Poll> { - if let Some(message) = self.pending_messages.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)) + while let Poll::Ready(Some(validation_result)) = + self.sync_substream_validations.poll_next_unpin(cx) + { + match validation_result { + Ok((peer, roles)) => { + self.peers.insert(peer, roles); + }, + Err(peer) => { + log::debug!( + target: "sub-libp2p", + "`SyncingEngine` rejected stream" + ); + self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); + }, + } } let event = match self.behaviour.poll(cx, params) { @@ -430,23 +440,6 @@ impl NetworkBehaviour for Protocol { return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), }; - while let Poll::Ready(Some(validation_result)) = - self.sync_substream_validations.poll_next_unpin(cx) - { - match validation_result { - Ok((peer, roles)) => { - self.peers.insert(peer, roles); - }, - Err(peer) => { - log::debug!( - target: "sub-libp2p", - "`SyncingEngine` rejected stream" - ); - self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); - }, - } - } - let outcome = match event { NotificationsOut::CustomProtocolOpen { peer_id, @@ -509,7 +502,6 @@ impl NetworkBehaviour for Protocol { ) { Ok(handshake) => { let roles = handshake.roles; - self.peers.insert(peer_id, roles); let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( @@ -644,10 +636,6 @@ impl NetworkBehaviour for Protocol { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) } - if let Some(message) = self.pending_messages.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)) - } - // This block can only be reached if an event was pulled from the behaviour and that // resulted in `CustomMessageOutcome::None`. Since there might be another pending // message from the behaviour, the task is scheduled again.