Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 017a9a0

Browse files
authored
Fix tracing spans are not being forwarded to spawned task (#8009)
* Fix tracing spans are not being forwarded to spawned task There is a bug that tracing spans are not forwarded to spawned task. The problem was that only the telemetry span was forwarded. The solution to this is to use the tracing provided `in_current_span` to capture the current active span and pass the telemetry span explictely. We will now always enter the span when the future is polled. This is essentially the same strategy as tracing is doing with its `Instrumented`, but now extended for our use case with having multiple spans active. * More tests
1 parent 30ec0be commit 017a9a0

File tree

4 files changed

+158
-32
lines changed

4 files changed

+158
-32
lines changed

Cargo.lock

Lines changed: 24 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/service/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,5 +89,6 @@ substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime/"
8989
sp-consensus-babe = { version = "0.8.0", path = "../../primitives/consensus/babe" }
9090
grandpa = { version = "0.8.0", package = "sc-finality-grandpa", path = "../finality-grandpa" }
9191
grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
92-
tokio = { version = "0.2", default-features = false }
92+
tokio = { version = "0.2.25", default-features = false }
9393
async-std = { version = "1.6.5", default-features = false }
94+
tracing-subscriber = "0.2.15"

client/service/src/task_manager/mod.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use log::{debug, error};
2424
use futures::{
2525
Future, FutureExt, StreamExt,
2626
future::{select, Either, BoxFuture, join_all, try_join_all, pending},
27-
sink::SinkExt,
27+
sink::SinkExt, task::{Context, Poll},
2828
};
2929
use prometheus_endpoint::{
3030
exponential_buckets, register,
@@ -40,6 +40,37 @@ mod prometheus_future;
4040
#[cfg(test)]
4141
mod tests;
4242

43+
/// A wrapper around a `[Option<TelemetrySpan>]` and a [`Future`].
44+
///
45+
/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure"
46+
/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support
47+
/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every
48+
/// time the future is polled and polls the inner future. So, the inner future can still have its
49+
/// own span attached and we get our telemetry span ;)
50+
struct WithTelemetrySpan<T> {
51+
span: Option<TelemetrySpan>,
52+
inner: T,
53+
}
54+
55+
impl<T> WithTelemetrySpan<T> {
56+
fn new(span: Option<TelemetrySpan>, inner: T) -> Self {
57+
Self {
58+
span,
59+
inner,
60+
}
61+
}
62+
}
63+
64+
impl<T: Future<Output = ()> + Unpin> Future for WithTelemetrySpan<T> {
65+
type Output = ();
66+
67+
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
68+
let span = self.span.clone();
69+
let _enter = span.as_ref().map(|s| s.enter());
70+
Pin::new(&mut self.inner).poll(ctx)
71+
}
72+
}
73+
4374
/// An handle for spawning tasks in the service.
4475
#[derive(Clone)]
4576
pub struct SpawnTaskHandle {
@@ -124,10 +155,11 @@ impl SpawnTaskHandle {
124155
}
125156
};
126157

127-
let join_handle = {
128-
let _span = self.telemetry_span.as_ref().map(|s| s.enter());
129-
self.executor.spawn(Box::pin(future.in_current_span()), task_type)
130-
};
158+
let future = future.in_current_span().boxed();
159+
let join_handle = self.executor.spawn(
160+
WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(),
161+
task_type,
162+
);
131163

132164
let mut task_notifier = self.task_notifier.clone();
133165
self.executor.spawn(

client/service/src/task_manager/tests.rs

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ use crate::config::TaskExecutor;
2020
use crate::task_manager::TaskManager;
2121
use futures::{future::FutureExt, pin_mut, select};
2222
use parking_lot::Mutex;
23-
use std::any::Any;
24-
use std::sync::Arc;
25-
use std::time::Duration;
23+
use std::{any::Any, sync::Arc, time::Duration};
24+
use tracing_subscriber::{layer::{SubscriberExt, Context}, Layer};
25+
use tracing::{subscriber::Subscriber, span::{Attributes, Id, Record, Span}, event::Event};
26+
use sc_telemetry::TelemetrySpan;
2627

2728
#[derive(Clone, Debug)]
2829
struct DropTester(Arc<Mutex<usize>>);
@@ -312,3 +313,94 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
312313
runtime.block_on(task_manager.clean_shutdown());
313314
assert_eq!(drop_tester, 0);
314315
}
316+
317+
struct TestLayer {
318+
spans_entered: Arc<Mutex<Vec<String>>>,
319+
spans: Arc<Mutex<std::collections::HashMap<Id, String>>>,
320+
}
321+
322+
impl<S: Subscriber> Layer<S> for TestLayer {
323+
fn new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<S>) {
324+
self.spans.lock().insert(id.clone(), attrs.metadata().name().to_string());
325+
}
326+
327+
fn on_record(&self, _: &Id, _: &Record<'_>, _: Context<S>) {}
328+
329+
fn on_event(&self, _: &Event<'_>, _: Context<S>) {}
330+
331+
fn on_enter(&self, span: &Id, _: Context<S>) {
332+
let name = self.spans.lock().get(span).unwrap().clone();
333+
self.spans_entered.lock().push(name);
334+
}
335+
336+
fn on_exit(&self, _: &Id, _: Context<S>) {}
337+
338+
fn on_close(&self, _: Id, _: Context<S>) {}
339+
}
340+
341+
type TestSubscriber = tracing_subscriber::layer::Layered<
342+
TestLayer,
343+
tracing_subscriber::fmt::Subscriber
344+
>;
345+
346+
fn setup_subscriber() -> (
347+
TestSubscriber,
348+
Arc<Mutex<Vec<String>>>,
349+
) {
350+
let spans_entered = Arc::new(Mutex::new(Default::default()));
351+
let layer = TestLayer {
352+
spans: Arc::new(Mutex::new(Default::default())),
353+
spans_entered: spans_entered.clone(),
354+
};
355+
let subscriber = tracing_subscriber::fmt().finish().with(layer);
356+
(subscriber, spans_entered)
357+
}
358+
359+
#[test]
360+
fn telemetry_span_is_forwarded_to_task() {
361+
let (subscriber, spans_entered) = setup_subscriber();
362+
let _sub_guard = tracing::subscriber::set_global_default(subscriber);
363+
364+
let telemetry_span = TelemetrySpan::new();
365+
366+
let span = tracing::info_span!("test");
367+
let _enter = span.enter();
368+
369+
let mut runtime = tokio::runtime::Runtime::new().unwrap();
370+
let handle = runtime.handle().clone();
371+
let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ()));
372+
let task_manager = TaskManager::new(task_executor, None, Some(telemetry_span.clone())).unwrap();
373+
374+
let (sender, receiver) = futures::channel::oneshot::channel();
375+
let spawn_handle = task_manager.spawn_handle();
376+
377+
let span = span.clone();
378+
task_manager.spawn_handle().spawn(
379+
"test",
380+
async move {
381+
assert_eq!(span, Span::current());
382+
spawn_handle.spawn("test-nested", async move {
383+
assert_eq!(span, Span::current());
384+
sender.send(()).unwrap();
385+
}.boxed());
386+
}.boxed(),
387+
);
388+
389+
// We need to leave exit the span here. If tokio is not running with multithreading, this
390+
// would lead to duplicate spans being "active" and forwarding the wrong one.
391+
drop(_enter);
392+
runtime.block_on(receiver).unwrap();
393+
runtime.block_on(task_manager.clean_shutdown());
394+
drop(runtime);
395+
396+
let spans = spans_entered.lock();
397+
// We entered the telemetry span and the "test" in the future, the nested future and
398+
// the "test" span outside of the future. So, we should have recorded 3 spans.
399+
assert_eq!(5, spans.len());
400+
401+
assert_eq!(spans[0], "test");
402+
assert_eq!(spans[1], telemetry_span.span().metadata().unwrap().name());
403+
assert_eq!(spans[2], "test");
404+
assert_eq!(spans[3], telemetry_span.span().metadata().unwrap().name());
405+
assert_eq!(spans[4], "test");
406+
}

0 commit comments

Comments
 (0)