diff --git a/Cargo.toml b/Cargo.toml index 6936ff06..9312bdea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,7 @@ repository = "https://github.com/paritytech/finality-grandpa" edition = "2018" [dependencies] -futures-preview = "0.3.0-alpha.17" -futures-timer = "0.3.0" +futures = "0.1" log = "0.4" parking_lot = { version = "0.9", optional = true } parity-scale-codec = { version = "1.0.3", optional = true, default-features = false, features = ["derive"] } @@ -17,7 +16,9 @@ num = { package = "num-traits", version = "0.2", default-features = false } hashbrown = { version = "0.6" } [dev-dependencies] +exit-future = "0.1.2" rand = "0.6.0" +tokio = "0.1.8" [features] default = ["std"] diff --git a/src/bridge_state.rs b/src/bridge_state.rs index 3388e784..fd9bab3a 100644 --- a/src/bridge_state.rs +++ b/src/bridge_state.rs @@ -18,19 +18,18 @@ use crate::round::State as RoundState; use futures::task; use parking_lot::{RwLock, RwLockReadGuard}; use std::sync::Arc; -use std::task::Context; // round state bridged across rounds. struct Bridged { inner: RwLock>, - waker: task::AtomicWaker, + task: task::AtomicTask, } impl Bridged { fn new(inner: RwLock>) -> Self { Bridged { inner, - waker: task::AtomicWaker::new(), + task: task::AtomicTask::new(), } } } @@ -42,7 +41,7 @@ impl PriorView { /// Push an update to the latter view. pub(crate) fn update(&self, new: RoundState) { *self.0.inner.write() = new; - self.0.waker.wake(); + self.0.task.notify(); } } @@ -51,8 +50,8 @@ pub(crate) struct LatterView(Arc>); impl LatterView { /// Fetch a handle to the last round-state. - pub(crate) fn get(&self, cx: &mut Context) -> RwLockReadGuard> { - self.0.waker.register(cx.waker()); + pub(crate) fn get(&self) -> RwLockReadGuard> { + self.0.task.register(); self.0.inner.read() } } @@ -74,7 +73,7 @@ pub(crate) fn bridge_state(initial: RoundState) -> (PriorView, #[cfg(test)] mod tests { use futures::prelude::*; - use std::{sync::Barrier, task::Poll}; + use std::sync::Barrier; use super::*; #[test] @@ -87,11 +86,11 @@ mod tests { }; let (prior, latter) = bridge_state(initial); - let waits_for_finality = ::futures::future::poll_fn(move |cx| -> Poll<()> { - if latter.get(cx).finalized.is_some() { - Poll::Ready(()) + let waits_for_finality = ::futures::future::poll_fn(move || -> Poll<(), ()> { + if latter.get().finalized.is_some() { + Ok(Async::Ready(())) } else { - Poll::Pending + Ok(Async::NotReady) } }); @@ -108,6 +107,6 @@ mod tests { }); barrier.wait(); - futures::executor::block_on(waits_for_finality); + waits_for_finality.wait().unwrap(); } } diff --git a/src/testing.rs b/src/testing.rs index 7376ef43..ad5774e5 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -133,14 +133,12 @@ pub mod environment { use crate::voter::{RoundData, CommunicationIn, CommunicationOut, Callback}; use crate::{Chain, Commit, Error, Equivocation, Message, Prevote, Precommit, PrimaryPropose, SignedMessage, HistoricalVotes}; use futures::prelude::*; - use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; - use futures_timer::Delay; + use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use parking_lot::Mutex; use std::collections::HashMap; - use std::pin::Pin; use std::sync::Arc; - use std::task::{Context, Poll}; - use std::time::Duration; + use std::time::{Instant, Duration}; + use tokio::timer::Delay; #[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] pub struct Id(pub u32); @@ -189,25 +187,26 @@ pub mod environment { } impl crate::voter::Environment<&'static str, u32> for Environment { - type Timer = Pin> + Send + 'static>>; + type Timer = Box + Send + 'static>; type Id = Id; type Signature = Signature; - type In = Pin,Error>> + Send + 'static>>; - type Out = Pin,Error=Error> + Send + 'static>>; + type In = Box,Error=Error> + Send + 'static>; + type Out = Box,SinkError=Error> + Send + 'static>; type Error = Error; fn round_data(&self, round: u64) -> RoundData { const GOSSIP_DURATION: Duration = Duration::from_millis(500); + let now = Instant::now(); let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { voter_id: Some(self.local_id), - prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION) + prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), - precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION) + precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), - incoming: Box::pin(incoming), - outgoing: Box::pin(outgoing), + incoming: Box::new(incoming), + outgoing: Box::new(outgoing), } } @@ -219,7 +218,8 @@ pub mod environment { let delay = Duration::from_millis( rand::thread_rng().gen_range(0, COMMIT_DELAY_MILLIS)); - Box::pin(Delay::new(delay).map_err(|_| panic!("Timer failed"))) + let now = Instant::now(); + Box::new(Delay::new(now + delay).map_err(|_| panic!("Timer failed"))) } fn completed( @@ -297,13 +297,13 @@ pub mod environment { // add a node to the network for a round. fn add_node M>(&mut self, f: F) -> ( - impl Stream>, - impl Sink + impl Stream, + impl Sink ) { let (tx, rx) = mpsc::unbounded(); let messages_out = self.raw_sender.clone() .sink_map_err(|e| panic!("Error sending messages: {:?}", e)) - .with(move |message| future::ready(Ok(f(message)))); + .with(move |message| Ok(f(message))); // get history to the node. for prior_message in self.history.iter().cloned() { @@ -311,17 +311,18 @@ pub mod environment { } self.senders.push(tx); + let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e)); - (rx.map(Ok), messages_out) + (rx, messages_out) } // do routing work - fn route(&mut self, cx: &mut Context) -> Poll<()> { + fn route(&mut self) -> Poll<(), ()> { loop { - match Stream::poll_next(Pin::new(&mut self.receiver), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(item)) => { + match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some(item)) => { self.history.push(item.clone()); for sender in &self.senders { let _ = sender.unbounded_send(item.clone()); @@ -356,8 +357,8 @@ pub mod environment { impl Network { pub fn make_round_comms(&self, round_number: u64, node_id: Id) -> ( - impl Stream,Error>>, - impl Sink,Error=Error> + impl Stream,Error=Error>, + impl Sink,SinkError=Error> ) { let mut rounds = self.rounds.lock(); rounds.entry(round_number) @@ -370,8 +371,8 @@ pub mod environment { } pub fn make_global_comms(&self) -> ( - impl Stream,Error>>, - impl Sink,Error=Error> + impl Stream,Error=Error>, + impl Sink,SinkError=Error> ) { let mut global_messages = self.global_messages.lock(); global_messages.add_node(|message| match message { @@ -392,22 +393,20 @@ pub mod environment { } impl Future for NetworkRouting { - type Output = (); + type Item = (); + type Error = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + fn poll(&mut self) -> Poll<(), ()> { let mut rounds = self.rounds.lock(); - rounds.retain(|_, round| match round.route(cx) { - Poll::Ready(()) => false, - Poll::Pending => true, + rounds.retain(|_, round| match round.route() { + Ok(Async::Ready(())) | Err(()) => false, + Ok(Async::NotReady) => true, }); let mut global_messages = self.global_messages.lock(); - let _ = global_messages.route(cx); + let _ = global_messages.route(); - Poll::Pending + Ok(Async::NotReady) } } - - impl Unpin for NetworkRouting { - } } diff --git a/src/voter/mod.rs b/src/voter/mod.rs index f59f0b66..91a198ff 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -25,16 +25,14 @@ //! votes will not be pushed to the sink. The protocol state machine still //! transitions state as if the votes had been pushed out. -use futures::{prelude::*, ready}; -use futures::channel::mpsc::{self, UnboundedReceiver}; +use futures::prelude::*; +use futures::sync::mpsc::{self, UnboundedReceiver}; #[cfg(feature = "std")] use log::trace; use std::collections::VecDeque; use std::hash::Hash; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use crate::round::State as RoundState; use crate::{ @@ -53,11 +51,11 @@ mod voting_round; /// /// This encapsulates the database and networking layers of the chain. pub trait Environment: Chain { - type Timer: Future> + Unpin; + type Timer: Future; type Id: Hash + Clone + Eq + ::std::fmt::Debug; type Signature: Eq + Clone; - type In: Stream, Self::Error>> + Unpin; - type Out: Sink, Error=Self::Error> + Unpin; + type In: Stream, Error=Self::Error>; + type Out: Sink, SinkError=Self::Error>; type Error: From + ::std::error::Error; /// Produce data necessary to start a round of voting. @@ -304,13 +302,13 @@ pub struct RoundData { pub outgoing: Output, } -struct Buffered { +struct Buffered { inner: S, - buffer: VecDeque, + buffer: VecDeque, } -impl + Unpin, I> Buffered { - fn new(inner: S) -> Buffered { +impl Buffered { + fn new(inner: S) -> Buffered { Buffered { buffer: VecDeque::new(), inner @@ -319,33 +317,40 @@ impl + Unpin, I> Buffered { // push an item into the buffered sink. // the sink _must_ be driven to completion with `poll` afterwards. - fn push(&mut self, item: I) { + fn push(&mut self, item: S::SinkItem) { self.buffer.push_back(item); } // returns ready when the sink and the buffer are completely flushed. - fn poll(&mut self, cx: &mut Context) -> Poll> { - let polled = self.schedule_all(cx)?; + fn poll(&mut self) -> Poll<(), S::SinkError> { + let polled = self.schedule_all()?; match polled { - Poll::Ready(()) => Sink::poll_flush(Pin::new(&mut self.inner), cx), - Poll::Pending => { - ready!(Sink::poll_flush(Pin::new(&mut self.inner), cx))?; - Poll::Pending + Async::Ready(()) => self.inner.poll_complete(), + Async::NotReady => { + self.inner.poll_complete()?; + Ok(Async::NotReady) } } } - fn schedule_all(&mut self, cx: &mut Context) -> Poll> { - while !self.buffer.is_empty() { - ready!(Sink::poll_ready(Pin::new(&mut self.inner), cx))?; - - let item = self.buffer.pop_front() - .expect("we checked self.buffer.is_empty() just above; qed"); - Sink::start_send(Pin::new(&mut self.inner), item)?; + fn schedule_all(&mut self) -> Poll<(), S::SinkError> { + while let Some(front) = self.buffer.pop_front() { + match self.inner.start_send(front) { + Ok(AsyncSink::Ready) => continue, + Ok(AsyncSink::NotReady(front)) => { + self.buffer.push_front(front); + break; + } + Err(e) => return Err(e), + } } - Poll::Ready(Ok(())) + if self.buffer.is_empty() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } } } @@ -377,8 +382,8 @@ type FinalizedNotification = ( pub struct Voter, GlobalIn, GlobalOut> where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, E::Error>> + Unpin, - GlobalOut: Sink, Error=E::Error> + Unpin, + GlobalIn: Stream, Error=E::Error>, + GlobalOut: Sink, SinkError=E::Error>, { env: Arc, voters: VoterSet, @@ -387,7 +392,7 @@ pub struct Voter, GlobalIn, GlobalOut> where finalized_notifications: UnboundedReceiver>, last_finalized_number: N, global_in: GlobalIn, - global_out: Buffered>, + global_out: Buffered, // the commit protocol might finalize further than the current round (if we're // behind), we keep track of last finalized in round so we don't violate any // assumptions from round-to-round. @@ -397,8 +402,8 @@ pub struct Voter, GlobalIn, GlobalOut> where impl, GlobalIn, GlobalOut> Voter where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, E::Error>> + Unpin, - GlobalOut: Sink, Error=E::Error> + Unpin, + GlobalIn: Stream, Error=E::Error>, + GlobalOut: Sink, SinkError=E::Error>, { /// Create new `Voter` tracker with given round number and base block. /// @@ -449,14 +454,15 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { + fn prune_background_rounds(&mut self) -> Result<(), E::Error> { // Do work on all background rounds, broadcasting any commits generated. - while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.past_rounds), cx) { - let (number, commit) = item?; + while let Async::Ready(Some((number, commit))) = self.past_rounds.poll()? { self.global_out.push(CommunicationOut::Commit(number, commit)); } - while let Poll::Ready(res) = Stream::poll_next(Pin::new(&mut self.finalized_notifications), cx) { + while let Async::Ready(res) = self.finalized_notifications.poll() + .expect("unbounded receivers do not have spurious errors; qed") + { let (f_hash, f_num, round, commit) = res.expect("one sender always kept alive in self.best_round; qed"); @@ -483,9 +489,9 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { - while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.global_in), cx) { - match item? { + fn process_incoming(&mut self) -> Result<(), E::Error> { + while let Async::Ready(Some(item)) = self.global_in.poll()? { + match item { CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => { trace!(target: "afg", "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}", round_number, @@ -589,13 +595,13 @@ impl, GlobalIn, GlobalOut> Voter Poll> { + fn process_best_round(&mut self) -> Poll<(), E::Error> { // If the current `best_round` is completable and we've already precommitted, // we start a new round at `best_round + 1`. let should_start_next = { - let completable = match self.best_round.poll(cx)? { - Poll::Ready(()) => true, - Poll::Pending => false, + let completable = match self.best_round.poll()? { + Async::Ready(()) => true, + Async::NotReady => false, }; let precommitted = match self.best_round.state() { @@ -606,7 +612,7 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter Result<(), E::Error> { @@ -656,28 +662,21 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Future for Voter where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, E::Error>> + Unpin, - GlobalOut: Sink, Error=E::Error> + Unpin, + GlobalIn: Stream, Error=E::Error>, + GlobalOut: Sink, SinkError=E::Error>, { - type Output = Result<(), E::Error>; + type Item = (); + type Error = E::Error; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.process_incoming(cx)?; - self.prune_background_rounds(cx)?; - self.global_out.poll(cx)?; + fn poll(&mut self) -> Poll<(), E::Error> { + self.process_incoming()?; + self.prune_background_rounds()?; + self.global_out.poll()?; - self.process_best_round(cx) + self.process_best_round() } } -impl, GlobalIn, GlobalOut> Unpin for Voter where - H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, - N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, E::Error>> + Unpin, - GlobalOut: Sink, Error=E::Error> + Unpin, -{ -} - /// Validate the given catch up and return a completed round with all prevotes /// and precommits from the catch up imported. If the catch up is invalid `None` /// is returned instead. @@ -810,8 +809,9 @@ mod tests { chain::GENESIS_HASH, environment::{Environment, Id, Signature}, }; - use futures_timer::TryFutureExt as _; use std::time::Duration; + use tokio::prelude::FutureExt; + use tokio::runtime::current_thread; #[test] fn talking_to_myself() { @@ -819,11 +819,11 @@ mod tests { let voters = std::iter::once((local_id, 100)).collect(); let (network, routing_task) = testing::environment::make_network(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let (signal, exit) = ::exit_future::signal(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { + current_thread::block_on_all(::futures::future::lazy(move || { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -842,15 +842,17 @@ mod tests { last_round_state, last_finalized, ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); - threads_pool.spawn_ok(routing_task); + ::tokio::spawn(exit.until(routing_task).map(|_| ())); // wait for the best block to finalize. finalized - .take_while(|&(_, n, _)| future::ready(n < 6)) - .for_each(|_| future::ready(())) - }).flatten()); + .take_while(|&(_, n, _)| Ok(n < 6)) + .for_each(|_| Ok(())) + .map(|_| signal.fire()) + })).unwrap(); } #[test] @@ -859,11 +861,13 @@ mod tests { let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let (signal, exit) = ::exit_future::signal(); + + current_thread::block_on_all(::futures::future::lazy(move || { + ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); - futures::executor::block_on(::futures::future::lazy(move |_| { // 3 voters offline. - let finalized_streams = (0..7).map(|i| { + let finalized_streams = (0..7).map(move |i| { let local_id = Id(i); // initialize chain let env = Arc::new(Environment::new(network.clone(), local_id)); @@ -884,17 +888,17 @@ mod tests { last_round_state, last_finalized, ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); // wait for the best block to be finalized by all honest voters finalized - .take_while(|&(_, n, _)| future::ready(n < 6)) - .for_each(|_| future::ready(())) - }).collect::>(); + .take_while(|&(_, n, _)| Ok(n < 6)) + .for_each(|_| Ok(())) + }); - threads_pool.spawn_ok(routing_task.map(|_| ())); - ::futures::future::join_all(finalized_streams.into_iter()) - }).flatten()); + ::futures::future::join_all(finalized_streams).map(|_| signal.fire()) + })).unwrap(); } #[test] @@ -905,11 +909,11 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (commits, _) = network.make_global_comms(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let (signal, exit) = ::exit_future::signal(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { + current_thread::block_on_all(::futures::future::lazy(move || { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -927,13 +931,14 @@ mod tests { last_round_state, last_finalized, ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); - threads_pool.spawn_ok(routing_task); + ::tokio::spawn(exit.until(routing_task).map(|_| ())); // wait for the node to broadcast a commit message - commits.take(1).for_each(|_| future::ready(())) - }).flatten()); + commits.take(1).for_each(|_| Ok(())).map(|_| signal.fire()) + })).unwrap(); } #[test] @@ -969,11 +974,11 @@ mod tests { }], }); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let (signal, exit) = ::exit_future::signal(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { + current_thread::block_on_all(::futures::future::lazy(move || { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -991,45 +996,46 @@ mod tests { last_round_state, last_finalized, ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting: {:?}"))); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|e| panic!("Error voting: {:?}", e))).map(|_| ())); - threads_pool.spawn_ok(routing_task.map(|_| ())); + ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); - threads_pool.spawn_ok(::futures::future::lazy(move |_| { - round_stream.into_future() - .then(|(value, stream)| { // wait for a prevote + ::tokio::spawn(exit.until(::futures::future::lazy(|| { + round_stream.into_future().map_err(|(e, _)| e) + .and_then(|(value, stream)| { // wait for a prevote assert!(match value { - Some(Ok(SignedMessage { message: Message::Prevote(_), id: Id(5), .. })) => true, + Some(SignedMessage { message: Message::Prevote(_), id: Id(5), .. }) => true, _ => false, }); let votes = vec![prevote, precommit].into_iter().map(Result::Ok); - futures::stream::iter(votes).forward(round_sink).map(|_| stream) // send our prevote + round_sink.send_all(futures::stream::iter_result(votes)).map(|_| stream) // send our prevote }) - .then(|stream| { + .and_then(|stream| { stream.take_while(|value| match value { // wait for a precommit - Ok(SignedMessage { message: Message::Precommit(_), id: Id(5), .. }) => future::ready(false), - _ => future::ready(true), - }).for_each(|_| future::ready(())) + SignedMessage { message: Message::Precommit(_), id: Id(5), .. } => Ok(false), + _ => Ok(true), + }).for_each(|_| Ok(())) }) - .then(|_| { + .and_then(|_| { // send our commit - stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) + commits_sink.send(CommunicationOut::Commit(commit.0, commit.1)) }) .map_err(|_| ()) - }).flatten().map(|_| ())); + })).map(|_| ())); // wait for the first commit (ours) - commits_stream.into_future() - .then(|(_, stream)| { - stream.take(1).for_each(|_| future::ready(())) // the second commit should never arrive - .map(|()| Ok::<(), std::io::Error>(())) + commits_stream.into_future().map_err(|_| ()) + .and_then(|(_, stream)| { + stream.take(1).for_each(|_| Ok(())) // the second commit should never arrive .timeout(Duration::from_millis(500)).map_err(|_| ()) }) .then(|res| { assert!(res.is_err()); // so the previous future times out + signal.fire(); futures::future::ok::<(), ()>(()) }) - }).flatten()).unwrap(); + })).unwrap(); } #[test] @@ -1044,7 +1050,7 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (_, commits_sink) = network.make_global_comms(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let (signal, exit) = ::exit_future::signal(); // this is a commit for a previous round let commit = (0, Commit { @@ -1059,7 +1065,7 @@ mod tests { let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { + current_thread::block_on_all(::futures::future::lazy(move || { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -1077,18 +1083,20 @@ mod tests { last_round_state, last_finalized, ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); - threads_pool.spawn_ok(routing_task.map(|_| ())); + ::tokio::spawn(exit.until(routing_task).map(|_| ())); - threads_pool.spawn_ok(stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) + ::tokio::spawn(commits_sink.send(CommunicationOut::Commit(commit.0, commit.1)) .map_err(|_| ()).map(|_| ())); // wait for the commit message to be processed which finalized block 6 env.finalized_stream() - .take_while(|&(_, n, _)| future::ready(n < 6)) - .for_each(|_| future::ready(())) - }).flatten()); + .take_while(|&(_, n, _)| Ok(n < 6)) + .for_each(|_| Ok(())) + .map(|_| signal.fire()) + })).unwrap(); } #[test] @@ -1097,10 +1105,10 @@ mod tests { let voters: VoterSet<_> = (0..3).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let (signal, exit) = ::exit_future::signal(); - futures::executor::block_on(::futures::future::lazy(move |_| { - threads_pool.spawn_ok(routing_task.map(|_| ())); + current_thread::block_on_all(::futures::future::lazy(move || { + ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); // initialize unsynced voter at round 0 let mut unsynced_voter = { @@ -1150,15 +1158,14 @@ mod tests { // poll until it's caught up. // should skip to round 6 - ::futures::future::poll_fn(move |cx| -> Poll> { - let poll = Future::poll(Pin::new(&mut unsynced_voter), cx); + ::futures::future::poll_fn(move || -> Poll<(), ()> { + let poll = unsynced_voter.poll().map_err(|_| ())?; if unsynced_voter.best_round.round_number() == 6 { - Poll::Ready(Ok(())) + Ok(Async::Ready(())) } else { - futures::ready!(poll).map_err(|_| ())?; - Poll::Ready(Ok(())) + Ok(poll) } - }) - }).flatten()).unwrap(); + }).map(move |_| signal.fire()) + })).unwrap(); } } diff --git a/src/voter/past_rounds.rs b/src/voter/past_rounds.rs index 24d95857..4362415c 100644 --- a/src/voter/past_rounds.rs +++ b/src/voter/past_rounds.rs @@ -22,19 +22,17 @@ //! - Passing it any validated commits (so backgrounded rounds don't produce conflicting ones) #[cfg(feature = "std")] -use futures::ready; +use futures::try_ready; use futures::prelude::*; use futures::stream::{self, futures_unordered::FuturesUnordered}; use futures::task; -use futures::channel::mpsc; +use futures::sync::mpsc; #[cfg(feature = "std")] use log::{trace, debug}; use std::cmp; use std::collections::HashMap; use std::hash::Hash; -use std::pin::Pin; -use std::task::{Context, Poll}; use crate::{Commit, BlockNumberOps}; use super::Environment; @@ -49,7 +47,7 @@ struct BackgroundRound> where N: Copy + BlockNumberOps + ::std::fmt::Debug, { inner: VotingRound, - waker: Option, + task: Option, finalized_number: N, round_committer: Option>, } @@ -78,8 +76,8 @@ impl> BackgroundRound where // wake up the future to be polled if done. if self.is_done() { - if let Some(ref waker) = self.waker { - waker.wake_by_ref(); + if let Some(ref task) = self.task { + task.notify(); } } } @@ -100,40 +98,35 @@ impl> Future for BackgroundRound where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, { - type Output = Result, E::Error>; + type Item = BackgroundRoundChange; + type Error = E::Error; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.waker = Some(cx.waker().clone()); + fn poll(&mut self) -> Poll { + self.task = Some(::futures::task::current()); - self.inner.poll(cx)?; + self.inner.poll()?; self.round_committer = match self.round_committer.take() { None => None, - Some(mut committer) => match committer.commit(cx, &mut self.inner)? { - Poll::Ready(None) => None, - Poll::Ready(Some(commit)) => return Poll::Ready(Ok( + Some(mut committer) => match committer.commit(&mut self.inner)? { + Async::Ready(None) => None, + Async::Ready(Some(commit)) => return Ok(Async::Ready( BackgroundRoundChange::Committed(commit) )), - Poll::Pending => Some(committer), + Async::NotReady => Some(committer), } }; if self.is_done() { // if this is fully done (has committed _and_ estimate finalized) // we bail for real. - Poll::Ready(Ok(BackgroundRoundChange::Irrelevant(self.round_number()))) + Ok(Async::Ready(BackgroundRoundChange::Irrelevant(self.round_number()))) } else { - Poll::Pending + Ok(Async::NotReady) } } } -impl> Unpin for BackgroundRound where - H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, - N: Copy + BlockNumberOps + ::std::fmt::Debug, -{ -} - struct RoundCommitter> where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, @@ -177,26 +170,26 @@ impl> RoundCommitter where Ok(true) } - fn commit(&mut self, cx: &mut Context, voting_round: &mut VotingRound) - -> Poll>, E::Error>> + fn commit(&mut self, voting_round: &mut VotingRound) + -> Poll>, E::Error> { - while let Poll::Ready(Some(commit)) = Stream::poll_next(Pin::new(&mut self.import_commits), cx) { + while let Ok(Async::Ready(Some(commit))) = self.import_commits.poll() { if !self.import_commit(voting_round, commit)? { trace!(target: "afg", "Ignoring invalid commit"); } } - ready!(Future::poll(Pin::new(&mut self.commit_timer), cx))?; + try_ready!(self.commit_timer.poll()); match (self.last_commit.take(), voting_round.finalized()) { (None, Some(_)) => { - Poll::Ready(Ok(voting_round.finalizing_commit().cloned())) + Ok(Async::Ready(voting_round.finalizing_commit().cloned())) }, (Some(Commit { target_number, .. }), Some((_, finalized_number))) if target_number < *finalized_number => { - Poll::Ready(Ok(voting_round.finalizing_commit().cloned())) + Ok(Async::Ready(voting_round.finalizing_commit().cloned())) }, _ => { - Poll::Ready(Ok(None)) + Ok(Async::Ready(None)) }, } } @@ -220,26 +213,24 @@ impl SelfReturningFuture { } } -impl Future for SelfReturningFuture { - type Output = (F::Output, F); +impl Future for SelfReturningFuture { + type Item = (F::Item, F); + type Error = F::Error; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(&mut self) -> Poll { match self.inner.take() { None => panic!("poll after return is not done in this module; qed"), - Some(mut f) => match Future::poll(Pin::new(&mut f), cx) { - Poll::Ready(item) => Poll::Ready((item, f)), - Poll::Pending => { + Some(mut f) => match f.poll()? { + Async::Ready(item) => Ok(Async::Ready((item, f))), + Async::NotReady => { self.inner = Some(f); - Poll::Pending + Ok(Async::NotReady) } } } } } -impl Unpin for SelfReturningFuture { -} - /// A stream for past rounds, which produces any commit messages from those /// rounds and drives them to completion. pub(super) struct PastRounds> where @@ -268,7 +259,7 @@ impl> PastRounds where let (tx, rx) = mpsc::unbounded(); let background = BackgroundRound { inner: round, - waker: None, + task: None, // https://github.com/paritytech/finality-grandpa/issues/50 finalized_number: N::zero(), round_committer: Some(RoundCommitter::new( @@ -307,15 +298,16 @@ impl> Stream for PastRounds where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, { - type Item = Result<(u64, Commit), E::Error>; + type Item = (u64, Commit); + type Error = E::Error; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll(&mut self) -> Poll, E::Error> { loop { - match Stream::poll_next(Pin::new(&mut self.past_rounds), cx) { - Poll::Ready(Some((Ok(BackgroundRoundChange::Irrelevant(number)), _))) => { + match self.past_rounds.poll()? { + Async::Ready(Some((BackgroundRoundChange::Irrelevant(number), _))) => { self.commit_senders.remove(&number); } - Poll::Ready(Some((Ok(BackgroundRoundChange::Committed(commit)), round))) => { + Async::Ready(Some((BackgroundRoundChange::Committed(commit), round))) => { let number = round.round_number(); // reschedule until irrelevant. @@ -329,18 +321,11 @@ impl> Stream for PastRounds where commit.target_hash, ); - return Poll::Ready(Some(Ok((number, commit)))); + return Ok(Async::Ready(Some((number, commit)))); } - Poll::Ready(Some((Err(err), _))) => return Poll::Ready(Some(Err(err))), - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, + Async::Ready(None) => return Ok(Async::Ready(None)), + Async::NotReady => return Ok(Async::NotReady), } } } } - -impl> Unpin for PastRounds where - H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, - N: Copy + BlockNumberOps + ::std::fmt::Debug, -{ -} diff --git a/src/voter/voting_round.rs b/src/voter/voting_round.rs index daaf7db0..ee55dfa9 100644 --- a/src/voter/voting_round.rs +++ b/src/voter/voting_round.rs @@ -15,16 +15,14 @@ //! Logic for voting and handling messages within a single round. #[cfg(feature = "std")] -use futures::ready; +use futures::try_ready; use futures::prelude::*; -use futures::channel::mpsc::UnboundedSender; +use futures::sync::mpsc::UnboundedSender; #[cfg(feature = "std")] use log::{trace, warn, debug}; use std::hash::Hash; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use crate::round::{Round, State as RoundState}; use crate::{ @@ -63,7 +61,7 @@ pub(super) struct VotingRound> where voting: Voting, votes: Round, incoming: E::In, - outgoing: Buffered>, + outgoing: Buffered, state: Option>, // state machine driving votes. bridged_round_state: Option>, // updates to later round last_round_state: Option>, // updates from prior round @@ -176,25 +174,25 @@ impl> VotingRound where } } - /// Poll the round. When the round is completable and messages have been flushed, it will return `Poll::Ready` but + /// Poll the round. When the round is completable and messages have been flushed, it will return `Async::Ready` but /// can continue to be polled. - pub(super) fn poll(&mut self, cx: &mut Context) -> Poll> { + pub(super) fn poll(&mut self) -> Poll<(), E::Error> { trace!(target: "afg", "Polling round {}, state = {:?}, step = {:?}", self.votes.number(), self.votes.state(), self.state); let pre_state = self.votes.state(); - self.process_incoming(cx)?; + self.process_incoming()?; // we only cast votes when we have access to the previous round state. // we might have started this round as a prospect "future" round to // check whether the voter is lagging behind the current round. - let last_round_state = self.last_round_state.as_ref().map(|s| s.get(cx).clone()); + let last_round_state = self.last_round_state.as_ref().map(|s| s.get().clone()); if let Some(ref last_round_state) = last_round_state { self.primary_propose(last_round_state)?; - self.prevote(cx, last_round_state)?; - self.precommit(cx, last_round_state)?; + self.prevote(last_round_state)?; + self.precommit(last_round_state)?; } - ready!(self.outgoing.poll(cx))?; - self.process_incoming(cx)?; // in case we got a new message signed locally. + try_ready!(self.outgoing.poll()); + self.process_incoming()?; // in case we got a new message signed locally. // broadcast finality notifications after attempting to cast votes let post_state = self.votes.state(); @@ -202,7 +200,7 @@ impl> VotingRound where // early exit if the current round is not completable if !self.votes.completable() { - return Poll::Pending; + return Ok(Async::NotReady); } // make sure that the previous round estimate has been finalized @@ -234,11 +232,11 @@ impl> VotingRound where // the previous round estimate must be finalized if !last_round_estimate_finalized { - return Poll::Pending; + return Ok(Async::NotReady); } // both exit conditions verified, we can complete this round - Poll::Ready(Ok(())) + Ok(Async::Ready(())) } /// Inspect the state of this round. @@ -321,10 +319,10 @@ impl> VotingRound where self.votes.historical_votes() } - fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> { - while let Poll::Ready(Some(incoming)) = Stream::poll_next(Pin::new(&mut self.incoming), cx) { + fn process_incoming(&mut self) -> Result<(), E::Error> { + while let Async::Ready(Some(incoming)) = self.incoming.poll()? { trace!(target: "afg", "Got incoming message"); - let SignedMessage { message, signature, id } = incoming?; + let SignedMessage { message, signature, id } = incoming; if !self.env.is_equal_or_descendent_of(self.votes.base().0, message.target().0.clone()) { trace!(target: "afg", "Ignoring message targeting {:?} lower than round base {:?}", message.target(), @@ -400,14 +398,14 @@ impl> VotingRound where Ok(()) } - fn prevote(&mut self, cx: &mut Context, last_round_state: &RoundState) -> Result<(), E::Error> { + fn prevote(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { let state = self.state.take(); let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| { - let should_prevote = match Future::poll(Pin::new(&mut prevote_timer), cx) { - Poll::Ready(Err(e)) => return Err(e), - Poll::Ready(Ok(())) => true, - Poll::Pending => self.votes.completable(), + let should_prevote = match prevote_timer.poll() { + Err(e) => return Err(e), + Ok(Async::Ready(())) => true, + Ok(Async::NotReady) => self.votes.completable(), }; if should_prevote { @@ -442,7 +440,7 @@ impl> VotingRound where Ok(()) } - fn precommit(&mut self, cx: &mut Context, last_round_state: &RoundState) -> Result<(), E::Error> { + fn precommit(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { match self.state.take() { Some(State::Prevoted(mut precommit_timer)) => { let last_round_estimate = last_round_state.estimate.clone() @@ -455,10 +453,10 @@ impl> VotingRound where p_g == &last_round_estimate || self.env.is_equal_or_descendent_of(last_round_estimate.0, p_g.0.clone()) }) - } && match Future::poll(Pin::new(&mut precommit_timer), cx) { - Poll::Ready(Err(e)) => return Err(e), - Poll::Ready(Ok(())) => true, - Poll::Pending => self.votes.completable(), + } && match precommit_timer.poll() { + Err(e) => return Err(e), + Ok(Async::Ready(())) => true, + Ok(Async::NotReady) => self.votes.completable(), }; if should_precommit {