diff --git a/Cargo.lock b/Cargo.lock index 00b8e0437af..e9619c4da96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1025,7 +1025,6 @@ dependencies = [ name = "cumulus-collator" version = "0.1.0" dependencies = [ - "cumulus-consensus", "cumulus-network", "cumulus-primitives", "cumulus-runtime", @@ -1045,7 +1044,6 @@ dependencies = [ "polkadot-service", "polkadot-test-client", "polkadot-validation", - "sc-block-builder", "sc-cli", "sc-client-api", "sp-api", @@ -1064,8 +1062,10 @@ dependencies = [ name = "cumulus-consensus" version = "0.1.0" dependencies = [ + "cumulus-test-client", + "cumulus-test-runtime", "futures 0.3.8", - "log", + "futures-timer 3.0.2", "parity-scale-codec", "polkadot-primitives", "polkadot-runtime", @@ -1077,8 +1077,10 @@ dependencies = [ "sp-core", "sp-inherents", "sp-runtime", + "sp-tracing", "substrate-prometheus-endpoint", "tokio 0.1.22", + "tracing", ] [[package]] @@ -1175,7 +1177,6 @@ dependencies = [ "memory-db", "parity-scale-codec", "polkadot-parachain", - "sc-block-builder", "sc-client-api", "sc-executor", "sp-blockchain", @@ -1211,6 +1212,7 @@ dependencies = [ "sp-core", "sp-inherents", "sp-runtime", + "tracing", ] [[package]] diff --git a/collator/Cargo.toml b/collator/Cargo.toml index 90d74d0ae9d..35b11729191 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] # Substrate dependencies -sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -16,7 +15,6 @@ sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = " sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" } # Polkadot dependencies polkadot-service = { git = "https://github.com/paritytech/polkadot", features = [ "real-overseer" ] , branch = "master" } @@ -28,7 +26,6 @@ polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = " polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "master" } # Cumulus dependencies -cumulus-consensus = { path = "../consensus" } cumulus-network = { path = "../network" } cumulus-primitives = { path = "../primitives" } cumulus-runtime = { path = "../runtime" } diff --git a/collator/src/lib.rs b/collator/src/lib.rs index 9130c4f7300..33abd35618f 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -24,8 +24,7 @@ use cumulus_primitives::{ }; use cumulus_runtime::ParachainBlockData; -use sc_client_api::{BlockBackend, Finalizer, StateBackend, UsageProvider}; -use sp_blockchain::HeaderBackend; +use sc_client_api::{BlockBackend, StateBackend}; use sp_consensus::{ BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError, ForkChoiceStrategy, Proposal, Proposer, RecordProof, @@ -522,13 +521,12 @@ where } /// Parameters for [`start_collator`]. -pub struct StartCollatorParams { +pub struct StartCollatorParams { pub proposer_factory: PF, pub inherent_data_providers: InherentDataProviders, pub backend: Arc, pub block_import: BI, pub block_status: Arc, - pub client: Arc, pub announce_block: Arc) + Send + Sync>, pub overseer_handler: OverseerHandler, pub spawner: Spawner, @@ -543,7 +541,6 @@ pub async fn start_collator< PF, BI, Backend, - Client, BS, Spawner, PClient, @@ -557,7 +554,6 @@ pub async fn start_collator< backend, block_import, block_status, - client, announce_block, mut overseer_handler, spawner, @@ -565,7 +561,7 @@ pub async fn start_collator< key, polkadot_client, polkadot_backend, - }: StartCollatorParams, + }: StartCollatorParams, ) -> Result<(), String> where PF: Environment + Send + 'static, @@ -574,14 +570,6 @@ where + Sync + 'static, Backend: sc_client_api::Backend + 'static, - Client: Finalizer - + UsageProvider - + HeaderBackend - + Send - + Sync - + BlockBackend - + 'static, - for<'a> &'a Client: BlockImport, BS: BlockBackend + Send + Sync + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static, PBackend: sc_client_api::Backend + 'static, @@ -591,18 +579,6 @@ where PBackend2: sc_client_api::Backend + 'static, PBackend2::State: StateBackend, { - let follow = match cumulus_consensus::follow_polkadot( - para_id, - client, - polkadot_client.clone(), - announce_block.clone(), - ) { - Ok(follow) => follow, - Err(e) => return Err(format!("Could not start following polkadot: {:?}", e)), - }; - - spawner.spawn("cumulus-follow-polkadot", follow.map(|_| ()).boxed()); - let collator = Collator::new( para_id, proposer_factory, @@ -644,13 +620,12 @@ mod tests { use super::*; use std::{pin::Pin, time::Duration}; - use sc_block_builder::BlockBuilderProvider; use sp_core::{testing::TaskExecutor, Pair}; use sp_inherents::InherentData; use sp_runtime::traits::DigestFor; use cumulus_test_client::{ - generate_block_inherents, Client, DefaultTestClientBuilderExt, TestClientBuilder, + Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, }; use cumulus_test_runtime::{Block, Header}; @@ -700,19 +675,12 @@ mod tests { fn propose( self, _: InherentData, - digest: DigestFor, + _: DigestFor, _: Duration, - record_proof: RecordProof, + _: RecordProof, ) -> Self::Proposal { let block_id = BlockId::Hash(self.header.hash()); - let mut builder = self - .client - .new_block_at(&block_id, digest, record_proof.yes()) - .expect("Initializes new block"); - - generate_block_inherents(&*self.client, None) - .into_iter() - .for_each(|e| builder.push(e).expect("Pushes an inherent")); + let builder = self.client.init_block_builder_at(&block_id, None); let (block, storage_changes, proof) = builder.build().expect("Creates block").into_inner(); @@ -766,14 +734,13 @@ mod tests { }; let collator_start = - start_collator::<_, _, _, _, _, _, _, _, polkadot_service::FullBackend, _, _>( + start_collator::<_, _, _, _, _, _, _, polkadot_service::FullBackend, _, _>( StartCollatorParams { proposer_factory: DummyFactory(client.clone()), inherent_data_providers: Default::default(), backend, block_import: client.clone(), block_status: client.clone(), - client: client.clone(), announce_block: Arc::new(announce_block), overseer_handler: handler, spawner, diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 1a45ac4c346..53b67ba09df 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -6,7 +6,7 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -# substrate deps +# Substrate deps sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -17,12 +17,23 @@ sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "mas sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } -# polkadot deps +# Polkadot deps polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-runtime = { git = "https://github.com/paritytech/polkadot", branch = "master" } -# other deps -futures = { version = "0.3.1", features = ["compat"] } +# Other deps +futures = { version = "0.3.8", features = ["compat"] } tokio = "0.1.22" codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "derive" ] } -log = "0.4" +tracing = "0.1.22" + +[dev-dependencies] +# Substrate deps +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } + +# Cumulus dependencies +cumulus-test-runtime = { path = "../test/runtime" } +cumulus-test-client = { path = "../test/client" } + +# Other deps +futures-timer = "3.0.2" diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 8f06140c440..4eb7f660aaa 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -14,7 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use sc_client_api::{Backend, BlockBackend, Finalizer, UsageProvider}; +use sc_client_api::{ + Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, +}; use sp_api::ProvideRuntimeApi; use sp_blockchain::{Error as ClientError, Result as ClientResult}; use sp_consensus::{ @@ -27,12 +29,11 @@ use sp_runtime::{ }; use polkadot_primitives::v1::{ - Block as PBlock, Hash as PHash, Id as ParaId, OccupiedCoreAssumption, ParachainHost, + Block as PBlock, Id as ParaId, OccupiedCoreAssumption, ParachainHost, }; use codec::Decode; -use futures::{future, Future, FutureExt, Stream, StreamExt}; -use log::{error, trace, warn}; +use futures::{future, select, FutureExt, Stream, StreamExt}; use std::{marker::PhantomData, sync::Arc}; @@ -47,17 +48,8 @@ pub enum Error { InvalidHeadData, } -/// A parachain head update. -pub struct HeadUpdate { - /// The relay-chain's block hash where the parachain head updated. - pub relay_hash: PHash, - /// The parachain head-data. - pub head_data: Vec, -} - -/// Helper for the Polkadot client. This is expected to be a lightweight handle -/// like an `Arc`. -pub trait PolkadotClient: Clone + 'static { +/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`. +pub trait RelaychainClient: Clone + 'static { /// The error type for interacting with the Polkadot client. type Error: std::fmt::Debug + Send; @@ -78,165 +70,319 @@ pub trait PolkadotClient: Clone + 'static { ) -> ClientResult>>; } -/// Finalize the given block in the Parachain. -fn finalize_block(client: &T, hash: Block::Hash) -> ClientResult +/// Follow the finalized head of the given parachain. +/// +/// For every finalized block of the relay chain, it will get the included parachain header +/// corresponding to `para_id` and will finalize it in the parachain. +async fn follow_finalized_head( + para_id: ParaId, + parachain: Arc

, + relay_chain: R, +) -> ClientResult<()> where Block: BlockT, - T: Finalizer + UsageProvider, + P: Finalizer + UsageProvider, + R: RelaychainClient, B: Backend, { - // don't finalize the same block multiple times. - if client.usage_info().chain.finalized_hash != hash { - match client.finalize_block(BlockId::hash(hash), None, true) { - Ok(()) => Ok(true), - Err(e) => match e { - ClientError::UnknownBlock(_) => Ok(false), - _ => Err(e), - }, + let mut finalized_heads = relay_chain.finalized_heads(para_id)?; + + loop { + let finalized_head = if let Some(h) = finalized_heads.next().await { + h + } else { + tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head."); + return Ok(()); + }; + + let header = match Block::Header::decode(&mut &finalized_head[..]) { + Ok(header) => header, + Err(err) => { + tracing::warn!( + target: "cumulus-consensus", + error = ?err, + "Could not decode parachain header while following finalized heads.", + ); + continue; + } + }; + + let hash = header.hash(); + + // don't finalize the same block multiple times. + if parachain.usage_info().chain.finalized_hash != hash { + if let Err(e) = parachain.finalize_block(BlockId::hash(hash), None, true) { + match e { + ClientError::UnknownBlock(_) => tracing::debug!( + target: "cumulus-consensus", + block_hash = ?hash, + "Could not finalize block because it is unknown.", + ), + _ => tracing::warn!( + target: "cumulus-consensus", + error = ?e, + block_hash = ?hash, + "Failed to finalize block", + ), + } + } } - } else { - Ok(true) } } -/// Spawns a future that follows the Polkadot relay chain for the given parachain. -pub fn follow_polkadot( +/// Run the parachain consensus. +/// +/// This will follow the given `relay_chain` to act as consesus for the parachain that corresponds +/// to the given `para_id`. It will set the new best block of the parachain as it gets aware of it. +/// The same happens for the finalized block. +/// +/// # Note +/// +/// This will access the backend of the parachain and thus, this future should be spawned as blocking +/// task. +pub async fn run_parachain_consensus( para_id: ParaId, - local: Arc, - polkadot: P, + parachain: Arc

, + relay_chain: R, announce_block: Arc) + Send + Sync>, -) -> ClientResult + Send + Unpin> +) -> ClientResult<()> where Block: BlockT, - L: Finalizer + UsageProvider + Send + Sync + BlockBackend, - for<'a> &'a L: BlockImport, - P: PolkadotClient, + P: Finalizer + + UsageProvider + + Send + + Sync + + BlockBackend + + BlockchainEvents, + for<'a> &'a P: BlockImport, + R: RelaychainClient, B: Backend, { - let follow_finalized = { - let local = local.clone(); - - polkadot - .finalized_heads(para_id)? - .filter_map(|head_data| { - let res = match <::Header>::decode(&mut &head_data[..]) { - Ok(header) => Some(header), - Err(err) => { - warn!( - target: "cumulus-consensus", - "Could not decode Parachain header for finalizing: {:?}", - err, - ); - None - } - }; - - future::ready(res) - }) - .for_each(move |p_head| { - if let Err(e) = finalize_block(&*local, p_head.hash()) { - warn!( - target: "cumulus-consensus", - "Failed to finalize block: {:?}", - e, - ); - } - - future::ready(()) - }) - }; - - Ok(future::select( - follow_finalized, - follow_new_best(para_id, local, polkadot, announce_block)?, - ) - .map(|_| ())) + let follow_new_best = follow_new_best( + para_id, + parachain.clone(), + relay_chain.clone(), + announce_block, + ); + let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain); + select! { + r = follow_new_best.fuse() => r, + r = follow_finalized_head.fuse() => r, + } } /// Follow the relay chain new best head, to update the Parachain new best head. -fn follow_new_best( +async fn follow_new_best( para_id: ParaId, - local: Arc, - polkadot: P, + parachain: Arc

, + relay_chain: R, announce_block: Arc) + Send + Sync>, -) -> ClientResult + Send + Unpin> +) -> ClientResult<()> where Block: BlockT, - L: Finalizer + UsageProvider + Send + Sync + BlockBackend, - for<'a> &'a L: BlockImport, - P: PolkadotClient, + P: Finalizer + + UsageProvider + + Send + + Sync + + BlockBackend + + BlockchainEvents, + for<'a> &'a P: BlockImport, + R: RelaychainClient, B: Backend, { - Ok(polkadot - .new_best_heads(para_id)? - .filter_map(|head_data| { - let res = match <::Header>::decode(&mut &head_data[..]) { - Ok(header) => Some(header), - Err(err) => { - warn!( - target: "cumulus-consensus", - "Could not decode Parachain header: {:?}", err); - None - } - }; - - future::ready(res) - }) - .for_each(move |h| { - let hash = h.hash(); - - if local.usage_info().chain.best_hash == hash { - trace!( - target: "cumulus-consensus", - "Skipping set new best block, because block `{}` is already the best.", - hash, - ) - } else { - // Make sure the block is already known or otherwise we skip setting new best. - match local.block_status(&BlockId::Hash(hash)) { - Ok(BlockStatus::InChainWithState) => { - // Make it the new best block - let mut block_import_params = - BlockImportParams::new(BlockOrigin::ConsensusBroadcast, h); - block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true)); - block_import_params.import_existing = true; - - if let Err(err) = - (&*local).import_block(block_import_params, Default::default()) - { - warn!( - target: "cumulus-consensus", - "Failed to set new best block `{}` with error: {:?}", - hash, err - ); - } - - (*announce_block)(hash, Vec::new()); - } - Ok(BlockStatus::InChainPruned) => { - error!( - target: "cumulus-collator", - "Trying to set pruned block `{:?}` as new best!", - hash, + let mut new_best_heads = relay_chain.new_best_heads(para_id)?.fuse(); + let mut imported_blocks = parachain.import_notification_stream().fuse(); + // The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain + // block before the parachain block it included. In this case we need to wait for this block to + // be imported to set it as new best. + let mut unset_best_header = None; + + loop { + select! { + h = new_best_heads.next() => { + match h { + Some(h) => handle_new_best_parachain_head( + h, + &*parachain, + &*announce_block, + &mut unset_best_header, + ), + None => { + tracing::debug!( + target: "cumulus-consensus", + "Stopping following new best.", ); + return Ok(()) } - Err(e) => { - error!( - target: "cumulus-collator", - "Failed to get block status of block `{:?}`: {:?}", - hash, - e, + } + }, + i = imported_blocks.next() => { + match i { + Some(i) => handle_new_block_imported( + i, + &mut unset_best_header, + &*parachain, + &*announce_block, + ), + None => { + tracing::debug!( + target: "cumulus-consensus", + "Stopping following imported blocks.", ); + return Ok(()) } - _ => {} } } + } + } +} + +/// Handle a new import block of the parachain. +fn handle_new_block_imported( + notification: BlockImportNotification, + unset_best_header_opt: &mut Option, + parachain: &P, + announce_block: &dyn Fn(Block::Hash, Vec), +) where + Block: BlockT, + P: UsageProvider + Send + Sync + BlockBackend, + for<'a> &'a P: BlockImport, +{ + let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) { + // If this is the new best block or we don't have any unset block, we can end it here. + (true, _) | (_, None) => return, + (false, Some(ref u)) => u, + }; + + let unset_hash = if notification.header.number() < unset_best_header.number() { + return; + } else if notification.header.number() == unset_best_header.number() { + let unset_hash = unset_best_header.hash(); + + if unset_hash != notification.hash { + return; + } else { + unset_hash + } + } else { + unset_best_header.hash() + }; + + match parachain.block_status(&BlockId::Hash(unset_hash)) { + Ok(BlockStatus::InChainWithState) => { + drop(unset_best_header); + let unset_best_header = unset_best_header_opt + .take() + .expect("We checked above that the value is set; qed"); + + import_block_as_new_best(unset_hash, unset_best_header, parachain, announce_block); + } + state => tracing::debug!( + target: "cumulus-consensus", + unset_best_header = ?unset_best_header, + imported_header = ?notification.header, + ?state, + "Unexpected state for unset best header.", + ), + } +} + +/// Handle the new best parachain head as extracted from the new best relay chain. +fn handle_new_best_parachain_head( + head: Vec, + parachain: &P, + announce_block: &dyn Fn(Block::Hash, Vec), + unset_best_header: &mut Option, +) where + Block: BlockT, + P: UsageProvider + Send + Sync + BlockBackend, + for<'a> &'a P: BlockImport, +{ + let parachain_head = match <::Header>::decode(&mut &head[..]) { + Ok(header) => header, + Err(err) => { + tracing::warn!( + target: "cumulus-consensus", + error = ?err, + "Could not decode Parachain header while following best heads.", + ); + return; + } + }; + + let hash = parachain_head.hash(); + + if parachain.usage_info().chain.best_hash == hash { + tracing::debug!( + target: "cumulus-consensus", + block_hash = ?hash, + "Skipping set new best block, because block is already the best.", + ) + } else { + // Make sure the block is already known or otherwise we skip setting new best. + match parachain.block_status(&BlockId::Hash(hash)) { + Ok(BlockStatus::InChainWithState) => { + unset_best_header.take(); + + import_block_as_new_best(hash, parachain_head, parachain, announce_block); + } + Ok(BlockStatus::InChainPruned) => { + tracing::error!( + target: "cumulus-collator", + block_hash = ?hash, + "Trying to set pruned block as new best!", + ); + } + Ok(BlockStatus::Unknown) => { + *unset_best_header = Some(parachain_head); + + tracing::debug!( + target: "cumulus-collator", + block_hash = ?hash, + "Parachain block not yet imported, waiting for import to enact as best block.", + ); + } + Err(e) => { + tracing::error!( + target: "cumulus-collator", + block_hash = ?hash, + error = ?e, + "Failed to get block status of block.", + ); + } + _ => {} + } + } +} - future::ready(()) - })) +fn import_block_as_new_best( + hash: Block::Hash, + header: Block::Header, + parachain: &P, + announce_block: &dyn Fn(Block::Hash, Vec), +) where + Block: BlockT, + P: UsageProvider + Send + Sync + BlockBackend, + for<'a> &'a P: BlockImport, +{ + // Make it the new best block + let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header); + block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true)); + block_import_params.import_existing = true; + + if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()) { + tracing::warn!( + target: "cumulus-consensus", + block_hash = ?hash, + error = ?err, + "Failed to set new best block.", + ); + } else { + (*announce_block)(hash, Vec::new()); + } } -impl PolkadotClient for Arc +impl RelaychainClient for Arc where T: sc_client_api::BlockchainEvents + ProvideRuntimeApi + 'static + Send + Sync, >::Api: ParachainHost, @@ -329,7 +475,7 @@ impl Clone for SelectChain { impl SelectChainT for SelectChain where Block: BlockT, - PC: PolkadotClient + Clone + Send + Sync, + PC: RelaychainClient + Clone + Send + Sync, PC::Error: ToString, SC: SelectChainT, { @@ -364,3 +510,323 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + + use codec::Encode; + use cumulus_test_client::{ + runtime::{Block, Header}, + Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, + }; + use futures::{channel::mpsc, executor::block_on}; + use futures_timer::Delay; + use std::{sync::Mutex, time::Duration}; + + struct RelaychainInner { + new_best_heads: Option>, + finalized_heads: Option>, + new_best_heads_sender: mpsc::UnboundedSender

, + finalized_heads_sender: mpsc::UnboundedSender
, + } + + impl RelaychainInner { + fn new() -> Self { + let (new_best_heads_sender, new_best_heads) = mpsc::unbounded(); + let (finalized_heads_sender, finalized_heads) = mpsc::unbounded(); + + Self { + new_best_heads_sender, + finalized_heads_sender, + new_best_heads: Some(new_best_heads), + finalized_heads: Some(finalized_heads), + } + } + } + + #[derive(Clone)] + struct Relaychain { + inner: Arc>, + } + + impl Relaychain { + fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(RelaychainInner::new())), + } + } + } + + impl RelaychainClient for Relaychain { + type Error = ClientError; + + type HeadStream = Box> + Send + Unpin>; + fn new_best_heads(&self, _: ParaId) -> ClientResult { + let stream = self + .inner + .lock() + .unwrap() + .new_best_heads + .take() + .expect("Should only be called once"); + + Ok(Box::new(stream.map(|v| v.encode()))) + } + + fn finalized_heads(&self, _: ParaId) -> ClientResult { + let stream = self + .inner + .lock() + .unwrap() + .finalized_heads + .take() + .expect("Should only be called once"); + + Ok(Box::new(stream.map(|v| v.encode()))) + } + + fn parachain_head_at( + &self, + _: &BlockId, + _: ParaId, + ) -> ClientResult>> { + unimplemented!("Not required for tests") + } + } + + fn build_and_import_block(mut client: Arc) -> Block { + let builder = client.init_block_builder(None); + + let block = builder.build().unwrap().block; + let (header, body) = block.clone().deconstruct(); + + let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header); + block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + block_import_params.body = Some(body); + + client + .import_block(block_import_params, Default::default()) + .unwrap(); + assert_eq!(0, client.chain_info().best_number); + + block + } + + #[test] + fn follow_new_best_works() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone()); + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain + .inner + .lock() + .unwrap() + .new_best_heads_sender + .clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + + let work = async move { + new_best_heads_sender + .unbounded_send(block.header().clone()) + .unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.best_hash { + break; + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); + } + + #[test] + fn follow_finalized_works() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone()); + let relay_chain = Relaychain::new(); + let finalized_sender = relay_chain + .inner + .lock() + .unwrap() + .finalized_heads_sender + .clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + + let work = async move { + finalized_sender + .unbounded_send(block.header().clone()) + .unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.finalized_hash { + break; + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); + } + + #[test] + fn follow_finalized_does_not_stop_on_unknown_block() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone()); + + let unknown_block = { + let block_builder = client.init_block_builder_at(&BlockId::Hash(block.hash()), None); + block_builder.build().unwrap().block + }; + + let relay_chain = Relaychain::new(); + let finalized_sender = relay_chain + .inner + .lock() + .unwrap() + .finalized_heads_sender + .clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + + let work = async move { + for _ in 0..3usize { + finalized_sender + .unbounded_send(unknown_block.header().clone()) + .unwrap(); + + Delay::new(Duration::from_millis(100)).await; + } + + finalized_sender + .unbounded_send(block.header().clone()) + .unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.finalized_hash { + break; + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); + } + + // It can happen that we first import a relay chain block, while not yet having the parachain + // block imported that would be set to the best block. We need to make sure to import this + // block as new best block in the moment it is imported. + #[test] + fn follow_new_best_sets_best_after_it_is_imported() { + sp_tracing::try_init_simple(); + + let mut client = Arc::new(TestClientBuilder::default().build()); + + let block = build_and_import_block(client.clone()); + + let unknown_block = { + let block_builder = client.init_block_builder_at(&BlockId::Hash(block.hash()), None); + block_builder.build().unwrap().block + }; + + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain + .inner + .lock() + .unwrap() + .new_best_heads_sender + .clone(); + + let consensus = + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + + let work = async move { + new_best_heads_sender + .unbounded_send(block.header().clone()) + .unwrap(); + + loop { + Delay::new(Duration::from_millis(100)).await; + if block.hash() == client.usage_info().chain.best_hash { + break; + } + } + + // Announce the unknown block + new_best_heads_sender + .unbounded_send(unknown_block.header().clone()) + .unwrap(); + + // Do some iterations. As this is a local task executor, only one task can run at a time. + // Meaning that it should already have processed the unknown block. + for _ in 0..3usize { + Delay::new(Duration::from_millis(100)).await; + } + + let (header, body) = unknown_block.clone().deconstruct(); + + let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header); + block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + block_import_params.body = Some(body); + + // Now import the unkown block to make it "known" + client + .import_block(block_import_params, Default::default()) + .unwrap(); + + loop { + Delay::new(Duration::from_millis(100)).await; + if unknown_block.hash() == client.usage_info().chain.best_hash { + break; + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = work.fuse() => {}, + } + }); + } +} diff --git a/message-broker/src/lib.rs b/message-broker/src/lib.rs index eec8f2570c9..e0c59cd3b78 100644 --- a/message-broker/src/lib.rs +++ b/message-broker/src/lib.rs @@ -33,8 +33,8 @@ use sp_std::{cmp, prelude::*}; use cumulus_primitives::{ inherents::{MessageIngestionType, MESSAGE_INGESTION_IDENTIFIER}, - well_known_keys, DownwardMessageHandler, HrmpMessageHandler, OutboundHrmpMessage, ParaId, - UpwardMessage, UpwardMessageSender, HrmpMessageSender, + well_known_keys, DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender, + OutboundHrmpMessage, ParaId, UpwardMessage, UpwardMessageSender, }; // TODO: these should be not a constant, but sourced from the relay-chain configuration. diff --git a/parachain-upgrade/src/lib.rs b/parachain-upgrade/src/lib.rs index d0a0aa1298f..e99d9ccb736 100644 --- a/parachain-upgrade/src/lib.rs +++ b/parachain-upgrade/src/lib.rs @@ -497,10 +497,13 @@ mod tests { let inherent_data = { let mut inherent_data = InherentData::default(); inherent_data - .put_data(INHERENT_IDENTIFIER, &ValidationDataType { - validation_data: vfp.clone(), - relay_chain_state: sp_state_machine::StorageProof::empty(), - }) + .put_data( + INHERENT_IDENTIFIER, + &ValidationDataType { + validation_data: vfp.clone(), + relay_chain_state: sp_state_machine::StorageProof::empty(), + }, + ) .expect("failed to put VFP inherent"); inherent_data }; diff --git a/rococo-parachains/runtime/src/lib.rs b/rococo-parachains/runtime/src/lib.rs index b75d111d30c..0df526d32a9 100644 --- a/rococo-parachains/runtime/src/lib.rs +++ b/rococo-parachains/runtime/src/lib.rs @@ -55,15 +55,15 @@ pub use sp_runtime::{Perbill, Permill}; // XCM imports use polkadot_parachain::primitives::Sibling; -use xcm::v0::{MultiLocation, NetworkId, Junction}; +use xcm::v0::{Junction, MultiLocation, NetworkId}; use xcm_builder::{ - ParentIsDefault, SiblingParachainConvertsVia, AccountId32Aliases, LocationInverter, - SovereignSignedViaLocation, SiblingParachainAsNative, - RelayChainAsNative, SignedAccountId32AsNative, CurrencyAdapter + AccountId32Aliases, CurrencyAdapter, LocationInverter, ParentIsDefault, RelayChainAsNative, + SiblingParachainAsNative, SiblingParachainConvertsVia, SignedAccountId32AsNative, + SovereignSignedViaLocation, }; use xcm_executor::{ - XcmExecutor, Config, - traits::{NativeAsset, IsConcrete}, + traits::{IsConcrete, NativeAsset}, + Config, XcmExecutor, }; pub type SessionHandlers = (); @@ -254,17 +254,16 @@ type LocationConverter = ( AccountId32Aliases, ); -type LocalAssetTransactor = - CurrencyAdapter< - // Use this currency: - Balances, - // Use this currency when it is a fungible asset matching the given location or name: - IsConcrete, - // Do a simple punn to convert an AccountId32 MultiLocation into a native chain account ID: - LocationConverter, - // Our chain's account ID type (we can't get away without mentioning it explicitly): - AccountId, - >; +type LocalAssetTransactor = CurrencyAdapter< + // Use this currency: + Balances, + // Use this currency when it is a fungible asset matching the given location or name: + IsConcrete, + // Do a simple punn to convert an AccountId32 MultiLocation into a native chain account ID: + LocationConverter, + // Our chain's account ID type (we can't get away without mentioning it explicitly): + AccountId, +>; type LocalOriginConverter = ( SovereignSignedViaLocation, diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index b7815678966..69a3d767771 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -28,7 +28,6 @@ sp-externalities = { git = "https://github.com/paritytech/substrate", default-fe parachain = { package = "polkadot-parachain", git = "https://github.com/paritytech/polkadot", default-features = false, features = [ "wasm-api" ] , branch = "master" } [dev-dependencies] -sc-block-builder = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/runtime/src/validate_block/tests.rs b/runtime/src/validate_block/tests.rs index 48420083d24..a2881006420 100644 --- a/runtime/src/validate_block/tests.rs +++ b/runtime/src/validate_block/tests.rs @@ -18,13 +18,11 @@ use crate::ParachainBlockData; use cumulus_primitives::{PersistedValidationData, ValidationData}; use cumulus_test_client::{ - generate_block_inherents, runtime::{Block, Hash, Header, UncheckedExtrinsic, WASM_BINARY}, - transfer, Client, DefaultTestClientBuilderExt, LongestChain, TestClientBuilder, - TestClientBuilderExt, + transfer, Client, DefaultTestClientBuilderExt, InitBlockBuilder, LongestChain, + TestClientBuilder, TestClientBuilderExt, }; use parachain::primitives::{BlockData, HeadData, ValidationParams, ValidationResult}; -use sc_block_builder::BlockBuilderProvider; use sc_executor::{ error::Result, sp_wasm_interface::HostFunctions, WasmExecutionMethod, WasmExecutor, }; @@ -89,12 +87,8 @@ fn build_block_with_proof( parent_head: Header, ) -> (Block, sp_trie::StorageProof) { let block_id = BlockId::Hash(client.info().best_hash); - let mut builder = client - .new_block_at(&block_id, Default::default(), true) - .expect("Initializes new block"); - - generate_block_inherents( - client, + let mut builder = client.init_block_builder_at( + &block_id, Some(ValidationData { persisted: PersistedValidationData { block_number: 1, @@ -103,13 +97,11 @@ fn build_block_with_proof( }, ..Default::default() }), - ) - .into_iter() - .for_each(|e| builder.push(e).expect("Pushes an inherent")); + ); extra_extrinsics .into_iter() - .for_each(|e| builder.push(e).expect("Pushes an extrinsic")); + .for_each(|e| builder.push(e).unwrap()); let built_block = builder.build().expect("Creates block"); diff --git a/service/Cargo.toml b/service/Cargo.toml index fe27627ec93..b1e9fe16365 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -28,3 +28,4 @@ polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = " # Other deps futures = "0.3.6" +tracing = "0.1.22" diff --git a/service/src/lib.rs b/service/src/lib.rs index f7bb1ca78a4..ef5f73d5bfc 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -23,7 +23,9 @@ use futures::{Future, FutureExt}; use polkadot_overseer::OverseerHandler; use polkadot_primitives::v1::{Block as PBlock, CollatorId, CollatorPair}; use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection}; -use sc_client_api::{Backend as BackendT, BlockBackend, Finalizer, UsageProvider, StateBackend}; +use sc_client_api::{ + Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, StateBackend, UsageProvider, +}; use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager}; use sp_blockchain::HeaderBackend; use sp_consensus::{BlockImport, Environment, Error as ConsensusError, Proposer}; @@ -36,7 +38,18 @@ use std::{marker::PhantomData, sync::Arc}; type PFullNode = polkadot_service::NewFull; /// Parameters given to [`start_collator`]. -pub struct StartCollatorParams<'a, Block: BlockT, PF, BI, BS, Client, Backend, Spawner, PClient, PBackend> { +pub struct StartCollatorParams< + 'a, + Block: BlockT, + PF, + BI, + BS, + Client, + Backend, + Spawner, + PClient, + PBackend, +> { pub proposer_factory: PF, pub inherent_data_providers: InherentDataProviders, pub backend: Arc, @@ -91,6 +104,7 @@ where + Send + Sync + BlockBackend + + BlockchainEvents + 'static, for<'b> &'b Client: BlockImport, Backend: BackendT + 'static, @@ -99,13 +113,20 @@ where PBackend: BackendT + 'static, PBackend::State: StateBackend, { + polkadot_full_node.client.execute_with(StartConsensus { + para_id, + announce_block: announce_block.clone(), + client: client.clone(), + task_manager, + _phantom: PhantomData, + })?; + polkadot_full_node .client .execute_with(StartCollator { proposer_factory, inherent_data_providers, backend, - client, announce_block, overseer_handler: polkadot_full_node .overseer_handler @@ -124,13 +145,12 @@ where Ok(()) } -struct StartCollator { +struct StartCollator { proposer_factory: PF, inherent_data_providers: InherentDataProviders, backend: Arc, block_import: BI, block_status: Arc, - client: Arc, announce_block: Arc) + Send + Sync>, overseer_handler: OverseerHandler, spawner: Spawner, @@ -139,8 +159,8 @@ struct StartCollator, } -impl polkadot_service::ExecuteWithClient - for StartCollator +impl polkadot_service::ExecuteWithClient + for StartCollator where Block: BlockT, PF: Environment + Send + 'static, @@ -152,14 +172,6 @@ where + Sync + 'static, BS: BlockBackend + Send + Sync + 'static, - Client: Finalizer - + UsageProvider - + HeaderBackend - + Send - + Sync - + BlockBackend - + 'static, - for<'b> &'b Client: BlockImport, Backend: BackendT + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static, PBackend2: sc_client_api::Backend + 'static, @@ -182,7 +194,6 @@ where backend: self.backend, block_import: self.block_import, block_status: self.block_status, - client: self.client, announce_block: self.announce_block, overseer_handler: self.overseer_handler, spawner: self.spawner, @@ -227,12 +238,13 @@ where + Send + Sync + BlockBackend + + BlockchainEvents + 'static, for<'a> &'a Client: BlockImport, Backend: BackendT + 'static, PClient: ClientHandle, { - polkadot_full_node.client.execute_with(StartFullNode { + polkadot_full_node.client.execute_with(StartConsensus { announce_block, para_id, client, @@ -245,7 +257,7 @@ where Ok(()) } -struct StartFullNode<'a, Block: BlockT, Client, Backend> { +struct StartConsensus<'a, Block: BlockT, Client, Backend> { para_id: ParaId, announce_block: Arc) + Send + Sync>, client: Arc, @@ -254,7 +266,7 @@ struct StartFullNode<'a, Block: BlockT, Client, Backend> { } impl<'a, Block, Client, Backend> polkadot_service::ExecuteWithClient - for StartFullNode<'a, Block, Client, Backend> + for StartConsensus<'a, Block, Client, Backend> where Block: BlockT, Client: Finalizer @@ -262,6 +274,7 @@ where + Send + Sync + BlockBackend + + BlockchainEvents + 'static, for<'b> &'b Client: BlockImport, Backend: BackendT + 'static, @@ -276,15 +289,25 @@ where Api: RuntimeApiCollection, PClient: AbstractClient + 'static, { - let future = cumulus_consensus::follow_polkadot( + let consensus = cumulus_consensus::run_parachain_consensus( self.para_id, self.client, client, self.announce_block, - )?; - self.task_manager - .spawn_essential_handle() - .spawn("cumulus-consensus", future); + ); + + self.task_manager.spawn_essential_handle().spawn( + "cumulus-consensus", + consensus.then(|r| async move { + if let Err(e) = r { + tracing::error!( + target: "cumulus-service", + error = %e, + "Parachain consensus failed.", + ) + } + }), + ); Ok(()) } diff --git a/test/client/src/block_builder.rs b/test/client/src/block_builder.rs index a2ff162afab..a25fa620cb7 100644 --- a/test/client/src/block_builder.rs +++ b/test/client/src/block_builder.rs @@ -14,52 +14,86 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use crate::Client; -use cumulus_primitives::{inherents::{VALIDATION_DATA_IDENTIFIER, ValidationDataType}, ValidationData}; -use cumulus_test_runtime::GetLastTimestamp; +use crate::{Backend, Client}; +use cumulus_primitives::{ + inherents::{ValidationDataType, VALIDATION_DATA_IDENTIFIER}, + ValidationData, +}; +use cumulus_test_runtime::{Block, GetLastTimestamp}; use polkadot_primitives::v1::BlockNumber as PBlockNumber; -use sc_block_builder::BlockBuilderApi; +use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; use sp_api::ProvideRuntimeApi; -use sp_blockchain::HeaderBackend; -use sp_core::ExecutionContext; use sp_runtime::generic::BlockId; -/// Generate the inherents required by the test runtime. -/// -/// - `validation_data`: The [`ValidationData`] that will be passed as inherent -/// data into the runtime when building the inherents. If -/// `None` is passed, the default value will be used. -pub fn generate_block_inherents( - client: &Client, - validation_data: Option>, -) -> Vec { - let mut inherent_data = sp_inherents::InherentData::new(); - let block_id = BlockId::Hash(client.info().best_hash); - let last_timestamp = client - .runtime_api() - .get_last_timestamp(&block_id) - .expect("Get last timestamp"); - let timestamp = last_timestamp + cumulus_test_runtime::MinimumPeriod::get(); +/// An extension for the Cumulus test client to init a block builder. +pub trait InitBlockBuilder { + /// Init a specific block builder that works for the test runtime. + /// + /// This will automatically create and push the inherents for you to make the block + /// valid for the test runtime. + fn init_block_builder( + &self, + validation_data: Option>, + ) -> sc_block_builder::BlockBuilder; - inherent_data - .put_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp) - .expect("Put timestamp failed"); - inherent_data - .put_data( - VALIDATION_DATA_IDENTIFIER, - &ValidationDataType { - validation_data: validation_data.unwrap_or_default(), - relay_chain_state: sp_state_machine::StorageProof::empty(), - }, - ) - .expect("Put validation function params failed"); + /// Init a specific block builder at a specific block that works for the test runtime. + /// + /// Same as [`InitBlockBuilder::init_block_builder`] besides that it takes a + /// [`BlockId`] to say which should be the parent block of the block that is being build. + fn init_block_builder_at( + &self, + at: &BlockId, + validation_data: Option>, + ) -> sc_block_builder::BlockBuilder; +} + +impl InitBlockBuilder for Client { + fn init_block_builder( + &self, + validation_data: Option>, + ) -> BlockBuilder { + let chain_info = self.chain_info(); + self.init_block_builder_at(&BlockId::Hash(chain_info.best_hash), validation_data) + } + + fn init_block_builder_at( + &self, + at: &BlockId, + validation_data: Option>, + ) -> BlockBuilder { + let mut block_builder = self + .new_block_at(at, Default::default(), true) + .expect("Creates new block builder for test runtime"); + + let mut inherent_data = sp_inherents::InherentData::new(); + let last_timestamp = self + .runtime_api() + .get_last_timestamp(&at) + .expect("Get last timestamp"); + + let timestamp = last_timestamp + cumulus_test_runtime::MinimumPeriod::get(); + + inherent_data + .put_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp) + .expect("Put timestamp failed"); + inherent_data + .put_data( + VALIDATION_DATA_IDENTIFIER, + &ValidationDataType { + validation_data: validation_data.unwrap_or_default(), + relay_chain_state: sp_state_machine::StorageProof::empty(), + }, + ) + .expect("Put validation function params failed"); + + let inherents = block_builder + .create_inherents(inherent_data) + .expect("Creates inherents"); + + inherents + .into_iter() + .for_each(|ext| block_builder.push(ext).expect("Pushes inherent")); - client - .runtime_api() - .inherent_extrinsics_with_context( - &BlockId::number(0), - ExecutionContext::BlockConstruction, - inherent_data, - ) - .expect("Get inherents failed") + block_builder + } } diff --git a/xcm-handler/src/lib.rs b/xcm-handler/src/lib.rs index 0295ca365b7..3b679df9c29 100644 --- a/xcm-handler/src/lib.rs +++ b/xcm-handler/src/lib.rs @@ -22,20 +22,17 @@ #![cfg_attr(not(feature = "std"), no_std)] -use codec::{Encode, Decode}; -use sp_std::convert::{TryFrom, TryInto}; -use frame_support::{ - decl_module, decl_event, decl_error, - sp_runtime::traits::Hash, -}; -use frame_system::ensure_root; +use codec::{Decode, Encode}; use cumulus_primitives::{ - ParaId, InboundHrmpMessage, InboundDownwardMessage, OutboundHrmpMessage, - DownwardMessageHandler, HrmpMessageHandler, UpwardMessageSender, HrmpMessageSender, + DownwardMessageHandler, HrmpMessageHandler, HrmpMessageSender, InboundDownwardMessage, + InboundHrmpMessage, OutboundHrmpMessage, ParaId, UpwardMessageSender, }; +use frame_support::{decl_error, decl_event, decl_module, sp_runtime::traits::Hash}; +use frame_system::ensure_root; +use sp_std::convert::{TryFrom, TryInto}; use xcm::{ + v0::{Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Xcm}, VersionedXcm, - v0::{Xcm, MultiLocation, Error as XcmError, Junction, SendXcm, ExecuteXcm} }; pub trait Config: frame_system::Config { @@ -125,7 +122,10 @@ impl HrmpMessageHandler for Module { frame_support::debug::print!("Processing HRMP XCM: {:?}", &hash); match VersionedXcm::decode(&mut &msg.data[..]).map(Xcm::try_from) { Ok(Ok(xcm)) => { - match T::XcmExecutor::execute_xcm(Junction::Parachain { id: sender.into() }.into(), xcm) { + match T::XcmExecutor::execute_xcm( + Junction::Parachain { id: sender.into() }.into(), + xcm, + ) { Ok(..) => RawEvent::Success(hash), Err(e) => RawEvent::Fail(hash, e), }; @@ -151,7 +151,8 @@ impl SendXcm for Module { let data = msg.encode(); let hash = T::Hashing::hash(&data); - T::UpwardMessageSender::send_upward_message(data).map_err(|_| XcmError::Undefined)?; + T::UpwardMessageSender::send_upward_message(data) + .map_err(|_| XcmError::Undefined)?; Self::deposit_event(RawEvent::UpwardMessageSent(hash)); Ok(()) @@ -165,7 +166,8 @@ impl SendXcm for Module { data, }; // TODO: Better error here - T::HrmpMessageSender::send_hrmp_message(message).map_err(|_| XcmError::Undefined)?; + T::HrmpMessageSender::send_hrmp_message(message) + .map_err(|_| XcmError::Undefined)?; Self::deposit_event(RawEvent::HrmpMessageSent(hash)); Ok(()) }