diff --git a/Cargo.lock b/Cargo.lock index bcf51319908da..b1204a1c58600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4014,6 +4014,7 @@ dependencies = [ "parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 1.0.0", "srml-finality-tracker 1.0.0", "substrate-client 1.0.0", @@ -4360,6 +4361,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 4a33f3261a7da..099255bdf5d2d 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -956,6 +956,11 @@ impl Client where let header = self.header(&BlockId::Hash(finalized_hash))? .expect("header already known to exist in DB because it is indicated in the tree route; qed"); + telemetry!(SUBSTRATE_INFO; "notify.finalized"; + "height" => format!("{}", header.number()), + "best" => ?finalized_hash, + ); + let notification = FinalityNotification { header, hash: finalized_hash, diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index 2424eed06f18d..9b076f5b01957 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -16,6 +16,7 @@ runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" } consensus_common = { package = "substrate-consensus-common", path = "../consensus/common" } substrate-primitives = { path = "../primitives" } substrate-telemetry = { path = "../telemetry" } +serde_json = "1.0" client = { package = "substrate-client", path = "../client" } inherents = { package = "substrate-inherents", path = "../../core/inherents" } network = { package = "substrate-network", path = "../network" } diff --git a/core/finality-grandpa/src/aux_schema.rs b/core/finality-grandpa/src/aux_schema.rs index 99cecb98d5460..9e981cb903b14 100644 --- a/core/finality-grandpa/src/aux_schema.rs +++ b/core/finality-grandpa/src/aux_schema.rs @@ -25,6 +25,7 @@ use fork_tree::ForkTree; use grandpa::round::State as RoundState; use runtime_primitives::traits::{Block as BlockT, NumberFor}; use log::{info, warn}; +use substrate_telemetry::{telemetry, CONSENSUS_INFO}; use crate::authorities::{AuthoritySet, SharedAuthoritySet, PendingChange, DelayKind}; use crate::consensus_changes::{SharedConsensusChanges, ConsensusChanges}; @@ -365,6 +366,17 @@ pub(crate) fn update_authority_set( let encoded_set = set.encode(); if let Some(new_set) = new_set { + telemetry!(CONSENSUS_INFO; "afg.authority_set"; + "hash" => ?new_set.canon_hash, + "number" => ?new_set.canon_number, + "authority_set_id" => ?new_set.set_id, + "authorities" => { + let authorities: Vec = + new_set.authorities.iter().map(|(id, _)| format!("{}", id)).collect(); + format!("{:?}", authorities) + } + ); + // we also overwrite the "last completed round" entry with a blank slate // because from the perspective of the finality gadget, the chain has // reset. diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 4b8958f2e9c58..ec7ed330ac0eb 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -681,7 +681,7 @@ impl> Sink for CommitsOut { let (round, commit) = input; let round = Round(round); - telemetry!(CONSENSUS_INFO; "afg.commit_issued"; + telemetry!(CONSENSUS_DEBUG; "afg.commit_issued"; "target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash, ); let (precommits, auth_data) = commit.precommits.into_iter() diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 3d9ec8b5e766d..5a2b084160442 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -69,6 +69,7 @@ use inherents::InherentDataProviders; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, Pair, Blake2Hasher}; use substrate_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_WARN}; +use serde_json; use srml_finality_tracker; @@ -105,6 +106,7 @@ use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, Shared use import::GrandpaBlockImport; use until_imported::UntilCommitBlocksImported; use communication::NetworkBridge; +use service::TelemetryOnConnect; use ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; @@ -433,14 +435,26 @@ fn register_finality_tracker_inherent_data_provider, N, RA, X> { + /// Configuration for the GRANDPA service. + pub config: Config, + /// A link to the block import worker. + pub link: LinkHalf, + /// The Network instance. + pub network: N, + /// The inherent data providers. + pub inherent_data_providers: InherentDataProviders, + /// Handle to a future that will resolve on exit. + pub on_exit: X, + /// If supplied, can be used to hook on telemetry connection established events. + pub telemetry_on_connect: Option>, +} + /// Run a GRANDPA voter as a task. Provide configuration and a link to a /// block import worker that has already been instantiated with `block_import`. -pub fn run_grandpa_voter, N, RA>( - config: Config, - link: LinkHalf, - network: N, - inherent_data_providers: InherentDataProviders, - on_exit: impl Future + Clone + Send + 'static, +pub fn run_grandpa_voter, N, RA, X>( + grandpa_params: GrandpaParams, ) -> ::client::error::Result + Send + 'static> where Block::Hash: Ord, B: Backend + 'static, @@ -451,7 +465,17 @@ pub fn run_grandpa_voter, N, RA>( DigestFor: Encode, DigestItemFor: DigestItem, RA: Send + Sync + 'static, + X: Future + Clone + Send + 'static, { + let GrandpaParams { + config, + link, + network, + inherent_data_providers, + on_exit, + telemetry_on_connect, + } = grandpa_params; + use futures::future::{self, Loop as FutureLoop}; let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone()); @@ -465,6 +489,28 @@ pub fn run_grandpa_voter, N, RA>( register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; + if let Some(telemetry_on_connect) = telemetry_on_connect { + let authorities = authority_set.clone(); + let events = telemetry_on_connect.telemetry_connection_sinks + .for_each(move |_| { + telemetry!(CONSENSUS_INFO; "afg.authority_set"; + "authority_set_id" => ?authorities.set_id(), + "authorities" => { + let curr = authorities.current_authorities(); + let voters = curr.voters(); + let authorities: Vec = + voters.iter().map(|(id, _)| id.to_string()).collect(); + serde_json::to_string(&authorities) + .expect("authorities is always at least an empty vector; elements are always of type string") + } + ); + Ok(()) + }) + .then(|_| Ok(())); + let events = events.select(telemetry_on_connect.on_exit).then(|_| Ok(())); + telemetry_on_connect.executor.spawn(events); + } + let voters = authority_set.current_authorities(); let initial_environment = Arc::new(Environment { inner: client.clone(), @@ -660,12 +706,8 @@ pub fn run_grandpa_voter, N, RA>( } #[deprecated(since = "1.1", note = "Please switch to run_grandpa_voter.")] -pub fn run_grandpa, N, RA>( - config: Config, - link: LinkHalf, - network: N, - inherent_data_providers: InherentDataProviders, - on_exit: impl Future + Clone + Send + 'static, +pub fn run_grandpa, N, RA, X>( + grandpa_params: GrandpaParams, ) -> ::client::error::Result + Send + 'static> where Block::Hash: Ord, B: Backend + 'static, @@ -676,6 +718,7 @@ pub fn run_grandpa, N, RA>( DigestFor: Encode, DigestItemFor: DigestItem, RA: Send + Sync + 'static, + X: Future + Clone + Send + 'static, { - run_grandpa_voter(config, link, network, inherent_data_providers, on_exit) + run_grandpa_voter(grandpa_params) } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 1d8279e99563b..93d68af01b3d6 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -419,18 +419,20 @@ fn run_to_completion_with( fn assert_send(_: &T) { } - let voter = run_grandpa_voter( - Config { + let grandpa_params = GrandpaParams { + config: Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, local_key: Some(Arc::new(key.clone().into())), name: Some(format!("peer#{}", peer_id)), }, - link, - MessageRouting::new(net.clone(), peer_id), - InherentDataProviders::new(), - Exit, - ).expect("all in order with client and network"); + link: link, + network: MessageRouting::new(net.clone(), peer_id), + inherent_data_providers: InherentDataProviders::new(), + on_exit: Exit, + telemetry_on_connect: None, + }; + let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); assert_send(&voter); @@ -517,18 +519,21 @@ fn finalize_3_voters_1_full_observer() { .take_while(|n| Ok(n.header.number() < &20)) .for_each(move |_| Ok(())) ); - let voter = run_grandpa_voter( - Config { + + let grandpa_params = GrandpaParams { + config: Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, local_key, name: Some(format!("peer#{}", peer_id)), }, - link, - MessageRouting::new(net.clone(), peer_id), - InherentDataProviders::new(), - Exit, - ).expect("all in order with client and network"); + link: link, + network: MessageRouting::new(net.clone(), peer_id), + inherent_data_providers: InherentDataProviders::new(), + on_exit: Exit, + telemetry_on_connect: None, + }; + let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); runtime.spawn(voter); } @@ -679,18 +684,20 @@ fn transition_3_voters_twice_1_full_observer() { assert_eq!(set.pending_changes().count(), 0); }) ); - let voter = run_grandpa_voter( - Config { + let grandpa_params = GrandpaParams { + config: Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, local_key, name: Some(format!("peer#{}", peer_id)), }, - link, - MessageRouting::new(net.clone(), peer_id), - InherentDataProviders::new(), - Exit, - ).expect("all in order with client and network"); + link: link, + network: MessageRouting::new(net.clone(), peer_id), + inherent_data_providers: InherentDataProviders::new(), + on_exit: Exit, + telemetry_on_connect: None, + }; + let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); runtime.spawn(voter); } @@ -1081,18 +1088,20 @@ fn voter_persists_its_votes() { let (_block_import, _, link) = net.lock().make_block_import(client.clone()); let link = link.lock().take().unwrap(); - let mut voter = run_grandpa_voter( - Config { + let grandpa_params = GrandpaParams { + config: Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, local_key: Some(Arc::new(peers[0].clone().into())), name: Some(format!("peer#{}", 0)), }, - link, - MessageRouting::new(net.clone(), 0), - InherentDataProviders::new(), - Exit, - ).expect("all in order with client and network"); + link: link, + network: MessageRouting::new(net.clone(), 0), + inherent_data_providers: InherentDataProviders::new(), + on_exit: Exit, + telemetry_on_connect: None, + }; + let mut voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); let voter = future::poll_fn(move || { // we need to keep the block_import alive since it owns the diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 570dcf5f866e3..b801f47b58808 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -22,6 +22,7 @@ mod slots; use std::collections::VecDeque; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; +use linked_hash_map::LinkedHashMap; use log::trace; use lru_cache::LruCache; use slots::{SlotType, SlotState, Slots}; diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 789b05e9dc94c..e749ceeb1e848 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -28,6 +28,8 @@ pub mod chain_ops; use std::io; use std::net::SocketAddr; use std::collections::HashMap; +use futures::sync::mpsc; +use parking_lot::Mutex; use client::BlockchainEvents; use exit_future::Signal; @@ -82,6 +84,7 @@ pub struct Service { _rpc: Box<::std::any::Any + Send + Sync>, _telemetry: Option>, _offchain_workers: Option, ComponentBlock>>>, + _telemetry_on_connect_sinks: Arc>>>, } /// Creates bare client without any networking. @@ -96,7 +99,27 @@ pub fn new_client(config: &FactoryFullConfi Ok(client) } +/// Stream of events for connection established to a telemetry server. +pub type TelemetryOnConnectNotifications = mpsc::UnboundedReceiver<()>; + +/// Used to hook on telemetry connection established events. +pub struct TelemetryOnConnect<'a> { + /// Handle to a future that will resolve on exit. + pub on_exit: Box + Send + 'static>, + /// Event stream. + pub telemetry_connection_sinks: TelemetryOnConnectNotifications, + /// Executor to which the hook is spawned. + pub executor: &'a TaskExecutor, +} + impl Service { + /// Get event stream for telemetry connection established events. + pub fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications { + let (sink, stream) = mpsc::unbounded(); + self._telemetry_on_connect_sinks.lock().push(sink); + stream + } + /// Creates a new service. pub fn new( mut config: FactoryFullConfiguration, @@ -304,6 +327,8 @@ impl Service { config.rpc_ws, config.rpc_cors.clone(), task_executor.clone(), transaction_pool.clone(), )?; + let telemetry_connection_sinks: Arc>>> = Default::default(); + // Telemetry let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { let is_authority = config.roles == Roles::AUTHORITY; @@ -313,6 +338,7 @@ impl Service { let impl_name = config.impl_name.to_owned(); let version = version.clone(); let chain_name = config.chain_spec.name().to_owned(); + let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); Arc::new(tel::init_telemetry(tel::TelemetryConfig { endpoints, on_connect: Box::new(move || { @@ -326,6 +352,10 @@ impl Service { "authority" => is_authority, "network_id" => network_id.clone() ); + + telemetry_connection_sinks_.lock().retain(|sink| { + sink.unbounded_send(()).is_ok() + }); }), })) }); @@ -342,6 +372,7 @@ impl Service { _rpc: Box::new(rpc), _telemetry: telemetry, _offchain_workers: offchain_workers, + _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), }) } @@ -358,7 +389,7 @@ impl Service { } } - /// return a shared instance of Telemtry (if enabled) + /// return a shared instance of Telemetry (if enabled) pub fn telemetry(&self) -> Option> { self._telemetry.as_ref().map(|t| t.clone()) } diff --git a/core/telemetry/Cargo.toml b/core/telemetry/Cargo.toml index fced20977646d..959349f74fc35 100644 --- a/core/telemetry/Cargo.toml +++ b/core/telemetry/Cargo.toml @@ -12,6 +12,7 @@ log = "0.4" rand = "0.6" serde = "1.0.81" serde_derive = "1.0" +serde_json = "1.0" slog = { version = "^2", features = ["nested-values"] } slog-json = { version = "^2", features = ["nested-values"] } slog-async = { version = "^2", features = ["nested-values"] } diff --git a/core/telemetry/src/lib.rs b/core/telemetry/src/lib.rs index fba75c196aa36..a993b50a178f1 100644 --- a/core/telemetry/src/lib.rs +++ b/core/telemetry/src/lib.rs @@ -24,14 +24,12 @@ use std::{io, time, thread}; use std::sync::Arc; use parking_lot::Mutex; -use slog::{Drain, o}; +use slog::{Drain, o, OwnedKVList, Record}; use log::trace; use rand::{thread_rng, Rng}; pub use slog_scope::with_logger; pub use slog; use serde_derive::{Serialize, Deserialize}; -use slog::OwnedKVList; -use slog::Record; use core::result; /// Configuration for telemetry. @@ -56,7 +54,7 @@ pub const SUBSTRATE_INFO: &str = "0"; pub const CONSENSUS_TRACE: &str = "9"; pub const CONSENSUS_DEBUG: &str = "5"; pub const CONSENSUS_WARN: &str = "4"; -pub const CONSENSUS_INFO: &str = "3"; +pub const CONSENSUS_INFO: &str = "0"; /// Multiply logging to all drains. This is similar to `slog::Duplicate`, which is /// limited to two drains though and doesn't support dynamic nesting at runtime. @@ -166,7 +164,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard macro_rules! telemetry { ( $a:expr; $b:expr; $( $t:tt )* ) => { $crate::with_logger(|l| { - $crate::slog::slog_info!(l, #$a, $b; "verbosity" => stringify!($a), $($t)* ) + $crate::slog::slog_info!(l, #$a, $b; $($t)* ) }) } } diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 1366a62987a79..96a531a4ef7f1 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -37,6 +37,7 @@ use inherents::InherentDataProviders; use network::construct_simple_protocol; use substrate_service::construct_service_factory; use log::info; +use substrate_service::TelemetryOnConnect; construct_simple_protocol! { /// Demo protocol attachment for substrate. @@ -128,13 +129,20 @@ construct_service_factory! { )?); }, Some(_) => { - executor.spawn(grandpa::run_grandpa_voter( - config, - link_half, - service.network(), - service.config.custom.inherent_data_providers.clone(), - service.on_exit(), - )?); + let telemetry_on_connect = TelemetryOnConnect { + on_exit: Box::new(service.on_exit()), + telemetry_connection_sinks: service.telemetry_on_connect_stream(), + executor: &executor, + }; + let grandpa_config = grandpa::GrandpaParams { + config: config, + link: link_half, + network: service.network(), + inherent_data_providers: service.config.custom.inherent_data_providers.clone(), + on_exit: service.on_exit(), + telemetry_on_connect: Some(telemetry_on_connect), + }; + executor.spawn(grandpa::run_grandpa_voter(grandpa_config)?); }, }