Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Next Next commit
skeleton of collators object
  • Loading branch information
rphmeier committed Jul 9, 2018
commit e457a2973ef841cdeec2fff4f8fcdf206b13d0bb
157 changes: 157 additions & 0 deletions polkadot/network/src/collators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Bridge between the network and consensus service for getting collations to it.

use polkadot_primitives::{AccountId, Hash};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use substrate_network::{PeerId, Context};

use futures::prelude::*;
use futures::sync::oneshot;

use std::collections::hash_map::{HashMap, Entry};
use std::sync::Arc;
use parking_lot::Mutex;

/// The role of the collator. Whether they're the primary or backup for this parachain.
pub enum Role {
/// Primary collators should send collations whenever it's time.
Primary,
/// Backup collators should not.
Backup,
}

/// A maintenance action for the collator set.
pub enum Action {
/// Disconnect the given collator.
Disconnect(AccountId),
/// Give the collator a new role.
NewRole(AccountId, Role),
}

/// Manages connected collators and role assignments from the perspective of a validator.
#[derive(Clone)]
pub struct Collators {
inner: Arc<Mutex<Inner>>,
}

impl Collators {
/// Create a new `Collators` object.
pub fn new() -> Self {
Collators {
inner: Arc::new(Mutex::new(Inner {
collators: HashMap::new(),
bad_collators: Vec::new(),
parachain_collators: HashMap::new(),
}))
}
}

/// Call when a new collator is authenticated. Returns the role.
pub fn on_new_collator(&self, account_id: AccountId, para_id: ParaId) -> Role {
let mut inner = self.inner.lock();

inner.collators.insert(account_id.clone(), para_id);
match inner.parachain_collators.entry(para_id) {
Entry::Vacant(mut vacant) => {
vacant.insert(ParachainCollators {
primary: account_id,
backup: Vec::new(),
collations: HashMap::new(),
});

Role::Primary
},
Entry::Occupied(mut occupied) => {
occupied.get_mut().backup.push(account_id);

Role::Backup
}
}
}

/// Called when a collator disconnects. If it was the primary, returns a new primary for that
/// parachain.
pub fn on_disconnect(&self, account_id: AccountId) -> Option<(AccountId, ParaId)> {
self.inner.lock().on_disconnect(account_id)
}

/// Call periodically to perform collator set maintenance.
/// Returns a set of actions.
pub fn maintain_peers(&self) -> Vec<Action> {
// get rid of all bad peers.
let mut inner = self.inner.lock();
let mut actions = Vec::new();
let bad = ::std::mem::replace(&mut inner.bad_collators, Vec::new());
for account in bad {
actions.push(Action::Disconnect(account));
if let Some((new_primary, _)) = inner.on_disconnect(account) {
actions.push(Action::NewRole(new_primary, Role::Primary));
}
}

// TODO: put underperforming collators on the back-burner.

actions
}
}

struct Inner {
collators: HashMap<AccountId, ParaId>,
bad_collators: Vec<AccountId>,
parachain_collators: HashMap<ParaId, ParachainCollators>,
}

impl Inner {
fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> {
self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
Entry::Vacant(_) => None,
Entry::Occupied(mut occ) => {
if occ.get().primary == account_id {
if occ.get().backup.is_empty() {
occ.remove();
None
} else {
let mut collators = occ.get_mut();
collators.primary = collators.backup.pop().expect("backup non-empty; qed");
Some((collators.primary, para_id))
}
} else {
None
}
}
})
}
}

enum CollationSlot {
// not queried yet
Pending(Vec<Collation>),
// waiting for next to arrive.
Awaiting(oneshot::Sender<Collation>),
}

struct ParachainCollators {
primary: AccountId,
backup: Vec<AccountId>,
collations: HashMap<Hash, CollationSlot>,
}

#[cfg(test)]
mod tests {

}
30 changes: 15 additions & 15 deletions polkadot/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ extern crate rhododendron;
#[macro_use]
extern crate log;

