Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Prev Previous commit
Next Next commit
awaiting and handling collations. rename collators to CollationPool
  • Loading branch information
rphmeier committed Jul 10, 2018
commit b04aa4e33c287568024b3db8ed46dcde56c2a314
5 changes: 3 additions & 2 deletions polkadot/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ pub fn collate<'a, R, P>(
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
);

let signature = key.sign(&block_data.0[..]).into();
let block_data_hash = block_data.hash();
let signature = key.sign(&block_data_hash.0[..]).into();
let pubkey_bytes: [u8; 32] = key.public().into();

let receipt = parachain::CandidateReceipt {
Expand All @@ -168,7 +169,7 @@ pub fn collate<'a, R, P>(
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: block_data.hash(),
block_data_hash,
};

parachain::Collation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

use polkadot_primitives::{AccountId, Hash};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use substrate_network::{PeerId, Context};

use futures::prelude::*;
use futures::sync::oneshot;

use std::collections::hash_map::{HashMap, Entry};
use std::sync::Arc;
use parking_lot::Mutex;

/// The role of the collator. Whether they're the primary or backup for this parachain.
pub enum Role {
Expand All @@ -43,35 +39,85 @@ pub enum Action {
NewRole(AccountId, Role),
}

enum CollationSlot {
Blank,
// not queried yet
Pending(Vec<Collation>),
// waiting for next to arrive.
Awaiting(Vec<oneshot::Sender<Collation>>),
}

impl CollationSlot {
fn received_collation(&mut self, collation: Collation) {
*self = match ::std::mem::replace(self, CollationSlot::Blank) {
CollationSlot::Blank => CollationSlot::Pending(vec![collation]),
CollationSlot::Pending(mut cs) => {
cs.push(collation);
CollationSlot::Pending(cs)
}
CollationSlot::Awaiting(senders) => {
for sender in senders {
let _ = sender.send(collation.clone());
}

CollationSlot::Blank
}
};
}

fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
*self = match ::std::mem::replace(self, CollationSlot::Blank) {
CollationSlot::Blank => CollationSlot::Awaiting(vec![sender]),
CollationSlot::Awaiting(mut senders) => {
senders.push(sender);
CollationSlot::Awaiting(senders)
}
CollationSlot::Pending(mut cs) => {
let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
let _ = sender.send(next_collation);

if cs.is_empty() {
CollationSlot::Blank
} else {
CollationSlot::Pending(cs)
}
}
};
}
}

struct ParachainCollators {
primary: AccountId,
backup: Vec<AccountId>,
}

