Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ab710bf
Updating substrate-demo
arkpar Sep 4, 2018
a5604d6
Consenus fixes
arkpar Sep 4, 2018
d5a0270
Reverted toolchain change
arkpar Sep 4, 2018
6a5ea57
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 4, 2018
eab11ad
Adjusted timeout formula
arkpar Aug 23, 2018
04eb184
Simplfied proposal creation
arkpar Sep 4, 2018
91041c3
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 5, 2018
67107bf
Fixed tests
arkpar Sep 5, 2018
8b7a8bb
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 5, 2018
f3e9299
Fixed a few small issues
arkpar Sep 5, 2018
b3a96ad
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 5, 2018
dbf6d0c
2017->2018
arkpar Sep 6, 2018
3e61f36
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 6, 2018
7d8ef2c
Style
arkpar Sep 6, 2018
7353474
More style
arkpar Sep 6, 2018
946e5e4
Renamed demo executable to substrate
arkpar Sep 6, 2018
a02f2b7
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 6, 2018
9464a68
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 7, 2018
c17c8b4
Style
arkpar Sep 7, 2018
a599454
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 10, 2018
f882159
Fixed compilation after merge
arkpar Sep 10, 2018
544bb89
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 10, 2018
b3295d6
Style
arkpar Sep 10, 2018
18d67cb
Merge branch 'master' of github.com:paritytech/substrate into a-demo
arkpar Sep 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Simplfied proposal creation
  • Loading branch information
arkpar committed Sep 4, 2018
commit 04eb18438ecb5d6df6806ae9048a433b4eb571a9
2 changes: 1 addition & 1 deletion demo/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ const ERROR_MSG: &'static str = "Failed to generate metadata files";

fn main() {
vergen::vergen(vergen::SHORT_SHA).expect(ERROR_MSG);
println!("cargo:rerun-if-changed=../.git/HEAD");
println!("cargo:rerun-if-changed=../../.git/HEAD");
}
238 changes: 88 additions & 150 deletions demo/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ extern crate rhododendron;

#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate futures;

#[macro_use]
Expand All @@ -46,15 +45,15 @@ extern crate log;
extern crate substrate_keyring;

use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::{self, Duration, Instant};

use codec::{Decode, Encode};
use demo_api::Api;
use demo_primitives::{AccountId, Hash, Block, BlockId, BlockNumber, Header, Timestamp, SessionKey};
use primitives::AuthorityId;
use transaction_pool::TransactionPool;
use tokio::runtime::TaskExecutor;
use tokio::timer::{Delay, Interval};
use tokio::timer::Delay;

use futures::prelude::*;
use futures::future;
Expand Down Expand Up @@ -122,6 +121,9 @@ impl<N, P> bft::Environment<Block> for ProposerFactory<N, P>
) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> {
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};

// force delay in evaluation this long.
const FORCE_DELAY: Timestamp = 5;

let parent_hash = parent_header.hash().into();

let id = BlockId::hash(parent_hash);
Expand Down Expand Up @@ -152,6 +154,7 @@ impl<N, P> bft::Environment<Block> for ProposerFactory<N, P>
transaction_pool: self.transaction_pool.clone(),
offline: self.offline.clone(),
validators,
minimum_timestamp: current_timestamp() + FORCE_DELAY,
};

Ok((proposer, input, output))
Expand All @@ -170,6 +173,7 @@ pub struct Proposer<C: Api + Send + Sync> {
transaction_pool: Arc<TransactionPool<C>>,
offline: SharedOfflineTracker,
validators: Vec<AccountId>,
minimum_timestamp: u64,
}

