Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 27 additions & 49 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use polkadot_primitives::v1::{
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash,
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, OurView, UnifiedReputationChange as Rep,
v1 as protocol_v1, View, PeerId, UnifiedReputationChange as Rep,
};

use futures::prelude::*;
Expand Down Expand Up @@ -740,7 +740,6 @@ async fn report_peer(
async fn handle_incoming_message<'a>(
peer: PeerId,
peer_data: &mut PeerData,
our_view: &View,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage,
Expand All @@ -750,27 +749,16 @@ async fn handle_incoming_message<'a>(
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
};

if !our_view.contains(&relay_parent) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?statement,
"Unexpected statement"
);
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
return None;
}

let active_head = match active_heads.get_mut(&relay_parent) {
Some(h) => h,
None => {
// This should never be out-of-sync with our view if the view updates
// correspond to actual `StartWork` messages. So we just log and ignore.
tracing::warn!(
tracing::debug!(
target: LOG_TARGET,
requested_relay_parent = %relay_parent,
"our view out-of-sync with active heads; head not found",
?peer,
?relay_parent,
"Unknown (or outdated) relay parent"
);
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
return None;
}
};
Expand Down Expand Up @@ -898,7 +886,6 @@ async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
our_view: &mut OurView,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
) {
Expand Down Expand Up @@ -929,7 +916,6 @@ async fn handle_network_update(
handle_incoming_message(
peer,
data,
&*our_view,
active_heads,
ctx,
message,
Expand Down Expand Up @@ -977,25 +963,8 @@ async fn handle_network_update(
None => (),
}
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
"Own view change",
);
let old_view = std::mem::replace(our_view, view);
active_heads.retain(|head, _| our_view.contains(head));

for new in our_view.difference(&old_view) {
if !active_heads.contains_key(&new) {
tracing::warn!(
target: LOG_TARGET,
unknown_hash = %new,
"Our network bridge view update \
inconsistent with `StartWork` messages we have received from overseer. \
Contains unknown hash.",
);
}
}
NetworkBridgeEvent::OurViewChange(_view) => {
// handled by `ActiveLeavesUpdate`
}
}

Expand All @@ -1008,19 +977,23 @@ impl StatementDistribution {
mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
) -> SubsystemResult<()> {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = OurView::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let metrics = self.metrics;

loop {
let message = ctx.recv().await?;
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
let _timer = metrics.time_active_leaves_update();

for activated in activated {
let relay_parent = activated.hash;
let span = PerLeafSpan::new(activated.span, "statement-distribution");
tracing::trace!(
target: LOG_TARGET,
hash = ?relay_parent,
"New active leaf",
);

let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel();
Expand Down Expand Up @@ -1063,6 +1036,18 @@ impl StatementDistribution {
active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index, span));
}

active_heads.retain(|h, _| {
let live = !deactivated.contains(h);
if !live {
tracing::trace!(
target: LOG_TARGET,
hash = ?h,
"Deactivating leaf",
);
}
live
});
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {
// do nothing
Expand All @@ -1088,7 +1073,6 @@ impl StatementDistribution {
&mut peers,
&mut active_heads,
&mut ctx,
&mut our_view,
event,
&metrics,
).await;
Expand Down Expand Up @@ -1189,7 +1173,7 @@ mod tests {
use futures::executor::{self, block_on};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_subsystem::{jaeger, ActivatedLeaf};

#[test]
Expand Down Expand Up @@ -1801,12 +1785,6 @@ mod tests {
)
}).await;

handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![hash_a])
)
}).await;

// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
// candidate backing.
let statement = {
Expand Down