Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
};

use futures::prelude::*;
use futures::channel::oneshot;
use futures::channel::{mpsc, oneshot};
use indexmap::IndexSet;

use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -476,6 +476,21 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}

/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
async fn inform_statement_listeners(
statement: SignedFullStatement,
listeners: &mut Vec<mpsc::Sender<SignedFullStatement>>,
) {
// Ignore the errors since these will be removed later.
for listener in listeners.iter_mut() {
let _ = listener.send(statement.clone()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a DQ: shouldn't we fan out sending and await the joined future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like futures::stream::iter(listeners.iter_mut()).for_each_concurrent(None, |listener| listener.send(statement.clone()) is probably a good idea. Then again, I have the impression that we're rarely going to have more than a few listeners at a time, so it may not make any practical difference.

}
// Remove any closed listeners.
listeners.retain(|tx| !tx.is_closed());
}

/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
Expand Down Expand Up @@ -821,6 +836,7 @@ async fn run(
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new();

loop {
let message = ctx.recv().await?;
Expand Down Expand Up @@ -874,14 +890,19 @@ async fn run(
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) =>
StatementDistributionMessage::Share(relay_parent, statement) => {
inform_statement_listeners(
statement.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's cloned inside the function should we pass statement there as a reference to save us one clone?

&mut statement_listeners,
).await;
circulate_statement_and_dependents(
&mut peers,
&mut active_heads,
&mut ctx,
relay_parent,
statement,
).await?,
).await?;
}
StatementDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut peers,
Expand All @@ -890,6 +911,9 @@ async fn run(
&mut our_view,
event,
).await?,
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ pub enum StatementDistributionMessage {
Share(Hash, SignedFullStatement),
/// Event from the network bridge.
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>),
/// Register a listener for shared statements.
RegisterStatementListener(mpsc::Sender<SignedFullStatement>),
}

impl StatementDistributionMessage {
Expand All @@ -423,6 +425,7 @@ impl StatementDistributionMessage {
match self {
Self::Share(hash, _) => Some(*hash),
Self::NetworkBridgeUpdateV1(_) => None,
Self::RegisterStatementListener(_) => None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions roadmap/implementers-guide/src/types/overseer-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ enum StatementDistributionMessage {
/// The statement distribution subsystem assumes that the statement should be correctly
/// signed.
Share(Hash, SignedFullStatement),
/// Register a listener to be notified on any new statements.
RegisterStatementListener(ResponseChannel<SignedFullStatement>),
}
```

Expand Down