diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index dc5d9b15d7b3..abcc714892b7 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true [dependencies] futures = "0.3.21" +futures-timer = "3" gum = { package = "tracing-gum", path = "../../gum" } polkadot-primitives = { path = "../../../primitives" } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 11d765d7aa95..55ff5f5ffcfc 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -103,6 +103,9 @@ enum MuxedMessage { Responder(Option), /// Messages from answered requests. Response(vstaging::UnhandledResponse), + /// Message that a request is ready to be retried. This just acts as a signal that we should + /// dispatch all pending requests again. + RetryRequest(()), } #[overseer::contextbounds(StatementDistribution, prefix = self::overseer)] @@ -114,19 +117,22 @@ impl MuxedMessage { from_v1_responder: &mut mpsc::Receiver, from_responder: &mut mpsc::Receiver, ) -> MuxedMessage { + let (request_manager, response_manager) = state.request_and_response_managers(); // We are only fusing here to make `select` happy, in reality we will quit if one of those // streams end: let from_orchestra = ctx.recv().fuse(); let from_v1_requester = from_v1_requester.next(); let from_v1_responder = from_v1_responder.next(); let from_responder = from_responder.next(); - let receive_response = vstaging::receive_response(state).fuse(); + let receive_response = vstaging::receive_response(response_manager).fuse(); + let retry_request = vstaging::next_retry(request_manager).fuse(); futures::pin_mut!( from_orchestra, from_v1_requester, from_v1_responder, from_responder, - receive_response + receive_response, + retry_request, ); futures::select! { msg = from_orchestra => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), @@ -134,6 +140,7 @@ impl MuxedMessage { msg = from_v1_responder => MuxedMessage::V1Responder(msg), msg = from_responder => MuxedMessage::Responder(msg), msg = receive_response => MuxedMessage::Response(msg), + msg = retry_request => MuxedMessage::RetryRequest(msg), } } } @@ -190,6 +197,7 @@ impl StatementDistributionSubsystem { .map_err(FatalError::SpawnTask)?; loop { + // Wait for the next message. let message = MuxedMessage::receive( &mut ctx, &mut state, @@ -198,6 +206,7 @@ impl StatementDistributionSubsystem { &mut res_receiver, ) .await; + match message { MuxedMessage::Subsystem(result) => { let result = self @@ -244,6 +253,11 @@ impl StatementDistributionSubsystem { MuxedMessage::Response(result) => { vstaging::handle_response(&mut ctx, &mut state, result).await; }, + MuxedMessage::RetryRequest(()) => { + // A pending request is ready to retry. This is only a signal to call + // `dispatch_requests` again. + () + }, }; vstaging::dispatch_requests(&mut ctx, &mut state).await; diff --git a/node/network/statement-distribution/src/vstaging/mod.rs b/node/network/statement-distribution/src/vstaging/mod.rs index 5fe39096c1cb..4ee9a0453fe2 100644 --- a/node/network/statement-distribution/src/vstaging/mod.rs +++ b/node/network/statement-distribution/src/vstaging/mod.rs @@ -58,9 +58,12 @@ use futures::{ SinkExt, StreamExt, }; -use std::collections::{ - hash_map::{Entry, HashMap}, - HashSet, +use std::{ + collections::{ + hash_map::{Entry, HashMap}, + HashSet, + }, + time::{Duration, Instant}, }; use crate::{ @@ -74,7 +77,7 @@ use groups::Groups; use requests::{CandidateIdentifier, RequestProperties}; use statement_store::{StatementOrigin, StatementStore}; -pub use requests::{RequestManager, UnhandledResponse}; +pub use requests::{RequestManager, ResponseManager, UnhandledResponse}; mod candidates; mod cluster; @@ -121,6 +124,9 @@ const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid st const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::BenefitMajorFirst("Peer was the first to provide a given valid statement"); +/// The amount of time to wait before retrying when the node sends a request and it is dropped. +pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1); + struct PerRelayParentState { local_validator: Option, statement_store: StatementStore, @@ -201,6 +207,7 @@ pub(crate) struct State { keystore: SyncCryptoStorePtr, authorities: HashMap, request_manager: RequestManager, + response_manager: ResponseManager, } impl State { @@ -215,8 +222,15 @@ impl State { keystore, authorities: HashMap::new(), request_manager: RequestManager::new(), + response_manager: ResponseManager::new(), } } + + pub(crate) fn request_and_response_managers( + &mut self, + ) -> (&mut RequestManager, &mut ResponseManager) { + (&mut self.request_manager, &mut self.response_manager) + } } // For the provided validator index, if there is a connected peer controlling the given authority @@ -2313,6 +2327,10 @@ async fn apply_post_confirmation( /// Dispatch pending requests for candidate data & statements. #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] pub(crate) async fn dispatch_requests(ctx: &mut Context, state: &mut State) { + if !state.request_manager.has_pending_requests() { + return + } + let peers = &state.peers; let peer_advertised = |identifier: &CandidateIdentifier, peer: &_| { let peer_data = peers.get(peer)?; @@ -2374,7 +2392,11 @@ pub(crate) async fn dispatch_requests(ctx: &mut Context, state: &mut St }) }; - while let Some(request) = state.request_manager.next_request(request_props, peer_advertised) { + while let Some(request) = state.request_manager.next_request( + &mut state.response_manager, + request_props, + peer_advertised, + ) { // Peer is supposedly connected. ctx.send_message(NetworkBridgeTxMessage::SendRequests( vec![Requests::AttestedCandidateVStaging(request)], @@ -2387,13 +2409,24 @@ pub(crate) async fn dispatch_requests(ctx: &mut Context, state: &mut St /// Wait on the next incoming response. If there are no requests pending, this /// future never resolves. It is the responsibility of the user of this API /// to interrupt the future. -pub(crate) async fn receive_response(state: &mut State) -> UnhandledResponse { - match state.request_manager.await_incoming().await { +pub(crate) async fn receive_response(response_manager: &mut ResponseManager) -> UnhandledResponse { + match response_manager.incoming().await { Some(r) => r, None => futures::future::pending().await, } } +/// Wait on the next soonest retry on a pending request. If there are no retries pending, this +/// future never resolves. Note that this only signals that a request is ready to retry; the user of +/// this API must call `dispatch_requests`. +pub(crate) async fn next_retry(request_manager: &mut RequestManager) { + match request_manager.next_retry_time() { + Some(instant) => + futures_timer::Delay::new(instant.saturating_duration_since(Instant::now())).await, + None => futures::future::pending().await, + } +} + /// Handles an incoming response. This does the actual work of validating the response, /// importing statements, sending acknowledgements, etc. #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] @@ -2615,7 +2648,7 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) { } /// Messages coming from the background respond task. -pub struct ResponderMessage { +pub(crate) struct ResponderMessage { request: IncomingRequest, sent_feedback: oneshot::Sender<()>, } @@ -2623,7 +2656,7 @@ pub struct ResponderMessage { /// A fetching task, taking care of fetching candidates via request/response. /// /// Runs in a background task and feeds request to [`answer_request`] through [`MuxedMessage`]. -pub async fn respond_task( +pub(crate) async fn respond_task( mut receiver: IncomingRequestReceiver, mut sender: mpsc::Sender, ) { diff --git a/node/network/statement-distribution/src/vstaging/requests.rs b/node/network/statement-distribution/src/vstaging/requests.rs index 507bbbb0ef18..355c2b0a85b3 100644 --- a/node/network/statement-distribution/src/vstaging/requests.rs +++ b/node/network/statement-distribution/src/vstaging/requests.rs @@ -29,6 +29,7 @@ use super::{ BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE, COST_INVALID_RESPONSE, COST_INVALID_SIGNATURE, COST_UNREQUESTED_RESPONSE_STATEMENT, + REQUEST_RETRY_DELAY, }; use crate::LOG_TARGET; @@ -49,9 +50,12 @@ use polkadot_primitives::vstaging::{ use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; -use std::collections::{ - hash_map::{Entry as HEntry, HashMap}, - HashSet, VecDeque, +use std::{ + collections::{ + hash_map::{Entry as HEntry, HashMap}, + HashSet, VecDeque, + }, + time::Instant, }; /// An identifier for a candidate. @@ -84,7 +88,27 @@ struct TaggedResponse { pub struct RequestedCandidate { priority: Priority, known_by: VecDeque, + /// Has the request been sent out and a response not yet received? in_flight: bool, + /// The timestamp for the next time we should retry, if the response failed. + next_retry_time: Option, +} + +impl RequestedCandidate { + fn is_pending(&self) -> bool { + if self.in_flight { + return false + } + + if let Some(next_retry_time) = self.next_retry_time { + let can_retry = Instant::now() >= next_retry_time; + if !can_retry { + return false + } + } + + true + } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -130,7 +154,6 @@ impl<'a> Entry<'a> { /// A manager for outgoing requests. pub struct RequestManager { - pending_responses: FuturesUnordered>, requests: HashMap, // sorted by priority. by_priority: Vec<(Priority, CandidateIdentifier)>, @@ -142,7 +165,6 @@ impl RequestManager { /// Create a new [`RequestManager`]. pub fn new() -> Self { RequestManager { - pending_responses: FuturesUnordered::new(), requests: HashMap::new(), by_priority: Vec::new(), unique_identifiers: HashMap::new(), @@ -166,6 +188,7 @@ impl RequestManager { priority: Priority { attempts: 0, origin: Origin::Unspecified }, known_by: VecDeque::new(), in_flight: false, + next_retry_time: None, }), true, ), @@ -241,6 +264,30 @@ impl RequestManager { } } + /// Returns true if there are pending requests that are dispatchable. + pub fn has_pending_requests(&self) -> bool { + for (_id, entry) in &self.requests { + if entry.is_pending() { + return true + } + } + + false + } + + /// Returns an instant at which the next request to be retried will be ready. + pub fn next_retry_time(&mut self) -> Option { + let mut next = None; + for (_id, request) in &self.requests { + if let Some(next_retry_time) = request.next_retry_time { + if next.map_or(true, |next| next_retry_time < next) { + next = Some(next_retry_time); + } + } + } + next + } + /// Yields the next request to dispatch, if there is any. /// /// This function accepts two closures as an argument. @@ -254,10 +301,11 @@ impl RequestManager { /// threshold and returns `None` if the peer is no longer connected. pub fn next_request( &mut self, + response_manager: &mut ResponseManager, request_props: impl Fn(&CandidateIdentifier) -> Option, peer_advertised: impl Fn(&CandidateIdentifier, &PeerId) -> Option, ) -> Option> { - if self.pending_responses.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize { + if response_manager.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize { return None } @@ -282,7 +330,7 @@ impl RequestManager { Some(e) => e, }; - if entry.in_flight { + if !entry.is_pending() { continue } @@ -313,7 +361,7 @@ impl RequestManager { ); let stored_id = id.clone(); - self.pending_responses.push(Box::pin(async move { + response_manager.push(Box::pin(async move { TaggedResponse { identifier: stored_id, requested_peer: target, @@ -343,15 +391,34 @@ impl RequestManager { res } +} + +/// A manager for pending responses. +pub struct ResponseManager { + pending_responses: FuturesUnordered>, +} + +impl ResponseManager { + pub fn new() -> Self { + Self { pending_responses: FuturesUnordered::new() } + } /// Await the next incoming response to a sent request, or immediately /// return `None` if there are no pending responses. - pub async fn await_incoming(&mut self) -> Option { + pub async fn incoming(&mut self) -> Option { self.pending_responses .next() .await .map(|response| UnhandledResponse { response }) } + + fn len(&self) -> usize { + self.pending_responses.len() + } + + fn push(&mut self, response: BoxFuture<'static, TaggedResponse>) { + self.pending_responses.push(response); + } } /// Properties used in target selection and validation of a request. @@ -484,6 +551,8 @@ impl UnhandledResponse { Err(_) => unreachable!("requested candidates always have a priority entry; qed"), }; + // Set the next retry time before clearing the `in_flight` flag. + entry.next_retry_time = Some(Instant::now() + REQUEST_RETRY_DELAY); entry.in_flight = false; entry.priority.attempts += 1; @@ -884,6 +953,7 @@ mod tests { #[test] fn handle_outdated_response_due_to_requests_for_different_identifiers() { let mut request_manager = RequestManager::new(); + let mut response_manager = ResponseManager::new(); let relay_parent = Hash::from_low_u64_le(1); let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent); @@ -924,9 +994,13 @@ mod tests { let peer_advertised = |_identifier: &CandidateIdentifier, _peer: &_| { Some(StatementFilter::full(group_size)) }; - let outgoing = request_manager.next_request(request_props, peer_advertised).unwrap(); + let outgoing = request_manager + .next_request(&mut response_manager, request_props, peer_advertised) + .unwrap(); assert_eq!(outgoing.payload.candidate_hash, candidate); - let outgoing = request_manager.next_request(request_props, peer_advertised).unwrap(); + let outgoing = request_manager + .next_request(&mut response_manager, request_props, peer_advertised) + .unwrap(); assert_eq!(outgoing.payload.candidate_hash, candidate); } @@ -1009,6 +1083,7 @@ mod tests { #[test] fn handle_outdated_response_due_to_garbage_collection() { let mut request_manager = RequestManager::new(); + let mut response_manager = ResponseManager::new(); let relay_parent = Hash::from_low_u64_le(1); let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent); @@ -1038,7 +1113,9 @@ mod tests { { let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone()); - let outgoing = request_manager.next_request(request_props, peer_advertised).unwrap(); + let outgoing = request_manager + .next_request(&mut response_manager, request_props, peer_advertised) + .unwrap(); assert_eq!(outgoing.payload.candidate_hash, candidate); } @@ -1083,6 +1160,7 @@ mod tests { #[test] fn should_clean_up_after_successful_requests() { let mut request_manager = RequestManager::new(); + let mut response_manager = ResponseManager::new(); let relay_parent = Hash::from_low_u64_le(1); let mut candidate_receipt = test_helpers::dummy_committed_candidate_receipt(relay_parent); @@ -1115,7 +1193,9 @@ mod tests { { let request_props = |_identifier: &CandidateIdentifier| Some((&request_properties).clone()); - let outgoing = request_manager.next_request(request_props, peer_advertised).unwrap(); + let outgoing = request_manager + .next_request(&mut response_manager, request_props, peer_advertised) + .unwrap(); assert_eq!(outgoing.payload.candidate_hash, candidate); } diff --git a/node/network/statement-distribution/src/vstaging/tests/mod.rs b/node/network/statement-distribution/src/vstaging/tests/mod.rs index f290c860ac5c..7ac5dae2f1a1 100644 --- a/node/network/statement-distribution/src/vstaging/tests/mod.rs +++ b/node/network/statement-distribution/src/vstaging/tests/mod.rs @@ -29,6 +29,7 @@ use polkadot_node_subsystem::messages::{ }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_types::{jaeger, ActivatedLeaf, LeafStatus}; +use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::vstaging::{ AssignmentPair, AsyncBackingParameters, BlockNumber, CommittedCandidateReceipt, CoreState, GroupRotationInfo, HeadData, Header, IndexedVec, PersistedValidationData, ScheduledCore, @@ -582,6 +583,14 @@ async fn send_new_topology(virtual_overseer: &mut VirtualOverseer, topology: New .await; } +async fn overseer_recv_with_timeout( + overseer: &mut VirtualOverseer, + timeout: Duration, +) -> Option { + gum::trace!("waiting for message..."); + overseer.recv().timeout(timeout).await +} + fn next_group_index( group_index: GroupIndex, validator_count: usize, diff --git a/node/network/statement-distribution/src/vstaging/tests/requests.rs b/node/network/statement-distribution/src/vstaging/tests/requests.rs index 313f2831a992..73804dd46a8a 100644 --- a/node/network/statement-distribution/src/vstaging/tests/requests.rs +++ b/node/network/statement-distribution/src/vstaging/tests/requests.rs @@ -573,6 +573,7 @@ fn peer_reported_for_not_enough_statements() { ), ) .await; + let c_seconded = state .sign_statement( v_c, @@ -1570,3 +1571,301 @@ fn local_node_respects_statement_mask() { overseer }); } + +#[test] +fn should_delay_before_retrying_dropped_requests() { + let validator_count = 6; + let group_size = 3; + let config = TestConfig { + validator_count, + group_size, + local_validator: true, + async_backing_params: None, + }; + + let relay_parent = Hash::repeat_byte(1); + let peer_c = PeerId::random(); + let peer_d = PeerId::random(); + let peer_e = PeerId::random(); + + test_harness(config, |state, mut overseer| async move { + let local_validator = state.local.clone().unwrap(); + + let other_group = + next_group_index(local_validator.group_index, validator_count, group_size); + let other_para = ParaId::from(other_group.0); + + let test_leaf = state.make_dummy_leaf(relay_parent); + + let (candidate_1, pvd_1) = make_candidate( + relay_parent, + 1, + other_para, + test_leaf.para_data(other_para).head_data.clone(), + vec![4, 5, 6].into(), + Hash::repeat_byte(42).into(), + ); + let (candidate_2, pvd_2) = make_candidate( + relay_parent, + 1, + other_para, + test_leaf.para_data(other_para).head_data.clone(), + vec![7, 8, 9].into(), + Hash::repeat_byte(43).into(), + ); + let candidate_hash_1 = candidate_1.hash(); + let candidate_hash_2 = candidate_2.hash(); + + let target_group_validators = state.group_validators(other_group, true); + let v_c = target_group_validators[0]; + let v_d = target_group_validators[1]; + let v_e = target_group_validators[2]; + + // Connect C, D, E + { + connect_peer( + &mut overseer, + peer_c.clone(), + Some(vec![state.discovery_id(v_c)].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_d.clone(), + Some(vec![state.discovery_id(v_d)].into_iter().collect()), + ) + .await; + + connect_peer( + &mut overseer, + peer_e.clone(), + Some(vec![state.discovery_id(v_e)].into_iter().collect()), + ) + .await; + + send_peer_view_change(&mut overseer, peer_c.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_d.clone(), view![relay_parent]).await; + send_peer_view_change(&mut overseer, peer_e.clone(), view![relay_parent]).await; + } + + activate_leaf(&mut overseer, &test_leaf, &state, true).await; + + answer_expected_hypothetical_depth_request( + &mut overseer, + vec![], + Some(relay_parent), + false, + ) + .await; + + // Send gossip topology. + send_new_topology(&mut overseer, state.make_dummy_topology()).await; + + // `1` indicates statements NOT to request. + let mask = StatementFilter::blank(group_size); + + // Send a request about a candidate. + { + let manifest = BackedCandidateManifest { + relay_parent, + candidate_hash: candidate_hash_1, + group_index: other_group, + para_id: other_para, + parent_head_data_hash: pvd_1.parent_head.hash(), + statement_knowledge: StatementFilter { + seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1], + validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 0], + }, + }; + + // Peer sends an announcement. + send_peer_message( + &mut overseer, + peer_c.clone(), + protocol_vstaging::StatementDistributionMessage::BackedCandidateManifest( + manifest.clone(), + ), + ) + .await; + + // We send a request to peer. Drop the request without sending a response. + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(mut requests, IfDisconnected::ImmediateError)) => { + assert_eq!(requests.len(), 1); + assert_matches!( + requests.pop().unwrap(), + Requests::AttestedCandidateVStaging(outgoing) => { + assert_eq!(outgoing.peer, Recipient::Peer(peer_c)); + assert_eq!(outgoing.payload.candidate_hash, candidate_hash_1); + assert_eq!(outgoing.payload.mask, mask); + } + ); + } + ); + + assert_matches!( + overseer_recv_with_timeout(&mut overseer, Duration::from_millis(100)).await, + None + ); + } + + // We still send requests about different candidates as per usual. + { + let manifest = BackedCandidateManifest { + relay_parent, + candidate_hash: candidate_hash_2, + group_index: other_group, + para_id: other_para, + parent_head_data_hash: pvd_2.parent_head.hash(), + statement_knowledge: StatementFilter { + seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1], + validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 0], + }, + }; + + // Peer sends an announcement. + send_peer_message( + &mut overseer, + peer_c.clone(), + protocol_vstaging::StatementDistributionMessage::BackedCandidateManifest( + manifest.clone(), + ), + ) + .await; + + let statements = vec![ + state + .sign_statement( + v_c, + CompactStatement::Seconded(candidate_hash_2), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(), + state + .sign_statement( + v_d, + CompactStatement::Seconded(candidate_hash_2), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(), + state + .sign_statement( + v_e, + CompactStatement::Seconded(candidate_hash_2), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(), + ]; + + // Don't drop this request. + handle_sent_request( + &mut overseer, + peer_c, + candidate_hash_2, + mask.clone(), + candidate_2.clone(), + pvd_2.clone(), + statements.clone(), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_STATEMENT + ); + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_STATEMENT + ); + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_STATEMENT + ); + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_RESPONSE + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + } + + // Sleep for the given amount of time. This should reset the delay for the first candidate. + futures_timer::Delay::new(REQUEST_RETRY_DELAY).await; + + // We re-try the first request. + { + let statements = vec![ + state + .sign_statement( + v_c, + CompactStatement::Seconded(candidate_hash_1), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(), + state + .sign_statement( + v_d, + CompactStatement::Seconded(candidate_hash_1), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(), + state + .sign_statement( + v_e, + CompactStatement::Seconded(candidate_hash_1), + &SigningContext { parent_hash: relay_parent, session_index: 1 }, + ) + .as_unchecked() + .clone(), + ]; + handle_sent_request( + &mut overseer, + peer_c, + candidate_hash_1, + mask, + candidate_1.clone(), + pvd_1.clone(), + statements.clone(), + ) + .await; + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_STATEMENT + ); + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_STATEMENT + ); + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_STATEMENT + ); + + assert_matches!( + overseer.recv().await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(p, r)) + if p == peer_c && r == BENEFIT_VALID_RESPONSE + ); + + answer_expected_hypothetical_depth_request(&mut overseer, vec![], None, false).await; + } + + overseer + }); +}