Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
afdc964
core/finality-grandpa: Pass Grandpa msg sender up to UntilImported
mxinden Oct 8, 2019
61ac9dd
core/finality-grandpa: Track senders to maybe later request blocks
mxinden Oct 8, 2019
ce9db3f
core/finality-grandpa: Make BlockStatus pub only within crate
mxinden Oct 10, 2019
3cd6de0
core/finality-grandpa: Abstract NetworkBridge with BlockSyncRequester
mxinden Oct 10, 2019
fd4800d
core/finality-grandpa: Pass BlockSyncRequester to UntilImported
mxinden Oct 10, 2019
0f82795
core/finality-grandpa: Track block number of pending within UntilImpo…
mxinden Oct 10, 2019
a2f6684
core/finality-grandpa: Request block sync on long wait
mxinden Oct 10, 2019
9f9e15f
core/finality-grandpa: Adjust unit tests to previous changes
mxinden Oct 11, 2019
126c35e
core/finality-grandpa: Fix line length
mxinden Oct 11, 2019
72516c0
core/finality-grandpa: Add comment explaining in & out vote combination
mxinden Oct 15, 2019
3b4adca
core/finality-grandpa: Log after, not before, timeout expired
mxinden Oct 17, 2019
74d6637
core/finality-grandpa: Collect senders as HashSet for deduplication
mxinden Oct 18, 2019
c138cfb
Merge remote-tracking branch 'paritytech/master' into sync_request_in…
mxinden Oct 21, 2019
6a1eec8
Revert "core/finality-grandpa: Track senders to maybe later request b…
mxinden Oct 22, 2019
8e7663c
Revert "core/finality-grandpa: Pass Grandpa msg sender up to UntilImp…
mxinden Oct 22, 2019
0805651
core/network/sync: Ask for block from all peers if none provided
mxinden Oct 22, 2019
6fda6a6
core/network/sync: Request specific fork sync from peers ahead or on par
mxinden Oct 23, 2019
2c5422f
Merge remote-tracking branch 'paritytech/master' into sync_request_in…
mxinden Oct 23, 2019
411e101
grandpa: fix tests
andresilva Oct 23, 2019
6d695d1
grandpa: fix warnings
andresilva Oct 23, 2019
202bced
grandpa: add test for block sync request on until_imported
andresilva Oct 23, 2019
284ad34
grandpa: rename Environment field inner to client
andresilva Oct 23, 2019
f030ec6
grandpa: fix minor nits
andresilva Oct 23, 2019
10790e7
grandpa: minor nits in until_imported
andresilva Oct 23, 2019
3705a2c
grandpa: copy docs for set_sync_fork_request
andresilva Oct 23, 2019
d668931
grandpa: remove stale TODO on UntilImported
andresilva Oct 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion core/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use network::{consensus_gossip as network_gossip, NetworkService};
use network_gossip::ConsensusMessage;
use codec::{Encode, Decode};
use primitives::Pair;
use sr_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use sr_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use tokio_executor::Executor;

Expand Down Expand Up @@ -129,6 +129,14 @@ pub trait Network<Block: BlockT>: Clone + Send + 'static {

/// Inform peers that a block with given hash should be downloaded.
fn announce(&self, block: Block::Hash, associated_data: Vec<u8>);

/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}

/// Create a unique topic for a round and set-id combo.
Expand Down Expand Up @@ -216,6 +224,10 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
self.announce_block(block, associated_data)
}

fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
NetworkService::set_sync_fork_request(self, peers, hash, number)
}
}

/// A stream used by NetworkBridge in its implementation of Network. Given a oneshot that eventually returns a channel
Expand Down Expand Up @@ -468,6 +480,9 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
format!("Failed to receive on unbounded receiver for round {}", round.0)
));

// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = incoming.select(out_rx);

(incoming, outgoing)
Expand Down Expand Up @@ -514,6 +529,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {

(incoming, outgoing)
}

/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
pub(crate) fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
self.service.set_sync_fork_request(peers, hash, number)
}
}

