diff --git a/Cargo.lock b/Cargo.lock index af133aaa8b5..7ae68589e5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -933,6 +933,7 @@ dependencies = [ "futures 0.3.4", "log 0.4.8", "parity-scale-codec", + "parking_lot 0.10.2", "polkadot-collator", "polkadot-network", "polkadot-primitives", @@ -1028,6 +1029,7 @@ dependencies = [ "async-std", "cumulus-collator", "cumulus-consensus", + "cumulus-network", "cumulus-test-parachain-runtime", "derive_more 0.15.0", "exit-future 0.1.4", diff --git a/collator/src/lib.rs b/collator/src/lib.rs index c5fa6a7de2a..e9f5d87b32f 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -16,7 +16,9 @@ //! Cumulus Collator implementation for Substrate. -use cumulus_network::WaitToAnnounce; +use cumulus_network::{ + DelayedBlockAnnounceValidator, JustifiedBlockAnnounceValidator, WaitToAnnounce, +}; use cumulus_primitives::{ inherents::VALIDATION_FUNCTION_PARAMS_IDENTIFIER as VFP_IDENT, validation_function_params::ValidationFunctionParams, @@ -295,6 +297,7 @@ pub struct CollatorBuilder { para_id: ParaId, client: Arc, announce_block: Arc) + Send + Sync>, + delayed_block_announce_validator: DelayedBlockAnnounceValidator, _marker: PhantomData<(Block, Backend)>, } @@ -309,6 +312,7 @@ impl para_id: ParaId, client: Arc, announce_block: Arc) + Send + Sync>, + delayed_block_announce_validator: DelayedBlockAnnounceValidator, ) -> Self { Self { proposer_factory, @@ -317,6 +321,7 @@ impl para_id, client, announce_block, + delayed_block_announce_validator, _marker: PhantomData, } } @@ -351,6 +356,10 @@ where Spawner: Spawn + Clone + Send + Sync + 'static, Extrinsic: codec::Codec + Send + Sync + 'static, { + self.delayed_block_announce_validator.set( + Box::new(JustifiedBlockAnnounceValidator::new(Vec::new(), polkadot_client.clone())), + ); + let follow = match cumulus_consensus::follow_polkadot(self.para_id, self.client, polkadot_client) { Ok(follow) => follow, @@ -502,6 +511,7 @@ mod tests { let _ = env_logger::try_init(); let spawner = futures::executor::ThreadPool::new().unwrap(); let announce_block = |_, _| (); + let block_announce_validator = DelayedBlockAnnounceValidator::new(); let builder = CollatorBuilder::new( DummyFactory, @@ -510,6 +520,7 @@ mod tests { id, Arc::new(TestClientBuilder::new().build()), Arc::new(announce_block), + block_announce_validator, ); let context = builder .build( diff --git a/network/Cargo.toml b/network/Cargo.toml index 9f8920e0695..0465fcd1d77 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -23,3 +23,4 @@ polkadot-network = { git = "https://github.com/paritytech/polkadot", branch = "c codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "derive" ] } futures = { version = "0.3.1", features = ["compat"] } log = "0.4.8" +parking_lot = "0.10.2" diff --git a/network/src/lib.rs b/network/src/lib.rs index 3229ea8fcba..c000525a37b 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -40,6 +40,7 @@ use futures::task::Spawn; use log::{error, trace}; use std::{marker::PhantomData, sync::Arc}; +use parking_lot::Mutex; /// Validate that data is a valid justification from a relay-chain validator that the block is a /// valid parachain-block candidate. @@ -148,6 +149,38 @@ where } } +/// A `BlockAnnounceValidator` that will be able to validate data when its internal +/// `BlockAnnounceValidator` is set. +pub struct DelayedBlockAnnounceValidator(Arc + Send>>>>); + +impl DelayedBlockAnnounceValidator { + pub fn new() -> DelayedBlockAnnounceValidator { + DelayedBlockAnnounceValidator(Arc::new(Mutex::new(None))) + } + + pub fn set(&self, validator: Box + Send>) { + *self.0.lock() = Some(validator); + } +} + +impl Clone for DelayedBlockAnnounceValidator { + fn clone(&self) -> DelayedBlockAnnounceValidator { + DelayedBlockAnnounceValidator(self.0.clone()) + } +} + +impl BlockAnnounceValidator for DelayedBlockAnnounceValidator { + fn validate( + &mut self, + header: &B::Header, + data: &[u8], + ) -> Result> { + self.0.lock().as_mut() + .expect("BlockAnnounceValidator is set before validating the first announcement; qed") + .validate(header, data) + } +} + /// Wait before announcing a block that a candidate message has been received for this block, then /// add this message as justification for the block announcement. /// diff --git a/test/parachain/Cargo.toml b/test/parachain/Cargo.toml index 48a0bb70515..64bf80f61cb 100644 --- a/test/parachain/Cargo.toml +++ b/test/parachain/Cargo.toml @@ -42,6 +42,7 @@ sc-finality-grandpa = { git = "https://github.com/paritytech/substrate", branch # Cumulus dependencies cumulus-consensus = { path = "../../consensus" } cumulus-collator = { path = "../../collator" } +cumulus-network = { path = "../../network" } # Polkadot dependencies polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" } diff --git a/test/parachain/src/service.rs b/test/parachain/src/service.rs index 81627d24f14..8fe361f7a6f 100644 --- a/test/parachain/src/service.rs +++ b/test/parachain/src/service.rs @@ -22,6 +22,7 @@ use polkadot_primitives::parachain::CollatorPair; use cumulus_collator::{CollatorBuilder, prepare_collator_config}; use futures::FutureExt; pub use sc_executor::NativeExecutor; +use cumulus_network::DelayedBlockAnnounceValidator; // Our native executor instance. native_executor_instance!( @@ -78,12 +79,17 @@ pub fn run_collator( .register_provider(sp_timestamp::InherentDataProvider) .unwrap(); + let block_announce_validator = DelayedBlockAnnounceValidator::new(); + let block_announce_validator_copy = block_announce_validator.clone(); let service = builder .with_finality_proof_provider(|client, backend| { // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider let provider = client as Arc>; Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _) })? + .with_block_announce_validator(|_client| { + Box::new(block_announce_validator_copy) + })? .build()?; let proposer_factory = sc_basic_authorship::ProposerFactory::new( @@ -104,6 +110,7 @@ pub fn run_collator( crate::PARA_ID, client, announce_block, + block_announce_validator, ); let polkadot_future = polkadot_collator::start_collator( diff --git a/test/parachain/tests/integration_test.rs b/test/parachain/tests/integration_test.rs index 85595c82f22..9aad67bee54 100644 --- a/test/parachain/tests/integration_test.rs +++ b/test/parachain/tests/integration_test.rs @@ -21,6 +21,7 @@ use assert_cmd::cargo::cargo_bin; use async_std::{net, task::sleep}; use codec::Encode; use futures::{future::FutureExt, join, pin_mut, select}; +use jsonrpsee::{raw::RawClient, transport::http::HttpTransportClient}; use polkadot_primitives::parachain::{Info, Scheduling}; use polkadot_primitives::Hash as PHash; use polkadot_runtime::{Header, OnlyStakingAndClaims, Runtime, SignedExtra, SignedPayload}; @@ -144,6 +145,25 @@ async fn wait_for_tcp(address: A) { } } +/// wait for parachain blocks to be produced +async fn wait_for_blocks(number_of_blocks: usize, mut client: &mut RawClient) { + let mut previous_blocks = HashSet::with_capacity(number_of_blocks); + + loop { + let current_block_hash = Chain::block_hash(&mut client, None).await.unwrap().unwrap(); + + if previous_blocks.insert(current_block_hash) { + eprintln!("new parachain block: {}", current_block_hash); + + if previous_blocks.len() == number_of_blocks { + break; + } + } + + sleep(Duration::from_secs(2)).await; + } +} + #[async_std::test] #[ignore] async fn integration_test() { @@ -310,13 +330,13 @@ async fn integration_test() { .await .unwrap(); - // run cumulus - let cumulus_dir = tempdir().unwrap(); - let mut cumulus = Command::new(cargo_bin("cumulus-test-parachain-collator")) + // run cumulus charlie + let cumulus_charlie_dir = tempdir().unwrap(); + let mut cumulus_charlie = Command::new(cargo_bin("cumulus-test-parachain-collator")) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .arg("--base-path") - .arg(cumulus_dir.path()) + .arg(cumulus_charlie_dir.path()) .arg("--unsafe-rpc-expose") .arg("--rpc-port=27017") .arg("--port=27117") @@ -329,35 +349,51 @@ async fn integration_test() { "--bootnodes=/ip4/127.0.0.1/tcp/27116/p2p/{}", polkadot_bob_id )) + .arg("--charlie") .spawn() .unwrap(); - let cumulus_helper = ChildHelper::new("cumulus", &mut cumulus); + let cumulus_charlie_helper = ChildHelper::new("cumulus-charlie", &mut cumulus_charlie); wait_for_tcp("127.0.0.1:27017").await; // connect rpc client to cumulus - let transport_client_cumulus = + let transport_client_cumulus_charlie = jsonrpsee::transport::http::HttpTransportClient::new("http://127.0.0.1:27017"); - let mut client_cumulus = jsonrpsee::raw::RawClient::new(transport_client_cumulus); + let mut client_cumulus_charlie = + jsonrpsee::raw::RawClient::new(transport_client_cumulus_charlie); - // wait for parachain blocks to be produced - let number_of_blocks = 4; - let mut previous_blocks = HashSet::with_capacity(number_of_blocks); - loop { - let current_block_hash = Chain::block_hash(&mut client_cumulus, None) - .await - .unwrap() - .unwrap(); + wait_for_blocks(4, &mut client_cumulus_charlie).await; - if previous_blocks.insert(current_block_hash) { - eprintln!("new parachain block: {}", current_block_hash); + // run cumulus dave + let cumulus_dave_dir = tempdir().unwrap(); + let mut cumulus_dave = Command::new(cargo_bin("cumulus-test-parachain-collator")) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .arg("--base-path") + .arg(cumulus_dave_dir.path()) + .arg("--unsafe-rpc-expose") + .arg("--rpc-port=27018") + .arg("--port=27118") + .arg("--") + .arg(format!( + "--bootnodes=/ip4/127.0.0.1/tcp/27115/p2p/{}", + polkadot_alice_id + )) + .arg(format!( + "--bootnodes=/ip4/127.0.0.1/tcp/27116/p2p/{}", + polkadot_bob_id + )) + .arg("--dave") + .spawn() + .unwrap(); + let cumulus_dave_helper = ChildHelper::new("cumulus-dave", &mut cumulus_dave); + wait_for_tcp("127.0.0.1:27018").await; - if previous_blocks.len() == number_of_blocks { - break; - } - } + // connect rpc client to cumulus + let transport_client_cumulus_dave = + jsonrpsee::transport::http::HttpTransportClient::new("http://127.0.0.1:27018"); + let mut client_cumulus_dave = jsonrpsee::raw::RawClient::new(transport_client_cumulus_dave); - sleep(Duration::from_secs(2)).await; - } + wait_for_blocks(4, &mut client_cumulus_dave).await; } .fuse();