Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Prev Previous commit
Next Next commit
plug collators into main polkadot-network
  • Loading branch information
rphmeier committed Jul 10, 2018
commit 11875ec74e5dc371ef8fc5a78439d8bd311a2dd9
5 changes: 2 additions & 3 deletions polkadot/network/src/collator_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::sync::oneshot;
use std::collections::hash_map::{HashMap, Entry};

/// The role of the collator. Whether they're the primary or backup for this parachain.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Serialize, Deserialize)]
pub enum Role {
/// Primary collators should send collations whenever it's time.
Primary,
Expand All @@ -34,6 +34,7 @@ pub enum Role {

/// A maintenance action for the collator set.
#[derive(PartialEq, Debug)]
#[allow(dead_code)]
pub enum Action {
/// Disconnect the given collator.
Disconnect(AccountId),
Expand Down Expand Up @@ -96,7 +97,6 @@ struct ParachainCollators {
/// Manages connected collators and role assignments from the perspective of a validator.
pub struct CollatorPool {
collators: HashMap<AccountId, ParaId>,
bad_collators: Vec<AccountId>,
parachain_collators: HashMap<ParaId, ParachainCollators>,
collations: HashMap<(Hash, ParaId), CollationSlot>,
}
Expand All @@ -106,7 +106,6 @@ impl CollatorPool {
pub fn new() -> Self {
CollatorPool {
collators: HashMap::new(),
bad_collators: Vec::new(),
parachain_collators: HashMap::new(),
collations: HashMap::new(),
}
Expand Down
2 changes: 1 addition & 1 deletion polkadot/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,6 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Collators for ConsensusNetwork
}

fn note_bad_collator(&self, collator: AccountId) {
self.network.with_spec(|spec, ctx| spec.disconnect_collator(ctx, collator));
self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator));
}
}
63 changes: 51 additions & 12 deletions polkadot/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use substrate_network::consensus_gossip::ConsensusGossip;
use substrate_network::{message, generic_message};
use substrate_network::specialization::Specialization;
use substrate_network::StatusMessage as GenericFullStatus;
use self::collator_pool::CollatorPool;
use self::collator_pool::{CollatorPool, Role, Action};

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -199,6 +199,8 @@ pub enum Message {
RequestBlockData(RequestId, Hash),
/// Provide block data by candidate hash or nothing if unknown.
BlockData(RequestId, Option<BlockData>),
/// Tell a collator their role.
CollatorRole(Role),
/// A collation provided by a peer. Relay parent and collation.
Collation(Hash, Collation),
}
Expand Down Expand Up @@ -263,7 +265,10 @@ impl PolkadotProtocol {
let parent_hash = consensus.parent_hash;
let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash);

for (id, info) in self.peers.iter_mut().filter(|&(_, ref info)| info.validator) {
// TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks.
for (id, info) in self.peers.iter_mut()
.filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some())
{
send_polkadot_message(
ctx,
*id,
Expand Down Expand Up @@ -367,6 +372,7 @@ impl PolkadotProtocol {
}
Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation),
Message::CollatorRole(_) => unimplemented!(),
}
}

Expand Down Expand Up @@ -398,12 +404,22 @@ impl Specialization<Block> for PolkadotProtocol {
Some(status) => status,
None => {
ctx.disable_peer(peer_id);
return;
return
}
};

if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
if self.collator_peer_id(acc_id.clone()).is_some() {
ctx.disable_peer(peer_id);
return
}

let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
send_polkadot_message(
ctx,
peer_id,
Message::CollatorRole(collator_role),
);
}

let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
Expand All @@ -429,9 +445,15 @@ impl Specialization<Block> for PolkadotProtocol {
fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {
if let Some(info) = self.peers.remove(&peer_id) {
if let Some((acc_id, _)) = info.status.collating_for {
if let Some(new_primary) = self.collators.on_disconnect(acc_id) {
// TODO: send new primary a role-change message.
unimplemented!()
let new_primary = self.collators.on_disconnect(acc_id)
.and_then(|new_primary| self.collator_peer_id(new_primary));

if let Some(new_primary) = new_primary {
send_polkadot_message(
ctx,
new_primary,
Message::CollatorRole(Role::Primary),
)
}
}

Expand Down Expand Up @@ -488,6 +510,19 @@ impl Specialization<Block> for PolkadotProtocol {
fn maintain_peers(&mut self, ctx: &mut Context<Block>) {
self.consensus_gossip.collect_garbage(None);
self.dispatch_pending_requests(ctx);

for collator_action in self.collators.maintain_peers() {
match collator_action {
Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) {
send_polkadot_message(
ctx,
collator,
Message::CollatorRole(role),
)
},
}
}
}
}

Expand All @@ -514,15 +549,19 @@ impl PolkadotProtocol {
rx
}

// disconnect a collator by account-id.
fn disconnect_collator(&mut self, ctx: &mut Context<Block>, account_id: AccountId) {
let bad_peers = self.peers
// get connected peer with given account ID for collation.
fn collator_peer_id(&self, account_id: AccountId) -> Option<PeerId> {
self.peers
.iter()
.filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

125 chars. please wrap ;) .

.map(|(peer_id, _)| *peer_id);
.map(|(peer_id, _)| *peer_id)
.next()
}

for peer in bad_peers {
ctx.disable_peer(peer);
// disconnect a collator by account-id.
fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
if let Some(peer_id) = self.collator_peer_id(account_id) {
ctx.disable_peer(peer_id)
}
}
}