1414//! Substrate service tasks management module.
1515
1616use std:: {
17+ pin:: Pin ,
1718 result:: Result , sync:: Arc ,
1819 task:: { Poll , Context } ,
19- borrow:: Cow , pin:: Pin ,
2020} ;
2121use exit_future:: Signal ;
2222use log:: { debug, error} ;
@@ -26,14 +26,21 @@ use futures::{
2626 compat:: * ,
2727 task:: { Spawn , FutureObj , SpawnError } ,
2828} ;
29+ use prometheus_endpoint:: {
30+ exponential_buckets, register,
31+ PrometheusError ,
32+ CounterVec , HistogramOpts , HistogramVec , Opts , Registry , U64
33+ } ;
2934use sc_client_api:: CloneableSpawn ;
3035use sp_utils:: mpsc:: { tracing_unbounded, TracingUnboundedSender , TracingUnboundedReceiver } ;
3136
37+ mod prometheus_future;
38+
3239/// Type alias for service task executor (usually runtime).
3340pub type ServiceTaskExecutor = Arc < dyn Fn ( Pin < Box < dyn Future < Output = ( ) > + Send > > ) + Send + Sync > ;
3441
3542/// Type alias for the task scheduler.
36- pub type TaskScheduler = TracingUnboundedSender < ( Pin < Box < dyn Future < Output = ( ) > + Send > > , Cow < ' static , str > ) > ;
43+ pub type TaskScheduler = TracingUnboundedSender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ;
3744
3845/// Helper struct to setup background tasks execution for service.
3946pub struct TaskManagerBuilder {
@@ -45,20 +52,29 @@ pub struct TaskManagerBuilder {
4552 /// Sender for futures that must be spawned as background tasks.
4653 to_spawn_tx : TaskScheduler ,
4754 /// Receiver for futures that must be spawned as background tasks.
48- to_spawn_rx : TracingUnboundedReceiver < ( Pin < Box < dyn Future < Output = ( ) > + Send > > , Cow < ' static , str > ) > ,
55+ to_spawn_rx : TracingUnboundedReceiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
56+ /// Prometheus metrics where to report the stats about tasks.
57+ metrics : Option < Metrics > ,
4958}
5059
5160impl TaskManagerBuilder {
5261 /// New asynchronous task manager setup.
53- pub fn new ( ) -> Self {
62+ ///
63+ /// If a Prometheus registry is passed, it will be used to report statistics about the
64+ /// service tasks.
65+ pub fn new ( prometheus_registry : Option < & Registry > ) -> Result < Self , PrometheusError > {
5466 let ( signal, on_exit) = exit_future:: signal ( ) ;
5567 let ( to_spawn_tx, to_spawn_rx) = tracing_unbounded ( "mpsc_task_manager" ) ;
56- Self {
68+
69+ let metrics = prometheus_registry. map ( Metrics :: register) . transpose ( ) ?;
70+
71+ Ok ( Self {
5772 on_exit,
5873 signal : Some ( signal) ,
5974 to_spawn_tx,
6075 to_spawn_rx,
61- }
76+ metrics,
77+ } )
6278 }
6379
6480 /// Get spawn handle.
@@ -69,6 +85,7 @@ impl TaskManagerBuilder {
6985 SpawnTaskHandle {
7086 on_exit : self . on_exit . clone ( ) ,
7187 sender : self . to_spawn_tx . clone ( ) ,
88+ metrics : self . metrics . clone ( ) ,
7289 }
7390 }
7491
@@ -78,14 +95,16 @@ impl TaskManagerBuilder {
7895 on_exit,
7996 signal,
8097 to_spawn_rx,
81- to_spawn_tx
98+ to_spawn_tx,
99+ metrics,
82100 } = self ;
83101 TaskManager {
84102 on_exit,
85103 signal,
86104 to_spawn_tx,
87105 to_spawn_rx,
88106 executor,
107+ metrics,
89108 }
90109 }
91110}
@@ -95,17 +114,45 @@ impl TaskManagerBuilder {
95114pub struct SpawnTaskHandle {
96115 sender : TaskScheduler ,
97116 on_exit : exit_future:: Exit ,
117+ metrics : Option < Metrics > ,
98118}
99119
100120impl SpawnTaskHandle {
101121 /// Spawns the given task with the given name.
102- pub fn spawn ( & self , name : impl Into < Cow < ' static , str > > , task : impl Future < Output = ( ) > + Send + ' static ) {
122+ ///
123+ /// Note that the `name` is a `&'static str`. The reason for this choice is that statistics
124+ /// about this task are getting reported to the Prometheus endpoint (if enabled), and that
125+ /// therefore the set of possible task names must be bounded.
126+ ///
127+ /// In other words, it would be a bad idea for someone to do for example
128+ /// `spawn(format!("{:?}", some_public_key))`.
129+ pub fn spawn ( & self , name : & ' static str , task : impl Future < Output = ( ) > + Send + ' static ) {
103130 let on_exit = self . on_exit . clone ( ) ;
131+ let metrics = self . metrics . clone ( ) ;
132+
133+ // Note that we increase the started counter here and not within the future. This way,
134+ // we could properly visualize on Prometheus situations where the spawning doesn't work.
135+ if let Some ( metrics) = & self . metrics {
136+ metrics. tasks_spawned . with_label_values ( & [ name] ) . inc ( ) ;
137+ // We do a dummy increase in order for the task to show up in metrics.
138+ metrics. tasks_ended . with_label_values ( & [ name] ) . inc_by ( 0 ) ;
139+ }
140+
104141 let future = async move {
105- futures:: pin_mut!( task) ;
106- let _ = select ( on_exit, task) . await ;
142+ if let Some ( metrics) = metrics {
143+ let poll_duration = metrics. poll_duration . with_label_values ( & [ name] ) ;
144+ let poll_start = metrics. poll_start . with_label_values ( & [ name] ) ;
145+ let task = prometheus_future:: with_poll_durations ( poll_duration, poll_start, task) ;
146+ futures:: pin_mut!( task) ;
147+ let _ = select ( on_exit, task) . await ;
148+ metrics. tasks_ended . with_label_values ( & [ name] ) . inc ( ) ;
149+ } else {
150+ futures:: pin_mut!( task) ;
151+ let _ = select ( on_exit, task) . await ;
152+ }
107153 } ;
108- if self . sender . unbounded_send ( ( Box :: pin ( future) , name. into ( ) ) ) . is_err ( ) {
154+
155+ if self . sender . unbounded_send ( Box :: pin ( future) ) . is_err ( ) {
109156 error ! ( "Failed to send task to spawn over channel" ) ;
110157 }
111158 }
@@ -114,9 +161,8 @@ impl SpawnTaskHandle {
114161impl Spawn for SpawnTaskHandle {
115162 fn spawn_obj ( & self , future : FutureObj < ' static , ( ) > )
116163 -> Result < ( ) , SpawnError > {
117- let future = select ( self . on_exit . clone ( ) , future) . map ( drop) ;
118- self . sender . unbounded_send ( ( Box :: pin ( future) , From :: from ( "unnamed" ) ) )
119- . map_err ( |_| SpawnError :: shutdown ( ) )
164+ self . spawn ( "unamed" , future) ;
165+ Ok ( ( ) )
120166 }
121167}
122168
@@ -145,40 +191,34 @@ pub struct TaskManager {
145191 /// Sender for futures that must be spawned as background tasks.
146192 to_spawn_tx : TaskScheduler ,
147193 /// Receiver for futures that must be spawned as background tasks.
148- to_spawn_rx : TracingUnboundedReceiver < ( Pin < Box < dyn Future < Output = ( ) > + Send > > , Cow < ' static , str > ) > ,
194+ /// Note: please read comment on [`SpawnTaskHandle::spawn`] for why this is a `&'static str`.
195+ to_spawn_rx : TracingUnboundedReceiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
149196 /// How to spawn background tasks.
150197 executor : ServiceTaskExecutor ,
198+ /// Prometheus metric where to report the polling times.
199+ metrics : Option < Metrics > ,
151200}
152201
153202impl TaskManager {
154203 /// Spawn background/async task, which will be aware on exit signal.
155- pub ( super ) fn spawn ( & self , name : impl Into < Cow < ' static , str > > , task : impl Future < Output = ( ) > + Send + ' static ) {
156- let on_exit = self . on_exit . clone ( ) ;
157- let future = async move {
158- futures:: pin_mut!( task) ;
159- let _ = select ( on_exit, task) . await ;
160- } ;
161- if self . to_spawn_tx . unbounded_send ( ( Box :: pin ( future) , name. into ( ) ) ) . is_err ( ) {
162- error ! ( "Failed to send task to spawn over channel" ) ;
163- }
204+ ///
205+ /// See also the documentation of [`SpawnTaskHandler::spawn`].
206+ pub ( super ) fn spawn ( & self , name : & ' static str , task : impl Future < Output = ( ) > + Send + ' static ) {
207+ self . spawn_handle ( ) . spawn ( name, task)
164208 }
165209
166210 pub ( super ) fn spawn_handle ( & self ) -> SpawnTaskHandle {
167211 SpawnTaskHandle {
168212 on_exit : self . on_exit . clone ( ) ,
169213 sender : self . to_spawn_tx . clone ( ) ,
214+ metrics : self . metrics . clone ( ) ,
170215 }
171216 }
172217
173- /// Get sender where background/async tasks can be sent.
174- pub ( super ) fn scheduler ( & self ) -> TaskScheduler {
175- self . to_spawn_tx . clone ( )
176- }
177-
178218 /// Process background task receiver.
179219 pub ( super ) fn process_receiver ( & mut self , cx : & mut Context ) {
180- while let Poll :: Ready ( Some ( ( task_to_spawn, name ) ) ) = Pin :: new ( & mut self . to_spawn_rx ) . poll_next ( cx) {
181- ( self . executor ) ( Box :: pin ( futures_diagnose :: diagnose ( name , task_to_spawn) ) ) ;
220+ while let Poll :: Ready ( Some ( task_to_spawn) ) = Pin :: new ( & mut self . to_spawn_rx ) . poll_next ( cx) {
221+ ( self . executor ) ( task_to_spawn) ;
182222 }
183223 }
184224
@@ -196,3 +236,51 @@ impl Drop for TaskManager {
196236 }
197237 }
198238}
239+
240+ #[ derive( Clone ) ]
241+ struct Metrics {
242+ // This list is ordered alphabetically
243+ poll_duration : HistogramVec ,
244+ poll_start : CounterVec < U64 > ,
245+ tasks_spawned : CounterVec < U64 > ,
246+ tasks_ended : CounterVec < U64 > ,
247+ }
248+
249+ impl Metrics {
250+ fn register ( registry : & Registry ) -> Result < Self , PrometheusError > {
251+ Ok ( Self {
252+ poll_duration : register ( HistogramVec :: new (
253+ HistogramOpts {
254+ common_opts : Opts :: new (
255+ "tasks_polling_duration" ,
256+ "Duration in seconds of each invocation of Future::poll"
257+ ) ,
258+ buckets : exponential_buckets ( 0.001 , 4.0 , 9 )
259+ . expect ( "function parameters are constant and always valid; qed" ) ,
260+ } ,
261+ & [ "task_name" ]
262+ ) ?, registry) ?,
263+ poll_start : register ( CounterVec :: new (
264+ Opts :: new (
265+ "tasks_polling_started_total" ,
266+ "Total number of times we started invoking Future::poll"
267+ ) ,
268+ & [ "task_name" ]
269+ ) ?, registry) ?,
270+ tasks_spawned : register ( CounterVec :: new (
271+ Opts :: new (
272+ "tasks_spawned_total" ,
273+ "Total number of tasks that have been spawned on the Service"
274+ ) ,
275+ & [ "task_name" ]
276+ ) ?, registry) ?,
277+ tasks_ended : register ( CounterVec :: new (
278+ Opts :: new (
279+ "tasks_ended_total" ,
280+ "Total number of tasks for which Future::poll has returned Ready(())"
281+ ) ,
282+ & [ "task_name" ]
283+ ) ?, registry) ?,
284+ } )
285+ }
286+ }
0 commit comments