Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
};

use futures::prelude::*;
use futures::channel::oneshot;
use futures::channel::{mpsc, oneshot};
use indexmap::IndexSet;

use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -476,6 +476,24 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}

/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut Vec<mpsc::Sender<SignedFullStatement>>,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
None,
|listener| async move {
let _ = listener.send(statement.clone()).await;
}
).await;
// Remove any closed listeners.
listeners.retain(|tx| !tx.is_closed());
}

/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
Expand Down Expand Up @@ -821,6 +839,7 @@ async fn run(
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new();

loop {
let message = ctx.recv().await?;
Expand Down Expand Up @@ -874,14 +893,19 @@ async fn run(
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) =>
StatementDistributionMessage::Share(relay_parent, statement) => {
inform_statement_listeners(
&statement,
&mut statement_listeners,
).await;
circulate_statement_and_dependents(
&mut peers,
&mut active_heads,
&mut ctx,
relay_parent,
statement,
).await?,
).await?;
}
StatementDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut peers,
Expand All @@ -890,6 +914,9 @@ async fn run(
&mut our_view,
event,
).await?,
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ pub enum StatementDistributionMessage {
Share(Hash, SignedFullStatement),
/// Event from the network bridge.
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>),
/// Register a listener for shared statements.
RegisterStatementListener(mpsc::Sender<SignedFullStatement>),
}

impl StatementDistributionMessage {
Expand All @@ -423,6 +425,7 @@ impl StatementDistributionMessage {
match self {
Self::Share(hash, _) => Some(*hash),
Self::NetworkBridgeUpdateV1(_) => None,
Self::RegisterStatementListener(_) => None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions roadmap/implementers-guide/src/types/overseer-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ enum StatementDistributionMessage {
/// The statement distribution subsystem assumes that the statement should be correctly
/// signed.
Share(Hash, SignedFullStatement),
/// Register a listener to be notified on any new statements.
RegisterStatementListener(ResponseChannel<SignedFullStatement>),
}
```

Expand Down