impl<C: Api + Send + Sync> Proposer<C> {
Expand All @@ -187,32 +191,89 @@ impl<C> bft::Proposer<Block> for Proposer<C>
where
C: Api + Send + Sync,
{
type Create = Result<Block, Error>;
type Error = Error;
type Create = future::Either<
CreateProposal<C>,
future::FutureResult<Block, Error>,
>;
type Evaluate = Box<Future<Item=bool, Error=Error>>;

fn propose(&self) -> Self::Create {
const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100);
fn propose(&self) -> Result<Block, Error> {
use demo_api::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
use demo_primitives::InherentData;

let now = Instant::now();
let timing = ProposalTiming {
start: self.start.clone(),
attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY),
const MAX_VOTE_OFFLINE_SECONDS: Duration = Duration::from_secs(60);

// TODO: handle case when current timestamp behind that in state.
let timestamp = ::std::cmp::max(self.minimum_timestamp, current_timestamp());

let elapsed_since_start = self.start.elapsed();
let offline_indices = if elapsed_since_start > MAX_VOTE_OFFLINE_SECONDS {
Vec::new()
} else {
self.offline.read().reports(&self.validators[..])
};

future::Either::A(CreateProposal {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
offline: self.offline.clone(),
validators: self.validators.clone(),
timing,
})
if !offline_indices.is_empty() {
info!(
"Submitting offline validators {:?} for slash-vote",
offline_indices.iter().map(|&i| self.validators[i as usize]).collect::<Vec<_>>(),
)
}

let inherent_data = InherentData {
timestamp,
offline_indices,
};

let mut block_builder = self.client.build_block(&self.parent_id, inherent_data)?;

{
let mut unqueue_invalid = Vec::new();
let result = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending_iterator| {
let mut pending_size = 0;
for pending in pending_iterator {
if pending_size + pending.verified.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }

match block_builder.push_extrinsic(pending.original.clone()) {
Ok(()) => {
pending_size += pending.verified.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.verified.hash().clone());
}
}
}
});
if let Err(e) = result {
warn!("Unable to get the pending set: {:?}", e);
}

self.transaction_pool.remove(&unqueue_invalid, false);
}

let block = block_builder.bake()?;

info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]",
block.header.number,
Hash::from(block.header.hash()),
block.header.parent_hash,
block.extrinsics.iter()
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
.collect::<Vec<_>>()
.join(", ")
);

let substrate_block = Decode::decode(&mut block.encode().as_slice())
.expect("blocks are defined to serialize to substrate blocks correctly; qed");

assert!(evaluation::evaluate_initial(
&substrate_block,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra tab

timestamp,
&self.parent_hash,
self.parent_number,
).is_ok());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two extra tabs


Ok(substrate_block)
}

fn evaluate(&self, unchecked_proposal: &Block) -> Self::Evaluate {
Expand Down Expand Up @@ -241,9 +302,11 @@ impl<C> bft::Proposer<Block> for Proposer<C>
let now = Instant::now();

// the duration until the given timestamp is current
let proposed_timestamp = proposal.timestamp();
let proposed_timestamp = ::std::cmp::max(self.minimum_timestamp, proposal.timestamp());
let timestamp_delay = if proposed_timestamp > current_timestamp {
Some(now + Duration::from_secs(proposed_timestamp - current_timestamp))
let delay_s = proposed_timestamp - current_timestamp;
debug!(target: "bft", "Delaying evaluation of proposal for {} seconds", delay_s);
Some(now + Duration::from_secs(delay_s))
} else {
None
};
Expand Down Expand Up @@ -377,132 +440,7 @@ impl<C> bft::Proposer<Block> for Proposer<C>
}

fn current_timestamp() -> Timestamp {
use std::time;

time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
.as_secs()
}

struct ProposalTiming {
start: Instant,
attempt_propose: Interval,
}

impl ProposalTiming {
// whether it's time to attempt a proposal.
// shouldn't be called outside of the context of a task.
fn poll(&mut self) -> Poll<(), ErrorKind> {
// first drain from the interval so when the minimum delay is up
// we don't have any notifications built up.
if let Async::Ready(x) = self.attempt_propose.poll().map_err(ErrorKind::Timer)? {
x.expect("timer still alive; intervals never end; qed");
}
Ok(Async::Ready(()))
}
}

/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: Api + Send + Sync> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
client: Arc<C>,
transaction_pool: Arc<TransactionPool<C>>,
timing: ProposalTiming,
validators: Vec<AccountId>,
offline: SharedOfflineTracker,
}

