Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Expose that BasicQueue expects blocking spawn
Up to now `BasicQueue` expected a closure that to spawn a `Future`.
This was expected to be a closure that spawns a blocking future.
However, this wasn't documented anywhere. This pr introduces a new trait
`SpawnBlocking` that exposes this requirement to the outside.
  • Loading branch information
bkchr committed Apr 30, 2020
commit 0ee25f2d57e913671463dca25493c554068b4ddc
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ macro_rules! new_full_start {
($config:expr) => {{
use std::sync::Arc;
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;

let mut import_setup = None;
let inherent_data_providers = sp_inherents::InherentDataProviders::new();

Expand All @@ -45,23 +45,24 @@ macro_rules! new_full_start {
let select_chain = select_chain.take()
.ok_or_else(|| sc_service::Error::SelectChainRequired)?;

let (grandpa_block_import, grandpa_link) =
sc_finality_grandpa::block_import(client.clone(), &(client.clone() as Arc<_>), select_chain)?;
let (grandpa_block_import, grandpa_link) = sc_finality_grandpa::block_import(
client.clone(),
&(client.clone() as Arc<_>),
select_chain,
)?;

let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new(
grandpa_block_import.clone(), client.clone(),
);

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
aura_block_import,
Some(Box::new(grandpa_block_import.clone())),
None,
client,
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;

import_setup = Some((grandpa_block_import, grandpa_link));
Expand Down Expand Up @@ -208,16 +209,14 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
let finality_proof_request_builder =
finality_proof_import.create_finality_proof_request_builder();

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>(
sc_consensus_aura::slot_duration(&*client)?,
grandpa_block_import,
None,
Some(Box::new(finality_proof_import)),
client,
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
14 changes: 7 additions & 7 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ macro_rules! new_full_start {
})?
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
Ok(sc_transaction_pool::BasicPool::new(
config,
std::sync::Arc::new(pool_api),
prometheus_registry,
))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| {
let select_chain = select_chain.take()
Expand All @@ -68,16 +72,14 @@ macro_rules! new_full_start {
client.clone(),
)?;

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link.clone(),
block_import.clone(),
Some(Box::new(justification_import)),
None,
client,
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;

import_setup = Some((block_import, grandpa_link, babe_link));
Expand Down Expand Up @@ -308,16 +310,14 @@ pub fn new_light(config: Configuration)
client.clone(),
)?;

let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future);

let import_queue = sc_consensus_babe::import_queue(
babe_link,
babe_block_import,
None,
Some(Box::new(finality_proof_import)),
client.clone(),
inherent_data_providers.clone(),
spawner,
spawn_task_handle,
)?;

Ok((import_queue, finality_proof_request_builder))
Expand Down
9 changes: 5 additions & 4 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
collections::HashMap
};

use futures::{prelude::*, future::BoxFuture};
use futures::prelude::*;
use parking_lot::Mutex;
use log::{debug, info, trace};

Expand Down Expand Up @@ -788,14 +788,14 @@ impl<Block: BlockT, C, I, P> BlockImport<Block> for AuraBlockImport<Block, C, I,
}

/// Start an import queue for the Aura consensus algorithm.
pub fn import_queue<B, I, C, P, F>(
pub fn import_queue<B, I, C, P, S>(
slot_duration: SlotDuration,
block_import: I,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
client: Arc<C>,
inherent_data_providers: InherentDataProviders,
spawner: F,
spawner: &S,
) -> Result<AuraImportQueue<B, sp_api::TransactionFor<C, B>>, sp_consensus::Error> where
B: BlockT,
C::Api: BlockBuilderApi<B> + AuraApi<B, AuthorityId<P>> + ApiExt<B, Error = sp_blockchain::Error>,
Expand All @@ -805,7 +805,7 @@ pub fn import_queue<B, I, C, P, F>(
P: Pair + Send + Sync + 'static,
P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode,
P::Signature: Encode + Decode,
F: Fn(BoxFuture<'static, ()>) -> (),
S: sp_core::traits::SpawnBlocking,
{
register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?;
initialize_authorities_cache(&*client)?;
Expand All @@ -815,6 +815,7 @@ pub fn import_queue<B, I, C, P, F>(
inherent_data_providers,
phantom: PhantomData,
};

Ok(BasicQueue::new(
verifier,
Box::new(block_import),
Expand Down
4 changes: 2 additions & 2 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;

use futures::{prelude::*, future::BoxFuture};
use futures::prelude::*;
use log::{debug, info, log, trace, warn};
use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
Expand Down Expand Up @@ -1272,7 +1272,7 @@ pub fn import_queue<Block: BlockT, Client, Inner>(
finality_proof_import: Option<BoxFinalityProofImport<Block>>,
client: Arc<Client>,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
spawner: &impl sp_core::traits::SpawnBlocking,
) -> ClientResult<BabeImportQueue<Block, sp_api::TransactionFor<Client, Block>>> where
Inner: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<Client, Block>>
+ Send + Sync + 'static,
Expand Down
1 change: 1 addition & 0 deletions client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ sp-blockchain = { path = "../../../primitives/blockchain" , version = "2.0.0-dev
sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common" , version = "0.8.0-dev"}
sp-inherents = { path = "../../../primitives/inherents" , version = "2.0.0-dev"}
sp-runtime = { path = "../../../primitives/runtime" , version = "2.0.0-dev"}
sp-core = { path = "../../../primitives/core" , version = "2.0.0-dev"}
sp-transaction-pool = { path = "../../../primitives/transaction-pool" , version = "2.0.0-dev"}

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment.

use futures::{prelude::*, future::BoxFuture};
use futures::prelude::*;
use sp_consensus::{
Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, B>(
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> ()
spawner: &impl sp_core::traits::SpawnBlocking,
) -> BasicQueue<Block, TransactionFor<B, Block>>
where
Block: BlockT,
Expand Down
7 changes: 4 additions & 3 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ use sp_consensus::{
SelectChain, Error as ConsensusError, CanAuthorWith, RecordProof, BlockImport,
BlockCheckParams, ImportResult,
};
use sp_consensus::import_queue::{BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport};
use sp_consensus::import_queue::{
BoxBlockImport, BasicQueue, Verifier, BoxJustificationImport, BoxFinalityProofImport,
};
use codec::{Encode, Decode};
use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
use futures::future::BoxFuture;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -462,7 +463,7 @@ pub fn import_queue<B, Transaction, Algorithm>(
finality_proof_import: Option<BoxFinalityProofImport<B>>,
algorithm: Algorithm,
inherent_data_providers: InherentDataProviders,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
spawner: &impl sp_core::traits::SpawnBlocking,
) -> Result<
PowImportQueue<B, Transaction>,
sp_consensus::Error
Expand Down
5 changes: 1 addition & 4 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,12 @@ fn build_test_full_node(config: config::NetworkConfiguration)
}
}

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new(
PassThroughVerifier(false),
Box::new(client.clone()),
None,
None,
spawner,
&sp_core::testing::SpawnBlockingExecutor::new(),
));

let worker = NetworkWorker::new(config::Params {
Expand Down
11 changes: 8 additions & 3 deletions client/network/test/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,18 @@ fn import_single_good_block_without_header_fails() {

#[test]
fn async_import_queue_drops() {
let executor = sp_core::testing::SpawnBlockingExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);
let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner);
let queue = BasicQueue::new(
verifier,
Box::new(substrate_test_runtime_client::new()),
None,
None,
&executor,
);
drop(queue);
}
}
10 changes: 2 additions & 8 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,15 +606,12 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
&sp_core::testing::SpawnBlockingExecutor::new(),
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down Expand Up @@ -687,15 +684,12 @@ pub trait TestNetFactory: Sized {
);
let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let spawner = |future| threads_pool.spawn_ok(future);

let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
spawner,
&sp_core::testing::SpawnBlockingExecutor::new(),
));

let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
Expand Down
17 changes: 14 additions & 3 deletions client/service/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::{
result::Result, sync::Arc
};
use exit_future::Signal;
use log::{debug};
use log::debug;
use futures::{
Future, FutureExt,
future::select,
future::{select, BoxFuture},
compat::*,
task::{Spawn, FutureObj, SpawnError},
};
Expand Down Expand Up @@ -65,7 +65,12 @@ impl SpawnTaskHandle {
}

/// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`.
fn spawn_inner(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static, task_type: TaskType) {
fn spawn_inner(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();

Expand Down Expand Up @@ -103,6 +108,12 @@ impl Spawn for SpawnTaskHandle {
}
}

impl sp_core::traits::SpawnBlocking for SpawnTaskHandle {
fn spawn_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) {
self.spawn_blocking(name, future);
}
}

impl sc_client_api::CloneableSpawn for SpawnTaskHandle {
fn clone(&self) -> Box<dyn CloneableSpawn> {
Box::new(Clone::clone(self))
Expand Down
6 changes: 3 additions & 3 deletions primitives/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use std::{mem, pin::Pin, time::Duration, marker::PhantomData};
use futures::{prelude::*, task::Context, task::Poll, future::BoxFuture};
use futures::{prelude::*, task::Context, task::Poll};
use futures_timer::Delay;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
Expand Down Expand Up @@ -56,7 +56,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
block_import: BoxBlockImport<B, Transaction>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
spawner: &impl sp_core::traits::SpawnBlocking,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let (future, worker_sender) = BlockImportWorker::new(
Expand All @@ -67,7 +67,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
finality_proof_import,
);

spawner(future.boxed());
spawner.spawn_blocking("basic-block-import-worker", future.boxed());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomaka might need to update his grafana dashboard :)


Self {
sender: worker_sender,
Expand Down
Loading