Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 48d5c45

Browse files
arkpargavofyork
authored andcommitted
Batch gossip messages (#4055)
1 parent e676a10 commit 48d5c45

File tree

3 files changed

+111
-75
lines changed

3 files changed

+111
-75
lines changed

core/network/src/protocol.rs

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,14 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
6969
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);
7070

7171
/// Current protocol version.
72-
pub(crate) const CURRENT_VERSION: u32 = 4;
72+
pub(crate) const CURRENT_VERSION: u32 = 5;
7373
/// Lowest version we support
7474
pub(crate) const MIN_VERSION: u32 = 3;
7575

7676
// Maximum allowed entries in `BlockResponse`
7777
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
78+
// Maximum allowed entries in `ConsensusBatch`
79+
const MAX_CONSENSUS_MESSAGES: usize = 256;
7880
/// When light node connects to the full node and the full node is behind light node
7981
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful
8082
/// and disconnect to free connection slot.
@@ -298,7 +300,7 @@ pub trait Context<B: BlockT> {
298300
fn disconnect_peer(&mut self, who: PeerId);
299301

300302
/// Send a consensus message to a peer.
301-
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage);
303+
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>);
302304

303305
/// Send a chain-specific message to a peer.
304306
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>);
@@ -330,13 +332,33 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
330332
self.behaviour.disconnect_peer(&who)
331333
}
332334

333-
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) {
334-
send_message::<B> (
335-
self.behaviour,
336-
&mut self.context_data.stats,
337-
&who,
338-
GenericMessage::Consensus(consensus)
339-
)
335+
fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
336+
if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) {
337+
let mut batch = Vec::new();
338+
let len = messages.len();
339+
for (index, message) in messages.into_iter().enumerate() {
340+
batch.reserve(MAX_CONSENSUS_MESSAGES);
341+
batch.push(message);
342+
if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 {
343+
send_message::<B> (
344+
self.behaviour,
345+
&mut self.context_data.stats,
346+
&who,
347+
GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())),
348+
)
349+
}
350+
}
351+
} else {
352+
// Backwards compatibility
353+
for message in messages {
354+
send_message::<B> (
355+
self.behaviour,
356+
&mut self.context_data.stats,
357+
&who,
358+
GenericMessage::Consensus(message)
359+
)
360+
}
361+
}
340362
}
341363

342364
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
@@ -598,13 +620,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
598620
GenericMessage::RemoteReadChildRequest(request) =>
599621
self.on_remote_read_child_request(who, request),
600622
GenericMessage::Consensus(msg) => {
601-
if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) {
602-
self.consensus_gossip.on_incoming(
603-
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
604-
who,
605-
msg,
606-
);
607-
}
623+
self.consensus_gossip.on_incoming(
624+
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
625+
who,
626+
vec![msg],
627+
);
628+
}
629+
GenericMessage::ConsensusBatch(messages) => {
630+
self.consensus_gossip.on_incoming(
631+
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
632+
who,
633+
messages,
634+
);
608635
}
609636
GenericMessage::ChainSpecific(msg) => self.specialization.on_message(
610637
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),

core/network/src/protocol/consensus_gossip.rs

Lines changed: 65 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
171171

172172
/// Send addressed message to a peer.
173173
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
174-
self.protocol.send_consensus(who.clone(), ConsensusMessage {
174+
self.protocol.send_consensus(who.clone(), vec![ConsensusMessage {
175175
engine_id: self.engine_id,
176176
data: message,
177-
});
177+
}]);
178178
}
179179

180180
/// Send all messages with given topic to a peer.
@@ -190,7 +190,7 @@ fn propagate<'a, B: BlockT, I>(
190190
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
191191
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
192192
)
193-
where I: IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
193+
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
194194
{
195195
let mut check_fns = HashMap::new();
196196
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
@@ -206,8 +206,9 @@ fn propagate<'a, B: BlockT, I>(
206206
(check_fn)(who, intent, topic, &message.data)
207207
};
208208

