diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index b068af0166817..8be59a9de6dc6 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -264,27 +264,15 @@ impl Runner { F: FnOnce(Configuration) -> std::result::Result, T: AbstractService + Unpin, { - let service = service_builder(self.config)?; - - // we eagerly drop the service so that the internal exit future is fired, - // but we need to keep holding a reference to the global telemetry guard - // and drop the runtime first. - let _telemetry = service.telemetry(); - - // we hold a reference to the base path so if the base path is a temporary directory it will - // not be deleted before the tokio runtime finish to clean up - let _base_path = service.base_path(); - - { - let f = service.fuse(); - self.tokio_runtime - .block_on(main(f)) - .map_err(|e| e.to_string())?; - } + let mut service = service_builder(self.config)?; + + let f = service.future().fuse(); + self.tokio_runtime + .block_on(main(f)) + .map_err(|e| e.to_string())?; - // The `service` **must** have been destroyed here for the shutdown signal to propagate - // to all the tasks. Dropping `tokio_runtime` will block the thread until all tasks have - // shut down. + service.terminate(); + // Dropping `tokio_runtime` will block the thread until all tasks have shut down. drop(self.tokio_runtime); Ok(()) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index fc0567e268260..9966de170b167 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -42,7 +42,7 @@ use std::net::SocketAddr; use std::collections::HashMap; use std::time::Duration; use wasm_timer::Instant; -use std::task::{Poll, Context}; +use std::task::Poll; use parking_lot::Mutex; use client::Client; @@ -135,7 +135,7 @@ pub struct Service { impl Unpin for Service {} /// Abstraction over a Substrate service. -pub trait AbstractService: Future> + Send + Unpin + Spawn + 'static { +pub trait AbstractService: Send + Unpin + Spawn + 'static { /// Type of block of this chain. type Block: BlockT; /// Backend storage for the client. @@ -216,6 +216,12 @@ pub trait AbstractService: Future> + Send + Unpin + S /// Get a clone of the base_path fn base_path(&self) -> Option>; + + /// Return a future that will end if an essential task fails. + fn future<'a>(&'a mut self) -> Pin> + Send + 'a>>; + + /// Signal to terminate all the running tasks. + fn terminate(&mut self); } impl AbstractService for @@ -320,27 +326,17 @@ where fn base_path(&self) -> Option> { self._base_path.clone() } -} - -impl Future for - Service -{ - type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); + fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.essential_failed_rx.next().await; - match Pin::new(&mut this.essential_failed_rx).poll_next(cx) { - Poll::Pending => {}, - Poll::Ready(_) => { - // Ready(None) should not be possible since we hold a live - // sender. - return Poll::Ready(Err(Error::Other("Essential task failed.".into()))); - } - } + Err(Error::Other("Essential task failed.".into())) + }) + } - // The service future never ends. - Poll::Pending + fn terminate(&mut self) { + self.task_manager.terminate() } } diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index 553ca9c326d8b..f9db91e8ef881 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -196,14 +196,19 @@ impl TaskManager { pub(super) fn on_exit(&self) -> exit_future::Exit { self.on_exit.clone() } + + /// Signal to terminate all the running tasks. + pub(super) fn terminate(&mut self) { + if let Some(signal) = self.signal.take() { + let _ = signal.fire(); + } + } } impl Drop for TaskManager { fn drop(&mut self) { debug!(target: "service", "Tasks manager shutdown"); - if let Some(signal) = self.signal.take() { - let _ = signal.fire(); - } + self.terminate(); } } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 613b0d71ce933..b2db4db496552 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -274,7 +274,6 @@ impl TestNet where let (service, user_data) = authority(node_config).expect("Error creating test node service"); let service = SyncService::from(service); - executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.authority_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; @@ -290,7 +289,6 @@ impl TestNet where let (service, user_data) = full(node_config).expect("Error creating test node service"); let service = SyncService::from(service); - executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.full_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; @@ -305,7 +303,6 @@ impl TestNet where let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let service = SyncService::from(light(node_config).expect("Error creating test node service")); - executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.light_nodes.push((self.nodes, service, addr)); self.nodes += 1; diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 04af2ceb58136..8482f1c6fedd6 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -137,7 +137,7 @@ pub fn start_client(mut service: impl AbstractService) -> Client { } } - Pin::new(&mut service) + Pin::new(&mut service.future()) .poll(cx) .map(drop) }));