Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Prev Previous commit
Next Next commit
Work on switching tests to stable futures
  • Loading branch information
tomaka committed Oct 28, 2019
commit 6f6849a7ac79b947dae9621e7fa7692b0d87ee05
2 changes: 1 addition & 1 deletion core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use fg_primitives::{AuthoritySignature, SetId, AuthorityWeight};
// Re-export these two because it's just so damn convenient.
pub use fg_primitives::{AuthorityId, ScheduledChange};

#[cfg(testttt)] // TODO: restore
#[cfg(test)]
mod tests;

/// A GRANDPA message for a substrate chain.
Expand Down
2 changes: 1 addition & 1 deletion core/finality-grandpa/src/light_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ fn on_post_finalization_error(error: ClientError, value_type: &str) -> Consensus
ConsensusError::ClientImport(error.to_string())
}

#[cfg(testttt)] // TODO: restore
#[cfg(test)]
pub mod tests {
use super::*;
use consensus_common::ForkChoiceStrategy;
Expand Down
121 changes: 55 additions & 66 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, Peer
use network::test::{PassThroughVerifier};
use network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
use futures01::Async;
use futures::{StreamExt as _, TryStreamExt as _};
use futures::{compat::Compat01As03, StreamExt as _, TryStreamExt as _};
use tokio::runtime::current_thread;
use keyring::Ed25519Keyring;
use client::{
Expand Down Expand Up @@ -183,7 +182,7 @@ impl futures01::Future for Exit {
type Error = ();

fn poll(&mut self) -> futures01::Poll<(), ()> {
Ok(Async::NotReady)
Ok(futures01::Async::NotReady)
}
}

Expand Down Expand Up @@ -328,7 +327,7 @@ fn run_to_completion_with<F>(
peers: &[Ed25519Keyring],
with: F,
) -> u64 where
F: FnOnce(current_thread::Handle) -> Option<Box<dyn Future<Output = result::Result<(), ()>>>>
F: FnOnce(current_thread::Handle) -> Option<Pin<Box<dyn Future<Output = result::Result<(), ()>>>>>
{
use parking_lot::RwLock;

Expand Down Expand Up @@ -358,15 +357,14 @@ fn run_to_completion_with<F>(
};

wait_for.push(
Box::new(
Box::pin(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(move |n| {
let mut highest_finalized = highest_finalized.write();
if *n.header.number() > *highest_finalized {
*highest_finalized = *n.header.number();
}
Ok(n.header.number() < &blocks)
future::ready(n.header.number() < &blocks)
})
.collect()
.map(|_| ())
Expand Down Expand Up @@ -398,11 +396,10 @@ fn run_to_completion_with<F>(

// wait for all finalized on each.
let wait_for = ::futures::future::join_all(wait_for)
.map(|_| ())
.map_err(|_| ());
.map(|_| ());

let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
let drive_to_completion = Compat01As03::new(futures01::future::poll_fn(|| { net.lock().poll(); Ok(futures01::Async::NotReady) }));
let _ = futures::executor::block_on(future::select(wait_for, drive_to_completion)).unwrap();

let highest_finalized = *highest_finalized.read();
highest_finalized
Expand Down Expand Up @@ -494,9 +491,8 @@ fn finalize_3_voters_1_full_observer() {
};
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &20))
.for_each(move |_| Ok(()))
.take_while(|n| future::ready(n.header.number() < &20))
.for_each(move |_| future::ready(()))
);

let keystore = if let Some(local_key) = local_key {
Expand Down Expand Up @@ -528,11 +524,10 @@ fn finalize_3_voters_1_full_observer() {

// wait for all finalized on each.
let wait_for = futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
.map(|_| ());

let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
let drive_to_completion = Compat01As03::new(futures01::future::poll_fn(|| { net.lock().poll(); Ok(futures01::Async::NotReady) }));
let _ = futures::executor::block_on(future::select(wait_for, drive_to_completion)).unwrap();
}

#[test]
Expand Down Expand Up @@ -586,8 +581,7 @@ fn transition_3_voters_twice_1_full_observer() {

// wait for blocks to be finalized before generating new ones
let block_production = client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &30))
.take_while(|n| future::ready(n.header.number() < &30))
.for_each(move |n| {
match n.header.number() {
1 => {
Expand Down Expand Up @@ -624,7 +618,7 @@ fn transition_3_voters_twice_1_full_observer() {
_ => {},
}

Ok(())
future::ready(())
});

runtime.spawn(block_production);
Expand Down Expand Up @@ -657,9 +651,8 @@ fn transition_3_voters_twice_1_full_observer() {

finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &30))
.for_each(move |_| Ok(()))
.take_while(|n| future::ready(n.header.number() < &30))
.for_each(move |_| future::ready(()))
.map(move |()| {
let full_client = client.as_full().expect("only full clients are used in test");
let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities(&*full_client).unwrap();
Expand Down Expand Up @@ -689,11 +682,10 @@ fn transition_3_voters_twice_1_full_observer() {

// wait for all finalized on each.
let wait_for = ::futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
.map(|_| ());

let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
let drive_to_completion = Compat01As03::new(futures01::future::poll_fn(|| { net.lock().poll(); Ok(futures01::Async::NotReady) }));
let _ = futures::executor::block_on(future::select(wait_for, drive_to_completion)).unwrap();
}

#[test]
Expand Down Expand Up @@ -796,14 +788,14 @@ fn sync_justifications_on_change_blocks() {
}

// the last peer should get the justification by syncing from other peers
runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> {
futures::executor::block_on(Compat01As03::new(futures01::future::poll_fn(move || -> std::result::Result<_, ()> {
if net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().poll();
Ok(Async::NotReady)
Ok(futures01::Async::NotReady)
} else {
Ok(Async::Ready(()))
Ok(futures01::Async::Ready(()))
}
})).unwrap()
}))).unwrap()
}

#[test]
Expand Down Expand Up @@ -1066,7 +1058,7 @@ fn voter_persists_its_votes() {
keystore_paths.push(keystore_path);

struct ResettableVoter {
voter: Box<dyn Future<Output = result::Result<(), ()>> + Send>,
voter: Pin<Box<dyn Future<Output = result::Result<(), ()>> + Send>>,
voter_rx: mpsc::UnboundedReceiver<()>,
net: Arc<Mutex<GrandpaTestNet>>,
client: PeersClient,
Expand All @@ -1078,14 +1070,14 @@ fn voter_persists_its_votes() {

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.voter.poll() {
Ok(Async::Ready(())) | Err(_) => panic!("error in the voter"),
Ok(futures01::Async::Ready(())) | Err(_) => panic!("error in the voter"),
Poll::Pending => {},
}

match self.voter_rx.poll() {
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Err(_) | Ok(futures01::Async::Ready(None)) => return Ok(futures01::Async::Ready(())),
Poll::Pending => {}
Ok(Async::Ready(Some(()))) => {
Ok(futures01::Async::Ready(Some(()))) => {
let (_block_import, _, _, _, link) =
self.net.lock().make_block_import(self.client.clone());
let link = link.lock().take().unwrap();
Expand All @@ -1107,15 +1099,15 @@ fn voter_persists_its_votes() {

let voter = run_grandpa_voter(grandpa_params)
.expect("all in order with client and network")
.then(move |r| {
.map(move |r| {
// we need to keep the block_import alive since it owns the
// sender for the voter commands channel, if that gets dropped
// then the voter will stop
drop(_block_import);
r
});

self.voter = Box::new(voter);
self.voter = Box::pin(voter);
// notify current task in order to poll the voter
cx.waker().wake_by_ref();
}
Expand All @@ -1129,7 +1121,7 @@ fn voter_persists_its_votes() {
// this way, the `ResettableVoter` will reset its `voter` field to a value ASAP.
voter_tx.unbounded_send(()).unwrap();
runtime.spawn(ResettableVoter {
voter: Box::new(futures::future::pending()),
voter: Box::pin(futures::future::pending()),
voter_rx,
net: net.clone(),
client: client.clone(),
Expand Down Expand Up @@ -1167,7 +1159,7 @@ fn voter_persists_its_votes() {
Exit,
true,
);
runtime.block_on(routing_work).unwrap();
futures::executor::block_on(routing_work).unwrap();

let (round_rx, round_tx) = network.round_communication(
communication::Round(1),
Expand All @@ -1183,7 +1175,7 @@ fn voter_persists_its_votes() {
let net = net.clone();
let state = AtomicUsize::new(0);

runtime.spawn(round_rx.for_each(move |signed| {
runtime.spawn(round_rx.try_for_each(move |signed| {
if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
// the first message we receive should be a prevote from alice.
let prevote = match signed.message {
Expand All @@ -1202,7 +1194,7 @@ fn voter_persists_its_votes() {
let net = net.clone();
let voter_tx = voter_tx.clone();
let round_tx = round_tx.clone();
future::Either::A(futures_timer::Interval::new_interval(Duration::from_millis(200))
future::Either::Left(futures_timer::Interval::new(Duration::from_millis(200))
.take_while(move |_| {
Ok(net2.lock().peer(1).client().info().chain.best_number != 40)
})
Expand Down Expand Up @@ -1238,7 +1230,7 @@ fn voter_persists_its_votes() {
// therefore we won't ever receive it again since it will be a
// known message on the gossip layer

future::Either::B(future::ok(()))
future::Either::Right(future::ok(()))

} else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 {
// we then receive a precommit from alice for block 15
Expand All @@ -1253,19 +1245,19 @@ fn voter_persists_its_votes() {
// signal exit
exit_tx.clone().lock().take().unwrap().send(()).unwrap();

future::Either::B(future::ok(()))
future::Either::Right(future::ok(()))

} else {
panic!()
}

}).map_err(|_| ()));
}));
}

let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ());
let drive_to_completion = Compat01As03::new(futures01::future::poll_fn(|| { net.lock().poll(); Ok(futures01::Async::NotReady) }));
let exit = exit_rx.into_future().map(|_| ());

runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap();
futures::executor::block_on(drive_to_completion.select(exit).map(|_| ())).unwrap();
}

#[test]
Expand All @@ -1288,8 +1280,7 @@ fn finalize_3_voters_1_light_observer() {
let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed");

let finality_notifications = net.lock().peer(3).client().finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &20))
.take_while(|n| future::ready(n.header.number() < &20))
.collect();

run_to_completion_with(&mut runtime, 20, net.clone(), authorities, |executor| {
Expand All @@ -1307,7 +1298,7 @@ fn finalize_3_voters_1_light_observer() {
).unwrap()
).unwrap();

Some(Box::new(finality_notifications.map(|_| ())))
Some(Box::pin(finality_notifications.map(|_| ())))
});
}

Expand All @@ -1328,14 +1319,14 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
net.lock().block_until_sync(&mut runtime);

// check that the block#1 is finalized on light client
runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> {
futures::executor::block_on(Compat01As03::new(futures01::future::poll_fn(move || -> result::Result<_, ()> {
if net.lock().peer(1).client().info().chain.finalized_number == 1 {
Ok(Async::Ready(()))
Ok(futures01::Async::Ready(()))
} else {
net.lock().poll();
Ok(Async::NotReady)
Ok(futures01::Async::NotReady)
}
})).unwrap()
}))).unwrap()
}

#[test]
Expand Down Expand Up @@ -1420,7 +1411,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();

let voter = |keystore, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Box<dyn Future<Output = Result<(), ()>> + Send> {
let voter = |keystore, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Pin<Box<dyn Future<Output = result::Result<(), ()>> + Send>> {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
Expand All @@ -1436,7 +1427,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
voting_rule: (),
};

Box::new(run_grandpa_voter(grandpa_params).expect("all in order with client and network"))
Box::pin(run_grandpa_voter(grandpa_params).expect("all in order with client and network"))
};

let mut keystore_paths = Vec::new();
Expand All @@ -1454,8 +1445,7 @@ fn voter_catches_up_to_latest_round_when_behind() {

finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &50))
.take_while(|n| future::ready(n.header.number() < &50))
.for_each(move |_| Ok(()))
);

Expand All @@ -1470,8 +1460,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
// wait for them to finalize block 50. since they'll vote on 3/4 of the
// unfinalized chain it will take at least 4 rounds to do it.
let wait_for_finality = ::futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
.map(|_| ());

// spawn a new voter, it should be behind by at least 4 rounds and should be
// able to catch up to the latest round
Expand All @@ -1495,25 +1484,25 @@ fn voter_catches_up_to_latest_round_when_behind() {

let start_time = std::time::Instant::now();
let timeout = Duration::from_secs(5 * 60);
let wait_for_catch_up = futures::future::poll_fn(move || {
let wait_for_catch_up = futures::future::poll_fn(move |_| {
// The voter will start at round 1 and since everyone else is
// already at a later round the only way to get to round 4 (or
// later) is by issuing a catch up request.
if set_state.read().last_completed_round().number >= 4 {
Ok(Async::Ready(()))
Poll::Ready(())
} else if start_time.elapsed() > timeout {
panic!("Timed out while waiting for catch up to happen")
} else {
Ok(Async::NotReady)
Poll::Pending
}
});

wait_for_catch_up
})
};

let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(test.select(drive_to_completion).map_err(|_| ())).unwrap();
let drive_to_completion = Compat01As03::new(futures01::future::poll_fn(|| { net.lock().poll(); Ok(futures01::Async::NotReady) }));
let _ = futures::executor::block_on(test.select(drive_to_completion)).unwrap();
}

#[test]
Expand Down