fn incoming_global<B: BlockT, N: Network<B>>(
Expand Down
4 changes: 4 additions & 0 deletions core/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::runtime::current_thread;
use std::sync::Arc;
use keyring::Ed25519Keyring;
use codec::Encode;
use sr_primitives::traits::NumberFor;

use crate::environment::SharedVoterSetState;
use super::gossip::{self, GossipValidator};
Expand Down Expand Up @@ -91,6 +92,9 @@ impl super::Network<Block> for TestNetwork {
fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
}

/// Notify the sync service to try syncing the given chain.
fn set_sync_fork_request(&self, _peers: Vec<network::PeerId>, _hash: Hash, _number: NumberFor<Block>) {}
}

impl network_gossip::ValidatorContext<Block> for TestNetwork {
Expand Down
27 changes: 14 additions & 13 deletions core/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl<Block: BlockT> SharedVoterSetState<Block> {

/// The environment we run GRANDPA in.
pub(crate) struct Environment<B, E, Block: BlockT, N: Network<Block>, RA, SC, VR> {
pub(crate) inner: Arc<Client<B, E, Block, RA>>,
pub(crate) client: Arc<Client<B, E, Block, RA>>,
pub(crate) select_chain: SC,
pub(crate) voters: Arc<VoterSet<AuthorityId>>,
pub(crate) config: Config,
Expand Down Expand Up @@ -413,7 +413,7 @@ where
NumberFor<Block>: BlockNumberOps,
{
fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result<Vec<Block::Hash>, GrandpaError> {
ancestry(&self.inner, base, block)
ancestry(&self.client, base, block)
}

fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
Expand All @@ -434,7 +434,7 @@ where

match self.select_chain.finality_target(block, None) {
Ok(Some(best_hash)) => {
let base_header = self.inner.header(&BlockId::Hash(block)).ok()?
let base_header = self.client.header(&BlockId::Hash(block)).ok()?
.expect("Header known to exist after `best_containing` call; qed");

if let Some(limit) = limit {
Expand All @@ -449,7 +449,7 @@ where
}
}

let best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()?
let best_header = self.client.header(&BlockId::Hash(best_hash)).ok()?
.expect("Header known to exist after `best_containing` call; qed");

// check if our vote is currently being limited due to a pending change
Expand All @@ -473,7 +473,7 @@ where
break;
}

target_header = self.inner.header(&BlockId::Hash(*target_header.parent_hash())).ok()?
target_header = self.client.header(&BlockId::Hash(*target_header.parent_hash())).ok()?
.expect("Header known to exist after `best_containing` call; qed");
}

Expand All @@ -492,7 +492,7 @@ where
// authority set limit filter, which can be considered a
// mandatory/implicit voting rule.
self.voting_rule
.restrict_vote(&*self.inner, &base_header, &best_header, target_header)
.restrict_vote(&*self.client, &base_header, &best_header, target_header)
.or(Some((target_header.hash(), *target_header.number())))
},
Ok(None) => {
Expand Down Expand Up @@ -601,8 +601,9 @@ where
// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new(
self.inner.import_notification_stream(),
self.inner.clone(),
self.client.import_notification_stream(),
self.network.clone(),
self.client.clone(),
incoming,
"round",
).map_err(Into::into));
Expand Down Expand Up @@ -650,7 +651,7 @@ where
current_rounds,
};

crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;

Ok(Some(set_state))
})?;
Expand Down Expand Up @@ -691,7 +692,7 @@ where
current_rounds,
};

crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;

Ok(Some(set_state))
})?;
Expand Down Expand Up @@ -742,7 +743,7 @@ where
current_rounds,
};

crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;

Ok(Some(set_state))
})?;
Expand Down Expand Up @@ -800,7 +801,7 @@ where
current_rounds,
};

crate::aux_schema::write_voter_set_state(&*self.inner, &set_state)?;
crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;

