Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
838d729
Initial commit
cecton Apr 7, 2020
34c80e0
Change substrate and polkadot branch to cecton-cumulus-branch
cecton Apr 7, 2020
94571a5
WIP
cecton Apr 7, 2020
730390a
WIP
cecton Apr 7, 2020
73543b2
WIP
cecton Apr 7, 2020
fd70177
WIP
cecton Apr 7, 2020
911645f
WIP
cecton Apr 8, 2020
132bfea
WIP
cecton Apr 8, 2020
df1e696
WIP
cecton Apr 8, 2020
45db8aa
WIP
cecton Apr 8, 2020
b173102
WIP
cecton Apr 8, 2020
bb9e23b
WIP
cecton Apr 8, 2020
e16af0a
WIP
cecton Apr 8, 2020
5880ab1
update .editorconfig
cecton Apr 8, 2020
5fbdbc0
should probably be --dev
cecton Apr 8, 2020
17f36db
formatting
cecton Apr 8, 2020
c7e6723
Change substrate & polkadot branch to cecton-keep-unpinned-para-blocks
cecton Apr 8, 2020
3c3b103
WIP
cecton Apr 8, 2020
dadf70c
WIP
cecton Apr 8, 2020
2de11f8
WIP
cecton Apr 8, 2020
04b57c7
WIP
cecton Apr 9, 2020
8af9a33
WIP
cecton Apr 9, 2020
06a2b6d
WIP
cecton Apr 9, 2020
152f041
Update polkadot & substrate branches
cecton Apr 9, 2020
1fde472
WIP
cecton Apr 9, 2020
2711ef0
Merge branch 'cecton-cumulus-branch' into cecton-keep-unpinned-para-b…
cecton Apr 9, 2020
786dbaf
Some fixes
bkchr Apr 10, 2020
c455b98
Updated polkadot
cecton Apr 10, 2020
f4ed303
Merge branch 'cecton-cumulus-branch' into cecton-keep-unpinned-para-b…
cecton Apr 10, 2020
d856406
WIP
cecton Apr 15, 2020
74a291e
WIP
cecton Apr 20, 2020
7b90a9d
Merge commit 7431075d0131d9df25ea26f11ccc86e8c080cb5d (conflicts)
cecton Apr 21, 2020
280ad92
Merge commit b9793bc2f69ade2c7e2780d88c5dbedfc6adb7d0 (no conflict)
cecton Apr 21, 2020
3fc94f9
Cargo.lock
cecton Apr 21, 2020
402da3c
WIP
cecton Apr 21, 2020
78e7614
WIP
cecton Apr 22, 2020
dc50e02
Update substrate & polkadot branches
cecton Apr 22, 2020
67c7003
Clean-up
cecton Apr 22, 2020
d08f452
WIP
cecton Apr 22, 2020
7961e9a
WIP
cecton Apr 22, 2020
124dbc3
WIP
cecton Apr 22, 2020
ff626ac
WIP
cecton Apr 24, 2020
562caab
WIP
cecton Apr 27, 2020
33e2efe
Use JustifiedBlockAnnounceValidator
cecton Apr 28, 2020
2404121
debug
cecton Apr 28, 2020
63613cc
Revert "debug"
cecton Apr 28, 2020
2c914c7
Revert "Use JustifiedBlockAnnounceValidator"
cecton Apr 28, 2020
2febd33
Revert branch to cumulus-branch
cecton Apr 28, 2020
9264807
clean-up
cecton Apr 28, 2020
94e0455
clean-up
cecton Apr 28, 2020
a6b4fec
cleanup
cecton Apr 28, 2020
b2957a1
cleanup
cecton Apr 28, 2020
377ebf1
Update branches
cecton Apr 29, 2020
be519b2
debug
cecton Apr 29, 2020
0c82c85
Cancel previous task when new one is created
cecton Apr 30, 2020
5c22c2d
Remove stream-cancel
cecton Apr 30, 2020
c3c1ad5
Clean-up
cecton Apr 30, 2020
b968c26
cleanup
cecton Apr 30, 2020
5deb2ad
cleanup
cecton Apr 30, 2020
c744ebd
Merge commit cd7e06c6439f6b9022f0b52e159e20b01d8542a1 (conflicts)
cecton May 5, 2020
9bdcaa7
Merge commit aaee2b410e6f4159552ad32cb9ee3813c969d8b4 (no conflict)
cecton May 5, 2020
54e2632
Fix merge issue
cecton May 5, 2020
aa15d45
Update polkadot_chainspec.json
cecton May 5, 2020
d3dfe3b
Update network/src/lib.rs
cecton May 5, 2020
995b907
Update network/src/lib.rs
cecton May 5, 2020
497cecf
Use closure instead of NetworkService to announce_block
cecton May 5, 2020
616d0bc
doc
cecton May 5, 2020
50a7c16
WIP
cecton May 5, 2020
cbdcfff
Move disable announcement to its own function with doc
cecton May 6, 2020
c097c63
Merge commit bd14f0ed0c42198354295a73a39daf32a60cb26f (no conflict)
cecton May 6, 2020
19cab74
Merge commit 45a7fe2a94be55a7056f4e14b1d443fb3cecc08b (conflicts)
cecton May 6, 2020
fffde6b
Change substrate and polkadot branch to cumulus-master
cecton May 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ polkadot-validation = { git = "https://github.com/paritytech/polkadot", branch =

# Cumulus dependencies
cumulus-consensus = { path = "../consensus" }
cumulus-runtime = { path = "../runtime" }
cumulus-network = { path = "../network" }
cumulus-primitives = { path = "../primitives" }
cumulus-runtime = { path = "../runtime" }

# Other dependencies
log = "0.4.8"
Expand Down
61 changes: 52 additions & 9 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Cumulus Collator implementation for Substrate.

use cumulus_network::WaitToAnnounce;
use cumulus_primitives::{
inherents::VALIDATION_FUNCTION_PARAMS_IDENTIFIER as VFP_IDENT,
validation_function_params::ValidationFunctionParams,
Expand All @@ -30,6 +31,7 @@ use sp_inherents::{InherentData, InherentDataProviders};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HashFor};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sc_client_api::{StateBackend, UsageProvider, Finalizer, BlockchainEvents};
use sc_service::Configuration;

use polkadot_collator::{
BuildParachainContext, InvalidHead, Network as CollatorNetwork, ParachainContext,
Expand All @@ -44,7 +46,8 @@ use codec::{Decode, Encode};

use log::{error, trace};

use futures::{task::Spawn, Future, future, FutureExt};
use futures::task::Spawn;
use futures::prelude::*;

use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration, pin::Pin};

Expand All @@ -57,28 +60,39 @@ struct HeadData<Block: BlockT> {
}

/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block, PF, BI> {
pub struct Collator<Block: BlockT, PF, BI> {
proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<Block>,
inherent_data_providers: InherentDataProviders,
collator_network: Arc<dyn CollatorNetwork>,
block_import: Arc<Mutex<BI>>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
}

impl<Block, PF, BI> Collator<Block, PF, BI> {
impl<Block: BlockT, PF, BI> Collator<Block, PF, BI> {
/// Create a new instance.
fn new(
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
collator_network: impl CollatorNetwork + Clone + 'static,
block_import: BI,
spawner: Arc<dyn Spawn + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> Self {
let collator_network = Arc::new(collator_network);
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(
spawner,
announce_block,
collator_network.clone(),
)));

Self {
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers,
_phantom: PhantomData,
collator_network: Arc::new(collator_network),
collator_network,
block_import: Arc::new(Mutex::new(block_import)),
wait_to_announce,
}
}

Expand Down Expand Up @@ -115,14 +129,15 @@ impl<Block, PF, BI> Collator<Block, PF, BI> {
}
}

impl<Block, PF, BI> Clone for Collator<Block, PF, BI> {
impl<Block: BlockT, PF, BI> Clone for Collator<Block, PF, BI> {
fn clone(&self) -> Self {
Self {
proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(),
_phantom: PhantomData,
collator_network: self.collator_network.clone(),
block_import: self.block_import.clone(),
wait_to_announce: self.wait_to_announce.clone(),
}
}
}
Expand All @@ -147,7 +162,7 @@ where

fn produce_candidate(
&mut self,
_relay_chain_parent: PHash,
relay_chain_parent: PHash,
global_validation: GlobalValidationSchedule,
local_validation: LocalValidationData,
) -> Self::ProduceCandidate {
Expand All @@ -169,6 +184,8 @@ where
.lock()
.init(&last_head.header);

let wait_to_announce = self.wait_to_announce.clone();

Box::pin(async move {
let parent_state_root = *last_head.header.state_root();

Expand Down Expand Up @@ -245,15 +262,24 @@ where
}

let block_data = BlockData(b.encode());
let header = b.into_header();
let encoded_header = header.encode();
let hash = header.hash();
let head_data = HeadData::<Block> {
header: b.into_header(),
header,
};

let candidate = (
block_data,
parachain::HeadData(head_data.encode()),
);

wait_to_announce.lock().wait_to_announce(
hash,
relay_chain_parent,
encoded_header,
);

trace!(target: "cumulus-collator", "Produced candidate: {:?}", candidate);

Ok(candidate)
Expand All @@ -268,6 +294,7 @@ pub struct CollatorBuilder<Block: BlockT, PF, BI, Backend, Client> {
block_import: BI,
para_id: ParaId,
client: Arc<Client>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
_marker: PhantomData<(Block, Backend)>,
}

Expand All @@ -281,13 +308,15 @@ impl<Block: BlockT, PF, BI, Backend, Client>
block_import: BI,
para_id: ParaId,
client: Arc<Client>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
) -> Self {
Self {
proposer_factory,
inherent_data_providers,
block_import,
para_id,
client,
announce_block,
_marker: PhantomData,
}
}
Expand All @@ -313,7 +342,7 @@ where
self,
polkadot_client: Arc<PClient>,
spawner: Spawner,
network: impl CollatorNetwork + Clone + 'static,
polkadot_network: impl CollatorNetwork + Clone + 'static,
) -> Result<Self::ParachainContext, ()>
where
PClient: ProvideRuntimeApi<PBlock> + Send + Sync + BlockchainEvents<PBlock> + 'static,
Expand Down Expand Up @@ -342,12 +371,24 @@ where
Ok(Collator::new(
self.proposer_factory,
self.inherent_data_providers,
network,
polkadot_network,
self.block_import,
Arc::new(spawner),
self.announce_block,
))
}
}

/// Prepare the collator's node condifugration
///
/// This function will disable the default announcement of Substrate for the parachain in favor
/// of the one of Cumulus.
pub fn prepare_collator_config(mut parachain_config: Configuration) -> Configuration {
parachain_config.announce_block = false;

parachain_config
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -460,13 +501,15 @@ mod tests {
let id = ParaId::from(100);
let _ = env_logger::try_init();
let spawner = futures::executor::ThreadPool::new().unwrap();
let announce_block = |_, _| ();

let builder = CollatorBuilder::new(
DummyFactory,
InherentDataProviders::default(),
TestClientBuilder::new().build(),
id,
Arc::new(TestClientBuilder::new().build()),
Arc::new(announce_block),
);
let context = builder
.build(
Expand Down
3 changes: 3 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "cumulu
sp-api = { git = "https://github.com/paritytech/substrate", branch = "cumulus-branch" }

# polkadot deps
polkadot-collator = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" }
polkadot-statement-table = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" }
polkadot-validation = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" }
polkadot-network = { git = "https://github.com/paritytech/polkadot", branch = "cumulus-branch" }

# other deps
codec = { package = "parity-scale-codec", version = "1.3.0", features = [ "derive" ] }
futures = { version = "0.3.1", features = ["compat"] }
log = "0.4.8"
118 changes: 117 additions & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ use sp_blockchain::Error as ClientError;
use sp_consensus::block_validation::{BlockAnnounceValidator, Validation};
use sp_runtime::{generic::BlockId, traits::Block as BlockT};

use polkadot_collator::Network as CollatorNetwork;
use polkadot_network::legacy::gossip::{GossipMessage, GossipStatement};
use polkadot_primitives::{
parachain::{ParachainHost, ValidatorId},
Block as PBlock,
Block as PBlock, Hash as PHash,
};
use polkadot_statement_table::{SignedStatement, Statement};
use polkadot_validation::check_statement;

use codec::{Decode, Encode};
use futures::{pin_mut, select, StreamExt};
use futures::channel::oneshot;
use futures::future::FutureExt;
use futures::task::Spawn;
use log::{error, trace};

use std::{marker::PhantomData, sync::Arc};

Expand Down Expand Up @@ -141,3 +147,113 @@ where
Ok(Validation::Success)
}
}

/// Wait before announcing a block that a candidate message has been received for this block, then
/// add this message as justification for the block announcement.
///
/// This object will spawn a new task every time the method `wait_to_announce` is called and cancel
/// the previous task running.
pub struct WaitToAnnounce<Block: BlockT> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some docs would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I was waiting for you to validate the implementation first to be sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

spawner: Arc<dyn Spawn + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
collator_network: Arc<dyn CollatorNetwork>,
current_trigger: oneshot::Sender<()>,
}

impl<Block: BlockT> WaitToAnnounce<Block> {
/// Create the `WaitToAnnounce` object
pub fn new(
spawner: Arc<dyn Spawn + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
collator_network: Arc<dyn CollatorNetwork>,
) -> WaitToAnnounce<Block> {
let (tx, _rx) = oneshot::channel();

WaitToAnnounce {
spawner,
announce_block,
collator_network,
current_trigger: tx,
}
}

/// Wait for a candidate message for the block, then announce the block. The candidate
/// message will be added as justification to the block announcement.
pub fn wait_to_announce(
&mut self,
hash: <Block as BlockT>::Hash,
relay_chain_leaf: PHash,
head_data: Vec<u8>,
) {
let (tx, rx) = oneshot::channel();
let announce_block = self.announce_block.clone();
let collator_network = self.collator_network.clone();

self.current_trigger = tx;

if let Err(err) = self.spawner.spawn_obj(Box::pin(async move {
let t1 = wait_to_announce::<Block>(
hash,
relay_chain_leaf,
announce_block,
collator_network,
&head_data,
).fuse();
let t2 = rx.fuse();

pin_mut!(t1, t2);

trace!(
target: "cumulus-network",
"waiting for announce block in a background task...",
);

select! {
_ = t1 => {
trace!(
target: "cumulus-network",
"block announcement finished",
);
},
_ = t2 => {
trace!(
target: "cumulus-network",
"previous task that waits for announce block has been canceled",
);
}
}
}).into()) {
error!(
target: "cumulus-network",
"Could not spawn a new task to wait for the announce block: {:?}",
err,
);
}
}
}

async fn wait_to_announce<Block: BlockT>(
hash: <Block as BlockT>::Hash,
relay_chain_leaf: PHash,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
collator_network: Arc<dyn CollatorNetwork>,
head_data: &Vec<u8>,
) {
let mut checked_statements = collator_network.checked_statements(relay_chain_leaf);

while let Some(statement) = checked_statements.next().await {
match &statement.statement {
Statement::Candidate(c) if &c.head_data.0 == head_data => {
let gossip_message: GossipMessage = GossipStatement {
relay_chain_leaf,
signed_statement: statement,
}.into();

announce_block(hash, gossip_message.encode());

break;
},
_ => {},
}
}
}
Loading