Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
network/src/legacy/gossip: Wrap GossipEngine in Arc Mutex & lock it o…
…n use

`GossipEngine` in itself has no need to be Send and Sync, given that it
does not rely on separately spawned background tasks anymore.
`RegisteredMessageValidator` needs to be `Send` and `Sync` due to the
inherited trait bounds from implementing `GossipService`. In addition
`RegisteredMessageValidator` derives `Clone`. Thereby `GossipEngine`
needs to be wrapped in an `Arc` and `Mutex` to keep the status quo.
  • Loading branch information
mxinden committed Mar 9, 2020
commit c19d707505c463c5b3d0e391f736c87a9f62efa6
30 changes: 19 additions & 11 deletions network/src/legacy/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use std::sync::Arc;

use arrayvec::ArrayVec;
use futures::prelude::*;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};

use crate::legacy::{GossipMessageStream, GossipService};

Expand Down Expand Up @@ -289,20 +289,28 @@ pub fn register_validator<C: ChainContext + 'static>(
});

let gossip_side = validator.clone();
let gossip_engine = sc_network_gossip::GossipEngine::new(
let gossip_engine = Arc::new(Mutex::new(sc_network_gossip::GossipEngine::new(
service.clone(),
POLKADOT_ENGINE_ID,
POLKADOT_PROTOCOL_NAME,
gossip_side,
);
)));

// Spawn gossip engine.
//
// Ideally this would not be spawned as an orphaned task, but polled by
// `RegisteredMessageValidator` which in turn would be polled by a `ValidationNetwork`.
let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(gossip_engine.clone())));
{
let gossip_engine = gossip_engine.clone();
let fut = futures::future::poll_fn(move |cx| {
gossip_engine.lock().poll_unpin(cx)
});
let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));

// Note: we consider the chances of an error to spawn a background task almost null.
if spawn_res.is_err() {
log::error!(target: "polkadot-gossip", "Failed to spawn background task");
// Note: we consider the chances of an error to spawn a background task almost null.
if spawn_res.is_err() {
log::error!(target: "polkadot-gossip", "Failed to spawn background task");
}
}

RegisteredMessageValidator {
Expand Down Expand Up @@ -350,7 +358,7 @@ pub struct RegisteredMessageValidator {
// Note: this is always `Some` in real code and `None` in tests.
service: Option<Arc<NetworkService<Block, Hash>>>,
// Note: this is always `Some` in real code and `None` in tests.
gossip_engine: Option<sc_network_gossip::GossipEngine<Block>>,
gossip_engine: Option<Arc<Mutex<sc_network_gossip::GossipEngine<Block>>>>,
}

impl RegisteredMessageValidator {
Expand Down Expand Up @@ -398,7 +406,7 @@ impl RegisteredMessageValidator {

pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.messages_for(topic)
gossip_engine.lock().messages_for(topic)
} else {
log::error!("Called gossip_messages_for on a test engine");
futures::channel::mpsc::unbounded().1
Expand All @@ -409,7 +417,7 @@ impl RegisteredMessageValidator {

pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.gossip_message(
gossip_engine.lock().gossip_message(
topic,
message.encode(),
false,
Expand All @@ -421,7 +429,7 @@ impl RegisteredMessageValidator {

pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.send_message(vec![who], message.encode());
gossip_engine.lock().send_message(vec![who], message.encode());
} else {
log::error!("Called send_message on a test engine");
}
Expand Down