Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
core/finality-grandpa: Make NeighborPacket aware of a Block for its hash
  • Loading branch information
mxinden committed Oct 3, 2019
commit dd2a75ed800041fc76296f792e58c5ef16df73bd
88 changes: 57 additions & 31 deletions core/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ use log::{trace, debug, warn};
use futures::prelude::*;
use futures::sync::mpsc;

use crate::{environment, CatchUp, CompactCommit, SignedMessage, BlockImportedChecker};
use crate::{environment, CatchUp, CompactCommit, SignedMessage, BlockStatus};
use super::{cost, benefit, Round, SetId};

use std::collections::{HashMap, VecDeque};
Expand Down Expand Up @@ -252,15 +252,15 @@ pub(super) enum GossipMessage<Block: BlockT> {
/// Grandpa commit message with round and set info.
Commit(FullCommitMessage<Block>),
/// A neighbor packet. Not repropagated.
Neighbor(VersionedNeighborPacket<NumberFor<Block>>),
Neighbor(VersionedNeighborPacket<Block>),
/// Grandpa catch up request message with round and set info. Not repropagated.
CatchUpRequest(CatchUpRequestMessage),
/// Grandpa catch up message with round and set info. Not repropagated.
CatchUp(FullCatchUpMessage<Block>),
}

impl<Block: BlockT> From<NeighborPacket<NumberFor<Block>>> for GossipMessage<Block> {
fn from(neighbor: NeighborPacket<NumberFor<Block>>) -> Self {
impl<Block: BlockT> From<NeighborPacket<Block>> for GossipMessage<Block> {
fn from(neighbor: NeighborPacket<Block>) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(neighbor))
}
}
Expand Down Expand Up @@ -292,33 +292,35 @@ pub(super) struct FullCommitMessage<Block: BlockT> {
/// V1 neighbor packet. Neighbor packets are sent from nodes to their peers
/// and are not repropagated. These contain information about the node's state.
#[derive(Debug, Encode, Decode, Clone)]
pub(super) struct NeighborPacket<N> {
pub(super) struct NeighborPacket<Block: BlockT> where
NumberFor<Block>: Ord,
{
/// The round the node is currently at.
pub(super) round: Round,
/// The set ID the node is currently at.
pub(super) set_id: SetId,
/// The highest finalizing commit observed.
pub(super) commit_finalized_height: N,
pub(super) commit_finalized_height: NumberFor<Block>,
/// Hashes of the blocks which:
///
/// - the node has synced
///
/// - have been mentioned by nodes in vote (Prevote, Precommit) messages
///
/// - within the advertised Round set in `round` field.
// TODO: Replace `()` with some hash trait bound.
pub(super) known_vote_blocks: Vec<()>,
pub(super) known_vote_blocks: Vec<Block::Hash>,
}

/// A versioned neighbor packet.
#[derive(Debug, Encode, Decode)]
pub(super) enum VersionedNeighborPacket<N> {
pub(super) enum VersionedNeighborPacket<Block: BlockT> {
// TODO: Should we be updating this version given that this effort is a breaking change?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should add a new variant here V2 wrapping the new neighbor packet type. In principle I think it should be possible to make this backwards-compatible (but I didn't think it through tbqh.)

#[codec(index = "1")]
V1(NeighborPacket<N>),
V1(NeighborPacket<Block>),
}

impl<N> VersionedNeighborPacket<N> {
fn into_neighbor_packet(self) -> NeighborPacket<N> {
impl<Block: BlockT> VersionedNeighborPacket<Block> {
fn into_neighbor_packet(self) -> NeighborPacket<Block> {
match self {
VersionedNeighborPacket::V1(p) => p,
}
Expand Down Expand Up @@ -411,17 +413,20 @@ impl<N> PeerInfo<N> {
}

/// The peers we're connected to in gossip.
struct Peers<N> {
inner: HashMap<PeerId, PeerInfo<N>>,
struct Peers<Block: BlockT> {
inner: HashMap<PeerId, PeerInfo<NumberFor<Block>>>,
}

impl<N> Default for Peers<N> {
impl<Block: BlockT> Default for Peers<Block> {
fn default() -> Self {
Peers { inner: HashMap::new() }
}
}

impl<N: Ord> Peers<N> {
impl<Block> Peers<Block> where
Block: BlockT,
NumberFor<Block>: Ord,
{
fn new_peer(&mut self, who: PeerId, roles: Roles) {
self.inner.insert(who, PeerInfo::new(roles));
}
Expand All @@ -431,8 +436,8 @@ impl<N: Ord> Peers<N> {
}

// returns a reference to the new view, if the peer is known.
fn update_peer_state(&mut self, who: &PeerId, update: NeighborPacket<N>)
-> Result<Option<&View<N>>, Misbehavior>
fn update_peer_state(&mut self, who: &PeerId, update: NeighborPacket<Block>)
-> Result<Option<&View<NumberFor<Block>>>, Misbehavior>
{
let peer = match self.inner.get_mut(who) {
None => return Ok(None),
Expand All @@ -459,7 +464,7 @@ impl<N: Ord> Peers<N> {
Ok(Some(&peer.view))
}

fn update_commit_height(&mut self, who: &PeerId, new_height: N) -> Result<(), Misbehavior> {
fn update_commit_height(&mut self, who: &PeerId, new_height: NumberFor<Block>) -> Result<(), Misbehavior> {
let peer = match self.inner.get_mut(who) {
None => return Ok(()),
Some(p) => p,
Expand All @@ -477,7 +482,7 @@ impl<N: Ord> Peers<N> {
Ok(())
}

fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> {
fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<NumberFor<Block>>> {
self.inner.get(who)
}
}
Expand Down Expand Up @@ -511,22 +516,27 @@ enum PendingCatchUp {
}

/// The inner lock-protected struct of a GossipValidator, enabling GossipValidator itself to be std::marker::Sync.
// TODO: Why does GossipValidator need to be Sync? Does Grandpa logic need to be parallel?
// TODO: Why does GossipValidator need to be Sync? Does Grandpa logic need to be parallel? I guess in this case related
// to the fact that the GossipValidator also acts as a network Validator (trait) thus is shared by two substrate
// components (network & finality-grandpa).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely. The network layer uses the validator to validate messages, while grandpa code needs to keep it up-to-date with the current view of the GRANDPA protocol (which set we're in, round, etc.)

struct Inner<Block: BlockT, Client> {
local_view: Option<View<NumberFor<Block>>>,
peers: Peers<NumberFor<Block>>,
peers: Peers<Block>,
live_topics: KeepTopics<Block>,
authorities: Vec<AuthorityId>,
config: crate::Config,
next_rebroadcast: Instant,
pending_catch_up: PendingCatchUp,
catch_up_enabled: bool,
client: Client,
// TODO: Add doc comment
/// Blocks we have seen mentioned in votes. One need to check ad-hoc whether they are available locally.
known_vote_blocks: Vec<Block::Hash>,
}

type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)>;
type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<Block>)>;

impl<Block: BlockT, Client> Inner<Block, Client> {
impl<Block: BlockT, Client: BlockStatus<Block>> Inner<Block, Client> {
fn new(config: crate::Config, catch_up_enabled: bool, client: Client) -> Self {
Inner {
local_view: None,
Expand All @@ -538,12 +548,16 @@ impl<Block: BlockT, Client> Inner<Block, Client> {
catch_up_enabled,
config,
client,
known_vote_blocks: vec![],
}
}

/// Note a round in the current set has started.
fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
{
// Given that we start a new round, reset the known vote blocks tracking blocks of the last round.
self.known_vote_blocks = vec![];

let local_view = match self.local_view {
None => return None,
Some(ref mut v) => if v.round == round {
Expand Down Expand Up @@ -649,6 +663,13 @@ impl<Block: BlockT, Client> Inner<Block, Client> {
return Action::Discard(cost::BAD_SIGNATURE);
}

// TODO: Adding a block in a function called 'validate' seems missleading.
//
// TODO: Is it save to give these vote and commit messages any attention at this point? Do they need to be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we have already done checked that the message is well-formed and the signature is valid. Any further validation will only happen once we have the full ancestry of the block imported now.

// further validated before we add their block hash to the list?
//
// self.known_vote_blocks.push(/* hash of the block*/);

let topic = super::round_topic::<Block>(full.round.0, full.set_id.0);
Action::Keep(topic, benefit::ROUND_MESSAGE)
}
Expand Down Expand Up @@ -867,7 +888,7 @@ impl<Block: BlockT, Client> Inner<Block, Client> {
(catch_up, report)
}

fn import_neighbor_message(&mut self, who: &PeerId, update: NeighborPacket<NumberFor<Block>>)
fn import_neighbor_message(&mut self, who: &PeerId, update: NeighborPacket<Block>)
-> (Vec<Block::Hash>, Action<Block::Hash>, Option<GossipMessage<Block>>, Option<Report>)
{
let update_res = self.peers.update_peer_state(who, update);
Expand All @@ -894,11 +915,16 @@ impl<Block: BlockT, Client> Inner<Block, Client> {

fn multicast_neighbor_packet(&self) -> MaybeMessage<Block> {
self.local_view.as_ref().map(|local_view| {
let known_vote_blocks = self.known_vote_blocks.iter().filter(|h| {
// TODO: Remove unwrap.
self.client.block_number(**h).unwrap().map(|_| true).unwrap_or(false)
}).map(|h| h.clone()).collect();

let packet = NeighborPacket {
round: local_view.round,
set_id: local_view.set_id,
commit_finalized_height: local_view.last_commit.unwrap_or(Zero::zero()),
known_vote_blocks: vec![],
known_vote_blocks: known_vote_blocks,
};

let peers = self.peers.inner.keys().cloned().collect();
Expand Down Expand Up @@ -947,7 +973,7 @@ pub(super) struct GossipValidator<Block: BlockT, Client> {
report_sender: mpsc::UnboundedSender<PeerReport>,
}

impl<Block: BlockT, Client> GossipValidator<Block, Client> {
impl<Block: BlockT, Client: BlockStatus<Block>> GossipValidator<Block, Client> {
/// Create a new gossip-validator. The current set is initialized to 0. If
/// `catch_up_enabled` is set to false then the validator will not issue any
/// catch up requests (useful e.g. when running just the GRANDPA observer).
Expand All @@ -969,7 +995,7 @@ impl<Block: BlockT, Client> GossipValidator<Block, Client> {

/// Note a round in the current set has started.
pub(super) fn note_round<F>(&self, round: Round, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
where F: FnOnce(Vec<PeerId>, NeighborPacket<Block>)
{
let maybe_msg = self.inner.write().note_round(round);
if let Some((to, msg)) = maybe_msg {
Expand All @@ -980,7 +1006,7 @@ impl<Block: BlockT, Client> GossipValidator<Block, Client> {
/// Note that a voter set with given ID has started. Updates the current set to given
/// value and initializes the round to 0.
pub(super) fn note_set<F>(&self, set_id: SetId, authorities: Vec<AuthorityId>, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
where F: FnOnce(Vec<PeerId>, NeighborPacket<Block>)
{
let maybe_msg = self.inner.write().note_set(set_id, authorities);
if let Some((to, msg)) = maybe_msg {
Expand All @@ -990,7 +1016,7 @@ impl<Block: BlockT, Client> GossipValidator<Block, Client> {

/// Note that we've imported a commit finalizing a given block.
pub(super) fn note_commit_finalized<F>(&self, finalized: NumberFor<Block>, send_neighbor: F)
where F: FnOnce(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)
where F: FnOnce(Vec<PeerId>, NeighborPacket<Block>)
{
let maybe_msg = self.inner.write().note_commit_finalized(finalized);
if let Some((to, msg)) = maybe_msg {
Expand Down Expand Up @@ -1059,7 +1085,7 @@ impl<Block: BlockT, Client> GossipValidator<Block, Client> {
}
}

impl<Block: BlockT, Client: Send + Sync> network_gossip::Validator<Block> for GossipValidator<Block, Client> {
impl<Block: BlockT, Client: BlockStatus<Block> + Send + Sync> network_gossip::Validator<Block> for GossipValidator<Block, Client> {
fn new_peer(&self, context: &mut dyn ValidatorContext<Block>, who: &PeerId, roles: Roles) {
let packet = {
let mut inner = self.inner.write();
Expand Down
13 changes: 6 additions & 7 deletions core/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use tokio_executor::Executor;

use crate::{
CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error,
Message, SignedMessage, BlockImportedChecker,
BlockStatus, CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error, Message, SignedMessage,
};
use crate::environment::HasVoted;
use gossip::{
Expand Down Expand Up @@ -233,14 +232,14 @@ impl Stream for NetworkStream {
}

/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>, Client: BlockImportedChecker<B>> {
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>, Client> {
service: N,
validator: Arc<GossipValidator<B, Client>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
announce_sender: periodic::BlockAnnounceSender<B>,
}

impl<B: BlockT, N: Network<B>, Client: BlockImportedChecker<B> + Send + Sync + 'static> NetworkBridge<B, N, Client> {
impl<B: BlockT, N: Network<B>, Client: BlockStatus<B> + Send + Sync + 'static> NetworkBridge<B, N, Client> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle and a future that must be polled to completion to finish startup.
/// On creation it will register previous rounds' votes with the gossip
Expand Down Expand Up @@ -493,7 +492,7 @@ impl<B: BlockT, N: Network<B>, Client: BlockImportedChecker<B> + Send + Sync + '
}
}

fn incoming_global<B: BlockT, N: Network<B>, Client: Send + Sync + 'static>(
fn incoming_global<B: BlockT, N: Network<B>, Client: BlockStatus<B> + Send + Sync + 'static>(
mut service: N,
topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>,
Expand Down Expand Up @@ -624,7 +623,7 @@ fn incoming_global<B: BlockT, N: Network<B>, Client: Send + Sync + 'static>(
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}

impl<B: BlockT, N: Network<B>, Client: BlockImportedChecker<B>> Clone for NetworkBridge<B, N, Client> {
impl<B: BlockT, N: Network<B>, Client> Clone for NetworkBridge<B, N, Client> {
fn clone(&self) -> Self {
NetworkBridge {
service: self.service.clone(),
Expand Down Expand Up @@ -955,7 +954,7 @@ impl<Block: BlockT, N: Network<Block>, Client> CommitsOut<Block, N, Client> {
}
}

impl<Block: BlockT, N: Network<Block>, Client> Sink for CommitsOut<Block, N, Client> {
impl<Block: BlockT, N: Network<Block>, Client: BlockStatus<Block>> Sink for CommitsOut<Block, N, Client> {
type SinkItem = (RoundNumber, Commit<Block>);
type SinkError = Error;

Expand Down
8 changes: 4 additions & 4 deletions core/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use log::{debug, warn};
use tokio_timer::Delay;

use network::PeerId;
use sr_primitives::traits::{NumberFor, Block as BlockT};
use sr_primitives::traits::{Block as BlockT};
use super::{gossip::{NeighborPacket, GossipMessage}, Network};

// how often to rebroadcast, if no other
Expand All @@ -44,15 +44,15 @@ fn rebroadcast_instant() -> Instant {
/// A sender used to send neighbor packets to a background job.
#[derive(Clone)]
pub(super) struct NeighborPacketSender<B: BlockT>(
mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>
mpsc::UnboundedSender<(Vec<PeerId>, NeighborPacket<B>)>
);

impl<B: BlockT> NeighborPacketSender<B> {
/// Send a neighbor packet for the background worker to gossip to peers.
pub fn send(
&self,
who: Vec<network::PeerId>,
neighbor_packet: NeighborPacket<NumberFor<B>>,
neighbor_packet: NeighborPacket<B>,
) {
if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) {
debug!(target: "afg", "Failed to send neighbor packet: {:?}", err);
Expand All @@ -72,7 +72,7 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (
N: Network<B>,
{
let mut last = None;
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>();
let (tx, mut rx) = mpsc::unbounded::<(Vec<PeerId>, NeighborPacket<B>)>();
let mut delay = Delay::new(rebroadcast_instant());

let work = futures::future::poll_fn(move || {
Expand Down
10 changes: 0 additions & 10 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,13 +887,3 @@ fn authority_id<'a, I>(
None => None,
}
}

pub(crate) trait BlockImportedChecker<Block: BlockT> {
fn is_imported(h: Block::Hash) -> bool;
}

impl<Backend, E, Block: BlockT, RA> BlockImportedChecker<Block> for Arc<Client<Backend, E, Block, RA>>{
fn is_imported(_h: Block::Hash) -> bool {
false
}
}