Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
62184b4
BEGIN ASYNC candidate-backing CHANGES
rphmeier Apr 28, 2022
821ed42
rename & document modes
rphmeier Apr 29, 2022
1fc4928
answer prospective validation data requests
rphmeier May 18, 2022
39f2076
GetMinimumRelayParents request is now plural
rphmeier May 19, 2022
80af4be
implement an implicit view utility for backing subsystems
rphmeier May 19, 2022
7c58f7a
implicit-view: get allowed relay parents
rphmeier May 23, 2022
fbcedac
refactorings and improvements to implicit view
rphmeier May 23, 2022
126ed91
add some TODOs for tests
rphmeier May 23, 2022
a5994a7
split implicit view updates into 2 functions
rphmeier May 25, 2022
8944760
backing: define State to prepare for functional refactor
rphmeier May 25, 2022
c34d0e6
add some docs
rphmeier May 25, 2022
20cd422
backing: implement bones of new leaf activation logic
rphmeier May 25, 2022
2b7d883
backing: create per-relay-parent-states
rphmeier May 25, 2022
923b28a
use new handle_active_leaves_update
rphmeier May 25, 2022
b9280d1
begin extracting logic from CandidateBackingJob
rphmeier May 27, 2022
967156e
mostly extract statement import from job logic
rphmeier May 27, 2022
b25acb7
handle statement imports outside of job logic
rphmeier May 27, 2022
df95962
do some TODO planning for prospective parachains integration
rphmeier May 27, 2022
458d24d
finish rewriting backing subsystem in functional style
rphmeier May 27, 2022
22a9d5e
add prospective parachains mode to relay parent entries
rphmeier May 27, 2022
fc0c4e4
fmt
rphmeier May 27, 2022
a4df277
add a RejectedByProspectiveParachains error
rphmeier May 27, 2022
2f202d0
notify prospective parachains of seconded and backed candidates
rphmeier May 28, 2022
910b997
always validate candidates exhaustively in backing.
rphmeier May 28, 2022
19d7a43
return persisted_validation_data from validation
rphmeier May 28, 2022
7f24629
handle rejections by prospective parachains
rphmeier May 28, 2022
22ead26
implement seconding sanity check
rphmeier May 28, 2022
6738ee9
invoke validate_and_second
rphmeier May 28, 2022
c54e1ca
Alter statement table to allow multiple seconded messages per validator
rphmeier May 28, 2022
e855b6c
refactor backing to have statements carry PVD
rphmeier May 30, 2022
eb0ee29
clean up all warnings
rphmeier May 30, 2022
d80c190
Add tests for implicit view
slumber May 31, 2022
21a381b
fix per_relay_parent pruning in backing
rphmeier May 31, 2022
47a9832
BEGIN STATEMENT DISTRIBUTION WORK
rphmeier May 31, 2022
b8f5282
mostly make network bridge amenable to vstaging
rphmeier May 31, 2022
843cb7b
network-bridge: fully adapt to vstaging
rphmeier May 31, 2022
89cf386
add some TODOs for tests
rphmeier May 31, 2022
a320e44
fix fallout in bitfield-distribution
rphmeier May 31, 2022
3cc56d1
add some test TODOs
rphmeier May 31, 2022
00f410d
fix fallout in gossip-support
rphmeier May 31, 2022
1a60a0c
fmt
rphmeier May 31, 2022
6f16adf
collator-protocol: fix message fallout
rphmeier May 31, 2022
7e46d8a
collator-protocol: load PVD from runtime
rphmeier May 31, 2022
86007fb
make things compile
rphmeier Jun 2, 2022
faf00f6
fmt
rphmeier Jun 2, 2022
f43c8f6
begin extracting view logic to separate module
rphmeier Jun 2, 2022
759e1d1
create deeper submodule and add per-peer knowledge
rphmeier Jun 2, 2022
51ca0ef
add ProspectiveParachainsMessage to statement-distribution
rphmeier Jun 2, 2022
778627b
make recv_runtime public
rphmeier Jun 2, 2022
df6f7c0
add ChainApiMessage to outgoing of statement-dist
rphmeier Jun 2, 2022
9af0013
begin new handle_active_leaves_update
rphmeier Jun 2, 2022
ce6e3d8
instantiate relay-parent-info without prospective data
rphmeier Jun 2, 2022
5bdb0ec
add staging-network feature to protocol
rphmeier Jun 5, 2022
8c9158a
rename to network-protocol staging
rphmeier Jun 5, 2022
dc6ee0d
refactor view to better accomodate both protcools
rphmeier Jun 5, 2022
a45cb24
begin fleshing out with_prospective
rphmeier Jun 5, 2022
d5e8e88
extract some tests to without_prospective; comment the rest
rphmeier Jun 5, 2022
3a095e6
begin high-level View API
rphmeier Jun 5, 2022
406d87d
fmt
rphmeier Jun 5, 2022
5c8e048
Merge branch 'rh-async-backing-feature' into rh-async-backing-integra…
slumber Jun 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix fallout in bitfield-distribution
  • Loading branch information
