Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
91 changes: 61 additions & 30 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
peer_set::MAX_NOTIFICATION_SIZE,
v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_primitives::approval::{
Expand Down Expand Up @@ -1381,14 +1382,7 @@ impl State {
"Sending assignments to unified peer",
);

sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer_id],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments_to_send),
)),
))
.await;
send_assignments_batched(sender, assignments_to_send, peer_id).await;
}

if !approvals_to_send.is_empty() {
Expand All @@ -1399,14 +1393,7 @@ impl State {
"Sending approvals to unified peer",
);

sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer_id],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals_to_send),
)),
))
.await;
send_approvals_batched(sender, approvals_to_send, peer_id).await;
}
}

Expand Down Expand Up @@ -1605,23 +1592,11 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi
// Send messages in accumulated packets, assignments preceding approvals.

for (peer, assignments_packet) in peer_assignments {
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments_packet),
)),
))
.await;
send_assignments_batched(ctx.sender(), assignments_packet, peer).await;
}

for (peer, approvals_packet) in peer_approvals {
ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals_packet),
)),
))
.await;
send_approvals_batched(ctx.sender(), approvals_packet, peer).await;
}
}

Expand Down Expand Up @@ -1758,3 +1733,59 @@ impl<Context> ApprovalDistribution {
SpawnedSubsystem { name: "approval-distribution-subsystem", future }
}
}

/// The maximum amount of assignments per batch is 33% of maximum allowed by protocol.
/// This is an arbitrary value. Bumping this up increases the maximum amount of approvals or assignments
/// we send in a single message to peers. Exceeding `MAX_NOTIFICATION_SIZE` will violate the protocol
/// configuration.
pub const MAX_BATCH_SIZE: usize = MAX_NOTIFICATION_SIZE as usize /
std::mem::size_of::<(IndirectAssignmentCert, CandidateIndex)>() /
3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be safer if we guaranteed via min that it is at least as large as a single message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the size the same for approvals and assignments. I would assume that assignments are larger, so you just picked the larger one. ... Still just having two constants here would be hardly more work, but more correct and a bit more robust against future changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. Initially I had 2 constants, but Idecided that it would be simpler with one, without compromising on anything else. I'll change it back to 2 constants and also provide the said guarantee.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/// Send assignments while honoring the `max_notification_size` of the protocol.
///
/// Splitting the messages into multiple notifications allows more granular processing at the
/// destination, such that the subsystem doesn't get stuck for long processing a batch
/// of assignments and can `select!` other tasks.
pub(crate) async fn send_assignments_batched(
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>,
peer: PeerId,
) {
let mut batches = assignments.into_iter().peekable();

while batches.peek().is_some() {
let batch: Vec<_> = batches.by_ref().take(MAX_BATCH_SIZE).collect();

sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(batch),
)),
))
.await;
}
}

/// Send approvals while honoring the `max_notification_size` of the protocol.
pub(crate) async fn send_approvals_batched(
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
approvals: Vec<IndirectSignedApprovalVote>,
peer: PeerId,
) {
let mut batches = approvals.into_iter().peekable();

while batches.peek().is_some() {
let batch: Vec<_> = batches.by_ref().take(MAX_BATCH_SIZE).collect();

sender
.send_message(NetworkBridgeTxMessage::SendValidationMessage(
vec![peer],
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(batch),
)),
))
.await;
}
}
128 changes: 128 additions & 0 deletions node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2276,3 +2276,131 @@ fn resends_messages_periodically() {
virtual_overseer
});
}

fn batch_test_round(message_count: usize) {
use polkadot_node_subsystem::SubsystemContext;
let pool = sp_core::testing::TaskExecutor::new();
let mut state = State::default();

let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = ApprovalDistribution::new(Default::default());
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345);
let mut sender = context.sender().clone();
let subsystem = subsystem.run_inner(context, &mut state, &mut rng);

let test_fut = async move {
let overseer = &mut virtual_overseer;
let validators = 0..message_count;
let assignments: Vec<_> = validators
.clone()
.map(|index| (fake_assignment_cert(Hash::zero(), ValidatorIndex(index as u32)), 0))
.collect();

let approvals: Vec<_> = validators
.map(|index| IndirectSignedApprovalVote {
block_hash: Hash::zero(),
candidate_index: 0,
validator: ValidatorIndex(index as u32),
signature: dummy_signature(),
})
.collect();

let peer = PeerId::random();
send_assignments_batched(&mut sender, assignments.clone(), peer).await;
send_approvals_batched(&mut sender, approvals.clone(), peer).await;

// Check expected assignments batches.
for assignment_index in (0..assignments.len()).step_by(super::MAX_BATCH_SIZE) {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments)
))
)) => {
// Last batch should cover all remaining messages.
if sent_assignments.len() < super::MAX_BATCH_SIZE {
assert_eq!(sent_assignments.len() + assignment_index, assignments.len());
} else {
assert_eq!(sent_assignments.len(), super::MAX_BATCH_SIZE);
}

assert_eq!(peers.len(), 1);

for (message_index, assignment) in sent_assignments.iter().enumerate() {
assert_eq!(assignment.0, assignments[assignment_index + message_index].0);
assert_eq!(assignment.1, 0);
}
}
);
}

// Check approval vote batching.
for approval_index in (0..approvals.len()).step_by(super::MAX_BATCH_SIZE) {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
))
)) => {
// Last batch should cover all remaining messages.
if sent_approvals.len() < super::MAX_BATCH_SIZE {
assert_eq!(sent_approvals.len() + approval_index, approvals.len());
} else {
assert_eq!(sent_approvals.len(), super::MAX_BATCH_SIZE);
}

assert_eq!(peers.len(), 1);

for (message_index, approval) in sent_approvals.iter().enumerate() {
assert_eq!(approval, &approvals[approval_index + message_index]);
}
}
);
}
virtual_overseer
};

futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);

executor::block_on(future::join(
async move {
let mut overseer = test_fut.await;
overseer
.send(FromOrchestra::Signal(OverseerSignal::Conclude))
.timeout(TIMEOUT)
.await
.expect("Conclude send timeout");
},
subsystem,
));
}

#[test]
fn batch_sending_1_msg() {
batch_test_round(1);
}

#[test]
fn batch_sending_exactly_one_batch() {
batch_test_round(super::MAX_BATCH_SIZE);
}

#[test]
fn batch_sending_partial_batch() {
batch_test_round(super::MAX_BATCH_SIZE * 2 + 4);
}

#[test]
fn batch_sending_multiple_same_len() {
batch_test_round(super::MAX_BATCH_SIZE * 10);
}

#[test]
fn batch_sending_half_batch() {
batch_test_round(super::MAX_BATCH_SIZE / 2);
}
2 changes: 1 addition & 1 deletion node/network/protocol/src/peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const LEGACY_COLLATION_PROTOCOL_V1: &str = "/polkadot/collation/1";
const LEGACY_PROTOCOL_VERSION_V1: u32 = 1;

/// Max notification size is currently constant.
const MAX_NOTIFICATION_SIZE: u64 = 100 * 1024;
pub const MAX_NOTIFICATION_SIZE: u64 = 100 * 1024;

/// The peer-sets and thus the protocols which are used for the network.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)]
Expand Down