Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix test subsystem context
  • Loading branch information
rphmeier committed Mar 27, 2021
commit 8b50bc26934d2d3faa31770dd639257bf15c69af
58 changes: 38 additions & 20 deletions node/subsystem-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::{
FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem,
SpawnedSubsystem, OverseerSignal,
SpawnedSubsystem, OverseerSignal, SubsystemSender,
};
use polkadot_node_subsystem_util::TimeoutExt;

Expand Down Expand Up @@ -156,9 +156,41 @@ pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
(SingleItemSink(inner.clone()), SingleItemStream(inner))
}

/// A test subsystem sender.
#[derive(Clone)]
pub struct TestSubsystemSender {
tx: mpsc::UnboundedSender<AllMessages>,
}

#[async_trait::async_trait]
impl SubsystemSender for TestSubsystemSender {
async fn send_message(&mut self, msg: AllMessages) {
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
}

async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
{
let mut iter = stream::iter(msgs.into_iter().map(Ok));
self.tx
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
}

fn send_unbounded_message(&mut self, msg: AllMessages) {
self.tx.unbounded_send(msg).expect("test overseer no longer live");
}
}

/// A test subsystem context.
pub struct TestSubsystemContext<M, S> {
tx: mpsc::UnboundedSender<AllMessages>,
tx: TestSubsystemSender,
rx: SingleItemStream<FromOverseer<M>>,
spawn: S,
}
Expand All @@ -168,6 +200,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
for TestSubsystemContext<M, S>
{
type Message = M;
type Sender = TestSubsystemSender;

async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Expand Down Expand Up @@ -198,23 +231,8 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
Ok(())
}

async fn send_message(&mut self, msg: AllMessages) {
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
}

async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
{
let mut iter = stream::iter(msgs.into_iter().map(Ok));
self.tx
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
fn sender(&mut self) -> &mut TestSubsystemSender {
&mut self.tx
}
}

Expand Down Expand Up @@ -260,7 +278,7 @@ pub fn make_subsystem_context<M, S>(

(
TestSubsystemContext {
tx: all_messages_tx,
tx: TestSubsystemSender { tx: all_messages_tx },
rx: overseer_rx,
spawn,
},
Expand Down