rphmeier committed May 31, 2022
commit a320e44747afd56574ce44a5082c74f81f9fc041
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/network/bitfield-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2021"

[dependencies]
always-assert = "0.1"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../gum" }
polkadot-primitives = { path = "../../../primitives" }
Expand Down
188 changes: 144 additions & 44 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@

#![deny(unused_crate_dependencies)]

use always_assert::never;
use futures::{channel::oneshot, FutureExt};

use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{
RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology,
},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
v1 as protocol_v1, vstaging as protocol_vstaging, OurView, PeerId, ProtocolVersion,
UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_subsystem::{
jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
Expand Down Expand Up @@ -69,25 +71,63 @@ struct BitfieldGossipMessage {
}

impl BitfieldGossipMessage {
fn into_validation_protocol(self) -> net_protocol::VersionedValidationProtocol {
self.into_network_message().into()
fn into_validation_protocol(
self,
recipient_version: ProtocolVersion,
) -> net_protocol::VersionedValidationProtocol {
self.into_network_message(recipient_version).into()
}

fn into_network_message(self) -> net_protocol::BitfieldDistributionMessage {
Versioned::V1(protocol_v1::BitfieldDistributionMessage::Bitfield(
self.relay_parent,
self.signed_availability.into(),
))
fn into_network_message(
self,
recipient_version: ProtocolVersion,
) -> net_protocol::BitfieldDistributionMessage {
match recipient_version {
x if x == protocol_v1::VERSION =>
Versioned::V1(protocol_v1::BitfieldDistributionMessage::Bitfield(
self.relay_parent,
self.signed_availability.into(),
)),
x if x == protocol_vstaging::VERSION =>
Versioned::VStaging(protocol_vstaging::BitfieldDistributionMessage::Bitfield(
self.relay_parent,
self.signed_availability.into(),
)),
_ => {
never!("Peers should only have supported protocol versions.");

gum::warn!(
target: LOG_TARGET,
version = recipient_version,
"Unknown protocol version provided for message recipient"
);

// fall back to v1 to avoid
Versioned::V1(protocol_v1::BitfieldDistributionMessage::Bitfield(
self.relay_parent,
self.signed_availability.into(),
))
},
}
}
}

/// Data stored on a per-peer basis.
#[derive(Debug)]
pub struct PeerData {
/// The peer's view.
view: View,
/// The peer's protocol version.
version: ProtocolVersion,
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Debug)]
struct ProtocolState {
/// Track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
peer_data: HashMap<PeerId, PeerData>,

/// The current and previous gossip topologies
topologies: SessionBoundGridTopologyStorage,
Expand Down Expand Up @@ -334,7 +374,7 @@ async fn handle_bitfield_distribution<Context>(
ctx,
job_data,
topology,
&mut state.peer_views,
&mut state.peer_data,
validator,
msg,
required_routing,
Expand All @@ -353,7 +393,7 @@ async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
topology: &SessionGridTopology,
peer_views: &mut HashMap<PeerId, View>,
peers: &mut HashMap<PeerId, PeerData>,
validator: ValidatorId,
message: BitfieldGossipMessage,
required_routing: RequiredRouting,
Expand All @@ -371,16 +411,16 @@ async fn relay_message<Context>(
.await;

drop(_span);
let total_peers = peer_views.len();
let total_peers = peers.len();
let mut random_routing: RandomRouting = Default::default();

let _span = span.child("interested-peers");
// pass on the bitfield distribution to all interested peers
let interested_peers = peer_views
let interested_peers = peers
.iter()
.filter_map(|(peer, view)| {
.filter_map(|(peer, data)| {
// check interest in the peer in this message's relay parent
if view.contains(&message.relay_parent) {
if data.view.contains(&message.relay_parent) {
let message_needed =
job_data.message_from_validator_needed_by_peer(&peer, &validator);
if message_needed {
Expand All @@ -395,7 +435,7 @@ async fn relay_message<Context>(
};

if need_routing {
Some(peer.clone())
Some((peer.clone(), data.version))
} else {
None
}
Expand All @@ -406,9 +446,9 @@ async fn relay_message<Context>(
None
}
})
.collect::<Vec<PeerId>>();
.collect::<Vec<(PeerId, ProtocolVersion)>>();

interested_peers.iter().for_each(|peer| {
interested_peers.iter().for_each(|(peer, _)| {
// track the message as sent for this peer
job_data
.message_sent_to_peer
Expand All @@ -427,11 +467,33 @@ async fn relay_message<Context>(
);
} else {
let _span = span.child("gossip");
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
interested_peers,
message.into_validation_protocol(),
))
.await;
let v1_interested_peers = interested_peers
.iter()
.filter(|(_, v)| v == &protocol_v1::VERSION)
.map(|(p, _)| p.clone())
.collect::<Vec<_>>();

let vstaging_interested_peers = interested_peers
.iter()
.filter(|(_, v)| v == &protocol_vstaging::VERSION)
.map(|(p, _)| p.clone())
.collect::<Vec<_>>();

if !v1_interested_peers.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
v1_interested_peers,
message.clone().into_validation_protocol(protocol_v1::VERSION),
))
.await;
}

if !vstaging_interested_peers.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vstaging_interested_peers,
message.into_validation_protocol(protocol_vstaging::VERSION),
))
.await
}
}
}

Expand All @@ -442,10 +504,20 @@ async fn process_incoming_peer_message<Context>(
state: &mut ProtocolState,
metrics: &Metrics,
origin: PeerId,
message: protocol_v1::BitfieldDistributionMessage,
message: net_protocol::BitfieldDistributionMessage,
rng: &mut (impl CryptoRng + Rng),
) {
let protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) = message;
let (relay_parent, bitfield) = match message {
Versioned::V1(protocol_v1::BitfieldDistributionMessage::Bitfield(
relay_parent,
bitfield,
)) => (relay_parent, bitfield),
Versioned::VStaging(protocol_vstaging::BitfieldDistributionMessage::Bitfield(
relay_parent,
bitfield,
)) => (relay_parent, bitfield),
};

gum::trace!(
target: LOG_TARGET,
peer = %origin,
Expand Down Expand Up @@ -543,7 +615,7 @@ async fn process_incoming_peer_message<Context>(
ctx,
job_data,
topology,
&mut state.peer_views,
&mut state.peer_data,
validator,
message,
required_routing,
Expand All @@ -567,15 +639,18 @@ async fn handle_network_msg<Context>(
let _timer = metrics.time_handle_network_msg();

match bridge_message {
NetworkBridgeEvent::PeerConnected(peer, role, _, _) => {
NetworkBridgeEvent::PeerConnected(peer, role, version, _) => {
gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected");
// insert if none already present
state.peer_views.entry(peer).or_default();
state
.peer_data
.entry(peer)
.or_insert_with(|| PeerData { view: View::default(), version });
},
NetworkBridgeEvent::PeerDisconnected(peer) => {
gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected");
// get rid of superfluous data
state.peer_views.remove(&peer);
state.peer_data.remove(&peer);
},
NetworkBridgeEvent::NewGossipTopology(gossip_topology) => {
let session_index = gossip_topology.session;
Expand All @@ -590,12 +665,21 @@ async fn handle_network_msg<Context>(
);

for new_peer in newly_added {
// in case we already knew that peer in the past
// it might have had an existing view, we use to initialize
// and minimize the delta on `PeerViewChange` to be sent
if let Some(old_view) = state.peer_views.remove(&new_peer) {
handle_peer_view_change(ctx, state, new_peer, old_view, rng).await;
}
let old_view = match state.peer_data.get_mut(&new_peer) {
Some(d) => {
// in case we already knew that peer in the past
// it might have had an existing view, we use to initialize
// and minimize the delta on `PeerViewChange` to be sent
std::mem::replace(&mut d.view, Default::default())
},
None => {
// For peers which are currently unknown, we'll send topology-related
// messages to them when they connect and send their first view update.
continue
},
};

handle_peer_view_change(ctx, state, new_peer, old_view, rng).await;
}
},
NetworkBridgeEvent::PeerViewChange(peerid, new_view) => {
Expand All @@ -606,7 +690,7 @@ async fn handle_network_msg<Context>(
gum::trace!(target: LOG_TARGET, ?new_view, "Our view change");
handle_our_view_change(state, new_view);
},
NetworkBridgeEvent::PeerMessage(remote, Versioned::V1(message)) =>
NetworkBridgeEvent::PeerMessage(remote, message) =>
process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await,
}
}
Expand Down Expand Up @@ -635,6 +719,9 @@ fn handle_our_view_change(state: &mut ProtocolState, view: OurView) {

// Send the difference between two views which were not sent
// to that particular peer.
//
// This requires that there is an entry in the `peer_data` field for the
// peer.
#[overseer::contextbounds(BitfieldDistribution, prefix=self::overseer)]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
Expand All @@ -643,13 +730,20 @@ async fn handle_peer_view_change<Context>(
view: View,
rng: &mut (impl CryptoRng + Rng),
) {
let added = state
.peer_views
.entry(origin.clone())
.or_default()
.replace_difference(view)
.cloned()
.collect::<Vec<_>>();
let peer_data = match state.peer_data.get_mut(&origin) {
None => {
gum::warn!(
target: LOG_TARGET,
peer = ?origin,
"Attempted to update peer view for unknown peer."
);

return
},
Some(pd) => pd,
};

let added = peer_data.view.replace_difference(view).cloned().collect::<Vec<_>>();

let topology = state.topologies.get_current_topology();
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin);
Expand Down Expand Up @@ -716,6 +810,12 @@ async fn send_tracked_gossip_message<Context>(
"Sending gossip message"
);

let version = if let Some(peer_data) = state.peer_data.get(&dest) {
peer_data.version
} else {
return
};

job_data
.message_sent_to_peer
.entry(dest.clone())
Expand All @@ -724,7 +824,7 @@ async fn send_tracked_gossip_message<Context>(

ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![dest],
message.into_validation_protocol(),
message.into_validation_protocol(version),
))
.await;
}
Expand Down