From 10dfd660d5927eeaf49849f088d3772f4979fb50 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 20 Jan 2020 16:34:08 +0100 Subject: [PATCH 1/4] clinet/finality-grandpa: Make round_communication use bounded channel `round_communication` returns a `Sink` and a `Stream` for outgoing and incoming messages. The messages send into the `Sink` are forwarded down to the network as well as send back into the `Stream` to ensure the node processes its own messages. So far, to send messages into the `Sink` back into the `Stream`, an unbounded channel was used. This patch updates `round_communication` and `OutgoingMessages` to use a bounded channel. This is part of a greater effort to reduce the number of owners of components within `finality-grandpa` and `network` as well as to reduce the amount of unbounded channels. For details see d4fbb897c and f0c18520a. --- .../finality-grandpa/src/communication/mod.rs | 87 ++++++++++--------- client/finality-grandpa/src/environment.rs | 9 +- 2 files changed, 56 insertions(+), 40 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 7723047d1b423..7738b38fd50b0 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -27,12 +27,13 @@ //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. -use futures::{prelude::*, sync::mpsc}; +use futures::prelude::*; use futures03::{ + sink::Sink as Sink03, channel::mpsc as mpsc03, compat::Compat, future::{Future as Future03}, - stream::StreamExt, + stream::{Stream as Stream03, StreamExt}, }; use log::{debug, trace}; use parking_lot::Mutex; @@ -271,8 +272,8 @@ impl> NetworkBridge { local_key: Option, has_voted: HasVoted, ) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error>, + impl Stream03> + Unpin, + impl Sink03, Error=Error> + Unpin, ) { self.note_round( round, @@ -290,22 +291,20 @@ impl> NetworkBridge { }); let topic = round_topic::(round.0, set_id.0); - let incoming = Compat::new(self.gossip_engine.messages_for(topic) - .map(|item| Ok::<_, ()>(item))) - .filter_map(|notification| { + let incoming = self.gossip_engine.messages_for(topic) + .filter_map(move |notification| { let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); - if let Err(ref e) = decoded { - debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); - } - decoded.ok() - }) - .and_then(move |msg| { - match msg { - GossipMessage::Vote(msg) => { + + match decoded { + Err(ref e) => { + debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); + return futures03::future::ready(None); + } + Ok(GossipMessage::Vote(msg)) => { // check signature. if !voters.contains_key(&msg.message.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); - return Ok(None); + return futures03::future::ready(None); } match &msg.message.message { @@ -332,18 +331,16 @@ impl> NetworkBridge { }, }; - Ok(Some(msg.message)) + futures03::future::ready(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); - return Ok(None); + return futures03::future::ready(None); } } - }) - .filter_map(|x| x) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); + }); - let (tx, out_rx) = mpsc::unbounded(); + let (tx, out_rx) = mpsc03::channel(0); let outgoing = OutgoingMessages:: { round: round.0, set_id: set_id.0, @@ -353,14 +350,10 @@ impl> NetworkBridge { has_voted, }; - let out_rx = out_rx.map_err(move |()| Error::Network( - format!("Failed to receive on unbounded receiver for round {}", round.0) - )); - // Combine incoming votes from external GRANDPA nodes with outgoing // votes from our own GRANDPA voter to have a single // vote-import-pipeline. - let incoming = incoming.select(out_rx); + let incoming = futures03::stream::select(incoming, out_rx); (incoming, outgoing) } @@ -641,17 +634,27 @@ struct OutgoingMessages { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, - sender: mpsc::UnboundedSender>, + sender: mpsc03::Sender>, network: GossipEngine, has_voted: HasVoted, } -impl Sink for OutgoingMessages +impl Unpin for OutgoingMessages {} + +impl Sink03> for OutgoingMessages +where + Message: Sized, { - type SinkItem = Message; - type SinkError = Error; + type Error = Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03> { + Sink03::poll_ready(Pin::new(&mut self.sender), cx) + .map(|elem| { elem.map_err(|e| { + Error::Network(format!("Failed to poll_ready channel sender: {:?}", e)) + })}) + } - fn start_send(&mut self, mut msg: Message) -> StartSend, Error> { + fn start_send(mut self: Pin<&mut Self>, mut msg: Message) -> Result<(), Self::Error> { // if we've voted on this round previously under the same key, send that vote instead match &mut msg { finality_grandpa::Message::PrimaryPropose(ref mut vote) => @@ -707,17 +710,23 @@ impl Sink for OutgoingMessages self.network.gossip_message(topic, message.encode(), false); // forward the message to the inner sender. - let _ = self.sender.unbounded_send(signed); - } + return self.sender.start_send(signed).map_err(|e| { + Error::Network(format!("Failed to start_send on channel sender: {:?}", e)) + }); + }; - Ok(AsyncSink::Ready) + Ok(()) } - fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03> { + Poll03::Ready(Ok(())) + } - fn close(&mut self) -> Poll<(), Error> { - // ignore errors since we allow this inner sender to be closed already. - self.sender.close().or_else(|_| Ok(Async::Ready(()))) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03> { + Sink03::poll_close(Pin::new(&mut self.sender), cx) + .map(|elem| { elem.map_err(|e| { + Error::Network(format!("Failed to poll_close channel sender: {:?}", e)) + })}) } } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 372229001dd19..c5c6291dc0bc6 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -22,7 +22,11 @@ use std::time::Duration; use log::{debug, warn, info}; use parity_scale_codec::{Decode, Encode}; use futures::prelude::*; -use futures03::future::{FutureExt as _, TryFutureExt as _}; +use futures03::{ + compat::{Compat, CompatSink}, + future::{FutureExt as _, TryFutureExt as _}, + stream::StreamExt as _, +}; use futures_timer::Delay; use parking_lot::RwLock; use sp_blockchain::{HeaderBackend, Error as ClientError}; @@ -608,6 +612,9 @@ where has_voted, ); + let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item))); + let outgoing = CompatSink::new(outgoing); + // schedule incoming messages from the network to be held until // corresponding blocks are imported. let incoming = Box::new(UntilVoteTargetImported::new( From 1fbf4ceddf79e687da6f69f0e5aec0b50817f693 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 20 Jan 2020 18:05:05 +0100 Subject: [PATCH 2/4] client/finality-grandpa: Import futures03::future::ready at the top --- client/finality-grandpa/src/communication/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 7738b38fd50b0..77d4cc782b3b1 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -29,10 +29,10 @@ use futures::prelude::*; use futures03::{ - sink::Sink as Sink03, channel::mpsc as mpsc03, compat::Compat, - future::{Future as Future03}, + future::{Future as Future03, ready}, + sink::Sink as Sink03, stream::{Stream as Stream03, StreamExt}, }; use log::{debug, trace}; @@ -298,13 +298,13 @@ impl> NetworkBridge { match decoded { Err(ref e) => { debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); - return futures03::future::ready(None); + return ready(None); } Ok(GossipMessage::Vote(msg)) => { // check signature. if !voters.contains_key(&msg.message.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); - return futures03::future::ready(None); + return ready(None); } match &msg.message.message { @@ -331,11 +331,11 @@ impl> NetworkBridge { }, }; - futures03::future::ready(Some(msg.message)) + ready(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); - return futures03::future::ready(None); + return ready(None); } } }); From f04f3355ca897f364d629c92b1d5f5fcd2dfcca4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 21 Jan 2020 17:14:36 +0100 Subject: [PATCH 3/4] client/finality-grandpa: Make tests use compat of future 03 --- .../finality-grandpa/src/communication/mod.rs | 6 ++--- client/finality-grandpa/src/tests.rs | 25 +++++++++++++++---- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 77d4cc782b3b1..109aac35d32ac 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -273,7 +273,7 @@ impl> NetworkBridge { has_voted: HasVoted, ) -> ( impl Stream03> + Unpin, - impl Sink03, Error=Error> + Unpin, + OutgoingMessages, ) { self.note_round( round, @@ -630,7 +630,7 @@ pub(crate) fn check_message_sig( /// use the same raw message and key to sign. This is currently true for /// `ed25519` and `BLS` signatures (which we might use in the future), care must /// be taken when switching to different key types. -struct OutgoingMessages { +pub(crate) struct OutgoingMessages { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, @@ -642,8 +642,6 @@ struct OutgoingMessages { impl Unpin for OutgoingMessages {} impl Sink03> for OutgoingMessages -where - Message: Sized, { type Error = Error; diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index 9ad12c6c317ad..05d9067cb0194 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -37,15 +37,17 @@ use sp_consensus::{ BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport, import_queue::{BoxJustificationImport, BoxFinalityProofImport}, }; -use std::collections::{HashMap, HashSet}; -use std::result; +use std::{ + collections::{HashMap, HashSet}, + result, + pin::Pin, task, +}; use parity_scale_codec::Decode; -use sp_runtime::traits::{Header as HeaderT, HasherFor}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor}; use sp_runtime::generic::{BlockId, DigestItem}; use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public}; use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi}; use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check}; -use std::{pin::Pin, task}; use authorities::AuthoritySet; use finality_proof::{ @@ -1280,6 +1282,9 @@ fn voter_persists_its_votes() { HasVoted::No, ); + let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item))); + let round_tx = futures03::compat::CompatSink::new(round_tx); + let round_tx = Arc::new(Mutex::new(round_tx)); let exit_tx = Arc::new(Mutex::new(Some(exit_tx))); @@ -1330,7 +1335,17 @@ fn voter_persists_its_votes() { target_hash: block_30_hash, }; - round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap(); + // One should either be calling `Sink::send` or `Sink::start_send` followed + // by `Sink::poll_complete` to make sure items are being flushed. Given that + // we send in a loop including a delay until items are received, this can be + // ignored for the sake of reduced complexity. + if !round_tx.lock() + .start_send(finality_grandpa::Message::Prevote(prevote)) + .unwrap() + .is_ready() { + panic!("expected sink to be ready to write to."); + } + Ok(()) }).map_err(|_| panic!())) From 6858319a169babd168871414883bbb969cc8c32d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 22 Jan 2020 17:29:27 +0100 Subject: [PATCH 4/4] client/finality-grandpa: Do not import ready into scope Instead of importing futures03::future::ready into the scope, only import futures::future03 into scope and call ready as furure03::ready. --- client/finality-grandpa/src/communication/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 109aac35d32ac..9b593331e6f32 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -31,7 +31,7 @@ use futures::prelude::*; use futures03::{ channel::mpsc as mpsc03, compat::Compat, - future::{Future as Future03, ready}, + future::{self as future03, Future as Future03}, sink::Sink as Sink03, stream::{Stream as Stream03, StreamExt}, }; @@ -298,13 +298,13 @@ impl> NetworkBridge { match decoded { Err(ref e) => { debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); - return ready(None); + return future03::ready(None); } Ok(GossipMessage::Vote(msg)) => { // check signature. if !voters.contains_key(&msg.message.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); - return ready(None); + return future03::ready(None); } match &msg.message.message { @@ -331,11 +331,11 @@ impl> NetworkBridge { }, }; - ready(Some(msg.message)) + future03::ready(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); - return ready(None); + return future03::ready(None); } } });