209-
for (message_hash, topic, message) in messages {
210-
for (id, ref mut peer) in peers.iter_mut() {
209+
for (id, ref mut peer) in peers.iter_mut() {
210+
let mut batch = Vec::new();
211+
for (message_hash, topic, message) in messages.clone() {
211212
let previous_attempts = peer.filtered_messages
212213
.get(&message_hash)
213214
.cloned()
@@ -245,8 +246,9 @@ fn propagate<'a, B: BlockT, I>(
245246
peer.known_messages.insert(message_hash.clone());
246247

247248
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
248-
protocol.send_consensus(id.clone(), message.clone());
249+
batch.push(message.clone())
249250
}
251+
protocol.send_consensus(id.clone(), batch);
250252
}
251253
}
252254

@@ -477,65 +479,68 @@ impl<B: BlockT> ConsensusGossip<B> {
477479
&mut self,
478480
protocol: &mut dyn Context<B>,
479481
who: PeerId,
480-
message: ConsensusMessage,
482+
messages: Vec<ConsensusMessage>,
481483
) {
482-
let message_hash = HashFor::<B>::hash(&message.data[..]);
484+
trace!(target:"gossip", "Received {} messages from peer {}", messages.len(), who);
485+
for message in messages {
486+
let message_hash = HashFor::<B>::hash(&message.data[..]);
483487

484-
if self.known_messages.contains_key(&message_hash) {
485-
trace!(target:"gossip", "Ignored already known message from {}", who);
486-
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
487-
return;
488-
}
488+
if self.known_messages.contains_key(&message_hash) {
489+
trace!(target:"gossip", "Ignored already known message from {}", who);
490+
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
491+
continue;
492+
}
489493

490-
let engine_id = message.engine_id;
491-
// validate the message
492-
let validation = self.validators.get(&engine_id)
493-
.cloned()
494-
.map(|v| {
495-
let mut context = NetworkContext { gossip: self, protocol, engine_id };
496-
v.validate(&mut context, &who, &message.data)
497-
});
494+
let engine_id = message.engine_id;
495+
// validate the message
496+
let validation = self.validators.get(&engine_id)
497+
.cloned()
498+
.map(|v| {
499+
let mut context = NetworkContext { gossip: self, protocol, engine_id };
500+
v.validate(&mut context, &who, &message.data)
501+
});
498502

499-
let validation_result = match validation {
500-
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
501-
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
502-
Some(ValidationResult::Discard) => None,
503-
None => {
504-
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
505-
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
506-
protocol.disconnect_peer(who);
507-
return;
508-
}
509-
};
503+
let validation_result = match validation {
504+
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
505+
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
506+
Some(ValidationResult::Discard) => None,
507+
None => {
508+
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
509+
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
510+
protocol.disconnect_peer(who.clone());
511+
continue;
512+
}
513+
};
510514

511-
if let Some((topic, keep)) = validation_result {
512-
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
513-
if let Some(ref mut peer) = self.peers.get_mut(&who) {
514-
peer.known_messages.insert(message_hash);
515-
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
516-
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
517-
entry.get_mut().retain(|sink| {
518-
if let Err(e) = sink.unbounded_send(TopicNotification {
519-
message: message.data.clone(),
520-
sender: Some(who.clone())
521-
}) {
522-
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
515+
if let Some((topic, keep)) = validation_result {
516+
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
517+
if let Some(ref mut peer) = self.peers.get_mut(&who) {
518+
peer.known_messages.insert(message_hash);
519+
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
520+
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
521+
entry.get_mut().retain(|sink| {
522+
if let Err(e) = sink.unbounded_send(TopicNotification {
523+
message: message.data.clone(),
524+
sender: Some(who.clone())
525+
}) {
526+
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
527+
}
528+
!sink.is_closed()
529+
});
530+
if entry.get().is_empty() {
531+
entry.remove_entry();
523532
}
524-
!sink.is_closed()
525-
});
526-
if entry.get().is_empty() {
527-
entry.remove_entry();
528533
}
529-
}
530-
if keep {
531-
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
534+
if keep {
535+
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
536+
}
537+
} else {
538+
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
539+
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
532540
}
533541
} else {
534-
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
535-
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
542+
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
536543
}
537-
} else {
538-
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
539544
}
540545
}
541546

@@ -555,6 +560,7 @@ impl<B: BlockT> ConsensusGossip<B> {
555560
};
556561

557562
if let Some(ref mut peer) = self.peers.get_mut(who) {
563+
let mut batch = Vec::new();
558564
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
559565
let intent = if force {
560566
MessageIntent::ForcedBroadcast
@@ -585,11 +591,12 @@ impl<B: BlockT> ConsensusGossip<B> {
585591
peer.known_messages.insert(entry.message_hash.clone());
586592

587593
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
588-
protocol.send_consensus(who.clone(), ConsensusMessage {
594+
batch.push(ConsensusMessage {
589595
engine_id: engine_id.clone(),
590596
data: entry.message.data.clone(),
591597
});
592598
}
599+
protocol.send_consensus(who.clone(), batch);
593600
}
594601
}
595602

@@ -626,8 +633,7 @@ impl<B: BlockT> ConsensusGossip<B> {
626633

627634
peer.filtered_messages.remove(&message_hash);
628635
peer.known_messages.insert(message_hash);
629-
630-
protocol.send_consensus(who.clone(), message.clone());
636+
protocol.send_consensus(who.clone(), vec![message.clone()]);
631637
}
632638
}
633639

@@ -812,7 +818,7 @@ mod tests {
812818
impl<B: BlockT> Context<B> for DummyNetworkContext {
813819
fn report_peer(&mut self, _who: PeerId, _reputation: i32) {}
814820
fn disconnect_peer(&mut self, _who: PeerId) {}
815-
fn send_consensus(&mut self, _who: PeerId, _consensus: ConsensusMessage) {}
821+
fn send_consensus(&mut self, _who: PeerId, _consensus: Vec<ConsensusMessage>) {}
816822
fn send_chain_specific(&mut self, _who: PeerId, _message: Vec<u8>) {}
817823
}
818824

core/network/src/protocol/message.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ pub mod generic {
217217
FinalityProofRequest(FinalityProofRequest<Hash>),
218218
/// Finality proof reponse.
219219
FinalityProofResponse(FinalityProofResponse<Hash>),
220+
/// Batch of consensus protocol messages.
221+
ConsensusBatch(Vec<ConsensusMessage>),
220222
/// Chain-specific message.
221223
#[codec(index = "255")]
222224
ChainSpecific(Vec<u8>),
@@ -243,6 +245,7 @@ pub mod generic {
243245
Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest",
244246
Message::FinalityProofRequest(_) => "FinalityProofRequest",
245247
Message::FinalityProofResponse(_) => "FinalityProofResponse",
248+
Message::ConsensusBatch(_) => "ConsensusBatch",
246249
Message::ChainSpecific(_) => "ChainSpecific",
247250
}
248251
}

0 commit comments

Comments
 (0)