/// Manages connected collators and role assignments from the perspective of a validator.
#[derive(Clone)]
pub struct Collators {
inner: Arc<Mutex<Inner>>,
pub struct CollatorPool {
collators: HashMap<AccountId, ParaId>,
bad_collators: Vec<AccountId>,
parachain_collators: HashMap<ParaId, ParachainCollators>,
collations: HashMap<(Hash, ParaId), CollationSlot>,
}

impl Collators {
/// Create a new `Collators` object.
impl CollatorPool {
/// Create a new `CollatorPool` object.
pub fn new() -> Self {
Collators {
inner: Arc::new(Mutex::new(Inner {
collators: HashMap::new(),
bad_collators: Vec::new(),
parachain_collators: HashMap::new(),
}))
CollatorPool {
collators: HashMap::new(),
bad_collators: Vec::new(),
parachain_collators: HashMap::new(),
collations: HashMap::new(),
}
}

/// Call when a new collator is authenticated. Returns the role.
pub fn on_new_collator(&self, account_id: AccountId, para_id: ParaId) -> Role {
let mut inner = self.inner.lock();

inner.collators.insert(account_id.clone(), para_id);
match inner.parachain_collators.entry(para_id) {
pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role {
self.collators.insert(account_id.clone(), para_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is guaranteed that any AccountId can at most be part of one parachain collator group ever and not be part of two or more parachains in their lifetime? Like, also not switch parachains? Because we overwrite any potential entry here without updating the the corresponding ParachainCollator if it exists (which could lead to an incoherent state and cause locking, like when not removing a primary AccountId when it disconnects, because we aren't aware it is still stored there.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#271 will ensure that we are connected to only one peer ID with a claimed AccountId at a time, and each AccountId can only collate for one parachain during the span of the connection in the current protocol.

match self.parachain_collators.entry(para_id) {
Entry::Vacant(mut vacant) => {
vacant.insert(ParachainCollators {
primary: account_id,
backup: Vec::new(),
collations: HashMap::new(),
});

Role::Primary
Expand All @@ -86,38 +132,7 @@ impl Collators {

/// Called when a collator disconnects. If it was the primary, returns a new primary for that
/// parachain.
pub fn on_disconnect(&self, account_id: AccountId) -> Option<(AccountId, ParaId)> {
self.inner.lock().on_disconnect(account_id)
}

/// Call periodically to perform collator set maintenance.
/// Returns a set of actions.
pub fn maintain_peers(&self) -> Vec<Action> {
// get rid of all bad peers.
let mut inner = self.inner.lock();
let mut actions = Vec::new();
let bad = ::std::mem::replace(&mut inner.bad_collators, Vec::new());
for account in bad {
actions.push(Action::Disconnect(account));
if let Some((new_primary, _)) = inner.on_disconnect(account) {
actions.push(Action::NewRole(new_primary, Role::Primary));
}
}

// TODO: put underperforming collators on the back-burner.

actions
}
}

struct Inner {
collators: HashMap<AccountId, ParaId>,
bad_collators: Vec<AccountId>,
parachain_collators: HashMap<ParaId, ParachainCollators>,
}

impl Inner {
fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> {
pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> {
self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
Entry::Vacant(_) => None,
Entry::Occupied(mut occ) => {
Expand All @@ -136,19 +151,44 @@ impl Inner {
}
})
}
}

enum CollationSlot {
// not queried yet
Pending(Vec<Collation>),
// waiting for next to arrive.
Awaiting(oneshot::Sender<Collation>),
}
/// Called when a collation is received.
/// The collator should be registered for the parachain of the collation as a precondition of this function.
/// The collation should have been checked for integrity of signature before passing to this function.
pub fn on_collation(&mut self, account_id: AccountId, relay_parent: Hash, collation: Collation) {
if let Some(para_id) = self.collators.get(&account_id) {
debug_assert_eq!(para_id, &collation.receipt.parachain_index);

struct ParachainCollators {
primary: AccountId,
backup: Vec<AccountId>,
collations: HashMap<Hash, CollationSlot>,
self.collations.entry((relay_parent, para_id.clone()))
.or_insert_with(|| CollationSlot::Blank)
.received_collation(collation);
}
}

/// Wait for a collation from a parachain.
pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
self.collations.entry((relay_parent, para_id))
.or_insert_with(|| CollationSlot::Blank)
.await_with(sender);
}

/// Call periodically to perform collator set maintenance.
/// Returns a set of actions to perform on the network level.
pub fn maintain_peers(&mut self) -> Vec<Action> {
// get rid of all bad peers.
let mut actions = Vec::new();
let bad = ::std::mem::replace(&mut self.bad_collators, Vec::new());
for account in bad {
actions.push(Action::Disconnect(account));
if let Some((new_primary, _)) = self.on_disconnect(account) {
actions.push(Action::NewRole(new_primary, Role::Primary));
}
}

// TODO: put underperforming collators on the back-burner.

actions
}
}

#[cfg(test)]
Expand Down
39 changes: 32 additions & 7 deletions polkadot/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ extern crate rhododendron;
#[macro_use]
extern crate log;

mod collators;
mod collator_pool;
mod router;
pub mod consensus;

use codec::Slicable;
use futures::sync::oneshot;
use parking_lot::Mutex;
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
use polkadot_primitives::{Block, SessionKey, Hash};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use polkadot_primitives::{AccountId, Block, SessionKey, Hash};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation};
use substrate_network::{PeerId, RequestId, Context};
use substrate_network::consensus_gossip::ConsensusGossip;
use substrate_network::{message, generic_message};
use substrate_network::specialization::Specialization;
use substrate_network::StatusMessage as GenericFullStatus;
use self::collators::Collators;
use self::collator_pool::CollatorPool;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -199,6 +199,8 @@ pub enum Message {
RequestBlockData(RequestId, Hash),
/// Provide block data by candidate hash or nothing if unknown.
BlockData(RequestId, Option<BlockData>),
/// A collation provided by a peer. Relay parent and collation.
Collation(Hash, Collation),
}

fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
Expand All @@ -210,7 +212,7 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message)
pub struct PolkadotProtocol {
peers: HashMap<PeerId, PeerInfo>,
consensus_gossip: ConsensusGossip<Block>,
collators: Collators,
collators: CollatorPool,
live_consensus: Option<CurrentConsensus>,
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
Expand All @@ -223,7 +225,7 @@ impl PolkadotProtocol {
PolkadotProtocol {
peers: HashMap::new(),
consensus_gossip: ConsensusGossip::new(),
collators: Collators::new(),
collators: CollatorPool::new(),
live_consensus: None,
in_flight: HashMap::new(),
pending: Vec::new(),
Expand Down Expand Up @@ -364,6 +366,7 @@ impl PolkadotProtocol {
send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data));
}
Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation),
}
}

Expand All @@ -387,7 +390,7 @@ impl PolkadotProtocol {

impl Specialization<Block> for PolkadotProtocol {
fn status(&self) -> Vec<u8> {
Status { collating_for: self.collating_for.clone() }.encode()
Status { collating_for: None }.encode()
}

fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) {
Expand Down Expand Up @@ -487,3 +490,25 @@ impl Specialization<Block> for PolkadotProtocol {
self.dispatch_pending_requests(ctx);
}
}

impl PolkadotProtocol {
// we received a collation from a peer
fn on_collation(&mut self, ctx: &mut Context<Block>, from: PeerId, relay_parent: Hash, collation: Collation) {
let collation_para = collation.receipt.parachain_index;
let collated_acc = collation.receipt.collator;

match self.peers.get(&from) {
None => ctx.disconnect_peer(from),
Some(peer_info) => match peer_info.status.collating_for {
None => ctx.disable_peer(from),
Some((ref acc_id, ref para_id))
if para_id != &collation_para || acc_id != &collated_acc || collation.receipt.check_signature().is_err() => ctx.disable_peer(from),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line is really too long to read ;) (150 chars)

Some((ref acc_id, _)) => self.collators.on_collation(acc_id.clone(), relay_parent, collation),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_collation above states that we must have checked the signature prior, and although this happens in the match check before, arriving here, that hasn't necessarily happened, does it? Also relying on a check to have happened in the match prior sounds like something that could easily be missed when changing this later. I'd prefer to have a more explicit guard around it, that makes sure check_signature has been called before on_collation regardless. Something like

Some((ref acc_id, ref para_id)) => {
  if para_id == &collation_para && acc_id == &collated_acc && collation.receipt.check_signature().is_ok() {
      self.collators.on_collation(acc_id.clone(), relay_parent, collation)
  } else {
    ctx.disable_peer(from)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have rewritten to this before seeing this comment. Agree

},
}
}

fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
self.collators.await_collation(relay_parent, para_id, sender);
}
}
13 changes: 12 additions & 1 deletion polkadot/primitives/src/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub struct CandidateReceipt {
pub parachain_index: Id,
/// The collator's relay-chain account ID
pub collator: super::AccountId,
/// Signature on block data by collator.
/// Signature on blake2-256 of the block data by collator.
pub signature: CandidateSignature,
/// The head-data
pub head_data: HeadData,
Expand Down Expand Up @@ -195,6 +195,17 @@ impl CandidateReceipt {
use runtime_primitives::traits::{BlakeTwo256, Hashing};
BlakeTwo256::hash_of(self)
}

/// Check integrity vs. provided block data.
pub fn check_signature(&self) -> Result<(), ()> {
use runtime_primitives::traits::Verify;

if self.signature.verify(&self.signature.0[..], &self.collator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this take block_data_hash?

Ok(())
} else {
Err(())
}
}
}

impl PartialOrd for CandidateReceipt {
Expand Down
2 changes: 1 addition & 1 deletion substrate/network/src/specialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static {
fn maintain_peers(&mut self, _ctx: &mut Context<B>) { }

/// Called when a block is _imported_ at the head of the chain (not during major sync).
fn on_block_imported(&mut self, _ctx: &mut Context<B>, hash: B::Hash, header: &B::Header) { }
fn on_block_imported(&mut self, _ctx: &mut Context<B>, _hash: B::Hash, _header: &B::Header) { }
}