mod collators;
mod router;
pub mod consensus;

Expand All @@ -57,10 +58,12 @@ use substrate_network::consensus_gossip::ConsensusGossip;
use substrate_network::{message, generic_message};
use substrate_network::specialization::Specialization;
use substrate_network::StatusMessage as GenericFullStatus;
use self::collators::Collators;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;


#[cfg(test)]
mod tests;

Expand All @@ -75,16 +78,16 @@ pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol>;
/// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Status {
collating_for: Option<ParaId>,
collating_for: Option<(AccountId, ParaId)>,
}

impl Slicable for Status {
fn encode(&self) -> Vec<u8> {
let mut v = Vec::new();
match self.collating_for {
Some(ref id) => {
Some(ref details) => {
v.push(1);
id.using_encoded(|s| v.extend(s));
details.using_encoded(|s| v.extend(s));
}
None => {
v.push(0);
Expand All @@ -96,7 +99,7 @@ impl Slicable for Status {
fn decode<I: ::codec::Input>(input: &mut I) -> Option<Self> {
let collating_for = match input.read_byte()? {
0 => None,
1 => Some(ParaId::decode(input)?),
1 => Some(Slicable::decode(input)?),
_ => return None,
};
Some(Status { collating_for })
Expand Down Expand Up @@ -207,8 +210,7 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message)
pub struct PolkadotProtocol {
peers: HashMap<PeerId, PeerInfo>,
consensus_gossip: ConsensusGossip<Block>,
collators: HashMap<ParaId, Vec<PeerId>>,
collating_for: Option<ParaId>,
collators: Collators,
live_consensus: Option<CurrentConsensus>,
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
Expand All @@ -221,8 +223,7 @@ impl PolkadotProtocol {
PolkadotProtocol {
peers: HashMap::new(),
consensus_gossip: ConsensusGossip::new(),
collators: HashMap::new(),
collating_for: None,
collators: Collators::new(),
live_consensus: None,
in_flight: HashMap::new(),
pending: Vec::new(),
Expand Down Expand Up @@ -398,10 +399,8 @@ impl Specialization<Block> for PolkadotProtocol {
}
};

if let Some(ref para_id) = local_status.collating_for {
self.collators.entry(para_id.clone())
.or_insert_with(Vec::new)
.push(peer_id);
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
}

let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
Expand All @@ -426,9 +425,10 @@ impl Specialization<Block> for PolkadotProtocol {

fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {
if let Some(info) = self.peers.remove(&peer_id) {
if let Some(collators) = info.status.collating_for.and_then(|id| self.collators.get_mut(&id)) {
if let Some(pos) = collators.iter().position(|x| x == &peer_id) {
collators.swap_remove(pos);
if let Some((acc_id, _)) = info.status.collating_for {
if let Some((new_primary, _)) = self.collators.on_disconnect(acc_id) {
// TODO: send new primary a role-change message.
unimplemented!()
}
}

Expand Down
5 changes: 5 additions & 0 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> where B::Header: HeaderT<Nu

pub fn on_block_imported(&self, io: &mut SyncIo, hash: B::Hash, header: &B::Header) {
self.sync.write().update_chain_info(&header);
self.specialization.write().on_block_imported(
&mut ProtocolContext::new(&self.context_data, io),
hash.clone(),
header
);

// blocks are not announced by light clients
if self.config.roles & Role::LIGHT == Role::LIGHT {
Expand Down
3 changes: 3 additions & 0 deletions substrate/network/src/specialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static {

/// Called periodically to maintain peers and handle timeouts.
fn maintain_peers(&mut self, _ctx: &mut Context<B>) { }

/// Called when a block is _imported_ at the head of the chain (not during major sync).
fn on_block_imported(&mut self, _ctx: &mut Context<B>, hash: B::Hash, header: &B::Header) { }
}