Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Prev Previous commit
Next Next commit
Implement block import
  • Loading branch information
bkchr committed Oct 22, 2019
commit d3ed1df0ea840afc46ef472423eb997dae8f6fc1
88 changes: 73 additions & 15 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
use cumulus_runtime::ParachainBlockData;

use sr_primitives::traits::{Block as BlockT, Header as HeaderT};
use consensus_common::{Environment, Proposer};
use consensus_common::{
BlockImport, Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin,
Error as ConsensusError,
};
use inherents::InherentDataProviders;
use substrate_primitives::Blake2Hasher;

Expand Down Expand Up @@ -53,44 +56,49 @@ struct HeadData<Block: BlockT> {
}

/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block, PF> {
pub struct Collator<Block, PF, BI> {
proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<Block>,
inherent_data_providers: InherentDataProviders,
collator_network: Arc<dyn CollatorNetwork>,
block_import: Arc<Mutex<BI>>,
}

impl<Block: BlockT, PF: Environment<Block>> Collator<Block, PF> {
impl<Block, PF, BI> Collator<Block, PF, BI> {
/// Create a new instance.
fn new(
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
collator_network: Arc<dyn CollatorNetwork>,
block_import: BI,
) -> Self {
Self {
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers,
_phantom: PhantomData,
collator_network,
block_import: Arc::new(Mutex::new(block_import)),
}
}
}

impl<Block, PF> Clone for Collator<Block, PF> {
impl<Block, PF, BI> Clone for Collator<Block, PF, BI> {
fn clone(&self) -> Self {
Self {
proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(),
_phantom: PhantomData,
collator_network: self.collator_network.clone(),
block_import: self.block_import.clone(),
}
}
}

impl<Block, PF> ParachainContext for Collator<Block, PF>
impl<Block, PF, BI> ParachainContext for Collator<Block, PF, BI>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
BI: BlockImport<Block, Error=ConsensusError> + Send + Sync + 'static,
{
type ProduceCandidate = Box<
dyn Future<Item=(BlockData, parachain::HeadData, OutgoingMessages), Error=InvalidHead>
Expand All @@ -107,24 +115,35 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>

let factory = self.proposer_factory.clone();
let inherent_providers = self.inherent_data_providers.clone();
let block_import = self.block_import.clone();

let res = HeadData::<Block>::decode(&mut &status.head_data.0[..])
.map_err(|_| InvalidHead)
.map_err(|e| {
error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e);
InvalidHead
})
.into_future()
.and_then(move |last_head| {
let parent_state_root = *last_head.header.state_root();

factory.lock()
.init(&last_head.header)
.map_err(|e| {
//TODO: Do we want to return the real error?
error!("Could not create proposer: {:?}", e);
error!(
target: "cumulus-collator",
"Could not create proposer: {:?}",
e,
);
InvalidHead
})
.and_then(|mut proposer| {
let inherent_data = inherent_providers.create_inherent_data()
.map_err(|e| {
error!("Failed to create inherent data: {:?}", e);
error!(
target: "cumulus-collator",
"Failed to create inherent data: {:?}",
e,
);
InvalidHead
})?;

Expand All @@ -136,7 +155,11 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
true,
)
.map_err(|e| {
error!("Proposing failed: {:?}", e);
error!(
target: "cumulus-collator",
"Proposing failed: {:?}",
e,
);
InvalidHead
})
.and_then(move |(block, proof)| {
Expand All @@ -155,7 +178,10 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
)
}
None => {
error!("Proposer did not return the requested proof.");
error!(
target: "cumulus-collator",
"Proposer did not return the requested proof.",
);
Err(InvalidHead)
}
};
Expand All @@ -168,6 +194,36 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
})
})
.flatten()
.and_then(move |b| {
let block_import_params = BlockImportParams {
origin: BlockOrigin::Own,
header: b.header().clone(),
justification: None,
post_digests: vec![],
body: Some(b.extrinsics().to_vec()),
finalized: false,
auxiliary: vec![], // block-weight is written in block import.
// TODO: block-import handles fork choice and this shouldn't even have the
// option to specify one.
// https://github.com/paritytech/substrate/issues/3623
fork_choice: ForkChoiceStrategy::LongestChain,
};

if let Err(err) = block_import.lock().import_block(
block_import_params,
Default::default(),
) {
error!(
target: "cumulus-collator",
"Error importing build block (at {:?}): {:?}",
b.header().parent_hash(),
err,
);
Err(InvalidHead)
} else {
Ok(b)
}
})
.map(|b| {
let block_data = BlockData(b.encode());
let head_data = HeadData::<Block> { header: b.into_header() };
Expand Down Expand Up @@ -201,7 +257,7 @@ impl<Block, SP> CollatorBuilder<Block, SP> {
}

impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for CollatorBuilder<Block, SP> {
type ParachainContext = Collator<Block, SP::ProposerFactory>;
type ParachainContext = Collator<Block, SP::ProposerFactory, SP::BlockImport>;

fn build<B, E>(
self,
Expand All @@ -213,25 +269,27 @@ impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for Collato
B: substrate_client::backend::Backend<PBlock, Blake2Hasher> + 'static,
E: substrate_client::CallExecutor<PBlock, Blake2Hasher> + Clone + Send + Sync + 'static
{
let (proposer_factory, inherent_data_providers) = self.setup_parachain
let (proposer_factory, block_import, inherent_data_providers) = self.setup_parachain
.setup_parachain(client, task_executor)
.map_err(|e| error!("Error setting up the parachain: {}", e))?;

Ok(Collator::new(proposer_factory, inherent_data_providers, network))
Ok(Collator::new(proposer_factory, inherent_data_providers, network, block_import))
}
}

/// Something that can setup a parachain.
pub trait SetupParachain<Block: BlockT>: Send {
/// The proposer factory of the parachain to build blocks.
type ProposerFactory: Environment<Block> + Send + 'static;
/// The block import for importing the blocks build by the collator.
type BlockImport: BlockImport<Block, Error=ConsensusError> + Send + Sync + 'static;

/// Setup the parachain.
fn setup_parachain<P: cumulus_consensus::PolkadotClient>(
self,
polkadot_client: P,
task_executor: TaskExecutor,
) -> Result<(Self::ProposerFactory, InherentDataProviders), String>;
) -> Result<(Self::ProposerFactory, Self::BlockImport, InherentDataProviders), String>;
}

/// Run a collator with the given proposer factory.
Expand Down
10 changes: 10 additions & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,14 @@ impl<B: BlockT> ParachainBlockData<B> {
pub fn into_header(self) -> B::Header {
self.header
}

/// Returns the header.
pub fn header(&self) -> &B::Header {
&self.header
}

/// Returns the extrinsics.
pub fn extrinsics(&self) -> &[B::Extrinsic] {
&self.extrinsics
}
}
22 changes: 16 additions & 6 deletions test/parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,32 +91,42 @@ pub fn run_collator<C: Send + Default + 'static, E: crate::cli::IntoExit + Send
};

let on_exit = service.on_exit();

let setup_parachain = SetupParachain { service, inherent_data_providers, proposer_factory, exit };
let block_import = service.client();

let setup_parachain = SetupParachain {
service,
inherent_data_providers,
proposer_factory,
exit,
block_import,
};

cumulus_collator::run_collator(setup_parachain, crate::PARA_ID, on_exit, key, version)
}

struct SetupParachain<S, PF, E> {
struct SetupParachain<S, PF, E, BI> {
service: S,
proposer_factory: PF,
exit: E,
inherent_data_providers: InherentDataProviders,
block_import: BI,
}

impl<S, PF, E> cumulus_collator::SetupParachain<Block> for SetupParachain<S, PF, E>
impl<S, PF, E, BI> cumulus_collator::SetupParachain<Block> for SetupParachain<S, PF, E, BI>
where
S: AbstractService,
E: Send + crate::cli::IntoExit,
PF: consensus_common::Environment<Block> + Send + 'static,
BI: consensus_common::BlockImport<Block, Error=consensus_common::Error> + Send + Sync + 'static,
{
type ProposerFactory = PF;
type BlockImport = BI;

fn setup_parachain<P: cumulus_consensus::PolkadotClient>(
self,
polkadot_client: P,
task_executor: polkadot_collator::TaskExecutor,
) -> Result<(Self::ProposerFactory, InherentDataProviders), String> {
) -> Result<(Self::ProposerFactory, Self::BlockImport, InherentDataProviders), String> {
let client = self.service.client();

let follow = match cumulus_consensus::follow_polkadot(crate::PARA_ID, client, polkadot_client) {
Expand All @@ -139,6 +149,6 @@ impl<S, PF, E> cumulus_collator::SetupParachain<Block> for SetupParachain<S, PF,
),
).map_err(|_| "Could not spawn parachain server!")?;

Ok((self.proposer_factory, self.inherent_data_providers))
Ok((self.proposer_factory, self.block_import, self.inherent_data_providers))
}
}