diff --git a/Cargo.lock b/Cargo.lock index abfa181abf4af..5764942a0d955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6205,6 +6205,7 @@ dependencies = [ "serde", "sp-blockchain", "sp-consensus", + "sp-core", "sp-inherents", "sp-runtime", "sp-transaction-pool", diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 51ebdbe1c1fae..a52406b5f5218 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -27,7 +27,7 @@ macro_rules! new_full_start { ($config:expr) => {{ use std::sync::Arc; use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; - + let mut import_setup = None; let inherent_data_providers = sp_inherents::InherentDataProviders::new(); @@ -45,15 +45,16 @@ macro_rules! new_full_start { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; - let (grandpa_block_import, grandpa_link) = - sc_finality_grandpa::block_import(client.clone(), &(client.clone() as Arc<_>), select_chain)?; + let (grandpa_block_import, grandpa_link) = sc_finality_grandpa::block_import( + client.clone(), + &(client.clone() as Arc<_>), + select_chain, + )?; let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( grandpa_block_import.clone(), client.clone(), ); - let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, aura_block_import, @@ -61,7 +62,7 @@ macro_rules! new_full_start { None, client, inherent_data_providers.clone(), - spawner, + spawn_task_handle, )?; import_setup = Some((grandpa_block_import, grandpa_link)); @@ -208,8 +209,6 @@ pub fn new_light(config: Configuration) -> Result( sc_consensus_aura::slot_duration(&*client)?, grandpa_block_import, @@ -217,7 +216,7 @@ pub fn new_light(config: Configuration) -> Result BlockImport for AuraBlockImport( +pub fn import_queue( slot_duration: SlotDuration, block_import: I, justification_import: Option>, finality_proof_import: Option>, client: Arc, inherent_data_providers: InherentDataProviders, - spawner: F, + spawner: &S, ) -> Result>, sp_consensus::Error> where B: BlockT, C::Api: BlockBuilderApi + AuraApi> + ApiExt, @@ -805,7 +805,7 @@ pub fn import_queue( P: Pair + Send + Sync + 'static, P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, P::Signature: Encode + Decode, - F: Fn(BoxFuture<'static, ()>) -> (), + S: sp_core::traits::SpawnBlocking, { register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; initialize_authorities_cache(&*client)?; @@ -815,6 +815,7 @@ pub fn import_queue( inherent_data_providers, phantom: PhantomData, }; + Ok(BasicQueue::new( verifier, Box::new(block_import), diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 4f8975d101d81..678a33cbcc7a6 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -107,7 +107,7 @@ use sc_client_api::{ }; use sp_block_builder::BlockBuilder as BlockBuilderApi; -use futures::{prelude::*, future::BoxFuture}; +use futures::prelude::*; use log::{debug, info, log, trace, warn}; use sc_consensus_slots::{ SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation, @@ -1272,7 +1272,7 @@ pub fn import_queue( finality_proof_import: Option>, client: Arc, inherent_data_providers: InherentDataProviders, - spawner: impl Fn(BoxFuture<'static, ()>) -> (), + spawner: &impl sp_core::traits::SpawnBlocking, ) -> ClientResult>> where Inner: BlockImport> + Send + Sync + 'static, diff --git a/client/consensus/manual-seal/Cargo.toml b/client/consensus/manual-seal/Cargo.toml index 807c370edf332..b0a3c2bd72b56 100644 --- a/client/consensus/manual-seal/Cargo.toml +++ b/client/consensus/manual-seal/Cargo.toml @@ -28,6 +28,7 @@ sp-blockchain = { path = "../../../primitives/blockchain" , version = "2.0.0-dev sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common" , version = "0.8.0-dev"} sp-inherents = { path = "../../../primitives/inherents" , version = "2.0.0-dev"} sp-runtime = { path = "../../../primitives/runtime" , version = "2.0.0-dev"} +sp-core = { path = "../../../primitives/core" , version = "2.0.0-dev"} sp-transaction-pool = { path = "../../../primitives/transaction-pool" , version = "2.0.0-dev"} [dev-dependencies] diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index b2020acb1f765..fefd3aa2d79aa 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -17,7 +17,7 @@ //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! This is suitable for a testing environment. -use futures::{prelude::*, future::BoxFuture}; +use futures::prelude::*; use sp_consensus::{ Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain, import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport}, @@ -68,7 +68,7 @@ impl Verifier for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue( block_import: BoxBlockImport>, - spawner: impl Fn(BoxFuture<'static, ()>) -> () + spawner: &impl sp_core::traits::SpawnBlocking, ) -> BasicQueue> where Block: BlockT, diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 5d504d8ffc01d..8d17071f4c15c 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -49,12 +49,13 @@ use sp_consensus::{ SelectChain, Error as ConsensusError, CanAuthorWith, RecordProof, BlockImport, BlockCheckParams, ImportResult, }; -use sp_consensus::import_queue::{BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport}; +use sp_consensus::import_queue::{ + BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport, +}; use codec::{Encode, Decode}; use sc_client_api; use log::*; use sp_timestamp::{InherentError as TIError, TimestampInherentData}; -use futures::future::BoxFuture; #[derive(derive_more::Display, Debug)] pub enum Error { @@ -462,7 +463,7 @@ pub fn import_queue( finality_proof_import: Option>, algorithm: Algorithm, inherent_data_providers: InherentDataProviders, - spawner: impl Fn(BoxFuture<'static, ()>) -> (), + spawner: &impl sp_core::traits::SpawnBlocking, ) -> Result< PowImportQueue, sp_consensus::Error diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 6c38d1d87469b..3bed660851b1a 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -80,15 +80,12 @@ fn build_test_full_node(config: config::NetworkConfiguration) } } - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - let spawner = |future| threads_pool.spawn_ok(future); - let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new( PassThroughVerifier(false), Box::new(client.clone()), None, None, - spawner, + &sp_core::testing::SpawnBlockingExecutor::new(), )); let worker = NetworkWorker::new(config::Params { diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index 5f152fa1322ee..a77dec629b282 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -81,13 +81,18 @@ fn import_single_good_block_without_header_fails() { #[test] fn async_import_queue_drops() { + let executor = sp_core::testing::SpawnBlockingExecutor::new(); // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = PassThroughVerifier(true); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - let spawner = |future| threads_pool.spawn_ok(future); - let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner); + let queue = BasicQueue::new( + verifier, + Box::new(substrate_test_runtime_client::new()), + None, + None, + &executor, + ); drop(queue); } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 9c2c4e3bfdf68..999c99f64a1d4 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -606,15 +606,12 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - let spawner = |future| threads_pool.spawn_ok(future); - let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, - spawner, + &sp_core::testing::SpawnBlockingExecutor::new(), )); let listen_addr = build_multiaddr![Memory(rand::random::())]; @@ -687,15 +684,12 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - let spawner = |future| threads_pool.spawn_ok(future); - let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, - spawner, + &sp_core::testing::SpawnBlockingExecutor::new(), )); let listen_addr = build_multiaddr![Memory(rand::random::())]; diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index e0db4c500ae1f..553ca9c326d8b 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -15,10 +15,10 @@ use std::{panic, pin::Pin, result::Result, sync::Arc}; use exit_future::Signal; -use log::{debug}; +use log::debug; use futures::{ Future, FutureExt, - future::{select, Either}, + future::{select, Either, BoxFuture}, compat::*, task::{Spawn, FutureObj, SpawnError}, }; @@ -62,7 +62,12 @@ impl SpawnTaskHandle { } /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. - fn spawn_inner(&self, name: &'static str, task: impl Future + Send + 'static, task_type: TaskType) { + fn spawn_inner( + &self, + name: &'static str, + task: impl Future + Send + 'static, + task_type: TaskType, + ) { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); @@ -119,6 +124,12 @@ impl Spawn for SpawnTaskHandle { } } +impl sp_core::traits::SpawnBlocking for SpawnTaskHandle { + fn spawn_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_blocking(name, future); + } +} + impl sc_client_api::CloneableSpawn for SpawnTaskHandle { fn clone(&self) -> Box { Box::new(Clone::clone(self)) diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index f39097ef3b366..f2b830e890e40 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use std::{mem, pin::Pin, time::Duration, marker::PhantomData}; -use futures::{prelude::*, task::Context, task::Poll, future::BoxFuture}; +use futures::{prelude::*, task::Context, task::Poll}; use futures_timer::Delay; use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; @@ -56,7 +56,7 @@ impl BasicQueue { block_import: BoxBlockImport, justification_import: Option>, finality_proof_import: Option>, - spawner: impl Fn(BoxFuture<'static, ()>) -> (), + spawner: &impl sp_core::traits::SpawnBlocking, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); let (future, worker_sender) = BlockImportWorker::new( @@ -67,7 +67,7 @@ impl BasicQueue { finality_proof_import, ); - spawner(future.boxed()); + spawner.spawn_blocking("basic-block-import-worker", future.boxed()); Self { sender: worker_sender, diff --git a/primitives/core/src/testing.rs b/primitives/core/src/testing.rs index 9e83090bcff26..c22381112449a 100644 --- a/primitives/core/src/testing.rs +++ b/primitives/core/src/testing.rs @@ -290,6 +290,30 @@ macro_rules! wasm_export_functions { }; } +/// An executor that supports spawning blocking futures in tests. +/// +/// Internally this just wraps a `ThreadPool` with a pool size of `8`. This +/// should ensure that we have enough threads in tests for spawning blocking futures. +#[cfg(feature = "std")] +#[derive(Clone)] +pub struct SpawnBlockingExecutor(futures::executor::ThreadPool); + +#[cfg(feature = "std")] +impl SpawnBlockingExecutor { + /// Create a new instance of `Self`. + pub fn new() -> Self { + let mut builder = futures::executor::ThreadPoolBuilder::new(); + Self(builder.pool_size(8).create().expect("Failed to create thread pool")) + } +} + +#[cfg(feature = "std")] +impl crate::traits::SpawnBlocking for SpawnBlockingExecutor { + fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.0.spawn_ok(future); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 1724f5e9877ee..cca1a8fa8ccf9 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -330,3 +330,11 @@ impl TaskExecutorExt { Self(spawn_handle) } } + +/// Something that can spawn a blocking future. +pub trait SpawnBlocking { + /// Spawn the given blocking future. + /// + /// The given `name` is used to identify the future in tracing. + fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); +}