Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
CLI integration
  • Loading branch information
arkpar committed Mar 20, 2018
commit 158a0004edfcb567ef4575b3bad2fa3651642e1f
52 changes: 33 additions & 19 deletions polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use std::sync::Arc;
use futures::{future, Future, Stream, Sink, Async, Canceled};
use parking_lot::Mutex;
use substrate_network as net;
use tokio_core::reactor::Core;
use tokio_core::reactor;
use client::BlockchainEvents;
use substrate_keyring::Keyring;
use primitives::{Hash, AuthorityId};
use primitives::block::{Id as BlockId, HeaderHash};
use primitives::block::{Id as BlockId, HeaderHash, Header};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt};
use polkadot_api::PolkadotApi;
use bft::{self, BftService};
Expand Down Expand Up @@ -86,28 +86,29 @@ struct Network(Arc<net::ConsensusService>);

impl Service {
/// Create and start a new instance.
pub fn new<C>(client: Arc<C>, network: Arc<net::ConsensusService>, transaction_pool: Arc<Mutex<TransactionPool>>) -> Service
pub fn new<C>(client: Arc<C>, network: Arc<net::ConsensusService>, transaction_pool: Arc<Mutex<TransactionPool>>, best_header: &Header) -> Service
where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
{
let best_header = best_header.clone();
let thread = thread::spawn(move || {
let mut core = Core::new().expect("tokio::Core could not be created");
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(Keyring::One.into());
let factory = ProposerFactory {
client: client.clone(),
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
};
let bft_service = BftService::new(client.clone(), key, factory);
// Kickstart BFT agreement on start.
if let Err(e) = Self::run_bft(&bft_service, network.clone(), &*client, core.handle(), &best_header) {
debug!("Error starting initial BFT agreement: {:?}", e);
}
loop {
let key = Arc::new(Keyring::One.into());
let factory = ProposerFactory {
client: client.clone(),
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
};
let bft_service = BftService::new(client.clone(), key, factory);
let handle = core.handle();
let start_bft = client.import_notification_stream().map(|notification| {
let hash = notification.header.hash();
let authorities = client.authorities(&BlockId::Hash(hash))?;
let input = network.bft_messages()
.filter_map(move |message| Self::process_message(message, &authorities, hash.clone()))
.map_err(|_| bft::InputStreamConcluded.into());
let output = BftSink { network: network.clone(), _e: Default::default() };
bft_service.build_upon(&notification.header, input, output, handle.clone())
if let Err(e) = Self::run_bft(&bft_service, network.clone(), &*client, handle.clone(), &notification.header) {
debug!("Error starting BFT agreement: {:?}", e);
}
}).map_err(|e| debug!("BFT agreement error: {:?}", e));
if let Err(_e) = core.run(start_bft.into_future()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing this in a loop, couldn't we do

let bft_starter = client.import_notification_stream().for_each(|notification| ...) and core.run(bft_starter)?

debug!("BFT event loop stopped");
Expand All @@ -116,10 +117,23 @@ impl Service {
}
});
Service {
thread: Some(thread),
thread: Some(thread)
}
}

fn run_bft<C, P>(bft_service: &BftService<P, C>, network: Arc<net::ConsensusService>, client: &C, handle: reactor::Handle, header: &Header) -> Result<(), <P as bft::ProposerFactory>::Error> where
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: more idiomatic as a free function

C: bft::Authorities + bft::BlockImport + Send + Sync + 'static,
P: bft::ProposerFactory + 'static,
{
let hash = header.hash();
let authorities = client.authorities(&BlockId::Hash(hash))?;
let input = network.bft_messages()
.filter_map(move |message| Self::process_message(message, &authorities, hash.clone()))
.map_err(|_| bft::InputStreamConcluded.into());
let output = BftSink { network: network.clone(), _e: Default::default() };
bft_service.build_upon(&header, input, output, handle.clone())
}

fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Option<bft::Communication> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too

// TODO: check all signatures
Some(match msg {
Expand Down
14 changes: 13 additions & 1 deletion polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern crate polkadot_primitives;
extern crate polkadot_runtime;
extern crate polkadot_executor;
extern crate polkadot_api;
extern crate polkadot_consensus as consensus;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct Service {
thread: Option<thread::JoinHandle<()>>,
client: Arc<Client>,
network: Arc<network::Service>,
_consensus: Option<consensus::Service>,
}

struct TransactionPoolAdapter {
Expand Down Expand Up @@ -141,9 +143,11 @@ impl Service {
};

let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?);
let best_header = client.header(&BlockId::Hash(client.info()?.chain.best_hash))?.expect("Best header always exists; qed");
info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
pool: transaction_pool,
pool: transaction_pool.clone(),
client: client.clone(),
});
let network_params = network::Params {
Expand All @@ -156,6 +160,13 @@ impl Service {
};
let network = network::Service::new(network_params)?;

// Spin consensus service if configured
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool, &best_header))
} else {
None
};

let thread_client = client.clone();
let thread_network = network.clone();
let thread = thread::spawn(move || {
Expand All @@ -177,6 +188,7 @@ impl Service {
thread: Some(thread),
client: client.clone(),
network: network.clone(),
_consensus: consensus_service,
})
}

Expand Down