Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Block import notifications
  • Loading branch information
arkpar committed Feb 25, 2018
commit f22cbdafcb615d1b785e190952bc130986bd5906
54 changes: 54 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ log = "0.3"
parking_lot = "0.4"
triehash = "0.1"
hex-literal = "0.1"
multiqueue = "0.3"
ed25519 = { path = "../ed25519" }
substrate-bft = { path = "../bft" }
substrate-codec = { path = "../codec" }
Expand Down
56 changes: 53 additions & 3 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

//! Substrate Client

use multiqueue;
use parking_lot::Mutex;
use primitives::{self, block, AuthorityId};
use primitives::block::Id as BlockId;
use primitives::storage::{StorageKey, StorageData};
Expand All @@ -28,10 +30,11 @@ use blockchain::{self, Info as ChainInfo, Backend as ChainBackend};
use {error, in_mem, block_builder, runtime_io, bft};

/// Polkadot Client
#[derive(Debug)]
pub struct Client<B, E> where B: backend::Backend {
backend: B,
executor: E,
import_notification_sink: Mutex<multiqueue::BroadcastFutSender<BlockImportNotification>>,
import_notification_stream: Mutex<multiqueue::BroadcastFutReceiver<BlockImportNotification>>,
}

/// Client info
Expand Down Expand Up @@ -82,6 +85,32 @@ pub enum BlockStatus {
Unknown,
}

/// Block data origin.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BlockOrigin {
/// Genesis block built into the client.
Genesis,
/// Block is part of the initial sync with the network.
NetworkInitialSync,
/// Block was broadcasted on the network.
NetworkBroadcast,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't all blocks broadcast over the network in some sense? doesn't it make more sense to provide hints about whether a block is the head of the chain or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The head hint is provided as a separate parameter to import function. This is for something else. Mainly for logging and possibly skipping certain checks for local blocks.

/// Block that was collated by this node.
Own,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean in the context of the 2-phase commit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the block body was generated on this machine and was not received from the network.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the block body is almost always generated on another machine, but any block bodies coming directly from the consensus process have already been evaluated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a variant for that.

/// Block was imported from a file.
File,
}

/// Summary of an imported block
#[derive(Clone, Debug)]
pub struct BlockImportNotification {
/// Imported block origin.
pub origin: BlockOrigin,
/// Imported block header.
pub header: block::Header,
/// Is this the new best block.
pub is_new_best: bool,
}

/// A header paired with a justification which has already been checked.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct JustifiedHeader {
Expand Down Expand Up @@ -122,6 +151,7 @@ impl<B, E> Client<B, E> where
where
F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>)
{
let (sink, stream) = multiqueue::broadcast_fut_queue(64);
if backend.blockchain().header(BlockId::Number(0))?.is_none() {
trace!("Empty database, writing genesis block");
let (genesis_header, genesis_store) = build_genesis();
Expand All @@ -133,9 +163,16 @@ impl<B, E> Client<B, E> where
Ok(Client {
backend,
executor,
import_notification_sink: Mutex::new(sink),
import_notification_stream: Mutex::new(stream),
})
}

/// Get block import event stream.
pub fn import_notification_stream(&self) -> multiqueue::BroadcastFutReceiver<BlockImportNotification> {
self.import_notification_stream.lock().add_stream()
}

/// Get a reference to the state at a given block.
pub fn state_at(&self, block: &BlockId) -> error::Result<B::State> {
self.backend.state_at(*block)
Expand Down Expand Up @@ -236,6 +273,7 @@ impl<B, E> Client<B, E> where
/// Queue a block for import.
pub fn import_block(
&self,
origin: BlockOrigin,
header: JustifiedHeader,
body: Option<block::Body>,
) -> error::Result<ImportResult> {
Expand All @@ -261,9 +299,21 @@ impl<B, E> Client<B, E> where

let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
trace!("Imported {}, (#{}), best={}", block::HeaderHash::from(header.blake2_256()), header.number, is_new_best);
transaction.set_block_data(header, body, Some(justification.uncheck().into()), is_new_best)?;
transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
transaction.set_storage(overlay.drain())?;
self.backend.commit_operation(transaction)?;

if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own {
let notification = BlockImportNotification {
origin: origin,
header: header,
is_new_best: is_new_best,
};
if let Err(e) = self.import_notification_sink.lock().try_send(notification) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fails if the buffer is full; seems dangerous to discard the notification in this case. maybe use the Sink::send api and wait for the future to resolve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import is more important and should not block on the notification handler. Normally this should never happen but if it does, skipping a notification is fine since there will be a new block eventually anyway. The handlers are not supposed to expect to receive notifications for every block anyway. It is quite possible that the node goes to sleep or loses connectivity for a while and when it goes back online there won't be any notifications for all the blocks that arrived in the meantime.

Copy link
Contributor

@rphmeier rphmeier Mar 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipping a notification is fine since there will be a new block eventually anyway

this is reasoning about "eventually" a little too long term for me to be comfortable using this in practice.
there won't be a new block if not enough authorities start the agreement process (e.g. if there are 1/3 offline and one of the "honest" nodes skips this notification. then the agreement would only complete when this guy restarted his node)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased the buffer to 65536 for now. Unfortunately it is preallocated.

Copy link
Contributor

@rphmeier rphmeier Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still dangerous but probably workable for now. we should write a fix after this PR -- maybe a 50ms timer where the same chain head being observed twice in a row starts consensus?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address that in the following PR. Probably ny replacing multiqueue with a vector of mpsc channels.

warn!("Error queueing block import notification: {:?}", e);
}
}

Ok(ImportResult::Queued)
}

Expand Down Expand Up @@ -335,7 +385,7 @@ impl<B, E> bft::BlockImport for Client<B, E>
justification,
};

