diff --git a/Cargo.lock b/Cargo.lock index a68f014e98ea2..d8f52aa141a95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2105,7 +2105,7 @@ dependencies = [ "http 0.2.1", "indexmap", "slab", - "tokio 0.2.23", + "tokio 0.2.25", "tokio-util", "tracing", "tracing-futures", @@ -2346,7 +2346,7 @@ dependencies = [ "itoa", "pin-project 1.0.2", "socket2", - "tokio 0.2.23", + "tokio 0.2.25", "tower-service", "tracing", "want 0.3.0", @@ -2365,7 +2365,7 @@ dependencies = [ "log", "rustls 0.18.1", "rustls-native-certs", - "tokio 0.2.23", + "tokio 0.2.25", "tokio-rustls", "webpki", ] @@ -3499,9 +3499,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.6.22" +version = "0.6.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" dependencies = [ "cfg-if 0.1.10", "fuchsia-zircon", @@ -3510,7 +3510,7 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow 0.2.1", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", @@ -3553,9 +3553,9 @@ dependencies = [ [[package]] name = "miow" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" dependencies = [ "kernel32-sys", "net2", @@ -3665,9 +3665,9 @@ dependencies = [ [[package]] name = "net2" -version = "0.2.35" +version = "0.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ebc3ec692ed7c9a255596c67808dee269f64655d8baf7b4f0638e51ba1d6853" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" dependencies = [ "cfg-if 0.1.10", "libc", @@ -6535,7 +6535,7 @@ dependencies = [ "tempfile", "thiserror", "tiny-bip39", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -6791,7 +6791,7 @@ dependencies = [ "substrate-test-runtime-client", "substrate-test-runtime-transaction-pool", "tempfile", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -6986,7 +6986,7 @@ dependencies = [ "substrate-prometheus-endpoint", "substrate-test-runtime-client", "tempfile", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -7230,7 +7230,7 @@ dependencies = [ "sp-utils", "substrate-test-runtime-client", "threadpool", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -7413,9 +7413,10 @@ dependencies = [ "substrate-test-runtime-client", "tempfile", "thiserror", - "tokio 0.2.23", + "tokio 0.2.25", "tracing", "tracing-futures", + "tracing-subscriber", "wasm-timer", ] @@ -8922,7 +8923,7 @@ dependencies = [ "sc-rpc-api", "serde", "sp-storage", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -8960,7 +8961,7 @@ dependencies = [ "hyper 0.13.9", "log", "prometheus", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -9075,7 +9076,7 @@ dependencies = [ "futures 0.3.9", "sc-service", "substrate-test-utils-derive", - "tokio 0.2.23", + "tokio 0.2.25", "trybuild", ] @@ -9094,7 +9095,7 @@ version = "0.1.0" dependencies = [ "sc-service", "substrate-test-utils", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] @@ -9323,9 +9324,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.23" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" +checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" dependencies = [ "bytes 0.5.6", "fnv", @@ -9459,7 +9460,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a" dependencies = [ "futures-core", "rustls 0.18.1", - "tokio 0.2.23", + "tokio 0.2.25", "webpki", ] @@ -9569,7 +9570,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.11", - "tokio 0.2.23", + "tokio 0.2.25", ] [[package]] diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 10caca86e6216..78c5f94baf66e 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -89,5 +89,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/" sp-consensus-babe = { version = "0.8.0", path = "../../primitives/consensus/babe" } grandpa = { version = "0.8.0", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } -tokio = { version = "0.2", default-features = false } +tokio = { version = "0.2.25", default-features = false } async-std = { version = "1.6.5", default-features = false } +tracing-subscriber = "0.2.15" diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 6b14fbeec2c6f..9a1fd15952e12 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -24,7 +24,7 @@ use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, future::{select, Either, BoxFuture, join_all, try_join_all, pending}, - sink::SinkExt, + sink::SinkExt, task::{Context, Poll}, }; use prometheus_endpoint::{ exponential_buckets, register, @@ -40,6 +40,37 @@ mod prometheus_future; #[cfg(test)] mod tests; +/// A wrapper around a `[Option]` and a [`Future`]. +/// +/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure" +/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support +/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every +/// time the future is polled and polls the inner future. So, the inner future can still have its +/// own span attached and we get our telemetry span ;) +struct WithTelemetrySpan { + span: Option, + inner: T, +} + +impl WithTelemetrySpan { + fn new(span: Option, inner: T) -> Self { + Self { + span, + inner, + } + } +} + +impl + Unpin> Future for WithTelemetrySpan { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { + let span = self.span.clone(); + let _enter = span.as_ref().map(|s| s.enter()); + Pin::new(&mut self.inner).poll(ctx) + } +} + /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { @@ -124,10 +155,11 @@ impl SpawnTaskHandle { } }; - let join_handle = { - let _span = self.telemetry_span.as_ref().map(|s| s.enter()); - self.executor.spawn(Box::pin(future.in_current_span()), task_type) - }; + let future = future.in_current_span().boxed(); + let join_handle = self.executor.spawn( + WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(), + task_type, + ); let mut task_notifier = self.task_notifier.clone(); self.executor.spawn( diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index f0ede1fc389a5..257f7db19870f 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -20,9 +20,10 @@ use crate::config::TaskExecutor; use crate::task_manager::TaskManager; use futures::{future::FutureExt, pin_mut, select}; use parking_lot::Mutex; -use std::any::Any; -use std::sync::Arc; -use std::time::Duration; +use std::{any::Any, sync::Arc, time::Duration}; +use tracing_subscriber::{layer::{SubscriberExt, Context}, Layer}; +use tracing::{subscriber::Subscriber, span::{Attributes, Id, Record, Span}, event::Event}; +use sc_telemetry::TelemetrySpan; #[derive(Clone, Debug)] struct DropTester(Arc>); @@ -312,3 +313,94 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } + +struct TestLayer { + spans_entered: Arc>>, + spans: Arc>>, +} + +impl Layer for TestLayer { + fn new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context) { + self.spans.lock().insert(id.clone(), attrs.metadata().name().to_string()); + } + + fn on_record(&self, _: &Id, _: &Record<'_>, _: Context) {} + + fn on_event(&self, _: &Event<'_>, _: Context) {} + + fn on_enter(&self, span: &Id, _: Context) { + let name = self.spans.lock().get(span).unwrap().clone(); + self.spans_entered.lock().push(name); + } + + fn on_exit(&self, _: &Id, _: Context) {} + + fn on_close(&self, _: Id, _: Context) {} +} + +type TestSubscriber = tracing_subscriber::layer::Layered< + TestLayer, + tracing_subscriber::fmt::Subscriber +>; + +fn setup_subscriber() -> ( + TestSubscriber, + Arc>>, +) { + let spans_entered = Arc::new(Mutex::new(Default::default())); + let layer = TestLayer { + spans: Arc::new(Mutex::new(Default::default())), + spans_entered: spans_entered.clone(), + }; + let subscriber = tracing_subscriber::fmt().finish().with(layer); + (subscriber, spans_entered) +} + +#[test] +fn telemetry_span_is_forwarded_to_task() { + let (subscriber, spans_entered) = setup_subscriber(); + let _sub_guard = tracing::subscriber::set_global_default(subscriber); + + let telemetry_span = TelemetrySpan::new(); + + let span = tracing::info_span!("test"); + let _enter = span.enter(); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ())); + let task_manager = TaskManager::new(task_executor, None, Some(telemetry_span.clone())).unwrap(); + + let (sender, receiver) = futures::channel::oneshot::channel(); + let spawn_handle = task_manager.spawn_handle(); + + let span = span.clone(); + task_manager.spawn_handle().spawn( + "test", + async move { + assert_eq!(span, Span::current()); + spawn_handle.spawn("test-nested", async move { + assert_eq!(span, Span::current()); + sender.send(()).unwrap(); + }.boxed()); + }.boxed(), + ); + + // We need to leave exit the span here. If tokio is not running with multithreading, this + // would lead to duplicate spans being "active" and forwarding the wrong one. + drop(_enter); + runtime.block_on(receiver).unwrap(); + runtime.block_on(task_manager.clean_shutdown()); + drop(runtime); + + let spans = spans_entered.lock(); + // We entered the telemetry span and the "test" in the future, the nested future and + // the "test" span outside of the future. So, we should have recorded 3 spans. + assert_eq!(5, spans.len()); + + assert_eq!(spans[0], "test"); + assert_eq!(spans[1], telemetry_span.span().metadata().unwrap().name()); + assert_eq!(spans[2], "test"); + assert_eq!(spans[3], telemetry_span.span().metadata().unwrap().name()); + assert_eq!(spans[4], "test"); +}