diff --git a/Cargo.lock b/Cargo.lock index 525c2fe030f4d..55855f544fefb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5846,6 +5846,7 @@ dependencies = [ "sc-service", "sc-telemetry", "sc-tracing", + "serde", "serde_json", "sp-blockchain", "sp-core", @@ -6945,18 +6946,18 @@ checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" [[package]] name = "serde" -version = "1.0.110" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c" +checksum = "c9124df5b40cbd380080b2cc6ab894c040a3070d995f5c9dc77e18c34a8ae37d" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.110" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984" +checksum = "3f2c3ac8e6ca1e9c80b8be1023940162bf81ae3cffbb1809474152f2ce1eb250" dependencies = [ "proc-macro2", "quote 1.0.6", diff --git a/bin/node-template/node/src/command.rs b/bin/node-template/node/src/command.rs index 18e1b22a53f8e..0e7728e974e96 100644 --- a/bin/node-template/node/src/command.rs +++ b/bin/node-template/node/src/command.rs @@ -67,7 +67,14 @@ pub fn run() -> sc_cli::Result<()> { match &cli.subcommand { Some(subcommand) => { let runner = cli.create_runner(subcommand)?; - runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0)) + runner.run_subcommand(subcommand, |config| { + let ( + client, _import_setup, _inherent_data_providers, _backend, _tasks_builder, + _keystore, import_queue, _select_chain, _transaction_pool, _background_tasks, + ) = new_full_start!(config); + + Ok((client, import_queue)) + }) } None => { let runner = cli.create_runner(&cli.run)?; diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index b9bc68ce3a8b6..e87f22130e383 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -5,7 +5,7 @@ use std::time::Duration; use sc_client_api::ExecutorProvider; use sc_consensus::LongestChain; use node_template_runtime::{self, opaque::Block, RuntimeApi}; -use sc_service::{error::{Error as ServiceError}, AbstractService, Configuration, ServiceBuilder}; +use sc_service::{error::{Error as ServiceError}, AbstractService, Configuration}; use sp_inherents::InherentDataProviders; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; @@ -29,66 +29,55 @@ macro_rules! new_full_start { ($config:expr) => {{ use std::sync::Arc; use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; - - let mut import_setup = None; let inherent_data_providers = sp_inherents::InherentDataProviders::new(); - let builder = sc_service::ServiceBuilder::new_full::< - node_template_runtime::opaque::Block, - node_template_runtime::RuntimeApi, - crate::service::Executor - >($config)? - .with_select_chain(|_config, backend| { - Ok(sc_consensus::LongestChain::new(backend.clone())) - })? - .with_transaction_pool(|builder| { - let pool_api = sc_transaction_pool::FullChainApi::new( - builder.client().clone(), - ); - Ok(sc_transaction_pool::BasicPool::new( - builder.config().transaction_pool.clone(), - std::sync::Arc::new(pool_api), - builder.prometheus_registry(), - )) - })? - .with_import_queue(| - _config, - client, - mut select_chain, - _transaction_pool, - spawn_task_handle, - registry, - | { - let select_chain = select_chain.take() - .ok_or_else(|| sc_service::Error::SelectChainRequired)?; + let (client, backend, keystore, task_manager) = sc_service::new_full_parts::< + node_template_runtime::opaque::Block, node_template_runtime::RuntimeApi, crate::service::Executor + >(&$config)?; + let client = Arc::new(client); + let select_chain = sc_consensus::LongestChain::new(backend.clone()); + let pool_api = sc_transaction_pool::FullChainApi::new(Arc::clone(&client)); + let registry = $config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()); + let (transaction_pool, background_task_one) = sc_transaction_pool::BasicPool::new( + $config.transaction_pool.clone(), std::sync::Arc::new(pool_api), registry.as_ref() + ); + let transaction_pool = Arc::new(transaction_pool); + let mut background_tasks = Vec::new(); - let (grandpa_block_import, grandpa_link) = sc_finality_grandpa::block_import( - client.clone(), - &(client.clone() as Arc<_>), - select_chain, - )?; + if let Some(bg_t) = background_task_one { + background_tasks.push(("txpool-background", bg_t)); + } - let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( - grandpa_block_import.clone(), client.clone(), - ); + let (import_queue, import_setup) = { + let (grandpa_block_import, grandpa_link) = sc_finality_grandpa::block_import( + Arc::clone(&client), &(Arc::clone(&client) as Arc<_>), select_chain.clone() + )?; - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( - sc_consensus_aura::slot_duration(&*client)?, - aura_block_import, - Some(Box::new(grandpa_block_import.clone())), - None, - client, - inherent_data_providers.clone(), - spawn_task_handle, - registry, - )?; + let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( + grandpa_block_import.clone(), Arc::clone(&client), + ); - import_setup = Some((grandpa_block_import, grandpa_link)); + let spawn_task_handle = task_manager.spawn_handle(); + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( + sc_consensus_aura::slot_duration(&*client)?, + aura_block_import, + Some(Box::new(grandpa_block_import.clone())), + None, + client.clone(), + inherent_data_providers.clone(), + &spawn_task_handle, + registry.as_ref(), + )?; - Ok(import_queue) - })?; + let import_setup = Some((grandpa_block_import, grandpa_link)); - (builder, import_setup, inherent_data_providers) + (import_queue, import_setup) + }; + + ( + client, import_setup, inherent_data_providers, backend, task_manager, keystore, + import_queue, select_chain, transaction_pool, background_tasks, + ) }} } @@ -99,19 +88,37 @@ pub fn new_full(config: Configuration) -> Result>; - Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _) - })? - .build()?; + let provider = client.clone() as Arc>; + let finality_proof_provider = Arc::new( + GrandpaFinalityProofProvider::new(backend.clone(), provider) + ); + let service = sc_service::build(sc_service::ServiceParams { + config, + client, + backend, + task_manager, + keystore, + on_demand: None, + select_chain: Some(select_chain), + import_queue, + finality_proof_request_builder: None, + finality_proof_provider: Some(finality_proof_provider), + transaction_pool, + remote_blockchain: None, + background_tasks, + block_announce_validator: None, + rpc_extensions_builder: Box::new(|_| ()), + informant_prefix: String::new() + })?; if role.is_authority() { let proposer = sc_basic_authorship::ProposerFactory::new( @@ -203,66 +210,64 @@ pub fn new_full(config: Configuration) -> Result Result { let inherent_data_providers = InherentDataProviders::new(); - ServiceBuilder::new_light::(config)? - .with_select_chain(|_config, backend| { - Ok(LongestChain::new(backend.clone())) - })? - .with_transaction_pool(|builder| { - let fetcher = builder.fetcher() - .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; + let ((client, backend, keystore, task_manager), fetcher, remote_blockchain) = + sc_service::new_light_parts::(&config)?; + let client = Arc::new(client); + let select_chain = LongestChain::new(backend.clone()); + let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); + let registry = config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()); + let (transaction_pool, background_task_one) = sc_transaction_pool::BasicPool::new( + config.transaction_pool.clone(), std::sync::Arc::new(pool_api), registry.as_ref() + ); + let transaction_pool = Arc::new(transaction_pool); + let mut background_tasks = Vec::new(); - let pool_api = sc_transaction_pool::LightChainApi::new( - builder.client().clone(), - fetcher.clone(), - ); - let pool = sc_transaction_pool::BasicPool::with_revalidation_type( - builder.config().transaction_pool.clone(), - Arc::new(pool_api), - builder.prometheus_registry(), - sc_transaction_pool::RevalidationType::Light, - ); - Ok(pool) - })? - .with_import_queue_and_fprb(| - _config, - client, - backend, - fetcher, - _select_chain, - _tx_pool, - spawn_task_handle, - prometheus_registry, - | { - let fetch_checker = fetcher - .map(|fetcher| fetcher.checker().clone()) - .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; - let grandpa_block_import = sc_finality_grandpa::light_block_import( - client.clone(), - backend, - &(client.clone() as Arc<_>), - Arc::new(fetch_checker), - )?; - let finality_proof_import = grandpa_block_import.clone(); - let finality_proof_request_builder = - finality_proof_import.create_finality_proof_request_builder(); - - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( - sc_consensus_aura::slot_duration(&*client)?, - grandpa_block_import, - None, - Some(Box::new(finality_proof_import)), - client, - inherent_data_providers.clone(), - spawn_task_handle, - prometheus_registry, - )?; + if let Some(bg_t) = background_task_one { + background_tasks.push(("txpool-background", bg_t)); + } + let fetch_checker = fetcher.checker().clone(); + let grandpa_block_import = sc_finality_grandpa::light_block_import( + client.clone(), + backend.clone(), + &(client.clone() as Arc<_>), + Arc::new(fetch_checker), + )?; + let finality_proof_import = grandpa_block_import.clone(); + let finality_proof_request_builder = + finality_proof_import.create_finality_proof_request_builder(); - Ok((import_queue, finality_proof_request_builder)) - })? - .with_finality_proof_provider(|client, backend| { - // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider - let provider = client as Arc>; - Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _) - })? - .build() + let spawn_task_handle = task_manager.spawn_handle(); + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( + sc_consensus_aura::slot_duration(&*client)?, + grandpa_block_import, + None, + Some(Box::new(finality_proof_import)), + client.clone(), + inherent_data_providers.clone(), + &spawn_task_handle, + registry.as_ref() + )?; + // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider + let provider = client.clone() as Arc>; + let finality_proof_provider = Arc::new( + GrandpaFinalityProofProvider::new(backend.clone(), provider) + ); + sc_service::build(sc_service::ServiceParams { + config, + client, + backend, + task_manager, + keystore, + on_demand: None, + select_chain: Some(select_chain), + import_queue, + finality_proof_request_builder: Some(finality_proof_request_builder), + finality_proof_provider: Some(finality_proof_provider), + transaction_pool, + remote_blockchain: Some(remote_blockchain), + background_tasks, + block_announce_validator: None, + rpc_extensions_builder: Box::new(|_| ()), + informant_prefix: String::new() + }) } diff --git a/bin/node/cli/src/command.rs b/bin/node/cli/src/command.rs index bd5483f2cd31e..2a0b99b272318 100644 --- a/bin/node/cli/src/command.rs +++ b/bin/node/cli/src/command.rs @@ -95,7 +95,14 @@ pub fn run() -> Result<()> { Some(Subcommand::Base(subcommand)) => { let runner = cli.create_runner(subcommand)?; - runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0)) + runner.run_subcommand(subcommand, |config| { + let ( + client, _import_setup, _inherent_data_providers, _backend, _tasks_builder, + _keystore, import_queue, _select_chain, _transaction_pool, _background_tasks, _, _ + ) = new_full_start!(config); + + Ok((client, import_queue)) + }) } } } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index dfeca726b8449..4eacc74b3f4e8 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -29,7 +29,7 @@ use node_executor; use node_primitives::Block; use node_runtime::RuntimeApi; use sc_service::{ - AbstractService, ServiceBuilder, config::Configuration, error::{Error as ServiceError}, + AbstractService, config::Configuration, error::{Error as ServiceError}, }; use sp_inherents::InherentDataProviders; use sc_consensus::LongestChain; @@ -42,108 +42,103 @@ macro_rules! new_full_start { ($config:expr) => {{ use std::sync::Arc; - let mut import_setup = None; - let mut rpc_setup = None; let inherent_data_providers = sp_inherents::InherentDataProviders::new(); - let builder = sc_service::ServiceBuilder::new_full::< + let (client, backend, keystore, task_manager) = sc_service::new_full_parts::< node_primitives::Block, node_runtime::RuntimeApi, node_executor::Executor - >($config)? - .with_select_chain(|_config, backend| { - Ok(sc_consensus::LongestChain::new(backend.clone())) - })? - .with_transaction_pool(|builder| { - let pool_api = sc_transaction_pool::FullChainApi::new( - builder.client().clone(), - ); - let config = builder.config(); - - Ok(sc_transaction_pool::BasicPool::new( - config.transaction_pool.clone(), - std::sync::Arc::new(pool_api), - builder.prometheus_registry(), - )) - })? - .with_import_queue(| - _config, - client, - mut select_chain, - _transaction_pool, - spawn_task_handle, - prometheus_registry, - | { - let select_chain = select_chain.take() - .ok_or_else(|| sc_service::Error::SelectChainRequired)?; - let (grandpa_block_import, grandpa_link) = grandpa::block_import( - client.clone(), - &(client.clone() as Arc<_>), - select_chain, - )?; - let justification_import = grandpa_block_import.clone(); - - let (block_import, babe_link) = sc_consensus_babe::block_import( - sc_consensus_babe::Config::get_or_compute(&*client)?, - grandpa_block_import, - client.clone(), - )?; - - let import_queue = sc_consensus_babe::import_queue( - babe_link.clone(), - block_import.clone(), - Some(Box::new(justification_import)), - None, - client, - inherent_data_providers.clone(), - spawn_task_handle, - prometheus_registry, - )?; - - import_setup = Some((block_import, grandpa_link, babe_link)); - Ok(import_queue) - })? - .with_rpc_extensions_builder(|builder| { - let grandpa_link = import_setup.as_ref().map(|s| &s.1) - .expect("GRANDPA LinkHalf is present for full services or set up failed; qed."); - - let shared_authority_set = grandpa_link.shared_authority_set().clone(); - let shared_voter_state = grandpa::SharedVoterState::empty(); - - rpc_setup = Some((shared_voter_state.clone())); - - let babe_link = import_setup.as_ref().map(|s| &s.2) - .expect("BabeLink is present for full services or set up failed; qed."); - - let babe_config = babe_link.config().clone(); - let shared_epoch_changes = babe_link.epoch_changes().clone(); - - let client = builder.client().clone(); - let pool = builder.pool().clone(); - let select_chain = builder.select_chain().cloned() - .expect("SelectChain is present for full services or set up failed; qed."); - let keystore = builder.keystore().clone(); - - Ok(move |deny_unsafe| { - let deps = node_rpc::FullDeps { - client: client.clone(), - pool: pool.clone(), - select_chain: select_chain.clone(), - deny_unsafe, - babe: node_rpc::BabeDeps { - babe_config: babe_config.clone(), - shared_epoch_changes: shared_epoch_changes.clone(), - keystore: keystore.clone(), - }, - grandpa: node_rpc::GrandpaDeps { - shared_voter_state: shared_voter_state.clone(), - shared_authority_set: shared_authority_set.clone(), - }, - }; + >(&$config)?; + let client = Arc::new(client); + + let mut background_tasks = Vec::new(); + let spawn_task_handle = task_manager.spawn_handle(); + + // Registry for Prometheus metrics + let registry = $config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()); + + // Select chain + let select_chain = sc_consensus::LongestChain::new(backend.clone()); + + // Transaction pool + let pool_api = sc_transaction_pool::FullChainApi::new( + client.clone(), + ); + let (transaction_pool, transaction_pool_task) = sc_transaction_pool::BasicPool::new( + $config.transaction_pool.clone(), + std::sync::Arc::new(pool_api), + registry.as_ref(), + ); + let transaction_pool = Arc::new(transaction_pool); - node_rpc::create_full(deps) - }) - })?; + if let Some(task) = transaction_pool_task { + background_tasks.push(("txpool-background", task)); + } + + // Import queue + let (grandpa_block_import, grandpa_link) = grandpa::block_import( + client.clone(), + &(client.clone() as Arc<_>), + select_chain.clone(), + )?; + let justification_import = grandpa_block_import.clone(); + + let (block_import, babe_link) = sc_consensus_babe::block_import( + sc_consensus_babe::Config::get_or_compute(&*client)?, + grandpa_block_import.clone(), + client.clone(), + )?; + + let import_queue = sc_consensus_babe::import_queue( + babe_link.clone(), + block_import.clone(), + Some(Box::new(justification_import)), + None, + client.clone(), + inherent_data_providers.clone(), + &spawn_task_handle, + registry.as_ref(), + )?; + + // Rpc extensions + + let shared_authority_set = grandpa_link.shared_authority_set().clone(); + let shared_voter_state = grandpa::SharedVoterState::empty(); + + let babe_config = babe_link.config().clone(); + let shared_epoch_changes = babe_link.epoch_changes().clone(); + + let rpc_setup = shared_voter_state.clone(); + + let rpc_client = client.clone(); + let rpc_keystore = keystore.clone(); + let rpc_transaction_pool = transaction_pool.clone(); + let rpc_select_chain = select_chain.clone(); + + let rpc_extensions_builder = Box::new(move |deny_unsafe| { + let deps = node_rpc::FullDeps { + client: rpc_client.clone(), + pool: rpc_transaction_pool.clone(), + select_chain: rpc_select_chain.clone(), + deny_unsafe, + babe: node_rpc::BabeDeps { + babe_config: babe_config.clone(), + shared_epoch_changes: shared_epoch_changes.clone(), + keystore: rpc_keystore.clone(), + }, + grandpa: node_rpc::GrandpaDeps { + shared_voter_state: shared_voter_state.clone(), + shared_authority_set: shared_authority_set.clone(), + }, + }; + + node_rpc::create_full::<_, _, sc_rpc::Metadata, _>(deps) + }); + + let import_setup = (block_import, grandpa_link, babe_link); - (builder, import_setup, inherent_data_providers, rpc_setup) + ( + client, import_setup, inherent_data_providers, backend, task_manager, keystore, + import_queue, select_chain, transaction_pool, background_tasks, rpc_setup, rpc_extensions_builder + ) }} } @@ -170,22 +165,38 @@ macro_rules! new_full { $config.disable_grandpa, ); - let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) = - new_full_start!($config); - - let service = builder - .with_finality_proof_provider(|client, backend| { - // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider - let provider = client as Arc>; - Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, provider)) as _) - })? - .build()?; - - let (block_import, grandpa_link, babe_link) = import_setup.take() - .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); - - let shared_voter_state = rpc_setup.take() - .expect("The SharedVoterState is present for Full Services or setup failed before. qed"); + let ( + client, import_setup, inherent_data_providers, backend, task_manager, keystore, + import_queue, select_chain, transaction_pool, background_tasks, rpc_setup, rpc_extensions_builder + ) = new_full_start!($config); + + // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider + let provider = client.clone() as Arc>; + let finality_proof_provider = + Arc::new(grandpa::FinalityProofProvider::new(backend.clone(), provider)); + + let service = sc_service::build(sc_service::ServiceParams { + config: $config, + client, + backend, + task_manager, + keystore, + on_demand: None, + select_chain: Some(select_chain), + import_queue, + finality_proof_request_builder: None, + finality_proof_provider: Some(finality_proof_provider), + transaction_pool, + remote_blockchain: None, + background_tasks, + block_announce_validator: None, + rpc_extensions_builder, + informant_prefix: String::new() + })?; + + let (block_import, grandpa_link, babe_link) = import_setup; + + let shared_voter_state = rpc_setup; ($with_startup_data)(&block_import, &babe_link); @@ -323,91 +334,101 @@ pub fn new_light(config: Configuration) -> Result { let inherent_data_providers = InherentDataProviders::new(); - let service = ServiceBuilder::new_light::(config)? - .with_select_chain(|_config, backend| { - Ok(LongestChain::new(backend.clone())) - })? - .with_transaction_pool(|builder| { - let fetcher = builder.fetcher() - .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; - let pool_api = sc_transaction_pool::LightChainApi::new( - builder.client().clone(), - fetcher, - ); - let pool = sc_transaction_pool::BasicPool::with_revalidation_type( - builder.config().transaction_pool.clone(), - Arc::new(pool_api), - builder.prometheus_registry(), - sc_transaction_pool::RevalidationType::Light, - ); - Ok(pool) - })? - .with_import_queue_and_fprb(| - _config, - client, - backend, - fetcher, - _select_chain, - _tx_pool, - spawn_task_handle, - registry, - | { - let fetch_checker = fetcher - .map(|fetcher| fetcher.checker().clone()) - .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; - let grandpa_block_import = grandpa::light_block_import( - client.clone(), - backend, - &(client.clone() as Arc<_>), - Arc::new(fetch_checker), - )?; - - let finality_proof_import = grandpa_block_import.clone(); - let finality_proof_request_builder = - finality_proof_import.create_finality_proof_request_builder(); - - let (babe_block_import, babe_link) = sc_consensus_babe::block_import( - sc_consensus_babe::Config::get_or_compute(&*client)?, - grandpa_block_import, - client.clone(), - )?; - - let import_queue = sc_consensus_babe::import_queue( - babe_link, - babe_block_import, - None, - Some(Box::new(finality_proof_import)), - client.clone(), - inherent_data_providers.clone(), - spawn_task_handle, - registry, - )?; - - Ok((import_queue, finality_proof_request_builder)) - })? - .with_finality_proof_provider(|client, backend| { - // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider - let provider = client as Arc>; - Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _) - })? - .with_rpc_extensions(|builder| { - let fetcher = builder.fetcher() - .ok_or_else(|| "Trying to start node RPC without active fetcher")?; - let remote_blockchain = builder.remote_backend() - .ok_or_else(|| "Trying to start node RPC without active remote blockchain")?; - - let light_deps = node_rpc::LightDeps { - remote_blockchain, - fetcher, - client: builder.client().clone(), - pool: builder.pool(), - }; + let ((client, backend, keystore, task_manager), fetcher, remote_blockchain) = + sc_service::new_light_parts::(&config)?; + let client = Arc::new(client); + + let registry = config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()); + let mut background_tasks = Vec::new(); + let spawn_task_handle = task_manager.spawn_handle(); + + let select_chain = LongestChain::new(backend.clone()); + + // Transaction pool + let pool_api = sc_transaction_pool::FullChainApi::new( + client.clone(), + ); + let (transaction_pool, transaction_pool_task) = + sc_transaction_pool::BasicPool::with_revalidation_type( + config.transaction_pool.clone(), + std::sync::Arc::new(pool_api), + registry.as_ref(), + sc_transaction_pool::RevalidationType::Light, + ); + let transaction_pool = Arc::new(transaction_pool); + + if let Some(task) = transaction_pool_task { + background_tasks.push(("txpool-background", task)); + } + + let fetch_checker = fetcher.checker().clone(); + + let grandpa_block_import = grandpa::light_block_import( + client.clone(), + backend.clone(), + &(client.clone() as Arc<_>), + Arc::new(fetch_checker), + )?; + + let finality_proof_import = grandpa_block_import.clone(); + let finality_proof_request_builder = + finality_proof_import.create_finality_proof_request_builder(); + + let (babe_block_import, babe_link) = sc_consensus_babe::block_import( + sc_consensus_babe::Config::get_or_compute(&*client)?, + grandpa_block_import, + client.clone(), + )?; + + let import_queue = sc_consensus_babe::import_queue( + babe_link, + babe_block_import, + None, + Some(Box::new(finality_proof_import)), + client.clone(), + inherent_data_providers.clone(), + &spawn_task_handle, + registry.as_ref(), + )?; + + // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider + let provider = client.clone() as Arc>; + let finality_proof_provider = + Arc::new(GrandpaFinalityProofProvider::new(backend.clone(), provider)); + + let rpc_client = client.clone(); + let rpc_transaction_pool = transaction_pool.clone(); + let rpc_remote_blockchain = remote_blockchain.clone(); + + let rpc_extensions_builder = move |_| { + let light_deps = node_rpc::LightDeps { + remote_blockchain: rpc_remote_blockchain.clone(), + fetcher: fetcher.clone(), + client: rpc_client.clone(), + pool: rpc_transaction_pool.clone(), + }; - Ok(node_rpc::create_light(light_deps)) - })? - .build()?; + node_rpc::create_light::<_, _, sc_rpc::Metadata, _>(light_deps) + }; - Ok(service) + sc_service::build(sc_service::ServiceParams { + config, + client, + backend, + task_manager, + keystore, + on_demand: None, + select_chain: Some(select_chain), + import_queue, + finality_proof_request_builder: Some(finality_proof_request_builder), + finality_proof_provider: Some(finality_proof_provider), + transaction_pool, + remote_blockchain: Some(remote_blockchain), + background_tasks, + block_announce_validator: None, + rpc_extensions_builder, + informant_prefix: String::new() + }) } #[cfg(test)] diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 4bdacfcbd2b50..6f561524a91e1 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -44,6 +44,7 @@ structopt = "0.3.8" sc-tracing = { version = "2.0.0-rc3", path = "../tracing" } chrono = "0.4.10" parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] } +serde = "1.0.111" [target.'cfg(not(target_os = "unknown"))'.dependencies] rpassword = "4.0.1" diff --git a/client/cli/src/commands/build_spec_cmd.rs b/client/cli/src/commands/build_spec_cmd.rs index d2e2ef3a5467c..ade51d0b69053 100644 --- a/client/cli/src/commands/build_spec_cmd.rs +++ b/client/cli/src/commands/build_spec_cmd.rs @@ -22,7 +22,7 @@ use crate::params::SharedParams; use crate::CliConfiguration; use log::info; use sc_network::config::build_multiaddr; -use sc_service::{config::MultiaddrWithPeerId, Configuration}; +use sc_service::{config::{MultiaddrWithPeerId, NetworkConfiguration}, ChainSpec}; use structopt::StructOpt; use std::io::Write; @@ -51,13 +51,16 @@ pub struct BuildSpecCmd { impl BuildSpecCmd { /// Run the build-spec command - pub fn run(&self, config: Configuration) -> error::Result<()> { + pub fn run( + &self, + mut spec: Box, + network_config: NetworkConfiguration + ) -> error::Result<()> { info!("Building chain spec"); - let mut spec = config.chain_spec; let raw_output = self.raw; if spec.boot_nodes().is_empty() && !self.disable_default_bootnode { - let keys = config.network.node_key.into_keypair()?; + let keys = network_config.node_key.into_keypair()?; let peer_id = keys.public().into_peer_id(); let addr = MultiaddrWithPeerId { multiaddr: build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(30333u16)], diff --git a/client/cli/src/commands/check_block_cmd.rs b/client/cli/src/commands/check_block_cmd.rs index d1241f010d597..970219b6d2727 100644 --- a/client/cli/src/commands/check_block_cmd.rs +++ b/client/cli/src/commands/check_block_cmd.rs @@ -16,12 +16,14 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ - CliConfiguration, error, params::{ImportParams, SharedParams, BlockNumberOrHash}, -}; -use sc_service::{Configuration, ServiceBuilderCommand}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; -use std::{fmt::Debug, str::FromStr}; +use crate::error; +use crate::params::ImportParams; +use crate::params::SharedParams; +use crate::params::BlockNumberOrHash; +use crate::CliConfiguration; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::fmt::Debug; +use std::str::FromStr; use structopt::StructOpt; /// The `check-block` command used to validate blocks. @@ -48,21 +50,24 @@ pub struct CheckBlockCmd { impl CheckBlockCmd { /// Run the check-block command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, + import_queue: IQ, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: BlockT + Debug, - as FromStr>::Err: std::fmt::Debug, - BB::Hash: FromStr, - ::Err: std::fmt::Debug, + B: BlockT + for<'de> serde::Deserialize<'de>, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, + RA: Send + Sync + 'static, + ::Hash: FromStr, + <::Hash as FromStr>::Err: Debug, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, { let start = std::time::Instant::now(); - builder(config)?.check_block(self.input.parse()?).await?; + let block_id = self.input.parse()?; + sc_service::chain_ops::check_block(client, import_queue, block_id).await?; println!("Completed in {} ms.", start.elapsed().as_millis()); Ok(()) diff --git a/client/cli/src/commands/export_blocks_cmd.rs b/client/cli/src/commands/export_blocks_cmd.rs index 2fdc408250bce..b6a58a8525df4 100644 --- a/client/cli/src/commands/export_blocks_cmd.rs +++ b/client/cli/src/commands/export_blocks_cmd.rs @@ -19,16 +19,14 @@ use crate::error; use crate::params::{BlockNumber, DatabaseParams, PruningParams, SharedParams}; use crate::CliConfiguration; -use log::info; -use sc_service::{ - config::DatabaseConfig, Configuration, ServiceBuilderCommand, -}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; use std::fs; use std::io; use std::path::PathBuf; +use std::str::FromStr; use structopt::StructOpt; +use sc_service::chain_ops::export_blocks; /// The `export-blocks` command used to export blocks. #[derive(Debug, StructOpt, Clone)] @@ -68,22 +66,17 @@ pub struct ExportBlocksCmd { impl ExportBlocksCmd { /// Run the export-blocks command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, { - if let DatabaseConfig::RocksDb { ref path, .. } = &config.database { - info!("DB path: {}", path.display()); - } - let from = self.from.as_ref().and_then(|f| f.parse().ok()).unwrap_or(1); let to = self.to.as_ref().and_then(|t| t.parse().ok()); @@ -94,8 +87,7 @@ impl ExportBlocksCmd { None => Box::new(io::stdout()), }; - builder(config)? - .export_blocks(file, from.into(), to, binary) + export_blocks(client, file, from.into(), to, binary) .await .map_err(Into::into) } diff --git a/client/cli/src/commands/export_state_cmd.rs b/client/cli/src/commands/export_state_cmd.rs index 33111e7737b1c..06ec790a6e354 100644 --- a/client/cli/src/commands/export_state_cmd.rs +++ b/client/cli/src/commands/export_state_cmd.rs @@ -20,8 +20,7 @@ use crate::{ CliConfiguration, error, params::{PruningParams, SharedParams, BlockNumberOrHash}, }; use log::info; -use sc_service::{Configuration, ServiceBuilderCommand}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::{fmt::Debug, str::FromStr, io::Write}; use structopt::StructOpt; @@ -44,23 +43,24 @@ pub struct ExportStateCmd { impl ExportStateCmd { /// Run the `export-state` command - pub fn run( + pub fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, + chain_spec: Box, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: BlockT + Debug, - as FromStr>::Err: std::fmt::Debug, - BB::Hash: FromStr, - ::Err: std::fmt::Debug, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + ::Hash: FromStr, + <::Hash as FromStr>::Err: Debug, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, { info!("Exporting raw state..."); - let mut input_spec = config.chain_spec.cloned_box(); + let mut input_spec = chain_spec; let block_id = self.input.clone().map(|b| b.parse()).transpose()?; - let raw_state = builder(config)?.export_raw_state(block_id)?; + let raw_state = sc_service::chain_ops::export_raw_state(client, block_id)?; input_spec.set_storage(raw_state); info!("Generating new chain spec..."); diff --git a/client/cli/src/commands/import_blocks_cmd.rs b/client/cli/src/commands/import_blocks_cmd.rs index a74f4d524c95b..e8945bb176e23 100644 --- a/client/cli/src/commands/import_blocks_cmd.rs +++ b/client/cli/src/commands/import_blocks_cmd.rs @@ -20,8 +20,8 @@ use crate::error; use crate::params::ImportParams; use crate::params::SharedParams; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use sc_service::chain_ops::import_blocks; +use sp_runtime::traits::Block as BlockT; use std::fmt::Debug; use std::fs; use std::io::{self, Read, Seek}; @@ -61,17 +61,17 @@ impl ReadPlusSeek for T {} impl ImportBlocksCmd { /// Run the import-blocks command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, + import_queue: IQ, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT + for<'de> serde::Deserialize<'de>, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, { let file: Box = match &self.input { Some(filename) => Box::new(fs::File::open(filename)?), @@ -82,8 +82,7 @@ impl ImportBlocksCmd { } }; - builder(config)? - .import_blocks(file, false, self.binary) + import_blocks(client, import_queue, file, false, self.binary) .await .map_err(Into::into) } diff --git a/client/cli/src/commands/purge_chain_cmd.rs b/client/cli/src/commands/purge_chain_cmd.rs index 9d364a45f7d09..4bc8edb3d4458 100644 --- a/client/cli/src/commands/purge_chain_cmd.rs +++ b/client/cli/src/commands/purge_chain_cmd.rs @@ -19,7 +19,7 @@ use crate::error; use crate::params::{DatabaseParams, SharedParams}; use crate::CliConfiguration; -use sc_service::Configuration; +use sc_service::DatabaseConfig; use std::fmt::Debug; use std::fs; use std::io::{self, Write}; @@ -43,8 +43,8 @@ pub struct PurgeChainCmd { impl PurgeChainCmd { /// Run the purge command - pub fn run(&self, config: Configuration) -> error::Result<()> { - let db_path = config.database.path() + pub fn run(&self, database_config: DatabaseConfig) -> error::Result<()> { + let db_path = database_config.path() .ok_or_else(|| error::Error::Input("Cannot purge custom database implementation".into()) )?; diff --git a/client/cli/src/commands/revert_cmd.rs b/client/cli/src/commands/revert_cmd.rs index 6117eaf4880bf..ac955c7545ff2 100644 --- a/client/cli/src/commands/revert_cmd.rs +++ b/client/cli/src/commands/revert_cmd.rs @@ -19,9 +19,10 @@ use crate::error; use crate::params::{BlockNumber, PruningParams, SharedParams}; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; +use sc_service::chain_ops::revert_chain; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; +use std::str::FromStr; use structopt::StructOpt; /// The `revert` command used revert the chain to a previous state. @@ -42,16 +43,18 @@ pub struct RevertCmd { impl RevertCmd { /// Run the revert command - pub fn run(&self, config: Configuration, builder: B) -> error::Result<()> + pub fn run( + &self, client: std::sync::Arc> + ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, { let blocks = self.num.parse()?; - builder(config)?.revert_chain(blocks)?; + revert_chain(client, blocks)?; Ok(()) } diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 1acd5ee60474a..6f6e8588f1789 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -37,7 +37,7 @@ use log::info; pub use params::*; use regex::Regex; pub use runner::*; -use sc_service::{ChainSpec, Configuration, TaskType}; +use sc_service::{ChainSpec, Configuration, config::TaskType}; use std::future::Future; use std::io::Write; use std::pin::Pin; diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 409772d7ca1fa..aef3e86d4dce4 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -25,7 +25,7 @@ use futures::pin_mut; use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; -use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType}; +use sc_service::{AbstractService, Configuration, Role, config::TaskType}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use sp_version::RuntimeVersion; @@ -233,29 +233,47 @@ impl Runner { /// A helper function that runs a future with tokio and stops if the process receives the signal /// `SIGTERM` or `SIGINT`. - pub fn run_subcommand(self, subcommand: &Subcommand, builder: B) -> Result<()> + pub fn run_subcommand(self, subcommand: &Subcommand, builder: BU) + -> Result<()> where - B: FnOnce(Configuration) -> sc_service::error::Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as FromStr>::Err: Debug, - ::Hash: FromStr, - <::Hash as FromStr>::Err: Debug, + BU: FnOnce(Configuration) + -> sc_service::error::Result<(Arc>, IQ)>, + B: BlockT + for<'de> serde::Deserialize<'de>, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, + RA: Send + Sync + 'static, + ::Hash: FromStr, + <::Hash as FromStr>::Err: Debug, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, { + let chain_spec = self.config.chain_spec.cloned_box(); + let network_config = self.config.network.clone(); + let db_config = self.config.database.clone(); + match subcommand { - Subcommand::BuildSpec(cmd) => cmd.run(self.config), + Subcommand::BuildSpec(cmd) => cmd.run(chain_spec, network_config), Subcommand::ExportBlocks(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + let (client, _) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client)) } Subcommand::ImportBlocks(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + let (client, import_queue) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, import_queue)) } Subcommand::CheckBlock(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + let (client, import_queue) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, import_queue)) } - Subcommand::Revert(cmd) => cmd.run(self.config, builder), - Subcommand::PurgeChain(cmd) => cmd.run(self.config), - Subcommand::ExportState(cmd) => cmd.run(self.config, builder), + Subcommand::Revert(cmd) => { + let (client, _) = builder(self.config)?; + cmd.run(client) + }, + Subcommand::PurgeChain(cmd) => cmd.run(db_config), + Subcommand::ExportState(cmd) => { + let (client, _) = builder(self.config)?; + cmd.run(client, chain_spec) + }, } } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 6e88042e367f0..a8ce6bbbeece9 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -18,13 +18,13 @@ use crate::{ Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm, - start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle, + start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, status_sinks, metrics::MetricsService, client::{light, Client, ClientConfig}, config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig}, }; use sc_client_api::{ - self, BlockchainEvents, light::RemoteBlockchain, execution_extensions::ExtensionsFactory, + self, BlockchainEvents, light::RemoteBlockchain, ExecutorProvider, CallExecutor, ForkBlocks, BadBlocks, CloneableSpawn, UsageProvider, backend::RemoteBackend, }; @@ -46,13 +46,12 @@ use sc_network::{NetworkService, NetworkStateInfo}; use parking_lot::{Mutex, RwLock}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{ - Block as BlockT, NumberFor, SaturatedConversion, HashFor, + Block as BlockT, SaturatedConversion, HashFor, }; use sp_api::ProvideRuntimeApi; use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo}; use std::{ collections::HashMap, - io::{Read, Write, Seek}, marker::PhantomData, sync::Arc, pin::Pin }; use wasm_timer::SystemTime; @@ -63,48 +62,10 @@ use sc_client_db::{Backend, DatabaseSettings}; use sp_core::traits::CodeExecutor; use sp_runtime::BuildStorage; use sc_client_api::execution_extensions::ExecutionExtensions; -use sp_core::storage::Storage; +/// A background task that is passed into `build` in `background_tasks`. pub type BackgroundTask = Pin + Send>>; -/// Aggregator for the components required to build a service. -/// -/// # Usage -/// -/// Call [`ServiceBuilder::new_full`] or [`ServiceBuilder::new_light`], then call the various -/// `with_` methods to add the required components that you built yourself: -/// -/// - [`with_select_chain`](ServiceBuilder::with_select_chain) -/// - [`with_import_queue`](ServiceBuilder::with_import_queue) -/// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider) -/// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool) -/// -/// After this is done, call [`build`](ServiceBuilder::build) to construct the service. -/// -/// The order in which the `with_*` methods are called doesn't matter, as the correct binding of -/// generics is done when you call `build`. -/// -pub struct ServiceBuilder -{ - config: Configuration, - pub (crate) client: Arc, - backend: Arc, - task_manager: TaskManager, - keystore: Arc>, - fetcher: Option, - select_chain: Option, - pub (crate) import_queue: TImpQu, - finality_proof_request_builder: Option, - finality_proof_provider: Option, - transaction_pool: Arc, - rpc_extensions_builder: Box + Send>, - remote_backend: Option>>, - marker: PhantomData<(TBl, TRtApi)>, - block_announce_validator_builder: Option) -> Box + Send> + Send>>, - informant_prefix: String, -} - /// A utility trait for building an RPC extension given a `DenyUnsafe` instance. /// This is useful since at service definition time we don't know whether the /// specific interface where the RPC extension will be exposed is safe or not. @@ -171,6 +132,13 @@ pub type TFullCallExecutor = crate::client::LocalCallExecutor< NativeExecutor, >; +type TFullParts = ( + TFullClient, + Arc>, + Arc>, + TaskManager, +); + /// Light client type. pub type TLightClient = Client< TLightBackend, @@ -200,9 +168,9 @@ pub type TLightCallExecutor = sc_light::GenesisCallExecutor< >, >; -type TFullParts = ( - TFullClient, - Arc>, +type TLightParts = ( + TLightClient, + Arc>, Arc>, TaskManager, ); @@ -217,7 +185,8 @@ pub fn new_full_client( new_full_parts(config).map(|parts| parts.0) } -fn new_full_parts( +/// Create some of the basic parts of a service from a configuration. +pub fn new_full_parts( config: &Configuration, ) -> Result, Error> where TBl: BlockT, @@ -330,519 +299,95 @@ pub fn new_client( )) } -impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { - /// Start the service builder with a configuration. - pub fn new_full( - config: Configuration, - ) -> Result, - Arc>, - (), - (), - BoxFinalityProofRequestBuilder, - Arc>, - (), - (), - TFullBackend, - >, Error> { - let (client, backend, keystore, task_manager) = new_full_parts(&config)?; - - let client = Arc::new(client); - - Ok(ServiceBuilder { - config, - client, - backend, - keystore, - task_manager, - fetcher: None, - select_chain: None, - import_queue: (), - finality_proof_request_builder: None, - finality_proof_provider: None, - transaction_pool: Arc::new(()), - rpc_extensions_builder: Box::new(|_| ()), - remote_backend: None, - block_announce_validator_builder: None, - informant_prefix: Default::default(), - marker: PhantomData, - }) - } +/// Create some of the basic parts of a service from a configuration. +pub fn new_light_parts( + config: &Configuration, +) -> Result<(TLightParts, Arc>, Arc>), Error> { + let prometheus_registry = config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()); - /// Start the service builder with a configuration. - pub fn new_light( - config: Configuration, - ) -> Result, - Arc>, - (), - (), - BoxFinalityProofRequestBuilder, - Arc>, - (), - (), - TLightBackend, - >, Error> { - let task_manager = { - let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManager::new(config.task_executor.clone(), registry)? - }; + let task_manager = TaskManager::new(config.task_executor.clone(), prometheus_registry.as_ref())?; - let keystore = match &config.keystore { - KeystoreConfig::Path { path, password } => Keystore::open( - path.clone(), - password.clone() - )?, - KeystoreConfig::InMemory => Keystore::new_in_memory(), - }; + let keystore = match &config.keystore { + KeystoreConfig::Path { path, password } => Keystore::open( + path.clone(), + password.clone() + )?, + KeystoreConfig::InMemory => Keystore::new_in_memory(), + }; - let executor = NativeExecutor::::new( - config.wasm_method, - config.default_heap_pages, - config.max_runtime_instances, - ); + let executor = NativeExecutor::::new( + config.wasm_method, + config.default_heap_pages, + config.max_runtime_instances, + ); - let db_storage = { - let db_settings = sc_client_db::DatabaseSettings { - state_cache_size: config.state_cache_size, - state_cache_child_ratio: - config.state_cache_child_ratio.map(|v| (v, 100)), - pruning: config.pruning.clone(), - source: config.database.clone(), - }; - sc_client_db::light::LightStorage::new(db_settings)? + let db_storage = { + let db_settings = sc_client_db::DatabaseSettings { + state_cache_size: config.state_cache_size, + state_cache_child_ratio: + config.state_cache_child_ratio.map(|v| (v, 100)), + pruning: config.pruning.clone(), + source: config.database.clone(), }; - let light_blockchain = sc_light::new_light_blockchain(db_storage); - let fetch_checker = Arc::new( - sc_light::new_fetch_checker::<_, TBl, _>( - light_blockchain.clone(), - executor.clone(), - Box::new(task_manager.spawn_handle()), - ), - ); - let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker)); - let backend = sc_light::new_light_backend(light_blockchain); - let remote_blockchain = backend.remote_blockchain(); - let client = Arc::new(light::new_light( - backend.clone(), - config.chain_spec.as_storage_builder(), - executor, + sc_client_db::light::LightStorage::new(db_settings)? + }; + let light_blockchain = sc_light::new_light_blockchain(db_storage); + let fetch_checker = Arc::new( + sc_light::new_fetch_checker::<_, TBl, _>( + light_blockchain.clone(), + executor.clone(), Box::new(task_manager.spawn_handle()), - config.prometheus_config.as_ref().map(|config| config.registry.clone()), - )?); - - Ok(ServiceBuilder { - config, - client, - backend, - task_manager, - keystore, - fetcher: Some(fetcher.clone()), - select_chain: None, - import_queue: (), - finality_proof_request_builder: None, - finality_proof_provider: None, - transaction_pool: Arc::new(()), - rpc_extensions_builder: Box::new(|_| ()), - remote_backend: Some(remote_blockchain), - block_announce_validator_builder: None, - informant_prefix: Default::default(), - marker: PhantomData, - }) - } + ), + ); + let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker)); + let backend = sc_light::new_light_backend(light_blockchain); + let remote_blockchain = backend.remote_blockchain(); + let client = light::new_light( + backend.clone(), + config.chain_spec.as_storage_builder(), + executor, + Box::new(task_manager.spawn_handle()), + config.prometheus_config.as_ref().map(|config| config.registry.clone()), + )?; + + Ok(((client, backend, keystore, task_manager), fetcher, remote_blockchain)) } -impl - ServiceBuilder< - TBl, - TRtApi, - TCl, - TFchr, - TSc, - TImpQu, - TFprb, - TFpp, - TExPool, - TRpc, - Backend - > -{ - /// Returns a reference to the configuration that was stored in this builder. - pub fn config(&self) -> &Configuration { - &self.config - } - - /// Returns a reference to the optional prometheus registry that was stored in this builder. - pub fn prometheus_registry(&self) -> Option<&Registry> { - self.config.prometheus_config.as_ref().map(|config| &config.registry) - } - - /// Returns a reference to the client that was stored in this builder. - pub fn client(&self) -> &Arc { - &self.client - } - - /// Returns a reference to the backend that was used in this builder. - pub fn backend(&self) -> &Arc { - &self.backend - } - - /// Returns a reference to the select-chain that was stored in this builder. - pub fn select_chain(&self) -> Option<&TSc> { - self.select_chain.as_ref() - } - - /// Returns a reference to the keystore - pub fn keystore(&self) -> Arc> { - self.keystore.clone() - } - - /// Returns a reference to the transaction pool stored in this builder - pub fn pool(&self) -> Arc { - self.transaction_pool.clone() - } - - /// Returns a reference to the fetcher, only available if builder - /// was created with `new_light`. - pub fn fetcher(&self) -> Option - where TFchr: Clone - { - self.fetcher.clone() - } - - /// Returns a reference to the remote_backend, only available if builder - /// was created with `new_light`. - pub fn remote_backend(&self) -> Option>> { - self.remote_backend.clone() - } - - /// Defines which head-of-chain strategy to use. - pub fn with_opt_select_chain( - self, - select_chain_builder: impl FnOnce( - &Configuration, &Arc, - ) -> Result, Error> - ) -> Result, Error> { - let select_chain = select_chain_builder(&self.config, &self.backend)?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - task_manager: self.task_manager, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions_builder: self.rpc_extensions_builder, - remote_backend: self.remote_backend, - block_announce_validator_builder: self.block_announce_validator_builder, - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - - /// Defines which head-of-chain strategy to use. - pub fn with_select_chain( - self, - builder: impl FnOnce(&Configuration, &Arc) -> Result, - ) -> Result, Error> { - self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some)) - } - - /// Defines which import queue to use. - pub fn with_import_queue( - self, - builder: impl FnOnce(&Configuration, Arc, Option, Arc, &SpawnTaskHandle, Option<&Registry>) - -> Result - ) -> Result, Error> - where TSc: Clone { - let import_queue = builder( - &self.config, - self.client.clone(), - self.select_chain.clone(), - self.transaction_pool.clone(), - &self.task_manager.spawn_handle(), - self.config.prometheus_config.as_ref().map(|config| &config.registry), - )?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - task_manager: self.task_manager, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions_builder: self.rpc_extensions_builder, - remote_backend: self.remote_backend, - block_announce_validator_builder: self.block_announce_validator_builder, - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - - /// Defines which strategy to use for providing finality proofs. - pub fn with_opt_finality_proof_provider( - self, - builder: impl FnOnce(Arc, Arc) -> Result>>, Error> - ) -> Result>, - TExPool, - TRpc, - Backend, - >, Error> { - let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - task_manager: self.task_manager, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions_builder: self.rpc_extensions_builder, - remote_backend: self.remote_backend, - block_announce_validator_builder: self.block_announce_validator_builder, - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - - /// Defines which strategy to use for providing finality proofs. - pub fn with_finality_proof_provider( - self, - build: impl FnOnce(Arc, Arc) -> Result>, Error> - ) -> Result>, - TExPool, - TRpc, - Backend, - >, Error> { - self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some)) - } - - /// Defines which import queue to use. - pub fn with_import_queue_and_opt_fprb( - self, - builder: impl FnOnce( - &Configuration, - Arc, - Arc, - Option, - Option, - Arc, - &SpawnTaskHandle, - Option<&Registry>, - ) -> Result<(UImpQu, Option), Error> - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - let (import_queue, fprb) = builder( - &self.config, - self.client.clone(), - self.backend.clone(), - self.fetcher.clone(), - self.select_chain.clone(), - self.transaction_pool.clone(), - &self.task_manager.spawn_handle(), - self.config.prometheus_config.as_ref().map(|config| &config.registry), - )?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - task_manager: self.task_manager, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue, - finality_proof_request_builder: fprb, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions_builder: self.rpc_extensions_builder, - remote_backend: self.remote_backend, - block_announce_validator_builder: self.block_announce_validator_builder, - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - - /// Defines which import queue to use. - pub fn with_import_queue_and_fprb( - self, - builder: impl FnOnce( - &Configuration, - Arc, - Arc, - Option, - Option, - Arc, - &SpawnTaskHandle, - Option<&Registry>, - ) -> Result<(UImpQu, UFprb), Error> - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx, tb, pr| - builder(cfg, cl, b, f, sc, tx, tb, pr) - .map(|(q, f)| (q, Some(f))) - ) - } - - /// Defines which transaction pool to use. - pub fn with_transaction_pool( - self, - transaction_pool_builder: impl FnOnce( - &Self, - ) -> Result<(UExPool, Option), Error>, - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - let (transaction_pool, background_task) = transaction_pool_builder(&self)?; - - if let Some(background_task) = background_task{ - self.task_manager.spawn_handle().spawn("txpool-background", background_task); - } - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - task_manager: self.task_manager, - backend: self.backend, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: Arc::new(transaction_pool), - rpc_extensions_builder: self.rpc_extensions_builder, - remote_backend: self.remote_backend, - block_announce_validator_builder: self.block_announce_validator_builder, - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - - /// Defines the RPC extension builder to use. Unlike `with_rpc_extensions`, - /// this method is useful in situations where the RPC extensions need to - /// access to a `DenyUnsafe` instance to avoid exposing sensitive methods. - pub fn with_rpc_extensions_builder( - self, - rpc_extensions_builder: impl FnOnce(&Self) -> Result, - ) -> Result< - ServiceBuilder, - Error, - > - where - TSc: Clone, - TFchr: Clone, - URpcBuilder: RpcExtensionBuilder + Send + 'static, - URpc: sc_rpc::RpcExtension, - { - let rpc_extensions_builder = rpc_extensions_builder(&self)?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - task_manager: self.task_manager, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions_builder: Box::new(rpc_extensions_builder), - remote_backend: self.remote_backend, - block_announce_validator_builder: self.block_announce_validator_builder, - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - - /// Defines the RPC extensions to use. - pub fn with_rpc_extensions( - self, - rpc_extensions: impl FnOnce(&Self) -> Result, - ) -> Result< - ServiceBuilder, - Error, - > - where - TSc: Clone, - TFchr: Clone, - URpc: Clone + sc_rpc::RpcExtension + Send + 'static, - { - let rpc_extensions = rpc_extensions(&self)?; - self.with_rpc_extensions_builder(|_| Ok(NoopRpcExtensionBuilder::from(rpc_extensions))) - } - - /// Defines the `BlockAnnounceValidator` to use. `DefaultBlockAnnounceValidator` will be used by +/// Parameters that are passed to `build`. +pub struct ServiceParams { + /// The node configuration. + pub config: Configuration, + /// A client, created in `new_full_parts`/`new_light_parts`. + pub client: Arc,>, + /// A backend, created in `new_full_parts`/`new_light_parts`. + pub backend: Arc, + /// A task manager, created in `new_full_parts`/`new_light_parts`. + pub task_manager: TaskManager, + /// A keystore, created in `new_full_parts`/`new_light_parts`. + pub keystore: Arc>, + /// An on-demand `Fetcher`. Used by light clients to fetch state via network requests. + pub on_demand: Option>>, + /// Which head-of-chain strategy to use. + pub select_chain: Option, + /// Which import queue to use. + pub import_queue: TImpQu, + /// FPRB. + pub finality_proof_request_builder: Option>, + /// Which strategy to use for providing finality proofs. + pub finality_proof_provider: Option>>, + /// Which transaction pool to use. + pub transaction_pool: Arc, + /// ..... + pub remote_blockchain: Option>>, + /// A list pf background tasks to spawn. + pub background_tasks: Vec<(&'static str, BackgroundTask)>, + // Defines the `BlockAnnounceValidator` to use. `DefaultBlockAnnounceValidator` will be used by /// default. - pub fn with_block_announce_validator( - self, - block_announce_validator_builder: - impl FnOnce(Arc) -> Box + Send> + Send + 'static, - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - task_manager: self.task_manager, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions_builder: self.rpc_extensions_builder, - remote_backend: self.remote_backend, - block_announce_validator_builder: Some(Box::new(block_announce_validator_builder)), - informant_prefix: self.informant_prefix, - marker: self.marker, - }) - } - + pub block_announce_validator: Option + Send>>, + /// Defines the RPC extension builder to use. + /// This method is in situations where the RPC extensions need to + /// access to a `DenyUnsafe` instance to avoid exposing sensitive methods. + pub rpc_extensions_builder: TRpcBuilder, /// Defines the informant's prefix for the logs. An empty string by default. /// /// By default substrate will show logs without a prefix. Example: @@ -863,78 +408,37 @@ impl /// ```text /// 2020-05-28 15:11:06 ✨ [Prefix] Imported #2 (0xc21c…2ca8) /// 2020-05-28 15:11:07 💤 [Prefix] Idle (0 peers), best: #2 (0xc21c…2ca8), finalized #0 (0x7299…e6df), ⬇ 0 ⬆ 0 - /// ``` - pub fn with_informant_prefix( - self, - informant_prefix: String, - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - Ok(ServiceBuilder { - informant_prefix: informant_prefix, - ..self - }) - } + /// ` + pub informant_prefix: String, } -/// Implemented on `ServiceBuilder`. Allows running block commands, such as import/export/validate -/// components to the builder. -pub trait ServiceBuilderCommand { - /// Block type this API operates on. - type Block: BlockT; - /// Native execution dispatch required by some commands. - type NativeDispatch: NativeExecutionDispatch + 'static; - /// Starts the process of importing blocks. - fn import_blocks( - self, - input: impl Read + Seek + Send + 'static, - force: bool, - binary: bool, - ) -> Pin> + Send>>; - - /// Performs the blocks export. - fn export_blocks( - self, - output: impl Write + 'static, - from: NumberFor, - to: Option>, - binary: bool - ) -> Pin>>>; - - /// Performs a revert of `blocks` blocks. - fn revert_chain( - &self, - blocks: NumberFor - ) -> Result<(), Error>; - - /// Re-validate known block. - fn check_block( - self, - block: BlockId - ) -> Pin> + Send>>; - - /// Export the raw state at the given `block`. If `block` is `None`, the - /// best block will be used. - fn export_raw_state( - &self, - block: Option>, - ) -> Result; -} - -impl -ServiceBuilder< +/// Builds the service. +pub fn build< TBl, TRtApi, - Client, - Arc>, TSc, TImpQu, - BoxFinalityProofRequestBuilder, - Arc>, TExPool, - TRpc, TBackend, -> where + TExec, + TRpcBuilder, + URpc, +>( + service_descriptor: ServiceParams, +) -> Result, + TSc, + NetworkStatus, + NetworkService::Hash>, + TExPool, + sc_offchain::OffchainWorkers< + Client, + TBackend::OffchainStorage, + TBl + >, +>, Error> +where Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: sp_api::Metadata + @@ -950,342 +454,316 @@ ServiceBuilder< TSc: Clone, TImpQu: 'static + ImportQueue, TExPool: MaintainedTransactionPool::Hash> + MallocSizeOfWasm + 'static, - TRpc: sc_rpc::RpcExtension, + TExec: CallExecutor, + TRpcBuilder: RpcExtensionBuilder + Send + 'static, + URpc: sc_rpc::RpcExtension, { + let ServiceParams { + mut config, + client, backend, task_manager, keystore, on_demand, select_chain, import_queue, + finality_proof_request_builder, finality_proof_provider, transaction_pool, + remote_blockchain, background_tasks, block_announce_validator, rpc_extensions_builder, + informant_prefix + } = service_descriptor; + + sp_session::generate_initial_session_keys( + client.clone(), + &BlockId::Hash(client.chain_info().best_hash), + config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), + )?; + + // A side-channel for essential tasks to communicate shutdown. + let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); + + let import_queue = Box::new(import_queue); + let chain_info = client.chain_info(); + let chain_spec = &config.chain_spec; - /// Set an ExecutionExtensionsFactory - pub fn with_execution_extensions_factory(self, execution_extensions_factory: Box) -> Result { - self.client.execution_extensions().set_extensions_factory(execution_extensions_factory); - Ok(self) - } + let version = config.impl_version; + info!("📦 Highest known block at #{}", chain_info.best_number); + telemetry!( + SUBSTRATE_INFO; + "node.start"; + "height" => chain_info.best_number.saturated_into::(), + "best" => ?chain_info.best_hash + ); - /// Builds the service. - pub fn build(self) -> Result, - TSc, - NetworkStatus, - NetworkService::Hash>, - TExPool, - sc_offchain::OffchainWorkers< - Client, - TBackend::OffchainStorage, - TBl - >, - >, Error> - where TExec: CallExecutor, - { - let ServiceBuilder { - marker: _, - mut config, - client, - task_manager, - fetcher: on_demand, - backend, - keystore, - select_chain, - import_queue, - finality_proof_request_builder, - finality_proof_provider, - transaction_pool, - rpc_extensions_builder, - remote_backend, - block_announce_validator_builder, - informant_prefix, - } = self; - - sp_session::generate_initial_session_keys( - client.clone(), - &BlockId::Hash(client.chain_info().best_hash), - config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), - )?; + // make transaction pool available for off-chain runtime calls. + client.execution_extensions() + .register_transaction_pool(Arc::downgrade(&transaction_pool) as _); + + let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { + imports_external_transactions: !matches!(config.role, Role::Light), + pool: transaction_pool.clone(), + client: client.clone(), + }); + + let protocol_id = { + let protocol_id_full = match chain_spec.protocol_id() { + Some(pid) => pid, + None => { + warn!("Using default protocol ID {:?} because none is configured in the \ + chain specs", DEFAULT_PROTOCOL_ID + ); + DEFAULT_PROTOCOL_ID + } + }.as_bytes(); + sc_network::config::ProtocolId::from(protocol_id_full) + }; - // A side-channel for essential tasks to communicate shutdown. - let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); + let block_announce_validator = if let Some(validator) = block_announce_validator { + validator + } else { + Box::new(DefaultBlockAnnounceValidator::new(client.clone())) + }; - let import_queue = Box::new(import_queue); - let chain_info = client.chain_info(); - let chain_spec = &config.chain_spec; + let network_params = sc_network::config::Params { + role: config.role.clone(), + executor: { + let spawn_handle = task_manager.spawn_handle(); + Some(Box::new(move |fut| { + spawn_handle.spawn("libp2p-node", fut); + })) + }, + network_config: config.network.clone(), + chain: client.clone(), + finality_proof_provider, + finality_proof_request_builder, + on_demand: on_demand.clone(), + transaction_pool: transaction_pool_adapter.clone() as _, + import_queue, + protocol_id, + block_announce_validator, + metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()) + }; - let version = config.impl_version; - info!("📦 Highest known block at #{}", chain_info.best_number); - telemetry!( - SUBSTRATE_INFO; - "node.start"; - "height" => chain_info.best_number.saturated_into::(), - "best" => ?chain_info.best_hash - ); + let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); + let network_mut = sc_network::NetworkWorker::new(network_params)?; + let network = network_mut.service().clone(); + let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + + let offchain_storage = backend.offchain_storage(); + let offchain_workers = match (config.offchain_worker.clone(), offchain_storage.clone()) { + (OffchainWorkerConfig {enabled: true, .. }, Some(db)) => { + Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db))) + }, + (OffchainWorkerConfig {enabled: true, .. }, None) => { + warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); + None + }, + _ => None, + }; - // make transaction pool available for off-chain runtime calls. - client.execution_extensions() - .register_transaction_pool(Arc::downgrade(&transaction_pool) as _); + let spawn_handle = task_manager.spawn_handle(); - let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { - imports_external_transactions: !matches!(config.role, Role::Light), - pool: transaction_pool.clone(), - client: client.clone(), - }); + // Spawn background tasks which were stacked during the + // service building. + for (title, background_task) in background_tasks { + spawn_handle.spawn(title, background_task); + } - let protocol_id = { - let protocol_id_full = match chain_spec.protocol_id() { - Some(pid) => pid, - None => { - warn!("Using default protocol ID {:?} because none is configured in the \ - chain specs", DEFAULT_PROTOCOL_ID - ); - DEFAULT_PROTOCOL_ID + // Inform the tx pool about imported and finalized blocks. + { + let txpool = Arc::downgrade(&transaction_pool); + + let mut import_stream = client.import_notification_stream().map(Into::into).fuse(); + let mut finality_stream = client.finality_notification_stream() + .map(Into::into) + .fuse(); + + let events = async move { + loop { + let evt = futures::select! { + evt = import_stream.next() => evt, + evt = finality_stream.next() => evt, + complete => return, + }; + + let txpool = txpool.upgrade(); + if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) { + txpool.maintain(evt).await; } - }.as_bytes(); - sc_network::config::ProtocolId::from(protocol_id_full) - }; - - let block_announce_validator = if let Some(f) = block_announce_validator_builder { - f(client.clone()) - } else { - Box::new(DefaultBlockAnnounceValidator::new(client.clone())) - }; - - let network_params = sc_network::config::Params { - role: config.role.clone(), - executor: { - let spawn_handle = task_manager.spawn_handle(); - Some(Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", fut); - })) - }, - network_config: config.network.clone(), - chain: client.clone(), - finality_proof_provider, - finality_proof_request_builder, - on_demand: on_demand.clone(), - transaction_pool: transaction_pool_adapter.clone() as _, - import_queue, - protocol_id, - block_announce_validator, - metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()) + } }; - let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); - let network_mut = sc_network::NetworkWorker::new(network_params)?; - let network = network_mut.service().clone(); - let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + spawn_handle.spawn( + "txpool-notifications", + events, + ); + } - let offchain_storage = backend.offchain_storage(); - let offchain_workers = match (config.offchain_worker.clone(), offchain_storage.clone()) { - (OffchainWorkerConfig {enabled: true, .. }, Some(db)) => { - Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db))) - }, - (OffchainWorkerConfig {enabled: true, .. }, None) => { - warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); - None - }, - _ => None, - }; + // Inform the offchain worker about new imported blocks + { + let offchain = offchain_workers.as_ref().map(Arc::downgrade); + let notifications_spawn_handle = task_manager.spawn_handle(); + let network_state_info: Arc = network.clone(); + let is_validator = config.role.is_authority(); + + let events = client.import_notification_stream().for_each(move |n| { + let offchain = offchain.as_ref().and_then(|o| o.upgrade()); + match offchain { + Some(offchain) if n.is_new_best => { + notifications_spawn_handle.spawn( + "offchain-on-block", + offchain.on_block_imported( + &n.header, + network_state_info.clone(), + is_validator, + ), + ); + }, + Some(_) => log::debug!( + target: "sc_offchain", + "Skipping offchain workers for non-canon block: {:?}", + n.header, + ), + _ => {}, + } - let spawn_handle = task_manager.spawn_handle(); - - // Inform the tx pool about imported and finalized blocks. - { - let txpool = Arc::downgrade(&transaction_pool); - - let mut import_stream = client.import_notification_stream().map(Into::into).fuse(); - let mut finality_stream = client.finality_notification_stream() - .map(Into::into) - .fuse(); - - let events = async move { - loop { - let evt = futures::select! { - evt = import_stream.next() => evt, - evt = finality_stream.next() => evt, - complete => return, - }; - - let txpool = txpool.upgrade(); - if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) { - txpool.maintain(evt).await; - } - } - }; + ready(()) + }); - spawn_handle.spawn( - "txpool-notifications", - events, - ); - } + spawn_handle.spawn( + "offchain-notifications", + events, + ); + } - // Inform the offchain worker about new imported blocks - { - let offchain = offchain_workers.as_ref().map(Arc::downgrade); - let notifications_spawn_handle = task_manager.spawn_handle(); - let network_state_info: Arc = network.clone(); - let is_validator = config.role.is_authority(); - - let events = client.import_notification_stream().for_each(move |n| { - let offchain = offchain.as_ref().and_then(|o| o.upgrade()); - match offchain { - Some(offchain) if n.is_new_best => { - notifications_spawn_handle.spawn( - "offchain-on-block", - offchain.on_block_imported( - &n.header, - network_state_info.clone(), - is_validator, - ), - ); - }, - Some(_) => log::debug!( - target: "sc_offchain", - "Skipping offchain workers for non-canon block: {:?}", - n.header, - ), - _ => {}, + { + // extrinsic notifications + let network = Arc::downgrade(&network); + let transaction_pool_ = transaction_pool.clone(); + let events = transaction_pool.import_notification_stream() + .for_each(move |hash| { + if let Some(network) = network.upgrade() { + network.propagate_extrinsic(hash); } - + let status = transaction_pool_.status(); + telemetry!(SUBSTRATE_INFO; "txpool.import"; + "ready" => status.ready, + "future" => status.future + ); ready(()) }); - spawn_handle.spawn( - "offchain-notifications", - events, - ); - } - - { - // extrinsic notifications - let network = Arc::downgrade(&network); - let transaction_pool_ = transaction_pool.clone(); - let events = transaction_pool.import_notification_stream() - .for_each(move |hash| { - if let Some(network) = network.upgrade() { - network.propagate_extrinsic(hash); - } - let status = transaction_pool_.status(); - telemetry!(SUBSTRATE_INFO; "txpool.import"; - "ready" => status.ready, - "future" => status.future - ); - ready(()) - }); + spawn_handle.spawn( + "on-transaction-imported", + events, + ); + } - spawn_handle.spawn( - "on-transaction-imported", - events, - ); - } + // Prometheus metrics. + let mut metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() { + // Set static metrics. - // Prometheus metrics. - let mut metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() { - // Set static metrics. - - - let role_bits = match config.role { - Role::Full => 1u64, - Role::Light => 2u64, - Role::Sentry { .. } => 3u64, - Role::Authority { .. } => 4u64, - }; - let metrics = MetricsService::with_prometheus( - ®istry, - &config.network.node_name, - &config.impl_version, - role_bits, - )?; - spawn_handle.spawn( - "prometheus-endpoint", - prometheus_endpoint::init_prometheus(port, registry).map(drop) - ); - metrics - } else { - MetricsService::new() + let role_bits = match config.role { + Role::Full => 1u64, + Role::Light => 2u64, + Role::Sentry { .. } => 3u64, + Role::Authority { .. } => 4u64, }; + let metrics = MetricsService::with_prometheus( + ®istry, + &config.network.node_name, + &config.impl_version, + role_bits, + )?; + spawn_handle.spawn( + "prometheus-endpoint", + prometheus_endpoint::init_prometheus(port, registry).map(drop) + ); - // Periodically notify the telemetry. - let transaction_pool_ = transaction_pool.clone(); - let client_ = client.clone(); - let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); - network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); - let tel_task = state_rx.for_each(move |(net_status, _)| { - let info = client_.usage_info(); - metrics_service.tick( - &info, - &transaction_pool_.status(), - &net_status, - ); - ready(()) - }); + metrics + } else { + MetricsService::new() + }; - spawn_handle.spawn( - "telemetry-periodic-send", - tel_task, + // Periodically notify the telemetry. + let transaction_pool_ = transaction_pool.clone(); + let client_ = client.clone(); + let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); + network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); + let tel_task = state_rx.for_each(move |(net_status, _)| { + let info = client_.usage_info(); + metrics_service.tick( + &info, + &transaction_pool_.status(), + &net_status, ); + ready(()) + }); - // Periodically send the network state to the telemetry. - let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); - network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); - let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { - telemetry!( - SUBSTRATE_INFO; - "system.network_state"; - "state" => network_state, - ); - ready(()) - }); - spawn_handle.spawn( - "telemetry-periodic-network-state", - tel_task_2, + spawn_handle.spawn( + "telemetry-periodic-send", + tel_task, + ); + + // Periodically send the network state to the telemetry. + let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); + network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); + let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { + telemetry!( + SUBSTRATE_INFO; + "system.network_state"; + "state" => network_state, ); + ready(()) + }); + spawn_handle.spawn( + "telemetry-periodic-network-state", + tel_task_2, + ); - // RPC - let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); - let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| { - use sc_rpc::{chain, state, author, system, offchain}; - - let system_info = sc_rpc::system::SystemInfo { - chain_name: chain_spec.name().into(), - impl_name: config.impl_name.into(), - impl_version: config.impl_version.into(), - properties: chain_spec.properties().clone(), - chain_type: chain_spec.chain_type().clone(), - }; - - let subscriptions = SubscriptionManager::new(Arc::new(task_manager.spawn_handle())); - - let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) = - (remote_backend.as_ref(), on_demand.as_ref()) { - // Light clients - let chain = sc_rpc::chain::new_light( - client.clone(), - subscriptions.clone(), - remote_backend.clone(), - on_demand.clone() - ); - let (state, child_state) = sc_rpc::state::new_light( - client.clone(), - subscriptions.clone(), - remote_backend.clone(), - on_demand.clone() - ); - (chain, state, child_state) + // RPC + let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); + let gen_handler = |deny_unsafe: sc_rpc::DenyUnsafe| { + use sc_rpc::{chain, state, author, system, offchain}; + + let system_info = sc_rpc::system::SystemInfo { + chain_name: chain_spec.name().into(), + impl_name: config.impl_name.into(), + impl_version: config.impl_version.into(), + properties: chain_spec.properties().clone(), + chain_type: chain_spec.chain_type().clone(), + }; - } else { - // Full nodes - let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone()); - let (state, child_state) = sc_rpc::state::new_full(client.clone(), subscriptions.clone()); - (chain, state, child_state) - }; + let subscriptions = SubscriptionManager::new(Arc::new(task_manager.spawn_handle())); - let author = sc_rpc::author::Author::new( + let (chain, state, child_state) = if let (Some(remote_blockchain), Some(on_demand)) = + (remote_blockchain.as_ref(), on_demand.as_ref()) { + // Light clients + let chain = sc_rpc::chain::new_light( client.clone(), - transaction_pool.clone(), - subscriptions, - keystore.clone(), - deny_unsafe, + subscriptions.clone(), + remote_blockchain.clone(), + on_demand.clone() ); - let system = system::System::new(system_info, system_rpc_tx.clone(), deny_unsafe); + let (state, child_state) = sc_rpc::state::new_light( + client.clone(), + subscriptions.clone(), + remote_blockchain.clone(), + on_demand.clone() + ); + (chain, state, child_state) - let maybe_offchain_rpc = offchain_storage.clone() + } else { + // Full nodes + let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone()); + let (state, child_state) = sc_rpc::state::new_full(client.clone(), subscriptions.clone()); + (chain, state, child_state) + }; + + let author = sc_rpc::author::Author::new( + client.clone(), + transaction_pool.clone(), + subscriptions, + keystore.clone(), + deny_unsafe, + ); + let system = system::System::new(system_info, system_rpc_tx.clone(), deny_unsafe); + + let maybe_offchain_rpc = offchain_storage.clone() .map(|storage| { let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe); // FIXME: Use plain Option (don't collect into HashMap) when we upgrade to jsonrpc 14.1 @@ -1294,130 +772,129 @@ ServiceBuilder< delegate.into_iter().collect::>() }).unwrap_or_default(); - sc_rpc_server::rpc_handler(( - state::StateApi::to_delegate(state), - state::ChildStateApi::to_delegate(child_state), - chain::ChainApi::to_delegate(chain), - maybe_offchain_rpc, - author::AuthorApi::to_delegate(author), - system::SystemApi::to_delegate(system), - rpc_extensions_builder.build(deny_unsafe), - )) - }; - let rpc = start_rpc_servers(&config, gen_handler)?; - // This is used internally, so don't restrict access to unsafe RPC - let rpc_handlers = gen_handler(sc_rpc::DenyUnsafe::No); - - // The network worker is responsible for gathering all network messages and processing - // them. This is quite a heavy task, and at the time of the writing of this comment it - // frequently happens that this future takes several seconds or in some situations - // even more than a minute until it has processed its entire queue. This is clearly an - // issue, and ideally we would like to fix the network future to take as little time as - // possible, but we also take the extra harm-prevention measure to execute the networking - // future using `spawn_blocking`. - spawn_handle.spawn_blocking( - "network-worker", - build_network_future( - config.role.clone(), - network_mut, - client.clone(), - network_status_sinks.clone(), - system_rpc_rx, - has_bootnodes, - config.announce_block, - ), - ); + sc_rpc_server::rpc_handler(( + state::StateApi::to_delegate(state), + state::ChildStateApi::to_delegate(child_state), + chain::ChainApi::to_delegate(chain), + maybe_offchain_rpc, + author::AuthorApi::to_delegate(author), + system::SystemApi::to_delegate(system), + rpc_extensions_builder.build(deny_unsafe), + )) + }; + let rpc = start_rpc_servers(&config, gen_handler)?; + // This is used internally, so don't restrict access to unsafe RPC + let rpc_handlers = gen_handler(sc_rpc::DenyUnsafe::No); + + // The network worker is responsible for gathering all network messages and processing + // them. This is quite a heavy task, and at the time of the writing of this comment it + // frequently happens that this future takes several seconds or in some situations + // even more than a minute until it has processed its entire queue. This is clearly an + // issue, and ideally we would like to fix the network future to take as little time as + // possible, but we also take the extra harm-prevention measure to execute the networking + // future using `spawn_blocking`. + spawn_handle.spawn_blocking( + "network-worker", + build_network_future( + config.role.clone(), + network_mut, + client.clone(), + network_status_sinks.clone(), + system_rpc_rx, + has_bootnodes, + config.announce_block, + ), + ); - let telemetry_connection_sinks: Arc>>> = Default::default(); - - // Telemetry - let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { - let is_authority = config.role.is_authority(); - let network_id = network.local_peer_id().to_base58(); - let name = config.network.node_name.clone(); - let impl_name = config.impl_name.to_owned(); - let version = version.clone(); - let chain_name = config.chain_spec.name().to_owned(); - let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); - let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig { - endpoints, - wasm_external_transport: config.telemetry_external_transport.take(), - }); - let startup_time = SystemTime::UNIX_EPOCH.elapsed() - .map(|dur| dur.as_millis()) - .unwrap_or(0); - let future = telemetry.clone() - .for_each(move |event| { - // Safe-guard in case we add more events in the future. - let sc_telemetry::TelemetryEvent::Connected = event; - - telemetry!(SUBSTRATE_INFO; "system.connected"; - "name" => name.clone(), - "implementation" => impl_name.clone(), - "version" => version.clone(), - "config" => "", - "chain" => chain_name.clone(), - "authority" => is_authority, - "startup_time" => startup_time, - "network_id" => network_id.clone() - ); + let telemetry_connection_sinks: Arc>>> = Default::default(); + + // Telemetry + let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { + let is_authority = config.role.is_authority(); + let network_id = network.local_peer_id().to_base58(); + let name = config.network.node_name.clone(); + let impl_name = config.impl_name.to_owned(); + let version = version.clone(); + let chain_name = config.chain_spec.name().to_owned(); + let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); + let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig { + endpoints, + wasm_external_transport: config.telemetry_external_transport.take(), + }); + let startup_time = SystemTime::UNIX_EPOCH.elapsed() + .map(|dur| dur.as_millis()) + .unwrap_or(0); + let future = telemetry.clone() + .for_each(move |event| { + // Safe-guard in case we add more events in the future. + let sc_telemetry::TelemetryEvent::Connected = event; + + telemetry!(SUBSTRATE_INFO; "system.connected"; + "name" => name.clone(), + "implementation" => impl_name.clone(), + "version" => version.clone(), + "config" => "", + "chain" => chain_name.clone(), + "authority" => is_authority, + "startup_time" => startup_time, + "network_id" => network_id.clone() + ); - telemetry_connection_sinks_.lock().retain(|sink| { - sink.unbounded_send(()).is_ok() - }); - ready(()) + telemetry_connection_sinks_.lock().retain(|sink| { + sink.unbounded_send(()).is_ok() }); + ready(()) + }); - spawn_handle.spawn( - "telemetry-worker", - future, - ); - - telemetry - }); + spawn_handle.spawn( + "telemetry-worker", + future, + ); - // Instrumentation - if let Some(tracing_targets) = config.tracing_targets.as_ref() { - let subscriber = sc_tracing::ProfilingSubscriber::new( - config.tracing_receiver, tracing_targets - ); - match tracing::subscriber::set_global_default(subscriber) { - Ok(_) => (), - Err(e) => error!(target: "tracing", "Unable to set global default subscriber {}", e), - } - } + telemetry + }); - // Spawn informant task - let network_status_sinks_1 = network_status_sinks.clone(); - let informant_future = sc_informant::build( - client.clone(), - move |interval| { - let (sink, stream) = tracing_unbounded("mpsc_network_status"); - network_status_sinks_1.lock().push(interval, sink); - stream - }, - transaction_pool.clone(), - sc_informant::OutputFormat { enable_color: true, prefix: informant_prefix }, + // Instrumentation + if let Some(tracing_targets) = config.tracing_targets.as_ref() { + let subscriber = sc_tracing::ProfilingSubscriber::new( + config.tracing_receiver, tracing_targets ); - spawn_handle.spawn("informant", informant_future); - - Ok(Service { - client, - task_manager, - network, - network_status_sinks, - select_chain, - transaction_pool, - essential_failed_tx, - essential_failed_rx, - rpc_handlers, - _rpc: rpc, - _telemetry: telemetry, - _offchain_workers: offchain_workers, - _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), - keystore, - marker: PhantomData::, - prometheus_registry: config.prometheus_config.map(|config| config.registry), - }) + match tracing::subscriber::set_global_default(subscriber) { + Ok(_) => (), + Err(e) => error!(target: "tracing", "Unable to set global default subscriber {}", e), + } } + + // Spawn informant task + let network_status_sinks_1 = network_status_sinks.clone(); + let informant_future = sc_informant::build( + client.clone(), + move |interval| { + let (sink, stream) = tracing_unbounded("mpsc_network_status"); + network_status_sinks_1.lock().push(interval, sink); + stream + }, + transaction_pool.clone(), + sc_informant::OutputFormat { enable_color: true, prefix: informant_prefix }, + ); + spawn_handle.spawn("informant", informant_future); + + Ok(Service { + client, + task_manager, + network, + network_status_sinks, + select_chain, + transaction_pool, + essential_failed_tx, + essential_failed_rx, + rpc_handlers, + _rpc: rpc, + _telemetry: telemetry, + _offchain_workers: offchain_workers, + _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), + keystore, + marker: PhantomData::, + prometheus_registry: config.prometheus_config.map(|config| config.registry), + }) } diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs deleted file mode 100644 index cb4ed24b60b62..0000000000000 --- a/client/service/src/chain_ops.rs +++ /dev/null @@ -1,614 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Chain utilities. - -use crate::error; -use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; -use crate::error::Error; -use sc_chain_spec::ChainSpec; -use log::{warn, info}; -use futures::{future, prelude::*}; -use sp_runtime::traits::{ - Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion, MaybeSerializeDeserialize, -}; -use sp_runtime::generic::{BlockId, SignedBlock}; -use codec::{Decode, Encode, IoReader as CodecIoReader}; -use crate::client::{Client, LocalCallExecutor}; -use sp_consensus::{ - BlockOrigin, - import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, -}; -use sc_executor::{NativeExecutor, NativeExecutionDispatch}; -use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageChild, StorageMap}; -use sc_client_api::{StorageProvider, BlockBackend, UsageProvider}; - -use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap}; -use std::time::{Duration, Instant}; -use futures_timer::Delay; -use std::task::Poll; -use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer}; -use std::convert::{TryFrom, TryInto}; -use sp_runtime::traits::{CheckedDiv, Saturating}; - -/// Number of blocks we will add to the queue before waiting for the queue to catch up. -const MAX_PENDING_BLOCKS: u64 = 1_024; - -/// Number of milliseconds to wait until next poll. -const DELAY_TIME: u64 = 2_000; - -/// Number of milliseconds that must have passed between two updates. -const TIME_BETWEEN_UPDATES: u64 = 3_000; - -/// Build a chain spec json -pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result { - spec.as_json(raw).map_err(Into::into) -} - - -/// Helper enum that wraps either a binary decoder (from parity-scale-codec), or a JSON decoder (from serde_json). -/// Implements the Iterator Trait, calling `next()` will decode the next SignedBlock and return it. -enum BlockIter where - R: std::io::Read + std::io::Seek, -{ - Binary { - // Total number of blocks we are expecting to decode. - num_expected_blocks: u64, - // Number of blocks we have decoded thus far. - read_block_count: u64, - // Reader to the data, used for decoding new blocks. - reader: CodecIoReader, - }, - Json { - // Nubmer of blocks we have decoded thus far. - read_block_count: u64, - // Stream to the data, used for decoding new blocks. - reader: StreamDeserializer<'static, JsonIoRead, SignedBlock>, - }, -} - -impl BlockIter where - R: Read + Seek + 'static, - B: BlockT + MaybeSerializeDeserialize, -{ - fn new(input: R, binary: bool) -> Result { - if binary { - let mut reader = CodecIoReader(input); - // If the file is encoded in binary format, it is expected to first specify the number - // of blocks that are going to be decoded. We read it and add it to our enum struct. - let num_expected_blocks: u64 = Decode::decode(&mut reader) - .map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?; - Ok(BlockIter::Binary { - num_expected_blocks, - read_block_count: 0, - reader, - }) - } else { - let stream_deser = Deserializer::from_reader(input) - .into_iter::>(); - Ok(BlockIter::Json { - reader: stream_deser, - read_block_count: 0, - }) - } - } - - /// Returns the number of blocks read thus far. - fn read_block_count(&self) -> u64 { - match self { - BlockIter::Binary { read_block_count, .. } - | BlockIter::Json { read_block_count, .. } - => *read_block_count, - } - } - - /// Returns the total number of blocks to be imported, if possible. - fn num_expected_blocks(&self) -> Option { - match self { - BlockIter::Binary { num_expected_blocks, ..} => Some(*num_expected_blocks), - BlockIter::Json {..} => None - } - } -} - -impl Iterator for BlockIter where - R: Read + Seek + 'static, - B: BlockT + MaybeSerializeDeserialize, -{ - type Item = Result, String>; - - fn next(&mut self) -> Option { - match self { - BlockIter::Binary { num_expected_blocks, read_block_count, reader } => { - if read_block_count < num_expected_blocks { - let block_result: Result, _> = SignedBlock::::decode(reader) - .map_err(|e| e.to_string()); - *read_block_count += 1; - Some(block_result) - } else { - // `read_block_count` == `num_expected_blocks` so we've read enough blocks. - None - } - } - BlockIter::Json { reader, read_block_count } => { - let res = Some(reader.next()?.map_err(|e| e.to_string())); - *read_block_count += 1; - res - } - } - } -} - -/// Imports the SignedBlock to the queue. -fn import_block_to_queue( - signed_block: SignedBlock, - queue: &mut TImpQu, - force: bool -) where - TBl: BlockT + MaybeSerializeDeserialize, - TImpQu: 'static + ImportQueue, -{ - let (header, extrinsics) = signed_block.block.deconstruct(); - let hash = header.hash(); - // import queue handles verification and importing it into the client. - queue.import_blocks(BlockOrigin::File, vec![ - IncomingBlock:: { - hash, - header: Some(header), - body: Some(extrinsics), - justification: signed_block.justification, - origin: None, - allow_missing_state: false, - import_existing: force, - } - ]); -} - -/// Returns true if we have imported every block we were supposed to import, else returns false. -fn importing_is_done( - num_expected_blocks: Option, - read_block_count: u64, - imported_blocks: u64 -) -> bool { - if let Some(num_expected_blocks) = num_expected_blocks { - imported_blocks >= num_expected_blocks - } else { - imported_blocks >= read_block_count - } -} - -/// Structure used to log the block importing speed. -struct Speedometer { - best_number: NumberFor, - last_number: Option>, - last_update: Instant, -} - -impl Speedometer { - /// Creates a fresh Speedometer. - fn new() -> Self { - Self { - best_number: NumberFor::::from(0), - last_number: None, - last_update: Instant::now(), - } - } - - /// Calculates `(best_number - last_number) / (now - last_update)` and - /// logs the speed of import. - fn display_speed(&self) { - // Number of milliseconds elapsed since last time. - let elapsed_ms = { - let elapsed = self.last_update.elapsed(); - let since_last_millis = elapsed.as_secs() * 1000; - let since_last_subsec_millis = elapsed.subsec_millis() as u64; - since_last_millis + since_last_subsec_millis - }; - - // Number of blocks that have been imported since last time. - let diff = match self.last_number { - None => return, - Some(n) => self.best_number.saturating_sub(n) - }; - - if let Ok(diff) = TryInto::::try_into(diff) { - // If the number of blocks can be converted to a regular integer, then it's easy: just - // do the math and turn it into a `f64`. - let speed = diff.saturating_mul(10_000).checked_div(u128::from(elapsed_ms)) - .map_or(0.0, |s| s as f64) / 10.0; - info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed); - } else { - // If the number of blocks can't be converted to a regular integer, then we need a more - // algebraic approach and we stay within the realm of integers. - let one_thousand = NumberFor::::from(1_000); - let elapsed = NumberFor::::from( - >::try_from(elapsed_ms).unwrap_or(u32::max_value()) - ); - - let speed = diff.saturating_mul(one_thousand).checked_div(&elapsed) - .unwrap_or_else(Zero::zero); - info!("📦 Current best block: {} ({} bps)", self.best_number, speed) - } - } - - /// Updates the Speedometer. - fn update(&mut self, best_number: NumberFor) { - self.last_number = Some(self.best_number); - self.best_number = best_number; - self.last_update = Instant::now(); - } - - // If more than TIME_BETWEEN_UPDATES has elapsed since last update, - // then print and update the speedometer. - fn notify_user(&mut self, best_number: NumberFor) { - let delta = Duration::from_millis(TIME_BETWEEN_UPDATES); - if Instant::now().duration_since(self.last_update) >= delta { - self.display_speed(); - self.update(best_number); - } - } -} - -/// Different State that the `import_blocks` future could be in. -enum ImportState where - R: Read + Seek + 'static, - B: BlockT + MaybeSerializeDeserialize, -{ - /// We are reading from the BlockIter structure, adding those blocks to the queue if possible. - Reading{block_iter: BlockIter}, - /// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up. - WaitingForImportQueueToCatchUp{ - block_iter: BlockIter, - delay: Delay, - block: SignedBlock - }, - // We have added all the blocks to the queue but they are still being processed. - WaitingForImportQueueToFinish{ - num_expected_blocks: Option, - read_block_count: u64, - delay: Delay, - }, -} - -impl< - TBl, TRtApi, TBackend, - TExecDisp, TFchr, TSc, TImpQu, TFprb, TFpp, - TExPool, TRpc, Backend -> ServiceBuilderCommand for ServiceBuilder< - TBl, TRtApi, - Client>, TBl, TRtApi>, - TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend -> where - TBl: BlockT + MaybeSerializeDeserialize, - TBackend: 'static + sc_client_api::backend::Backend + Send, - TExecDisp: 'static + NativeExecutionDispatch, - TImpQu: 'static + ImportQueue, - TRtApi: 'static + Send + Sync, - Self: Send + 'static, -{ - type Block = TBl; - type NativeDispatch = TExecDisp; - - fn import_blocks( - mut self, - input: impl Read + Seek + Send + 'static, - force: bool, - binary: bool, - ) -> Pin> + Send>> { - struct WaitLink { - imported_blocks: u64, - has_error: bool, - } - - impl WaitLink { - fn new() -> WaitLink { - WaitLink { - imported_blocks: 0, - has_error: false, - } - } - } - - impl Link for WaitLink { - fn blocks_processed( - &mut self, - imported: usize, - _num_expected_blocks: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)> - ) { - self.imported_blocks += imported as u64; - - for result in results { - if let (Err(err), hash) = result { - warn!("There was an error importing block with hash {:?}: {:?}", hash, err); - self.has_error = true; - break; - } - } - } - } - - let mut link = WaitLink::new(); - let block_iter_res: Result, String> = BlockIter::new(input, binary); - - let block_iter = match block_iter_res { - Ok(block_iter) => block_iter, - Err(e) => { - // We've encountered an error while creating the block iterator - // so we can just return a future that returns an error. - return future::ready(Err(Error::Other(e))).boxed() - } - }; - - let mut state = Some(ImportState::Reading{block_iter}); - let mut speedometer = Speedometer::::new(); - - // Importing blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we read a block from the input or import a bunch of blocks from the import - // queue, the `Future` re-schedules itself and returns `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block imports, - // or to stop the operation completely. - let import = future::poll_fn(move |cx| { - let client = &self.client; - let queue = &mut self.import_queue; - match state.take().expect("state should never be None; qed") { - ImportState::Reading{mut block_iter} => { - match block_iter.next() { - None => { - // The iterator is over: we now need to wait for the import queue to finish. - let num_expected_blocks = block_iter.num_expected_blocks(); - let read_block_count = block_iter.read_block_count(); - let delay = Delay::new(Duration::from_millis(DELAY_TIME)); - state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); - }, - Some(block_result) => { - let read_block_count = block_iter.read_block_count(); - match block_result { - Ok(block) => { - if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { - // The queue is full, so do not add this block and simply wait until - // the queue has made some progress. - let delay = Delay::new(Duration::from_millis(DELAY_TIME)); - state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); - } else { - // Queue is not full, we can keep on adding blocks to the queue. - import_block_to_queue(block, queue, force); - state = Some(ImportState::Reading{block_iter}); - } - } - Err(e) => { - return Poll::Ready( - Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e)))) - } - } - } - } - }, - ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => { - let read_block_count = block_iter.read_block_count(); - if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { - // Queue is still full, so wait until there is room to insert our block. - match Pin::new(&mut delay).poll(cx) { - Poll::Pending => { - state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); - return Poll::Pending - }, - Poll::Ready(_) => { - delay.reset(Duration::from_millis(DELAY_TIME)); - }, - } - state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); - } else { - // Queue is no longer full, so we can add our block to the queue. - import_block_to_queue(block, queue, force); - // Switch back to Reading state. - state = Some(ImportState::Reading{block_iter}); - } - }, - ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => { - // All the blocks have been added to the queue, which doesn't mean they - // have all been properly imported. - if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) { - // Importing is done, we can log the result and return. - info!( - "🎉 Imported {} blocks. Best: #{}", - read_block_count, client.chain_info().best_number - ); - return Poll::Ready(Ok(())) - } else { - // Importing is not done, we still have to wait for the queue to finish. - // Wait for the delay, because we know the queue is lagging behind. - match Pin::new(&mut delay).poll(cx) { - Poll::Pending => { - state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); - return Poll::Pending - }, - Poll::Ready(_) => { - delay.reset(Duration::from_millis(DELAY_TIME)); - }, - } - - state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); - } - } - } - - queue.poll_actions(cx, &mut link); - - let best_number = client.chain_info().best_number; - speedometer.notify_user(best_number); - - if link.has_error { - return Poll::Ready(Err( - Error::Other( - format!("Stopping after #{} blocks because of an error", link.imported_blocks) - ) - )) - } - - cx.waker().wake_by_ref(); - Poll::Pending - }); - Box::pin(import) - } - - fn export_blocks( - self, - mut output: impl Write + 'static, - from: NumberFor, - to: Option>, - binary: bool - ) -> Pin>>> { - let mut block = from; - - let last = match to { - Some(v) if v.is_zero() => One::one(), - Some(v) => v, - None => self.client.chain_info().best_number, - }; - - let mut wrote_header = false; - - // Exporting blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we write a block to the output, the `Future` re-schedules itself and returns - // `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block exports, - // or to stop the operation completely. - let export = future::poll_fn(move |cx| { - let client = &self.client; - - if last < block { - return Poll::Ready(Err("Invalid block range specified".into())); - } - - if !wrote_header { - info!("Exporting blocks from #{} to #{}", block, last); - if binary { - let last_: u64 = last.saturated_into::(); - let block_: u64 = block.saturated_into::(); - let len: u64 = last_ - block_ + 1; - output.write_all(&len.encode())?; - } - wrote_header = true; - } - - match client.block(&BlockId::number(block))? { - Some(block) => { - if binary { - output.write_all(&block.encode())?; - } else { - serde_json::to_writer(&mut output, &block) - .map_err(|e| format!("Error writing JSON: {}", e))?; - } - }, - // Reached end of the chain. - None => return Poll::Ready(Ok(())), - } - if (block % 10000.into()).is_zero() { - info!("#{}", block); - } - if block == last { - return Poll::Ready(Ok(())); - } - block += One::one(); - - // Re-schedule the task in order to continue the operation. - cx.waker().wake_by_ref(); - Poll::Pending - }); - - Box::pin(export) - } - - fn revert_chain( - &self, - blocks: NumberFor - ) -> Result<(), Error> { - let reverted = self.client.revert(blocks)?; - let info = self.client.chain_info(); - - if reverted.is_zero() { - info!("There aren't any non-finalized blocks to revert."); - } else { - info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); - } - Ok(()) - } - - fn check_block( - self, - block_id: BlockId - ) -> Pin> + Send>> { - match self.client.block(&block_id) { - Ok(Some(block)) => { - let mut buf = Vec::new(); - 1u64.encode_to(&mut buf); - block.encode_to(&mut buf); - let reader = std::io::Cursor::new(buf); - self.import_blocks(reader, true, true) - } - Ok(None) => Box::pin(future::err("Unknown block".into())), - Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), - } - } - - fn export_raw_state( - &self, - block: Option>, - ) -> Result { - let block = block.unwrap_or_else( - || BlockId::Hash(self.client.usage_info().chain.best_hash) - ); - - let empty_key = StorageKey(Vec::new()); - let mut top_storage = self.client.storage_pairs(&block, &empty_key)?; - let mut children_default = HashMap::new(); - - // Remove all default child storage roots from the top storage and collect the child storage - // pairs. - while let Some(pos) = top_storage - .iter() - .position(|(k, _)| k.0.starts_with(well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX)) { - let (key, _) = top_storage.swap_remove(pos); - - let key = StorageKey( - key.0[well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.len()..].to_vec(), - ); - let child_info = ChildInfo::new_default(&key.0); - - let keys = self.client.child_storage_keys(&block, &child_info, &empty_key)?; - let mut pairs = StorageMap::new(); - keys.into_iter().try_for_each(|k| { - if let Some(value) = self.client.child_storage(&block, &child_info, &k)? { - pairs.insert(k.0, value.0); - } - - Ok::<_, Error>(()) - })?; - - children_default.insert(key.0, StorageChild { child_info, data: pairs }); - } - - let top = top_storage.into_iter().map(|(k, v)| (k.0, v.0)).collect(); - Ok(Storage { top, children_default }) - } -} diff --git a/client/service/src/chain_ops/check_block.rs b/client/service/src/chain_ops/check_block.rs new file mode 100644 index 0000000000000..648204dd7f839 --- /dev/null +++ b/client/service/src/chain_ops/check_block.rs @@ -0,0 +1,54 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use futures::{future, prelude::*}; +use sp_runtime::traits::Block as BlockT; +use sp_runtime::generic::BlockId; +use codec::Encode; +use crate::client::Client; +use sp_consensus::import_queue::ImportQueue; +use sc_client_api::client::BlockBackend; + +use std::pin::Pin; +use std::sync::Arc; +use crate::chain_ops::import_blocks; + +/// Re-validate known block. +pub fn check_block( + client: Arc>, + import_queue: IQ, + block_id: BlockId +) -> Pin> + Send>> +where + B: BlockT + for<'de> serde::Deserialize<'de>, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: ImportQueue + 'static, + RA: Send + Sync + 'static, +{ + match client.block(&block_id) { + Ok(Some(block)) => { + let mut buf = Vec::new(); + 1u64.encode_to(&mut buf); + block.encode_to(&mut buf); + let reader = std::io::Cursor::new(buf); + import_blocks(client, import_queue, reader, true, true) + } + Ok(None) => Box::pin(future::err("Unknown block".into())), + Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), + } +} diff --git a/client/service/src/chain_ops/export_blocks.rs b/client/service/src/chain_ops/export_blocks.rs new file mode 100644 index 0000000000000..1b51ab9d11136 --- /dev/null +++ b/client/service/src/chain_ops/export_blocks.rs @@ -0,0 +1,109 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use log::info; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, One, Zero, SaturatedConversion +}; +use sp_runtime::generic::BlockId; +use codec::Encode; +use crate::client::Client; + +use std::{io::Write, pin::Pin}; +use sc_client_api::BlockBackend; +use std::sync::Arc; +use std::task::Poll; + +/// Performs the blocks export. +pub fn export_blocks( + client: Arc>, + mut output: impl Write + 'static, + from: NumberFor, + to: Option>, + binary: bool +) -> Pin>>> +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + let mut block = from; + + let last = match to { + Some(v) if v.is_zero() => One::one(), + Some(v) => v, + None => client.chain_info().best_number, + }; + + let mut wrote_header = false; + + // Exporting blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we write a block to the output, the `Future` re-schedules itself and returns + // `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block exports, + // or to stop the operation completely. + let export = future::poll_fn(move |cx| { + let client = &client; + + if last < block { + return Poll::Ready(Err("Invalid block range specified".into())); + } + + if !wrote_header { + info!("Exporting blocks from #{} to #{}", block, last); + if binary { + let last_: u64 = last.saturated_into::(); + let block_: u64 = block.saturated_into::(); + let len: u64 = last_ - block_ + 1; + output.write_all(&len.encode())?; + } + wrote_header = true; + } + + match client.block(&BlockId::number(block))? { + Some(block) => { + if binary { + output.write_all(&block.encode())?; + } else { + serde_json::to_writer(&mut output, &block) + .map_err(|e| format!("Error writing JSON: {}", e))?; + } + }, + // Reached end of the chain. + None => return Poll::Ready(Ok(())), + } + if (block % 10000.into()).is_zero() { + info!("#{}", block); + } + if block == last { + return Poll::Ready(Ok(())); + } + block += One::one(); + + // Re-schedule the task in order to continue the operation. + cx.waker().wake_by_ref(); + Poll::Pending + }); + + Box::pin(export) +} + + diff --git a/client/service/src/chain_ops/export_raw_state.rs b/client/service/src/chain_ops/export_raw_state.rs new file mode 100644 index 0000000000000..96ad94718b069 --- /dev/null +++ b/client/service/src/chain_ops/export_raw_state.rs @@ -0,0 +1,73 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use sp_runtime::traits::Block as BlockT; +use sp_runtime::generic::BlockId; +use crate::client::Client; +use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageChild, StorageMap}; +use sc_client_api::{StorageProvider, UsageProvider}; + +use std::{collections::HashMap, sync::Arc}; + +/// Export the raw state at the given `block`. If `block` is `None`, the +/// best block will be used. +pub fn export_raw_state( + client: Arc>, + block: Option>, +) -> Result +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + let block = block.unwrap_or_else( + || BlockId::Hash(client.usage_info().chain.best_hash) + ); + + let empty_key = StorageKey(Vec::new()); + let mut top_storage = client.storage_pairs(&block, &empty_key)?; + let mut children_default = HashMap::new(); + + // Remove all default child storage roots from the top storage and collect the child storage + // pairs. + while let Some(pos) = top_storage + .iter() + .position(|(k, _)| k.0.starts_with(well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX)) { + let (key, _) = top_storage.swap_remove(pos); + + let key = StorageKey( + key.0[well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.len()..].to_vec(), + ); + let child_info = ChildInfo::new_default(&key.0); + + let keys = client.child_storage_keys(&block, &child_info, &empty_key)?; + let mut pairs = StorageMap::new(); + keys.into_iter().try_for_each(|k| { + if let Some(value) = client.child_storage(&block, &child_info, &k)? { + pairs.insert(k.0, value.0); + } + + Ok::<_, Error>(()) + })?; + + children_default.insert(key.0, StorageChild { child_info, data: pairs }); + } + + let top = top_storage.into_iter().map(|(k, v)| (k.0, v.0)).collect(); + Ok(Storage { top, children_default }) +} diff --git a/client/service/src/chain_ops/import_blocks.rs b/client/service/src/chain_ops/import_blocks.rs new file mode 100644 index 0000000000000..7e3f57665fb47 --- /dev/null +++ b/client/service/src/chain_ops/import_blocks.rs @@ -0,0 +1,474 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::error; +use crate::error::Error; +use sc_chain_spec::ChainSpec; +use log::{warn, info}; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, Zero, Header, MaybeSerializeDeserialize, +}; +use sp_runtime::generic::SignedBlock; +use codec::{Decode, IoReader as CodecIoReader}; +use crate::client::Client; +use sp_consensus::{ + BlockOrigin, + import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, +}; + +use std::{io::{Read, Seek}, pin::Pin}; +use std::time::{Duration, Instant}; +use futures_timer::Delay; +use std::task::Poll; +use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer}; +use std::convert::{TryFrom, TryInto}; +use sp_runtime::traits::{CheckedDiv, Saturating}; + +/// Number of blocks we will add to the queue before waiting for the queue to catch up. +const MAX_PENDING_BLOCKS: u64 = 1_024; + +/// Number of milliseconds to wait until next poll. +const DELAY_TIME: u64 = 2_000; + +/// Number of milliseconds that must have passed between two updates. +const TIME_BETWEEN_UPDATES: u64 = 3_000; + +use std::sync::Arc; + +/// Build a chain spec json +pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result { + spec.as_json(raw).map_err(Into::into) +} + + +/// Helper enum that wraps either a binary decoder (from parity-scale-codec), or a JSON decoder +/// (from serde_json). Implements the Iterator Trait, calling `next()` will decode the next +/// SignedBlock and return it. +enum BlockIter where + R: std::io::Read + std::io::Seek, +{ + Binary { + // Total number of blocks we are expecting to decode. + num_expected_blocks: u64, + // Number of blocks we have decoded thus far. + read_block_count: u64, + // Reader to the data, used for decoding new blocks. + reader: CodecIoReader, + }, + Json { + // Nubmer of blocks we have decoded thus far. + read_block_count: u64, + // Stream to the data, used for decoding new blocks. + reader: StreamDeserializer<'static, JsonIoRead, SignedBlock>, + }, +} + +impl BlockIter where + R: Read + Seek + 'static, + B: BlockT + MaybeSerializeDeserialize, +{ + fn new(input: R, binary: bool) -> Result { + if binary { + let mut reader = CodecIoReader(input); + // If the file is encoded in binary format, it is expected to first specify the number + // of blocks that are going to be decoded. We read it and add it to our enum struct. + let num_expected_blocks: u64 = Decode::decode(&mut reader) + .map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?; + Ok(BlockIter::Binary { + num_expected_blocks, + read_block_count: 0, + reader, + }) + } else { + let stream_deser = Deserializer::from_reader(input) + .into_iter::>(); + Ok(BlockIter::Json { + reader: stream_deser, + read_block_count: 0, + }) + } + } + + /// Returns the number of blocks read thus far. + fn read_block_count(&self) -> u64 { + match self { + BlockIter::Binary { read_block_count, .. } + | BlockIter::Json { read_block_count, .. } + => *read_block_count, + } + } + + /// Returns the total number of blocks to be imported, if possible. + fn num_expected_blocks(&self) -> Option { + match self { + BlockIter::Binary { num_expected_blocks, ..} => Some(*num_expected_blocks), + BlockIter::Json {..} => None + } + } +} + +impl Iterator for BlockIter where + R: Read + Seek + 'static, + B: BlockT + MaybeSerializeDeserialize, +{ + type Item = Result, String>; + + fn next(&mut self) -> Option { + match self { + BlockIter::Binary { num_expected_blocks, read_block_count, reader } => { + if read_block_count < num_expected_blocks { + let block_result: Result, _> = SignedBlock::::decode(reader) + .map_err(|e| e.to_string()); + *read_block_count += 1; + Some(block_result) + } else { + // `read_block_count` == `num_expected_blocks` so we've read enough blocks. + None + } + } + BlockIter::Json { reader, read_block_count } => { + let res = Some(reader.next()?.map_err(|e| e.to_string())); + *read_block_count += 1; + res + } + } + } +} + +/// Imports the SignedBlock to the queue. +fn import_block_to_queue( + signed_block: SignedBlock, + queue: &mut TImpQu, + force: bool +) where + TBl: BlockT + MaybeSerializeDeserialize, + TImpQu: 'static + ImportQueue, +{ + let (header, extrinsics) = signed_block.block.deconstruct(); + let hash = header.hash(); + // import queue handles verification and importing it into the client. + queue.import_blocks(BlockOrigin::File, vec![ + IncomingBlock:: { + hash, + header: Some(header), + body: Some(extrinsics), + justification: signed_block.justification, + origin: None, + allow_missing_state: false, + import_existing: force, + } + ]); +} + +/// Returns true if we have imported every block we were supposed to import, else returns false. +fn importing_is_done( + num_expected_blocks: Option, + read_block_count: u64, + imported_blocks: u64 +) -> bool { + if let Some(num_expected_blocks) = num_expected_blocks { + imported_blocks >= num_expected_blocks + } else { + imported_blocks >= read_block_count + } +} + +/// Structure used to log the block importing speed. +struct Speedometer { + best_number: NumberFor, + last_number: Option>, + last_update: Instant, +} + +impl Speedometer { + /// Creates a fresh Speedometer. + fn new() -> Self { + Self { + best_number: NumberFor::::from(0), + last_number: None, + last_update: Instant::now(), + } + } + + /// Calculates `(best_number - last_number) / (now - last_update)` and + /// logs the speed of import. + fn display_speed(&self) { + // Number of milliseconds elapsed since last time. + let elapsed_ms = { + let elapsed = self.last_update.elapsed(); + let since_last_millis = elapsed.as_secs() * 1000; + let since_last_subsec_millis = elapsed.subsec_millis() as u64; + since_last_millis + since_last_subsec_millis + }; + + // Number of blocks that have been imported since last time. + let diff = match self.last_number { + None => return, + Some(n) => self.best_number.saturating_sub(n) + }; + + if let Ok(diff) = TryInto::::try_into(diff) { + // If the number of blocks can be converted to a regular integer, then it's easy: just + // do the math and turn it into a `f64`. + let speed = diff.saturating_mul(10_000).checked_div(u128::from(elapsed_ms)) + .map_or(0.0, |s| s as f64) / 10.0; + info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed); + } else { + // If the number of blocks can't be converted to a regular integer, then we need a more + // algebraic approach and we stay within the realm of integers. + let one_thousand = NumberFor::::from(1_000); + let elapsed = NumberFor::::from( + >::try_from(elapsed_ms).unwrap_or(u32::max_value()) + ); + + let speed = diff.saturating_mul(one_thousand).checked_div(&elapsed) + .unwrap_or_else(Zero::zero); + info!("📦 Current best block: {} ({} bps)", self.best_number, speed) + } + } + + /// Updates the Speedometer. + fn update(&mut self, best_number: NumberFor) { + self.last_number = Some(self.best_number); + self.best_number = best_number; + self.last_update = Instant::now(); + } + + // If more than TIME_BETWEEN_UPDATES has elapsed since last update, + // then print and update the speedometer. + fn notify_user(&mut self, best_number: NumberFor) { + let delta = Duration::from_millis(TIME_BETWEEN_UPDATES); + if Instant::now().duration_since(self.last_update) >= delta { + self.display_speed(); + self.update(best_number); + } + } +} + +/// Different State that the `import_blocks` future could be in. +enum ImportState where + R: Read + Seek + 'static, + B: BlockT + MaybeSerializeDeserialize, +{ + /// We are reading from the BlockIter structure, adding those blocks to the queue if possible. + Reading{block_iter: BlockIter}, + /// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to + /// catch up. + WaitingForImportQueueToCatchUp{ + block_iter: BlockIter, + delay: Delay, + block: SignedBlock + }, + // We have added all the blocks to the queue but they are still being processed. + WaitingForImportQueueToFinish{ + num_expected_blocks: Option, + read_block_count: u64, + delay: Delay, + }, +} + +/// Starts the process of importing blocks. +pub fn import_blocks( + client: Arc>, + mut import_queue: IQ, + input: impl Read + Seek + Send + 'static, + force: bool, + binary: bool, +) -> Pin> + Send>> +where + B: BlockT + for<'de> serde::Deserialize<'de>, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: ImportQueue + 'static, + RA: Send + Sync + 'static, +{ + struct WaitLink { + imported_blocks: u64, + has_error: bool, + } + + impl WaitLink { + fn new() -> WaitLink { + WaitLink { + imported_blocks: 0, + has_error: false, + } + } + } + + impl Link for WaitLink { + fn blocks_processed( + &mut self, + imported: usize, + _num_expected_blocks: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + self.imported_blocks += imported as u64; + + for result in results { + if let (Err(err), hash) = result { + warn!("There was an error importing block with hash {:?}: {:?}", hash, err); + self.has_error = true; + break; + } + } + } + } + + let mut link = WaitLink::new(); + let block_iter_res: Result, String> = BlockIter::new(input, binary); + + let block_iter = match block_iter_res { + Ok(block_iter) => block_iter, + Err(e) => { + // We've encountered an error while creating the block iterator + // so we can just return a future that returns an error. + return future::ready(Err(Error::Other(e))).boxed() + } + }; + + let mut state = Some(ImportState::Reading{block_iter}); + let mut speedometer = Speedometer::::new(); + + // Importing blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we read a block from the input or import a bunch of blocks from the import + // queue, the `Future` re-schedules itself and returns `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block imports, + // or to stop the operation completely. + let import = future::poll_fn(move |cx| { + let client = &client; + let queue = &mut import_queue; + match state.take().expect("state should never be None; qed") { + ImportState::Reading{mut block_iter} => { + match block_iter.next() { + None => { + // The iterator is over: we now need to wait for the import queue to finish. + let num_expected_blocks = block_iter.num_expected_blocks(); + let read_block_count = block_iter.read_block_count(); + let delay = Delay::new(Duration::from_millis(DELAY_TIME)); + state = Some(ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, delay + }); + }, + Some(block_result) => { + let read_block_count = block_iter.read_block_count(); + match block_result { + Ok(block) => { + if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { + // The queue is full, so do not add this block and simply wait + // until the queue has made some progress. + let delay = Delay::new(Duration::from_millis(DELAY_TIME)); + state = Some(ImportState::WaitingForImportQueueToCatchUp { + block_iter, delay, block + }); + } else { + // Queue is not full, we can keep on adding blocks to the queue. + import_block_to_queue(block, queue, force); + state = Some(ImportState::Reading{block_iter}); + } + } + Err(e) => { + return Poll::Ready( + Err(Error::Other( + format!("Error reading block #{}: {}", read_block_count, e) + ))) + } + } + } + } + }, + ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => { + let read_block_count = block_iter.read_block_count(); + if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { + // Queue is still full, so wait until there is room to insert our block. + match Pin::new(&mut delay).poll(cx) { + Poll::Pending => { + state = Some(ImportState::WaitingForImportQueueToCatchUp { + block_iter, delay, block + }); + return Poll::Pending + }, + Poll::Ready(_) => { + delay.reset(Duration::from_millis(DELAY_TIME)); + }, + } + state = Some(ImportState::WaitingForImportQueueToCatchUp { + block_iter, delay, block + }); + } else { + // Queue is no longer full, so we can add our block to the queue. + import_block_to_queue(block, queue, force); + // Switch back to Reading state. + state = Some(ImportState::Reading{block_iter}); + } + }, + ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, mut delay + } => { + // All the blocks have been added to the queue, which doesn't mean they + // have all been properly imported. + if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) { + // Importing is done, we can log the result and return. + info!( + "🎉 Imported {} blocks. Best: #{}", + read_block_count, client.chain_info().best_number + ); + return Poll::Ready(Ok(())) + } else { + // Importing is not done, we still have to wait for the queue to finish. + // Wait for the delay, because we know the queue is lagging behind. + match Pin::new(&mut delay).poll(cx) { + Poll::Pending => { + state = Some(ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, delay + }); + return Poll::Pending + }, + Poll::Ready(_) => { + delay.reset(Duration::from_millis(DELAY_TIME)); + }, + } + + state = Some(ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, delay + }); + } + } + } + + queue.poll_actions(cx, &mut link); + + let best_number = client.chain_info().best_number; + speedometer.notify_user(best_number); + + if link.has_error { + return Poll::Ready(Err( + Error::Other( + format!("Stopping after #{} blocks because of an error", link.imported_blocks) + ) + )) + } + + cx.waker().wake_by_ref(); + Poll::Pending + }); + Box::pin(import) +} diff --git a/client/service/src/chain_ops/mod.rs b/client/service/src/chain_ops/mod.rs new file mode 100644 index 0000000000000..af6e6f632fc06 --- /dev/null +++ b/client/service/src/chain_ops/mod.rs @@ -0,0 +1,29 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain utilities. + +mod check_block; +mod export_blocks; +mod export_raw_state; +mod import_blocks; +mod revert_chain; + +pub use check_block::*; +pub use export_blocks::*; +pub use export_raw_state::*; +pub use import_blocks::*; +pub use revert_chain::*; diff --git a/client/service/src/chain_ops/revert_chain.rs b/client/service/src/chain_ops/revert_chain.rs new file mode 100644 index 0000000000000..3eff1425bfa50 --- /dev/null +++ b/client/service/src/chain_ops/revert_chain.rs @@ -0,0 +1,44 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use log::info; +use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; +use crate::client::Client; + +use std::sync::Arc; + +/// Performs a revert of `blocks` blocks. +pub fn revert_chain( + client: Arc>, + blocks: NumberFor +) -> Result<(), Error> +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + let reverted = client.revert(blocks)?; + let info = client.chain_info(); + + if reverted.is_zero() { + info!("There aren't any non-finalized blocks to revert."); + } else { + info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); + } + Ok(()) +} diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 67ac7bdb4fbd7..41a6a375a6687 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -23,7 +23,6 @@ #![recursion_limit="128"] pub mod config; -#[macro_use] pub mod chain_ops; pub mod error; @@ -36,6 +35,9 @@ mod client; mod status_sinks; mod task_manager; +use crate::config::RpcMethods; +pub use sp_consensus::import_queue::ImportQueue; +pub use builder::*; use std::{io, pin::Pin}; use std::marker::PhantomData; use std::net::SocketAddr; @@ -45,7 +47,7 @@ use wasm_timer::Instant; use std::task::{Poll, Context}; use parking_lot::Mutex; -use client::Client; +pub use client::Client; use futures::{ Future, FutureExt, Stream, StreamExt, compat::*, @@ -61,12 +63,7 @@ use parity_util_mem::MallocSizeOf; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; pub use self::error::Error; -pub use self::builder::{ - new_full_client, new_client, - ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend, - TFullCallExecutor, TLightCallExecutor, RpcExtensionBuilder, -}; -pub use config::{Configuration, DatabaseConfig, PruningMode, Role, RpcMethods, TaskType}; +pub use config::{Configuration, Role, PruningMode, DatabaseConfig}; pub use sc_chain_spec::{ ChainSpec, GenericChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension, NoExtension, ChainType, diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index 553ca9c326d8b..072c41b76b764 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -184,7 +184,7 @@ impl TaskManager { self.spawn_handle().spawn(name, task) } - pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { + pub fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), executor: self.executor.clone(), diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 206153082505c..7d619aff966ef 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -34,11 +34,10 @@ use sc_service::{ GenericChainSpec, ChainSpecExtension, Configuration, - config::{DatabaseConfig, KeystoreConfig}, + config::{DatabaseConfig, KeystoreConfig, TaskType}, RuntimeGenesis, Role, Error, - TaskType, }; use sp_blockchain::HeaderBackend; use sc_network::{multiaddr, Multiaddr}; @@ -258,7 +257,9 @@ impl TestNet where for (key, authority) in authorities { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| { + executor.spawn(fut.unit_error().compat()) + }) }; let node_config = node_config( self.nodes, diff --git a/primitives/consensus/common/src/import_queue/buffered_link.rs b/primitives/consensus/common/src/import_queue/buffered_link.rs index d85121a710ecb..371542573a549 100644 --- a/primitives/consensus/common/src/import_queue/buffered_link.rs +++ b/primitives/consensus/common/src/import_queue/buffered_link.rs @@ -39,6 +39,7 @@ //! use futures::prelude::*; +use futures::stream::FusedStream; use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded}; use std::{pin::Pin, task::Context, task::Poll}; @@ -139,6 +140,10 @@ impl BufferedLinkReceiver { /// it is as if this method always returned `Poll::Pending`. pub fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { loop { + if self.rx.is_terminated() { + break; + } + let msg = if let Poll::Ready(Some(msg)) = Stream::poll_next(Pin::new(&mut self.rx), cx) { msg } else {