Ok(Some(set_state))
})?;
Expand All @@ -816,7 +817,7 @@ where
commit: Commit<Block>,
) -> Result<(), Self::Error> {
finalize_block(
&*self.inner,
&*self.client,
&self.authority_set,
&self.consensus_changes,
Some(self.config.justification_period.into()),
Expand Down
45 changes: 33 additions & 12 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
//!
//! # Usage
//!
//! First, create a block-import wrapper with the `block_import` function.
//! The GRANDPA worker needs to be linked together with this block import object,
//! so a `LinkHalf` is returned as well. All blocks imported (from network or consensus or otherwise)
//! must pass through this wrapper, otherwise consensus is likely to break in
//! unexpected ways.
//! First, create a block-import wrapper with the `block_import` function. The
//! GRANDPA worker needs to be linked together with this block import object, so
//! a `LinkHalf` is returned as well. All blocks imported (from network or
//! consensus or otherwise) must pass through this wrapper, otherwise consensus
//! is likely to break in unexpected ways.
//!
//! Next, use the `LinkHalf` and a local configuration to `run_grandpa_voter`.
//! This requires a `Network` implementation. The returned future should be
Expand Down Expand Up @@ -242,7 +242,7 @@ impl From<ClientError> for Error {
}

/// Something which can determine if a block is known.
pub trait BlockStatus<Block: BlockT> {
pub(crate) trait BlockStatus<Block: BlockT> {
/// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block
/// is definitely known and has been imported.
/// If an unexpected error occurs, return that.
Expand All @@ -261,6 +261,26 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E,
}
}

/// Something that one can ask to do a block sync request.
pub(crate) trait BlockSyncRequester<Block: BlockT> {
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}

impl<Block, N> BlockSyncRequester<Block> for NetworkBridge<Block, N> where
Block: BlockT,
N: communication::Network<Block>,
{
fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: Block::Hash, number: NumberFor<Block>) {
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
}
}

/// A new authority set along with the canonical block it changed at.
#[derive(Debug)]
pub(crate) struct NewAuthoritySet<H, N> {
Expand Down Expand Up @@ -429,6 +449,7 @@ fn global_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
// block commit and catch up messages until relevant blocks are imported.
let global_in = UntilGlobalMessageBlocksImported::new(
client.import_notification_stream(),
network.clone(),
client.clone(),
global_in,
"global",
Expand Down Expand Up @@ -617,7 +638,7 @@ where

let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
inner: client,
client,
select_chain,
voting_rule,
voters: Arc::new(voters),
Expand Down Expand Up @@ -656,7 +677,7 @@ where
"authority_id" => authority_id.to_string(),
);

let chain_info = self.env.inner.info();
let chain_info = self.env.client.info();
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"number" => ?chain_info.chain.finalized_number,
"hash" => ?chain_info.chain.finalized_hash,
Expand All @@ -680,7 +701,7 @@ where
let global_comms = global_communication(
self.env.set_id,
&self.env.voters,
&self.env.inner,
&self.env.client,
&self.env.network,
&self.env.config.keystore,
);
Expand Down Expand Up @@ -728,7 +749,7 @@ where
(new.canon_hash, new.canon_number),
);

aux_schema::write_voter_set_state(&*self.env.inner, &set_state)?;
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;

Expand All @@ -737,7 +758,7 @@ where
set_id: new.set_id,
voter_set_state: self.env.voter_set_state.clone(),
// Fields below are simply transferred and not updated.
inner: self.env.inner.clone(),
client: self.env.client.clone(),
select_chain: self.env.select_chain.clone(),
config: self.env.config.clone(),
authority_set: self.env.authority_set.clone(),
Expand All @@ -757,7 +778,7 @@ where
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };

aux_schema::write_voter_set_state(&*self.env.inner, &set_state)?;
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;

Expand Down
2 changes: 1 addition & 1 deletion core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ fn grandpa_environment_respects_voting_rules() {
authority_set: authority_set.clone(),
config: config.clone(),
consensus_changes: consensus_changes.clone(),
inner: link.client.clone(),
client: link.client.clone(),
select_chain: link.select_chain.clone(),
set_id: authority_set.set_id(),
voter_set_state: set_state.clone(),
Expand Down
Loading