From b6ff0fc2a78f31465d01063343045ad72ed6e362 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 15 Aug 2019 16:34:33 +0200 Subject: [PATCH] Update to stables futures --- Cargo.toml | 5 +- src/bridge_state.rs | 23 ++-- src/testing.rs | 71 +++++------ src/voter/mod.rs | 253 ++++++++++++++++++-------------------- src/voter/past_rounds.rs | 97 +++++++++------ src/voter/voting_round.rs | 56 +++++---- 6 files changed, 258 insertions(+), 247 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3c574278..96ebd3af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,8 @@ repository = "https://github.com/paritytech/finality-grandpa" edition = "2018" [dependencies] -futures = "0.1" +futures-preview = "0.3.0-alpha.17" +futures-timer = "0.3.0" log = "0.4" parking_lot = { version = "0.9", optional = true } parity-scale-codec = { version = "1.0.3", optional = true, default-features = false, features = ["derive"] } @@ -16,9 +17,7 @@ num = { package = "num-traits", version = "0.2", default-features = false } hashmap_core = { version = "0.1.10", default-features = false } [dev-dependencies] -exit-future = "0.1.2" rand = "0.6.0" -tokio = "0.1.8" [features] default = ["std"] diff --git a/src/bridge_state.rs b/src/bridge_state.rs index fd9bab3a..3388e784 100644 --- a/src/bridge_state.rs +++ b/src/bridge_state.rs @@ -18,18 +18,19 @@ use crate::round::State as RoundState; use futures::task; use parking_lot::{RwLock, RwLockReadGuard}; use std::sync::Arc; +use std::task::Context; // round state bridged across rounds. struct Bridged { inner: RwLock>, - task: task::AtomicTask, + waker: task::AtomicWaker, } impl Bridged { fn new(inner: RwLock>) -> Self { Bridged { inner, - task: task::AtomicTask::new(), + waker: task::AtomicWaker::new(), } } } @@ -41,7 +42,7 @@ impl PriorView { /// Push an update to the latter view. pub(crate) fn update(&self, new: RoundState) { *self.0.inner.write() = new; - self.0.task.notify(); + self.0.waker.wake(); } } @@ -50,8 +51,8 @@ pub(crate) struct LatterView(Arc>); impl LatterView { /// Fetch a handle to the last round-state. - pub(crate) fn get(&self) -> RwLockReadGuard> { - self.0.task.register(); + pub(crate) fn get(&self, cx: &mut Context) -> RwLockReadGuard> { + self.0.waker.register(cx.waker()); self.0.inner.read() } } @@ -73,7 +74,7 @@ pub(crate) fn bridge_state(initial: RoundState) -> (PriorView, #[cfg(test)] mod tests { use futures::prelude::*; - use std::sync::Barrier; + use std::{sync::Barrier, task::Poll}; use super::*; #[test] @@ -86,11 +87,11 @@ mod tests { }; let (prior, latter) = bridge_state(initial); - let waits_for_finality = ::futures::future::poll_fn(move || -> Poll<(), ()> { - if latter.get().finalized.is_some() { - Ok(Async::Ready(())) + let waits_for_finality = ::futures::future::poll_fn(move |cx| -> Poll<()> { + if latter.get(cx).finalized.is_some() { + Poll::Ready(()) } else { - Ok(Async::NotReady) + Poll::Pending } }); @@ -107,6 +108,6 @@ mod tests { }); barrier.wait(); - waits_for_finality.wait().unwrap(); + futures::executor::block_on(waits_for_finality); } } diff --git a/src/testing.rs b/src/testing.rs index da4f1afe..7c03081c 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -133,12 +133,14 @@ pub mod environment { use crate::voter::{RoundData, CommunicationIn, CommunicationOut, Callback}; use crate::{Chain, Commit, Error, Equivocation, Message, Prevote, Precommit, PrimaryPropose, SignedMessage, HistoricalVotes}; use futures::prelude::*; - use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; + use futures_timer::Delay; use parking_lot::Mutex; use std::collections::HashMap; + use std::pin::Pin; use std::sync::Arc; - use std::time::{Instant, Duration}; - use tokio::timer::Delay; + use std::task::{Context, Poll}; + use std::time::Duration; #[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] pub struct Id(pub u32); @@ -187,26 +189,25 @@ pub mod environment { } impl crate::voter::Environment<&'static str, u32> for Environment { - type Timer = Box + Send + 'static>; + type Timer = Pin> + Send + 'static>>; type Id = Id; type Signature = Signature; - type In = Box,Error=Error> + Send + 'static>; - type Out = Box,SinkError=Error> + Send + 'static>; + type In = Pin,Error>> + Send + 'static>>; + type Out = Pin,Error=Error> + Send + 'static>>; type Error = Error; fn round_data(&self, round: u64) -> RoundData { const GOSSIP_DURATION: Duration = Duration::from_millis(500); - let now = Instant::now(); let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { voter_id: Some(self.local_id), - prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION) + prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), - precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION) + precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), - incoming: Box::new(incoming), - outgoing: Box::new(outgoing), + incoming: Box::pin(incoming), + outgoing: Box::pin(outgoing), } } @@ -218,8 +219,7 @@ pub mod environment { let delay = Duration::from_millis( rand::thread_rng().gen_range(0, COMMIT_DELAY_MILLIS)); - let now = Instant::now(); - Box::new(Delay::new(now + delay).map_err(|_| panic!("Timer failed"))) + Box::pin(Delay::new(delay).map_err(|_| panic!("Timer failed"))) } fn completed( @@ -297,13 +297,13 @@ pub mod environment { // add a node to the network for a round. fn add_node M>(&mut self, f: F) -> ( - impl Stream, - impl Sink + impl Stream>, + impl Sink ) { let (tx, rx) = mpsc::unbounded(); let messages_out = self.raw_sender.clone() .sink_map_err(|e| panic!("Error sending messages: {:?}", e)) - .with(move |message| Ok(f(message))); + .with(move |message| future::ready(Ok(f(message)))); // get history to the node. for prior_message in self.history.iter().cloned() { @@ -311,18 +311,17 @@ pub mod environment { } self.senders.push(tx); - let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e)); - (rx, messages_out) + (rx.map(Ok), messages_out) } // do routing work - fn route(&mut self) -> Poll<(), ()> { + fn route(&mut self, cx: &mut Context) -> Poll<()> { loop { - match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some(item)) => { + match Stream::poll_next(Pin::new(&mut self.receiver), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(item)) => { self.history.push(item.clone()); for sender in &self.senders { let _ = sender.unbounded_send(item.clone()); @@ -357,8 +356,8 @@ pub mod environment { impl Network { pub fn make_round_comms(&self, round_number: u64, node_id: Id) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error> + impl Stream,Error>>, + impl Sink,Error=Error> ) { let mut rounds = self.rounds.lock(); rounds.entry(round_number) @@ -371,8 +370,8 @@ pub mod environment { } pub fn make_global_comms(&self) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error> + impl Stream,Error>>, + impl Sink,Error=Error> ) { let mut global_messages = self.global_messages.lock(); global_messages.add_node(|message| match message { @@ -393,20 +392,22 @@ pub mod environment { } impl Future for NetworkRouting { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { let mut rounds = self.rounds.lock(); - rounds.retain(|_, round| match round.route() { - Ok(Async::Ready(())) | Err(()) => false, - Ok(Async::NotReady) => true, + rounds.retain(|_, round| match round.route(cx) { + Poll::Ready(()) => false, + Poll::Pending => true, }); let mut global_messages = self.global_messages.lock(); - let _ = global_messages.route(); + let _ = global_messages.route(cx); - Ok(Async::NotReady) + Poll::Pending } } + + impl Unpin for NetworkRouting { + } } diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 91a198ff..f59f0b66 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -25,14 +25,16 @@ //! votes will not be pushed to the sink. The protocol state machine still //! transitions state as if the votes had been pushed out. -use futures::prelude::*; -use futures::sync::mpsc::{self, UnboundedReceiver}; +use futures::{prelude::*, ready}; +use futures::channel::mpsc::{self, UnboundedReceiver}; #[cfg(feature = "std")] use log::trace; use std::collections::VecDeque; use std::hash::Hash; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::round::State as RoundState; use crate::{ @@ -51,11 +53,11 @@ mod voting_round; /// /// This encapsulates the database and networking layers of the chain. pub trait Environment: Chain { - type Timer: Future; + type Timer: Future> + Unpin; type Id: Hash + Clone + Eq + ::std::fmt::Debug; type Signature: Eq + Clone; - type In: Stream, Error=Self::Error>; - type Out: Sink, SinkError=Self::Error>; + type In: Stream, Self::Error>> + Unpin; + type Out: Sink, Error=Self::Error> + Unpin; type Error: From + ::std::error::Error; /// Produce data necessary to start a round of voting. @@ -302,13 +304,13 @@ pub struct RoundData { pub outgoing: Output, } -struct Buffered { +struct Buffered { inner: S, - buffer: VecDeque, + buffer: VecDeque, } -impl Buffered { - fn new(inner: S) -> Buffered { +impl + Unpin, I> Buffered { + fn new(inner: S) -> Buffered { Buffered { buffer: VecDeque::new(), inner @@ -317,40 +319,33 @@ impl Buffered { // push an item into the buffered sink. // the sink _must_ be driven to completion with `poll` afterwards. - fn push(&mut self, item: S::SinkItem) { + fn push(&mut self, item: I) { self.buffer.push_back(item); } // returns ready when the sink and the buffer are completely flushed. - fn poll(&mut self) -> Poll<(), S::SinkError> { - let polled = self.schedule_all()?; + fn poll(&mut self, cx: &mut Context) -> Poll> { + let polled = self.schedule_all(cx)?; match polled { - Async::Ready(()) => self.inner.poll_complete(), - Async::NotReady => { - self.inner.poll_complete()?; - Ok(Async::NotReady) + Poll::Ready(()) => Sink::poll_flush(Pin::new(&mut self.inner), cx), + Poll::Pending => { + ready!(Sink::poll_flush(Pin::new(&mut self.inner), cx))?; + Poll::Pending } } } - fn schedule_all(&mut self) -> Poll<(), S::SinkError> { - while let Some(front) = self.buffer.pop_front() { - match self.inner.start_send(front) { - Ok(AsyncSink::Ready) => continue, - Ok(AsyncSink::NotReady(front)) => { - self.buffer.push_front(front); - break; - } - Err(e) => return Err(e), - } - } + fn schedule_all(&mut self, cx: &mut Context) -> Poll> { + while !self.buffer.is_empty() { + ready!(Sink::poll_ready(Pin::new(&mut self.inner), cx))?; - if self.buffer.is_empty() { - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) + let item = self.buffer.pop_front() + .expect("we checked self.buffer.is_empty() just above; qed"); + Sink::start_send(Pin::new(&mut self.inner), item)?; } + + Poll::Ready(Ok(())) } } @@ -382,8 +377,8 @@ type FinalizedNotification = ( pub struct Voter, GlobalIn, GlobalOut> where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, Error=E::Error>, - GlobalOut: Sink, SinkError=E::Error>, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, { env: Arc, voters: VoterSet, @@ -392,7 +387,7 @@ pub struct Voter, GlobalIn, GlobalOut> where finalized_notifications: UnboundedReceiver>, last_finalized_number: N, global_in: GlobalIn, - global_out: Buffered, + global_out: Buffered>, // the commit protocol might finalize further than the current round (if we're // behind), we keep track of last finalized in round so we don't violate any // assumptions from round-to-round. @@ -402,8 +397,8 @@ pub struct Voter, GlobalIn, GlobalOut> where impl, GlobalIn, GlobalOut> Voter where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, Error=E::Error>, - GlobalOut: Sink, SinkError=E::Error>, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, { /// Create new `Voter` tracker with given round number and base block. /// @@ -454,15 +449,14 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { + fn prune_background_rounds(&mut self, cx: &mut Context) -> Result<(), E::Error> { // Do work on all background rounds, broadcasting any commits generated. - while let Async::Ready(Some((number, commit))) = self.past_rounds.poll()? { + while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.past_rounds), cx) { + let (number, commit) = item?; self.global_out.push(CommunicationOut::Commit(number, commit)); } - while let Async::Ready(res) = self.finalized_notifications.poll() - .expect("unbounded receivers do not have spurious errors; qed") - { + while let Poll::Ready(res) = Stream::poll_next(Pin::new(&mut self.finalized_notifications), cx) { let (f_hash, f_num, round, commit) = res.expect("one sender always kept alive in self.best_round; qed"); @@ -489,9 +483,9 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { - while let Async::Ready(Some(item)) = self.global_in.poll()? { - match item { + fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> { + while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.global_in), cx) { + match item? { CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => { trace!(target: "afg", "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}", round_number, @@ -595,13 +589,13 @@ impl, GlobalIn, GlobalOut> Voter Poll<(), E::Error> { + fn process_best_round(&mut self, cx: &mut Context) -> Poll> { // If the current `best_round` is completable and we've already precommitted, // we start a new round at `best_round + 1`. let should_start_next = { - let completable = match self.best_round.poll()? { - Async::Ready(()) => true, - Async::NotReady => false, + let completable = match self.best_round.poll(cx)? { + Poll::Ready(()) => true, + Poll::Pending => false, }; let precommitted = match self.best_round.state() { @@ -612,7 +606,7 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter Result<(), E::Error> { @@ -662,21 +656,28 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Future for Voter where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, Error=E::Error>, - GlobalOut: Sink, SinkError=E::Error>, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, { - type Item = (); - type Error = E::Error; + type Output = Result<(), E::Error>; - fn poll(&mut self) -> Poll<(), E::Error> { - self.process_incoming()?; - self.prune_background_rounds()?; - self.global_out.poll()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.process_incoming(cx)?; + self.prune_background_rounds(cx)?; + self.global_out.poll(cx)?; - self.process_best_round() + self.process_best_round(cx) } } +impl, GlobalIn, GlobalOut> Unpin for Voter where + H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, + N: Copy + BlockNumberOps + ::std::fmt::Debug, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, +{ +} + /// Validate the given catch up and return a completed round with all prevotes /// and precommits from the catch up imported. If the catch up is invalid `None` /// is returned instead. @@ -809,9 +810,8 @@ mod tests { chain::GENESIS_HASH, environment::{Environment, Id, Signature}, }; + use futures_timer::TryFutureExt as _; use std::time::Duration; - use tokio::prelude::FutureExt; - use tokio::runtime::current_thread; #[test] fn talking_to_myself() { @@ -819,11 +819,11 @@ mod tests { let voters = std::iter::once((local_id, 100)).collect(); let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -842,17 +842,15 @@ mod tests { last_round_state, last_finalized, ); - ::tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - ::tokio::spawn(exit.until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task); // wait for the best block to finalize. finalized - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - .map(|_| signal.fire()) - })).unwrap(); + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(())) + }).flatten()); } #[test] @@ -861,13 +859,11 @@ mod tests { let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); - - current_thread::block_on_all(::futures::future::lazy(move || { - ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + futures::executor::block_on(::futures::future::lazy(move |_| { // 3 voters offline. - let finalized_streams = (0..7).map(move |i| { + let finalized_streams = (0..7).map(|i| { let local_id = Id(i); // initialize chain let env = Arc::new(Environment::new(network.clone(), local_id)); @@ -888,17 +884,17 @@ mod tests { last_round_state, last_finalized, ); - ::tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); // wait for the best block to be finalized by all honest voters finalized - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - }); + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(())) + }).collect::>(); - ::futures::future::join_all(finalized_streams).map(|_| signal.fire()) - })).unwrap(); + threads_pool.spawn_ok(routing_task.map(|_| ())); + ::futures::future::join_all(finalized_streams.into_iter()) + }).flatten()); } #[test] @@ -909,11 +905,11 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (commits, _) = network.make_global_comms(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -931,14 +927,13 @@ mod tests { last_round_state, last_finalized, ); - ::tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - ::tokio::spawn(exit.until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task); // wait for the node to broadcast a commit message - commits.take(1).for_each(|_| Ok(())).map(|_| signal.fire()) - })).unwrap(); + commits.take(1).for_each(|_| future::ready(())) + }).flatten()); } #[test] @@ -974,11 +969,11 @@ mod tests { }], }); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -996,46 +991,45 @@ mod tests { last_round_state, last_finalized, ); - ::tokio::spawn(exit.clone() - .until(voter.map_err(|e| panic!("Error voting: {:?}", e))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting: {:?}"))); - ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task.map(|_| ())); - ::tokio::spawn(exit.until(::futures::future::lazy(|| { - round_stream.into_future().map_err(|(e, _)| e) - .and_then(|(value, stream)| { // wait for a prevote + threads_pool.spawn_ok(::futures::future::lazy(move |_| { + round_stream.into_future() + .then(|(value, stream)| { // wait for a prevote assert!(match value { - Some(SignedMessage { message: Message::Prevote(_), id: Id(5), .. }) => true, + Some(Ok(SignedMessage { message: Message::Prevote(_), id: Id(5), .. })) => true, _ => false, }); let votes = vec![prevote, precommit].into_iter().map(Result::Ok); - round_sink.send_all(futures::stream::iter_result(votes)).map(|_| stream) // send our prevote + futures::stream::iter(votes).forward(round_sink).map(|_| stream) // send our prevote }) - .and_then(|stream| { + .then(|stream| { stream.take_while(|value| match value { // wait for a precommit - SignedMessage { message: Message::Precommit(_), id: Id(5), .. } => Ok(false), - _ => Ok(true), - }).for_each(|_| Ok(())) + Ok(SignedMessage { message: Message::Precommit(_), id: Id(5), .. }) => future::ready(false), + _ => future::ready(true), + }).for_each(|_| future::ready(())) }) - .and_then(|_| { + .then(|_| { // send our commit - commits_sink.send(CommunicationOut::Commit(commit.0, commit.1)) + stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) }) .map_err(|_| ()) - })).map(|_| ())); + }).flatten().map(|_| ())); // wait for the first commit (ours) - commits_stream.into_future().map_err(|_| ()) - .and_then(|(_, stream)| { - stream.take(1).for_each(|_| Ok(())) // the second commit should never arrive + commits_stream.into_future() + .then(|(_, stream)| { + stream.take(1).for_each(|_| future::ready(())) // the second commit should never arrive + .map(|()| Ok::<(), std::io::Error>(())) .timeout(Duration::from_millis(500)).map_err(|_| ()) }) .then(|res| { assert!(res.is_err()); // so the previous future times out - signal.fire(); futures::future::ok::<(), ()>(()) }) - })).unwrap(); + }).flatten()).unwrap(); } #[test] @@ -1050,7 +1044,7 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (_, commits_sink) = network.make_global_comms(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); // this is a commit for a previous round let commit = (0, Commit { @@ -1065,7 +1059,7 @@ mod tests { let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -1083,20 +1077,18 @@ mod tests { last_round_state, last_finalized, ); - ::tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - ::tokio::spawn(exit.until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task.map(|_| ())); - ::tokio::spawn(commits_sink.send(CommunicationOut::Commit(commit.0, commit.1)) + threads_pool.spawn_ok(stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) .map_err(|_| ()).map(|_| ())); // wait for the commit message to be processed which finalized block 6 env.finalized_stream() - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - .map(|_| signal.fire()) - })).unwrap(); + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(())) + }).flatten()); } #[test] @@ -1105,10 +1097,10 @@ mod tests { let voters: VoterSet<_> = (0..3).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); - current_thread::block_on_all(::futures::future::lazy(move || { - ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + futures::executor::block_on(::futures::future::lazy(move |_| { + threads_pool.spawn_ok(routing_task.map(|_| ())); // initialize unsynced voter at round 0 let mut unsynced_voter = { @@ -1158,14 +1150,15 @@ mod tests { // poll until it's caught up. // should skip to round 6 - ::futures::future::poll_fn(move || -> Poll<(), ()> { - let poll = unsynced_voter.poll().map_err(|_| ())?; + ::futures::future::poll_fn(move |cx| -> Poll> { + let poll = Future::poll(Pin::new(&mut unsynced_voter), cx); if unsynced_voter.best_round.round_number() == 6 { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } else { - Ok(poll) + futures::ready!(poll).map_err(|_| ())?; + Poll::Ready(Ok(())) } - }).map(move |_| signal.fire()) - })).unwrap(); + }) + }).flatten()).unwrap(); } } diff --git a/src/voter/past_rounds.rs b/src/voter/past_rounds.rs index 4362415c..24d95857 100644 --- a/src/voter/past_rounds.rs +++ b/src/voter/past_rounds.rs @@ -22,17 +22,19 @@ //! - Passing it any validated commits (so backgrounded rounds don't produce conflicting ones) #[cfg(feature = "std")] -use futures::try_ready; +use futures::ready; use futures::prelude::*; use futures::stream::{self, futures_unordered::FuturesUnordered}; use futures::task; -use futures::sync::mpsc; +use futures::channel::mpsc; #[cfg(feature = "std")] use log::{trace, debug}; use std::cmp; use std::collections::HashMap; use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; use crate::{Commit, BlockNumberOps}; use super::Environment; @@ -47,7 +49,7 @@ struct BackgroundRound> where N: Copy + BlockNumberOps + ::std::fmt::Debug, { inner: VotingRound, - task: Option, + waker: Option, finalized_number: N, round_committer: Option>, } @@ -76,8 +78,8 @@ impl> BackgroundRound where // wake up the future to be polled if done. if self.is_done() { - if let Some(ref task) = self.task { - task.notify(); + if let Some(ref waker) = self.waker { + waker.wake_by_ref(); } } } @@ -98,35 +100,40 @@ impl> Future for BackgroundRound where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, { - type Item = BackgroundRoundChange; - type Error = E::Error; + type Output = Result, E::Error>; - fn poll(&mut self) -> Poll { - self.task = Some(::futures::task::current()); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.waker = Some(cx.waker().clone()); - self.inner.poll()?; + self.inner.poll(cx)?; self.round_committer = match self.round_committer.take() { None => None, - Some(mut committer) => match committer.commit(&mut self.inner)? { - Async::Ready(None) => None, - Async::Ready(Some(commit)) => return Ok(Async::Ready( + Some(mut committer) => match committer.commit(cx, &mut self.inner)? { + Poll::Ready(None) => None, + Poll::Ready(Some(commit)) => return Poll::Ready(Ok( BackgroundRoundChange::Committed(commit) )), - Async::NotReady => Some(committer), + Poll::Pending => Some(committer), } }; if self.is_done() { // if this is fully done (has committed _and_ estimate finalized) // we bail for real. - Ok(Async::Ready(BackgroundRoundChange::Irrelevant(self.round_number()))) + Poll::Ready(Ok(BackgroundRoundChange::Irrelevant(self.round_number()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } +impl> Unpin for BackgroundRound where + H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, + N: Copy + BlockNumberOps + ::std::fmt::Debug, +{ +} + struct RoundCommitter> where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, @@ -170,26 +177,26 @@ impl> RoundCommitter where Ok(true) } - fn commit(&mut self, voting_round: &mut VotingRound) - -> Poll>, E::Error> + fn commit(&mut self, cx: &mut Context, voting_round: &mut VotingRound) + -> Poll>, E::Error>> { - while let Ok(Async::Ready(Some(commit))) = self.import_commits.poll() { + while let Poll::Ready(Some(commit)) = Stream::poll_next(Pin::new(&mut self.import_commits), cx) { if !self.import_commit(voting_round, commit)? { trace!(target: "afg", "Ignoring invalid commit"); } } - try_ready!(self.commit_timer.poll()); + ready!(Future::poll(Pin::new(&mut self.commit_timer), cx))?; match (self.last_commit.take(), voting_round.finalized()) { (None, Some(_)) => { - Ok(Async::Ready(voting_round.finalizing_commit().cloned())) + Poll::Ready(Ok(voting_round.finalizing_commit().cloned())) }, (Some(Commit { target_number, .. }), Some((_, finalized_number))) if target_number < *finalized_number => { - Ok(Async::Ready(voting_round.finalizing_commit().cloned())) + Poll::Ready(Ok(voting_round.finalizing_commit().cloned())) }, _ => { - Ok(Async::Ready(None)) + Poll::Ready(Ok(None)) }, } } @@ -213,24 +220,26 @@ impl SelfReturningFuture { } } -impl Future for SelfReturningFuture { - type Item = (F::Item, F); - type Error = F::Error; +impl Future for SelfReturningFuture { + type Output = (F::Output, F); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.inner.take() { None => panic!("poll after return is not done in this module; qed"), - Some(mut f) => match f.poll()? { - Async::Ready(item) => Ok(Async::Ready((item, f))), - Async::NotReady => { + Some(mut f) => match Future::poll(Pin::new(&mut f), cx) { + Poll::Ready(item) => Poll::Ready((item, f)), + Poll::Pending => { self.inner = Some(f); - Ok(Async::NotReady) + Poll::Pending } } } } } +impl Unpin for SelfReturningFuture { +} + /// A stream for past rounds, which produces any commit messages from those /// rounds and drives them to completion. pub(super) struct PastRounds> where @@ -259,7 +268,7 @@ impl> PastRounds where let (tx, rx) = mpsc::unbounded(); let background = BackgroundRound { inner: round, - task: None, + waker: None, // https://github.com/paritytech/finality-grandpa/issues/50 finalized_number: N::zero(), round_committer: Some(RoundCommitter::new( @@ -298,16 +307,15 @@ impl> Stream for PastRounds where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, { - type Item = (u64, Commit); - type Error = E::Error; + type Item = Result<(u64, Commit), E::Error>; - fn poll(&mut self) -> Poll, E::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - match self.past_rounds.poll()? { - Async::Ready(Some((BackgroundRoundChange::Irrelevant(number), _))) => { + match Stream::poll_next(Pin::new(&mut self.past_rounds), cx) { + Poll::Ready(Some((Ok(BackgroundRoundChange::Irrelevant(number)), _))) => { self.commit_senders.remove(&number); } - Async::Ready(Some((BackgroundRoundChange::Committed(commit), round))) => { + Poll::Ready(Some((Ok(BackgroundRoundChange::Committed(commit)), round))) => { let number = round.round_number(); // reschedule until irrelevant. @@ -321,11 +329,18 @@ impl> Stream for PastRounds where commit.target_hash, ); - return Ok(Async::Ready(Some((number, commit)))); + return Poll::Ready(Some(Ok((number, commit)))); } - Async::Ready(None) => return Ok(Async::Ready(None)), - Async::NotReady => return Ok(Async::NotReady), + Poll::Ready(Some((Err(err), _))) => return Poll::Ready(Some(Err(err))), + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, } } } } + +impl> Unpin for PastRounds where + H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, + N: Copy + BlockNumberOps + ::std::fmt::Debug, +{ +} diff --git a/src/voter/voting_round.rs b/src/voter/voting_round.rs index ee55dfa9..daaf7db0 100644 --- a/src/voter/voting_round.rs +++ b/src/voter/voting_round.rs @@ -15,14 +15,16 @@ //! Logic for voting and handling messages within a single round. #[cfg(feature = "std")] -use futures::try_ready; +use futures::ready; use futures::prelude::*; -use futures::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; #[cfg(feature = "std")] use log::{trace, warn, debug}; use std::hash::Hash; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::round::{Round, State as RoundState}; use crate::{ @@ -61,7 +63,7 @@ pub(super) struct VotingRound> where voting: Voting, votes: Round, incoming: E::In, - outgoing: Buffered, + outgoing: Buffered>, state: Option>, // state machine driving votes. bridged_round_state: Option>, // updates to later round last_round_state: Option>, // updates from prior round @@ -174,25 +176,25 @@ impl> VotingRound where } } - /// Poll the round. When the round is completable and messages have been flushed, it will return `Async::Ready` but + /// Poll the round. When the round is completable and messages have been flushed, it will return `Poll::Ready` but /// can continue to be polled. - pub(super) fn poll(&mut self) -> Poll<(), E::Error> { + pub(super) fn poll(&mut self, cx: &mut Context) -> Poll> { trace!(target: "afg", "Polling round {}, state = {:?}, step = {:?}", self.votes.number(), self.votes.state(), self.state); let pre_state = self.votes.state(); - self.process_incoming()?; + self.process_incoming(cx)?; // we only cast votes when we have access to the previous round state. // we might have started this round as a prospect "future" round to // check whether the voter is lagging behind the current round. - let last_round_state = self.last_round_state.as_ref().map(|s| s.get().clone()); + let last_round_state = self.last_round_state.as_ref().map(|s| s.get(cx).clone()); if let Some(ref last_round_state) = last_round_state { self.primary_propose(last_round_state)?; - self.prevote(last_round_state)?; - self.precommit(last_round_state)?; + self.prevote(cx, last_round_state)?; + self.precommit(cx, last_round_state)?; } - try_ready!(self.outgoing.poll()); - self.process_incoming()?; // in case we got a new message signed locally. + ready!(self.outgoing.poll(cx))?; + self.process_incoming(cx)?; // in case we got a new message signed locally. // broadcast finality notifications after attempting to cast votes let post_state = self.votes.state(); @@ -200,7 +202,7 @@ impl> VotingRound where // early exit if the current round is not completable if !self.votes.completable() { - return Ok(Async::NotReady); + return Poll::Pending; } // make sure that the previous round estimate has been finalized @@ -232,11 +234,11 @@ impl> VotingRound where // the previous round estimate must be finalized if !last_round_estimate_finalized { - return Ok(Async::NotReady); + return Poll::Pending; } // both exit conditions verified, we can complete this round - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } /// Inspect the state of this round. @@ -319,10 +321,10 @@ impl> VotingRound where self.votes.historical_votes() } - fn process_incoming(&mut self) -> Result<(), E::Error> { - while let Async::Ready(Some(incoming)) = self.incoming.poll()? { + fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> { + while let Poll::Ready(Some(incoming)) = Stream::poll_next(Pin::new(&mut self.incoming), cx) { trace!(target: "afg", "Got incoming message"); - let SignedMessage { message, signature, id } = incoming; + let SignedMessage { message, signature, id } = incoming?; if !self.env.is_equal_or_descendent_of(self.votes.base().0, message.target().0.clone()) { trace!(target: "afg", "Ignoring message targeting {:?} lower than round base {:?}", message.target(), @@ -398,14 +400,14 @@ impl> VotingRound where Ok(()) } - fn prevote(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { + fn prevote(&mut self, cx: &mut Context, last_round_state: &RoundState) -> Result<(), E::Error> { let state = self.state.take(); let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| { - let should_prevote = match prevote_timer.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(())) => true, - Ok(Async::NotReady) => self.votes.completable(), + let should_prevote = match Future::poll(Pin::new(&mut prevote_timer), cx) { + Poll::Ready(Err(e)) => return Err(e), + Poll::Ready(Ok(())) => true, + Poll::Pending => self.votes.completable(), }; if should_prevote { @@ -440,7 +442,7 @@ impl> VotingRound where Ok(()) } - fn precommit(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { + fn precommit(&mut self, cx: &mut Context, last_round_state: &RoundState) -> Result<(), E::Error> { match self.state.take() { Some(State::Prevoted(mut precommit_timer)) => { let last_round_estimate = last_round_state.estimate.clone() @@ -453,10 +455,10 @@ impl> VotingRound where p_g == &last_round_estimate || self.env.is_equal_or_descendent_of(last_round_estimate.0, p_g.0.clone()) }) - } && match precommit_timer.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(())) => true, - Ok(Async::NotReady) => self.votes.completable(), + } && match Future::poll(Pin::new(&mut precommit_timer), cx) { + Poll::Ready(Err(e)) => return Err(e), + Poll::Ready(Ok(())) => true, + Poll::Pending => self.votes.completable(), }; if should_precommit {