diff --git a/Cargo.lock b/Cargo.lock index 2bd85a4c9d72..0cc944998918 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3308,6 +3308,8 @@ dependencies = [ "futures-timer 3.0.2", "kv-log-macro", "log 0.4.8", + "polkadot-primitives", + "polkadot-statement-table", "streamunordered", ] @@ -7186,6 +7188,18 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" +[[package]] +name = "statement-gossip-subsystem" +version = "0.1.0" +dependencies = [ + "exit-future", + "futures 0.3.5", + "overseer", + "polkadot-network", + "polkadot-primitives", + "polkadot-statement-table", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index dd266485c91d..9192ff666d08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ members = [ "statement-table", "service", "validation", - + "statement-gossip-subsystem", "parachain/test-parachains", "parachain/test-parachains/adder", "parachain/test-parachains/adder/collator", diff --git a/overseer/Cargo.toml b/overseer/Cargo.toml index bcd0a8e9e529..d04fbe4ad81b 100644 --- a/overseer/Cargo.toml +++ b/overseer/Cargo.toml @@ -9,6 +9,8 @@ futures = "0.3.5" log = "0.4.8" futures-timer = "3.0.2" streamunordered = "0.5.1" +polkadot-primitives = { path = "../primitives" } +polkadot-statement-table = { path = "../statement-table" } [dev-dependencies] futures = { version = "0.3.5", features = ["thread-pool"] } diff --git a/overseer/examples/minimal-example.rs b/overseer/examples/minimal-example.rs index 4dd37dbafea1..f1ef0125d189 100644 --- a/overseer/examples/minimal-example.rs +++ b/overseer/examples/minimal-example.rs @@ -101,6 +101,16 @@ impl Subsystem for Subsystem2 { } } + +struct DummySubsystem; + +impl Subsystem for DummySubsystem { + fn start(&mut self, _: SubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(Delay::new(Duration::from_secs(10)))) + } +} + + fn main() { femme::with_level(femme::LevelFilter::Trace); let spawner = executor::ThreadPool::new().unwrap(); @@ -113,6 +123,7 @@ fn main() { let (overseer, _handler) = Overseer::new( Box::new(Subsystem2), Box::new(Subsystem1), + Box::new(DummySubsystem), spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index f7ac6cac5079..6ecab6d5c31a 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -69,6 +69,8 @@ use futures::{ }; use futures_timer::Delay; use streamunordered::{StreamYield, StreamUnordered}; +use polkadot_primitives::Hash; +use polkadot_statement_table::SignedStatement; /// An error type that describes faults that may happen /// @@ -225,9 +227,9 @@ pub struct SubsystemContext{ #[derive(Debug)] pub enum OverseerSignal { /// `Subsystem` should start working. - StartWork, + StartWork(Hash), /// `Subsystem` should stop working. - StopWork, + StopWork(Hash), /// Conclude the work of the `Overseer` and all `Subsystem`s. Conclude, } @@ -249,6 +251,16 @@ pub enum CandidateBackingSubsystemMessage { Second, } + +#[derive(Debug)] +/// A message type used by the StatementGossip [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +pub enum StatementGossipSubsystemMessage { + ToGossip { relay_parent: Hash, statement: SignedStatement }, + Received { relay_parent: Hash, statement: SignedStatement } +} + /// A message type tying together all message types that are used across [`Subsystem`]s. /// /// [`Subsystem`]: trait.Subsystem.html @@ -256,6 +268,7 @@ pub enum CandidateBackingSubsystemMessage { pub enum AllMessages { Validation(ValidationSubsystemMessage), CandidateBacking(CandidateBackingSubsystemMessage), + StatementGossip(StatementGossipSubsystemMessage) } /// A message type that a [`Subsystem`] receives from the [`Overseer`]. @@ -355,6 +368,9 @@ pub struct Overseer { /// A candidate backing subsystem candidate_backing_subsystem: OverseenSubsystem, + /// A statement gossip subsystem + statement_gossip_subsystem: OverseenSubsystem, + /// Spawner to spawn tasks to. s: S, @@ -473,6 +489,7 @@ where pub fn new( validation: Box + Send>, candidate_backing: Box + Send>, + statement_gossip: Box + Send>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -498,9 +515,17 @@ where candidate_backing, )?; + let statement_gossip_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + statement_gossip, + )?; + let this = Self { validation_subsystem, candidate_backing_subsystem, + statement_gossip_subsystem, s, running_subsystems, running_subsystems_rx, @@ -588,6 +613,11 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + AllMessages::StatementGossip(msg) => { + if let Some(ref mut s) = self.statement_gossip_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } } } diff --git a/statement-gossip-subsystem/Cargo.toml b/statement-gossip-subsystem/Cargo.toml new file mode 100644 index 000000000000..ec16eae6e773 --- /dev/null +++ b/statement-gossip-subsystem/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "statement-gossip-subsystem" +version = "0.1.0" +authors = ["Ashley Ruglys "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +polkadot-network = { path = "../network" } +statement-table = { package = "polkadot-statement-table", path = "../statement-table" } +primitives = { package = "polkadot-primitives", path = "../primitives" } +overseer = { path = "../overseer" } +exit-future = "0.2.0" +futures = "0.3.5" diff --git a/statement-gossip-subsystem/src/lib.rs b/statement-gossip-subsystem/src/lib.rs new file mode 100644 index 000000000000..d0e4d6919a94 --- /dev/null +++ b/statement-gossip-subsystem/src/lib.rs @@ -0,0 +1,106 @@ +use statement_table::SignedStatement; +use primitives::Hash; +use std::collections::HashMap; +use futures::channel::mpsc::{Sender, Receiver, channel}; +use futures::prelude::*; +use futures::future::ready; +use polkadot_network::legacy::GossipService; +use polkadot_network::legacy::gossip::{GossipMessage, GossipStatement}; +use overseer::*; + +type Jobs = HashMap)>; +type Message = StatementGossipSubsystemMessage; + +pub struct StatementGossipSubsystem { + /// This comes from the legacy networking code, so it is likely to be changed. + gossip_service: GS, +} + +impl StatementGossipSubsystem { + pub fn new(gossip_service: GS) -> Self { + Self { + gossip_service + } + } +} + +impl Subsystem for StatementGossipSubsystem { + fn start(&mut self, mut ctx: SubsystemContext) -> SpawnedSubsystem { + let mut jobs = Jobs::new(); + let gossip_service = self.gossip_service.clone(); + let (jobs_to_subsystem_s, mut jobs_to_subsystem_r) = channel(1024); + + SpawnedSubsystem(Box::pin(async move { + loop { + match ctx.try_recv().await { + Ok(Some(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)))) => { + let (signal, exit) = exit_future::signal(); + let (subsystem_to_job_s, subsystem_to_job_r) = channel(1024); + + ctx.spawn(statement_gossip_job( + gossip_service.clone(), relay_parent, exit, jobs_to_subsystem_s.clone(), subsystem_to_job_r, + ).boxed()).await.unwrap(); + + jobs.insert(relay_parent, (signal, subsystem_to_job_s)); + }, + Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)))) => { + if let Some((signal, _)) = jobs.remove(&relay_parent) { + signal.fire().unwrap(); + } else { + println!("Error: No jobs for {:?} running", relay_parent); + } + }, + Ok(Some(FromOverseer::Communication { msg: Message::ToGossip { relay_parent, statement }})) => { + if let Some((_, sender)) = jobs.get_mut(&relay_parent) { + sender.try_send(statement).unwrap(); + } + }, + Ok(Some(FromOverseer::Communication { msg: Message::Received { .. }})) => {}, + Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) | Ok(None) | Err(_) => return, + } + + if let Some(msg) = jobs_to_subsystem_r.next().await { + let _ = ctx.send_msg(AllMessages::StatementGossip(msg)).await; + } + + } + })) + } +} + +async fn statement_gossip_job( + gossip_service: GS, + // The relay parent that this job is running for. + relay_parent: Hash, + // A future that resolves with the associated `exit_future::Signal` is fired. + exit_future: exit_future::Exit, + // A cloned sender of messages to the overseer. This channel is shared between all jobs. + mut overseer_sender: Sender, + // A receiver of messages from the subsystem. This channel is exclusive to this job. + subsystem_receiver: Receiver, +) { + println!("Job for {:?} started", relay_parent); + + let mut incoming = gossip_service.gossip_messages_for(relay_parent) + .filter_map(|(gossip_message, _)| match gossip_message { + GossipMessage::Statement(statement) => ready(Some(statement)), + _ => ready(None) + }) + .for_each(|statement| { + overseer_sender.try_send(StatementGossipSubsystemMessage::Received { relay_parent, statement: statement.signed_statement }).unwrap(); + ready(()) + }) + .fuse(); + + let mut outgoing = subsystem_receiver.for_each(|statement| { + gossip_service.gossip_message(relay_parent, GossipMessage::Statement(GossipStatement { relay_chain_leaf: relay_parent, signed_statement: statement })); + ready(()) + }); + + futures::select! { + _ = exit_future.fuse() => {}, + _ = incoming => {}, + _ = outgoing => {}, + } + println!("Job for {:?} stopped", relay_parent); +}