Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c3af86c
Move DummySpecialization to sc-network (#4680)
tomaka Jan 20, 2020
561bd72
keep nominations after getting kicked with zero slash (#4681)
rphmeier Jan 20, 2020
a90c72e
deprecate chain_status field of network handshake (#4675)
rphmeier Jan 20, 2020
a083572
client/finality-grandpa/communication: Add doc comment for PeerReport…
mxinden Jan 20, 2020
0995bb3
ci: increase retention for logs of tests to 144 hours (#4677)
gabreal Jan 20, 2020
1f9d09d
Pallet session new API (#4609)
gui1117 Jan 20, 2020
5cd952b
Only support ECDSA compressed public keys (#4667)
bkchr Jan 20, 2020
cfbb24c
Call enable_all() when building tokio runtime (#4690)
tomaka Jan 20, 2020
cb9c181
Make debug builds more usable (#4683)
bkchr Jan 21, 2020
f52ea97
grandpa: reduce allocations when verifying multiple messages (#4693)
andresilva Jan 21, 2020
6ee1244
Pass an executor through the Configuration (#4688)
tomaka Jan 21, 2020
1472014
contracts: New contract events + unconfusions (#4685)
Robbepop Jan 21, 2020
ef97057
fix docs deadlinks (#4698)
NikVolf Jan 21, 2020
4b2f70f
remove license preamble from node-template (#4699)
NikVolf Jan 21, 2020
cf020ad
grandpa: filter some telemetry events on larger voter sets (#4700)
andresilva Jan 21, 2020
ef578cd
Support `u128`/`i128` in runtime interface (#4703)
bkchr Jan 22, 2020
2cc1772
client/authority-discovery/Cargo.toml: Update dependency (#4706)
mxinden Jan 22, 2020
6c3b86d
More cleanups in node-template (#4705)
NikVolf Jan 22, 2020
32c04b4
Merge branch 'paritytech/master' into prometheus_v0.3
mxinden Jan 23, 2020
ea9278a
utils/prometheus: Make crate spawn onto global executor
mxinden Jan 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Pass an executor through the Configuration (paritytech#4688)
* Pass an executor through the Configuration

* Make tasks_executor mandatory

* Fix tests
  • Loading branch information
tomaka authored and bkchr committed Jan 21, 2020
commit 6ee1244e2d018333746d82131222308e0d802e6a
6 changes: 5 additions & 1 deletion bin/node-template/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ pub fn run<I, T, E>(args: I, exit: E, version: VersionInfo) -> error::Result<()>
type Config<T> = Configuration<(), T>;
match parse_and_prepare::<NoCustom, NoCustom, _>(&version, "substrate-node", args) {
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, _custom_args, config: Config<_>| {
|exit, _cli_args, _custom_args, mut config: Config<_>| {
info!("{}", version.name);
info!(" version {}", config.full_version());
info!(" by {}, 2017, 2018", version.author);
info!("Chain specification: {}", config.chain_spec.name());
info!("Node name: {}", config.name);
info!("Roles: {}", display_role(&config));
let runtime = Runtime::new().map_err(|e| format!("{:?}", e))?;
config.tasks_executor = {
let runtime_handle = runtime.handle().clone();
Some(Box::new(move |fut| { runtime_handle.spawn(fut); }))
};
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
runtime,
Expand Down
6 changes: 5 additions & 1 deletion bin/node/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re

match parse_and_prepare::<CustomSubcommands, NoCustom, _>(&version, "substrate-node", args) {
ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, _custom_args, config: Config<_, _>| {
|exit, _cli_args, _custom_args, mut config: Config<_, _>| {
info!("{}", version.name);
info!(" version {}", config.full_version());
info!(" by Parity Technologies, 2017-2019");
Expand All @@ -111,6 +111,10 @@ pub fn run<I, T, E>(args: I, exit: E, version: sc_cli::VersionInfo) -> error::Re
.enable_all()
.build()
.map_err(|e| format!("{:?}", e))?;
config.tasks_executor = {
let runtime_handle = runtime.handle().clone();
Some(Box::new(move |fut| { runtime_handle.spawn(fut); }))
};
match config.roles {
ServiceRoles::LIGHT => run_until_exit(
runtime,
Expand Down
6 changes: 5 additions & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,11 @@ ServiceBuilder<
essential_failed_rx,
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
tasks_executor: if let Some(exec) = config.tasks_executor {
exec
} else {
return Err(Error::TasksExecutorRequired);
},
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
Expand Down
6 changes: 4 additions & 2 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ pub use sc_client_db::{kvdb::KeyValueDB, PruningMode};
pub use sc_network::config::{ExtTransport, NetworkConfiguration, Roles};
pub use sc_executor::WasmExecutionMethod;

use std::{path::{PathBuf, Path}, net::SocketAddr, sync::Arc};
use std::{future::Future, path::{PathBuf, Path}, pin::Pin, net::SocketAddr, sync::Arc};
pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
use sc_chain_spec::{ChainSpec, RuntimeGenesis, Extension, NoExtension};
use sp_core::crypto::Protected;
use target_info::Target;
use sc_telemetry::TelemetryEndpoints;

/// Service configuration.
#[derive(Clone)]
pub struct Configuration<C, G, E = NoExtension> {
/// Implementation name
pub impl_name: &'static str,
Expand All @@ -39,6 +38,8 @@ pub struct Configuration<C, G, E = NoExtension> {
pub impl_commit: &'static str,
/// Node roles.
pub roles: Roles,
/// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error.
pub tasks_executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>>,
/// Extrinsic pool configuration.
pub transaction_pool: TransactionPoolOptions,
/// Network configuration.
Expand Down Expand Up @@ -160,6 +161,7 @@ impl<C, G, E> Configuration<C, G, E> where
config_dir: config_dir.clone(),
name: Default::default(),
roles: Roles::FULL,
tasks_executor: None,
transaction_pool: Default::default(),
network: Default::default(),
keystore: KeystoreConfig::None,
Expand Down
3 changes: 3 additions & 0 deletions client/service/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum Error {
/// Best chain selection strategy is missing.
#[display(fmt="Best chain selection strategy (SelectChain) is not provided.")]
SelectChainRequired,
/// Tasks executor is missing.
#[display(fmt="Tasks executor hasn't been provided.")]
TasksExecutorRequired,
/// Other error.
Other(String),
}
Expand Down
25 changes: 4 additions & 21 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parking_lot::Mutex;
use sc_client::Client;
use exit_future::Signal;
use futures::{
Future, FutureExt, Stream, StreamExt, TryFutureExt,
Future, FutureExt, Stream, StreamExt,
future::select, channel::mpsc,
compat::*,
sink::SinkExt,
Expand Down Expand Up @@ -95,10 +95,8 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Receiver for futures that must be spawned as background tasks.
to_spawn_rx: mpsc::UnboundedReceiver<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// List of futures to poll from `poll`.
/// If spawning a background task is not possible, we instead push the task into this `Vec`.
/// The elements must then be polled manually.
to_poll: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// How to spawn background tasks.
tasks_executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>,
_rpc: Box<dyn std::any::Any + Send + Sync>,
_telemetry: Option<sc_telemetry::Telemetry>,
Expand Down Expand Up @@ -322,22 +320,7 @@ impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
}

while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) {
// TODO: Update to tokio 0.2 when libp2p get switched to std futures (#4383)
let executor = tokio_executor::DefaultExecutor::current();
use futures01::future::Executor;
if let Err(err) = executor.execute(task_to_spawn.unit_error().compat()) {
debug!(
target: "service",
"Failed to spawn background task: {:?}; falling back to manual polling",
err
);
this.to_poll.push(Box::pin(err.into_future().compat().map(drop)));
}
}

// Polling all the `to_poll` futures.
while let Some(pos) = this.to_poll.iter_mut().position(|t| Pin::new(t).poll(cx).is_ready()) {
let _ = this.to_poll.remove(pos);
(this.tasks_executor)(task_to_spawn);
}

// The service future never ends.
Expand Down
21 changes: 19 additions & 2 deletions client/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
use std::iter;
use std::sync::{Arc, Mutex, MutexGuard};
use std::net::Ipv4Addr;
use std::pin::Pin;
use std::time::Duration;
use log::info;
use futures01::{Future, Stream, Poll};
use futures::{FutureExt as _, TryFutureExt as _};
use tempfile::TempDir;
use tokio::{runtime::Runtime, prelude::FutureExt};
use tokio::timer::Interval;
Expand Down Expand Up @@ -131,6 +133,7 @@ fn node_config<G, E: Clone> (
index: usize,
spec: &ChainSpec<G, E>,
role: Roles,
tasks_executor: Box<dyn Fn(Pin<Box<dyn futures::Future<Output = ()> + Send>>) + Send>,
key_seed: Option<String>,
base_port: u16,
root: &TempDir,
Expand Down Expand Up @@ -172,6 +175,7 @@ fn node_config<G, E: Clone> (
impl_version: "0.1",
impl_commit: "",
roles: role,
tasks_executor: Some(tasks_executor),
transaction_pool: Default::default(),
network: network_config,
keystore: KeystoreConfig::Path {
Expand Down Expand Up @@ -251,10 +255,15 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
let executor = self.runtime.executor();

for (key, authority) in authorities {
let tasks_executor = {
let executor = executor.clone();
Box::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(
self.nodes,
&self.chain_spec,
Roles::AUTHORITY,
tasks_executor,
Some(key),
self.base_port,
&temp,
Expand All @@ -270,7 +279,11 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
}

for full in full {
let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, None, self.base_port, &temp);
let tasks_executor = {
let executor = executor.clone();
Box::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, tasks_executor, None, self.base_port, &temp);
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let (service, user_data) = full(node_config).expect("Error creating test node service");
let service = SyncService::from(service);
Expand All @@ -282,7 +295,11 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
}

for light in light {
let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, None, self.base_port, &temp);
let tasks_executor = {
let executor = executor.clone();
Box::new(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>| executor.spawn(fut.unit_error().compat()))
};
let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, tasks_executor, None, self.base_port, &temp);
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = SyncService::from(light(node_config).expect("Error creating test node service"));

Expand Down
3 changes: 3 additions & 0 deletions utils/browser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ where
allow_private_ipv4: true,
enable_mdns: false,
};
config.tasks_executor = Some(Box::new(move |fut| {
wasm_bindgen_futures::spawn_local(fut)
}));
config.telemetry_external_transport = Some(transport);
config.roles = Roles::LIGHT;
config.name = format!("{} (Browser)", name);
Expand Down