impl<C> CreateProposal<C> where C: Api + Send + Sync {
fn propose(&self) -> Result<Block, Error> {
use demo_api::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
use demo_primitives::InherentData;

const MAX_VOTE_OFFLINE_SECONDS: Duration = Duration::from_secs(60);

// TODO: handle case when current timestamp behind that in state.
let timestamp = current_timestamp();

let elapsed_since_start = self.timing.start.elapsed();
let offline_indices = if elapsed_since_start > MAX_VOTE_OFFLINE_SECONDS {
Vec::new()
} else {
self.offline.read().reports(&self.validators[..])
};

if !offline_indices.is_empty() {
info!(
"Submitting offline validators {:?} for slash-vote",
offline_indices.iter().map(|&i| self.validators[i as usize]).collect::<Vec<_>>(),
)
}

let inherent_data = InherentData {
timestamp,
offline_indices,
};

let mut block_builder = self.client.build_block(&self.parent_id, inherent_data)?;

{
let mut unqueue_invalid = Vec::new();
let result = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending_iterator| {
let mut pending_size = 0;
for pending in pending_iterator {
if pending_size + pending.verified.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }

match block_builder.push_extrinsic(pending.original.clone()) {
Ok(()) => {
pending_size += pending.verified.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.verified.hash().clone());
}
}
}
});
if let Err(e) = result {
warn!("Unable to get the pending set: {:?}", e);
}

self.transaction_pool.remove(&unqueue_invalid, false);
}

let block = block_builder.bake()?;

info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]",
block.header.number,
Hash::from(block.header.hash()),
block.header.parent_hash,
block.extrinsics.iter()
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
.collect::<Vec<_>>()
.join(", ")
);

let substrate_block = Decode::decode(&mut block.encode().as_slice())
.expect("blocks are defined to serialize to substrate blocks correctly; qed");

assert!(evaluation::evaluate_initial(
&substrate_block,
timestamp,
&self.parent_hash,
self.parent_number,
).is_ok());

Ok(substrate_block)
}
}

impl<C> Future for CreateProposal<C> where C: Api + Send + Sync {
type Item = Block;
type Error = Error;

fn poll(&mut self) -> Poll<Block, Error> {
try_ready!(self.timing.poll());
self.propose().map(Async::Ready)
}
}
37 changes: 8 additions & 29 deletions demo/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use transaction_pool::TransactionPool;
use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle;
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::{Delay, Interval};
use tokio::timer::Interval;

use super::{Network, ProposerFactory};
use error;
Expand All @@ -53,35 +53,14 @@ fn start_bft<F, C>(
<F::Proposer as bft::Proposer<Block>>::Error: ::std::fmt::Display + Into<error::Error>,
<F as bft::Environment<Block>>::Error: ::std::fmt::Display
{
const DELAY_UNTIL: Duration = Duration::from_millis(5000);

let mut handle = LocalThreadHandle::current();
let work = Delay::new(Instant::now() + DELAY_UNTIL)
.then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}

match bft_service.build_upon(&header) {
Ok(maybe_bft_work) => {
if maybe_bft_work.is_some() {
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);
}

maybe_bft_work
}
Err(e) => {
warn!(target: "bft", "BFT agreement error: {}", e);
None
}
}
})
.map(|_| ());

if let Err(e) = handle.spawn_local(Box::new(work)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
match bft_service.build_upon(&header) {
Ok(Some(bft_work)) => if let Err(e) = handle.spawn_local(Box::new(bft_work)) {
warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
Ok(None) => trace!(target: "bft", "Could not start agreement on top of {}", header.hash()),
Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),
}
}

/// Consensus service. Starts working when created.
Expand Down
2 changes: 1 addition & 1 deletion substrate/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl<B, P, I> BftService<B, P, I>
hash: None,
start_round: 0,
})),
round_timeout_multiplier: 4,
round_timeout_multiplier: 10,
key: key, // TODO: key changing over time.
factory,
}
Expand Down