Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test that gossip streams are cleaned up correctly
  • Loading branch information
rphmeier committed Mar 12, 2020
commit c35dae86374c89dd63ec7e331c40989135a0e6f0
4 changes: 1 addition & 3 deletions network/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl NetworkServiceOps for PolkadotNetworkService {
}

/// Operations that a handle to a gossip network should provide.
trait GossipOps: Clone + crate::legacy::GossipService {
trait GossipOps: Clone + Send + crate::legacy::GossipService + 'static {
fn new_local_leaf(
&self,
relay_parent: Hash,
Expand Down Expand Up @@ -374,8 +374,6 @@ struct ConsensusNetworkingInstance {
_drop_signal: exit_future::Signal,
}

type RegisteredMessageValidator = crate::legacy::gossip::RegisteredMessageValidator<crate::PolkadotProtocol>;

/// A utility future that resolves when the receiving end of a channel has hung up.
///
/// This is an `.await`-friendly interface around `poll_canceled`.
Expand Down
92 changes: 81 additions & 11 deletions network/src/protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use polkadot_primitives::{Block, Header, BlockId};
use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId,
Retriable, CollatorId, AbridgedCandidateReceipt,
GlobalValidationSchedule, LocalValidationData,
GlobalValidationSchedule, LocalValidationData, ErasureChunk,
};
use polkadot_validation::SharedTable;

use av_store::Store as AvailabilityStore;
use av_store::{Store as AvailabilityStore, ErasureNetworking};
use sc_network_gossip::TopicNotification;
use sp_blockchain::Result as ClientResult;
use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi};
use sp_runtime::traits::{Block as BlockT, HasherFor, NumberFor};
use sp_runtime::traits::{Block as BlockT, HashFor, NumberFor};
use sp_state_machine::ChangesTrieState;
use sp_core::{crypto::Pair, NativeOrEncoded, ExecutionContext};
use sp_keyring::Sr25519Keyring;
Expand All @@ -44,17 +44,27 @@ struct Recorded {
notifications: Vec<(PeerId, Message)>,
}

// Test setup registers receivers of gossip messages as well as signals that
// fire when they are taken.
type GossipStreamEntry = (mpsc::UnboundedReceiver<TopicNotification>, oneshot::Sender<()>);

#[derive(Default, Clone)]
struct MockGossip {
inner: Arc<Mutex<HashMap<Hash, mpsc::UnboundedReceiver<TopicNotification>>>>,
inner: Arc<Mutex<HashMap<Hash, GossipStreamEntry>>>,
}

impl MockGossip {
#[allow(unused)]
fn add_gossip_stream(&self, topic: Hash) -> mpsc::UnboundedSender<TopicNotification> {
fn add_gossip_stream(&self, topic: Hash)
-> (mpsc::UnboundedSender<TopicNotification>, oneshot::Receiver<()>)
{
let (tx, rx) = mpsc::unbounded();
self.inner.lock().insert(topic, rx);
tx
let (o_tx, o_rx) = oneshot::channel();
self.inner.lock().insert(topic, (rx, o_tx));
(tx, o_rx)
}

fn contains_listener(&self, topic: &Hash) -> bool {
self.inner.lock().contains_key(topic)
}
}

Expand Down Expand Up @@ -82,7 +92,10 @@ impl crate::legacy::GossipService for MockGossip {
fn gossip_messages_for(&self, topic: Hash) -> crate::legacy::GossipMessageStream {
crate::legacy::GossipMessageStream::new(match self.inner.lock().remove(&topic) {
None => Box::pin(stream::empty()),
Some(rx) => Box::pin(rx),
Some((rx, o_rx)) => {
let _ = o_rx.send(());
Box::pin(rx)
}
})
}

Expand Down Expand Up @@ -174,7 +187,7 @@ impl ApiErrorExt for RuntimeApi {
}

impl ApiExt<Block> for RuntimeApi {
type StateBackend = sp_state_machine::InMemoryBackend<sp_api::HasherFor<Block>>;
type StateBackend = sp_state_machine::InMemoryBackend<sp_api::HashFor<Block>>;

fn map_api_result<F: FnOnce(&Self) -> Result<R, E>, R, E>(
&self,
Expand All @@ -196,7 +209,7 @@ impl ApiExt<Block> for RuntimeApi {
fn into_storage_changes(
&self,
_: &Self::StateBackend,
_: Option<&ChangesTrieState<HasherFor<Block>, NumberFor<Block>>>,
_: Option<&ChangesTrieState<HashFor<Block>, NumberFor<Block>>>,
_: <Block as sp_api::BlockT>::Hash,
) -> std::result::Result<sp_api::StorageChanges<Self::StateBackend, Block>, String>
where Self: Sized
Expand Down Expand Up @@ -500,3 +513,60 @@ fn validator_key_spillover_cleaned() {
}).await);
});
}

#[test]
fn erasure_fetch_drop_also_drops_gossip_sender() {
let (mut service, gossip, worker_task) = test_setup(Config { collating_for: None });
let candidate_hash = [1; 32].into();

let expected_index = 1;

let executor = EXECUTOR.clone();
executor.spawn(worker_task).unwrap();

let topic = crate::erasure_coding_topic(&candidate_hash);
let (mut gossip_tx, gossip_taken_rx) = gossip.add_gossip_stream(topic);

futures::executor::block_on(async move {
let chunk_listener = service.fetch_erasure_chunk(
&candidate_hash,
expected_index,
);

// spawn an abortable handle to the chunk listener future.
// we will wait until this future has proceeded enough to start grabbing
// messages from gossip, and then we will abort the future.
let (chunk_listener, abort_handle) = future::abortable(chunk_listener);
executor.spawn(chunk_listener.map(|_| ())).unwrap();
gossip_taken_rx.await.unwrap();

// gossip listener was taken. and is active.
assert!(!gossip.contains_listener(&topic));
assert!(!gossip_tx.is_closed());

abort_handle.abort();

loop {
// if dropping the sender leads to the gossip listener
// being cleaned up, we will eventually be unable to send a message
// on the sender.
if gossip_tx.is_closed() { break }

let fake_chunk = GossipMessage::ErasureChunk(
crate::legacy::gossip::ErasureChunkMessage {
chunk: ErasureChunk {
chunk: vec![],
index: expected_index + 1,
proof: vec![],
},
candidate_hash,
}
).encode();

match gossip_tx.send(TopicNotification { message: fake_chunk, sender: None }).await {
Err(e) => { assert!(e.is_disconnected()); break },
Ok(_) => continue,
}
}
});
}