let _ = self.import_block(justified_header, Some(block.transactions));
let _ = self.import_block(BlockOrigin::Genesis, justified_header, Some(block.transactions));
}
}

Expand Down
3 changes: 2 additions & 1 deletion substrate/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extern crate ed25519;

extern crate triehash;
extern crate parking_lot;
extern crate multiqueue;
#[cfg(test)] #[macro_use] extern crate hex_literal;
#[macro_use] extern crate error_chain;
#[macro_use] extern crate log;
Expand All @@ -43,5 +44,5 @@ pub mod genesis;
pub mod block_builder;
mod client;

pub use client::{Client, ClientInfo, CallResult, ImportResult, BlockStatus, new_in_mem};
pub use client::{Client, ClientInfo, CallResult, ImportResult, BlockStatus, BlockOrigin, new_in_mem};
pub use blockchain::Info as ChainInfo;
11 changes: 6 additions & 5 deletions substrate/network/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

//! Blockchain access trait

use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus};
use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin};
use client::error::Error;
use state_machine;
use primitives::block::{self, Id as BlockId};
use primitives::bft::Justification;

pub trait Client: Send + Sync {
/// Given a hash return a header
fn import(&self, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error>;
/// Import a new block. Parent is supposed to be existing in the blockchain.
fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error>;

/// Get blockchain info.
fn info(&self) -> Result<ClientInfo, Error>;
Expand All @@ -50,10 +50,11 @@ impl<B, E> Client for PolkadotClient<B, E> where
E: state_machine::CodeExecutor + Send + Sync + 'static,
Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, {

fn import(&self, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error> {
fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error> {
// TODO: defer justification check.
let justified_header = self.check_justification(header, justification.into())?;
(self as &PolkadotClient<B, E>).import_block(justified_header, body)
let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync };
(self as &PolkadotClient<B, E>).import_block(origin, justified_header, body)
}

fn info(&self) -> Result<ClientInfo, Error> {
Expand Down
10 changes: 9 additions & 1 deletion substrate/network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ impl ChainSync {
}
}

fn best_seen_block(&self) -> Option<BlockNumber> {
self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number)
}

/// Returns sync status
pub fn status(&self) -> Status {
let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number);
let best_seen = self.best_seen_block();
let state = match &best_seen {
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading,
_ => SyncState::Idle,
Expand All @@ -97,6 +101,7 @@ impl ChainSync {
}
}

/// Handle new connected peer.
pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
if let Some(info) = protocol.peer_info(peer_id) {
match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) {
Expand Down Expand Up @@ -211,6 +216,7 @@ impl ChainSync {
vec![]
};

let best_seen = self.best_seen_block();
// Blocks in the response/drain should be in ascending order.
for block in new_blocks {
let origin = block.origin;
Expand All @@ -220,7 +226,9 @@ impl ChainSync {
let number = header.number;
let hash = header_hash(&header);
let parent = header.parent_hash;
let is_best = best_seen.as_ref().map_or(false, |n| number >= *n);
let result = protocol.chain().import(
is_best,
header,
justification,
block.body
Expand Down
4 changes: 2 additions & 2 deletions substrate/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod sync;
use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use parking_lot::RwLock;
use client::{self, genesis};
use client::{self, genesis, BlockOrigin};
use client::block_builder::BlockBuilder;
use primitives::block::Id as BlockId;
use primitives;
Expand Down Expand Up @@ -185,7 +185,7 @@ impl Peer {
trace!("Generating {}, (#{})", primitives::block::HeaderHash::from(block.header.blake2_256()), block.header.number);
let justification = Self::justify(&block.header);
let justified = self.client.check_justification(block.header, justification).unwrap();
self.client.import_block(justified, Some(block.transactions)).unwrap();
self.client.import_block(BlockOrigin::File, justified, Some(block.transactions)).unwrap();
}
}

Expand Down
Loading