@@ -93,6 +93,9 @@ const VC_THRESHOLD: usize = 2;
9393
9494const LOG_TARGET : & str = "parachain::statement-distribution" ;
9595
96+ /// Large statements should be rare.
97+ const MAX_LARGE_STATEMENTS_PER_SENDER : usize = 20 ;
98+
9699/// The statement distribution subsystem.
97100pub struct StatementDistribution {
98101 /// Pointer to a keystore, which is required for determining this nodes validator index.
@@ -194,6 +197,29 @@ struct PeerRelayParentKnowledge {
194197 seconded_counts : HashMap < ValidatorIndex , VcPerPeerTracker > ,
195198 /// How many statements we've received for each candidate that we're aware of.
196199 received_message_count : HashMap < CandidateHash , usize > ,
200+
201+
202+ /// How many large statements this peer already sent us.
203+ ///
204+ /// Flood protection for large statements is rather hard and as soon as we get
205+ /// https://github.com/paritytech/polkadot/issues/2979 implemented also no longer necessary.
206+ /// Reason: We keep messages around until we fetched the payload, but if a node makes up
207+ /// statements and never provides the data, we will keep it around for the slot duration. Not
208+ /// even signature checking would help, as the sender, if a validator, can just sign arbitrary
209+ /// invalid statements and will not face any consequences as long as it won't provide the
210+ /// payload.
211+ ///
212+ /// Quick and temporary fix, only accept `MAX_LARGE_STATEMENTS_PER_SENDER` per connected node.
213+ ///
214+ /// Large statements should be rare, if they were not, we would run into problems anyways, as
215+ /// we would not be able to distribute them in a timely manner. Therefore
216+ /// `MAX_LARGE_STATEMENTS_PER_SENDER` can be set to a relatively small number. It is also not
217+ /// per candidate hash, but in total as candidate hashes can be made up, as illustrated above.
218+ ///
219+ /// An attacker could still try to fill up our memory, by repeatedly disconnecting and
220+ /// connecting again with new peer ids, but we assume that the resulting effective bandwidth
221+ /// for such an attack would be too low.
222+ large_statement_count : usize ,
197223}
198224
199225impl PeerRelayParentKnowledge {
@@ -318,6 +344,15 @@ impl PeerRelayParentKnowledge {
318344 Ok ( self . received_candidates . insert ( candidate_hash. clone ( ) ) )
319345 }
320346
347+ /// Note a received large statement metadata.
348+ fn receive_large_statement ( & mut self ) -> std:: result:: Result < ( ) , Rep > {
349+ if self . large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER {
350+ return Err ( COST_APPARENT_FLOOD ) ;
351+ }
352+ self . large_statement_count += 1 ;
353+ Ok ( ( ) )
354+ }
355+
321356 /// This method does the same checks as `receive` without modifying the internal state.
322357 /// Returns an error if the peer should not have sent us this message according to protocol
323358 /// rules for flood protection.
@@ -458,6 +493,17 @@ impl PeerData {
458493 . ok_or ( COST_UNEXPECTED_STATEMENT ) ?
459494 . check_can_receive ( fingerprint, max_message_count)
460495 }
496+
497+ /// Basic flood protection for large statements.
498+ fn receive_large_statement (
499+ & mut self ,
500+ relay_parent : & Hash ,
501+ ) -> std:: result:: Result < ( ) , Rep > {
502+ self . view_knowledge
503+ . get_mut ( relay_parent)
504+ . ok_or ( COST_UNEXPECTED_STATEMENT ) ?
505+ . receive_large_statement ( )
506+ }
461507}
462508
463509// A statement stored while a relay chain head is active.
@@ -1278,6 +1324,20 @@ async fn handle_incoming_message<'a>(
12781324 }
12791325 } ;
12801326
1327+ if let protocol_v1:: StatementDistributionMessage :: LargeStatement ( _) = message {
1328+ if let Err ( rep) = peer_data. receive_large_statement ( & relay_parent) {
1329+ tracing:: debug!(
1330+ target: LOG_TARGET ,
1331+ ?peer,
1332+ ?message,
1333+ ?rep,
1334+ "Unexpected large statement." ,
1335+ ) ;
1336+ report_peer ( ctx, peer, rep) . await ;
1337+ return None ;
1338+ }
1339+ }
1340+
12811341 let fingerprint = message. get_fingerprint ( ) ;
12821342 let candidate_hash = fingerprint. 0 . candidate_hash ( ) . clone ( ) ;
12831343 let handle_incoming_span = active_head. span . child ( "handle-incoming" )
@@ -3471,6 +3531,176 @@ mod tests {
34713531 executor:: block_on ( future:: join ( test_fut, bg) ) ;
34723532 }
34733533
3534+ #[ test]
3535+ fn peer_cant_flood_with_large_statements ( ) {
3536+ sp_tracing:: try_init_simple ( ) ;
3537+ let hash_a = Hash :: repeat_byte ( 1 ) ;
3538+
3539+ let candidate = {
3540+ let mut c = CommittedCandidateReceipt :: default ( ) ;
3541+ c. descriptor . relay_parent = hash_a;
3542+ c. descriptor . para_id = 1 . into ( ) ;
3543+ c. commitments . new_validation_code = Some ( ValidationCode ( vec ! [ 1 , 2 , 3 ] ) ) ;
3544+ c
3545+ } ;
3546+
3547+ let peer_a = PeerId :: random ( ) ; // Alice
3548+
3549+ let validators = vec ! [
3550+ Sr25519Keyring :: Alice . pair( ) ,
3551+ Sr25519Keyring :: Bob . pair( ) ,
3552+ Sr25519Keyring :: Charlie . pair( ) ,
3553+ // other group
3554+ Sr25519Keyring :: Dave . pair( ) ,
3555+ // We:
3556+ Sr25519Keyring :: Ferdie . pair( ) ,
3557+ ] ;
3558+
3559+ let first_group = vec ! [ 0 , 1 , 2 , 4 ] ;
3560+ let session_info = make_session_info (
3561+ validators,
3562+ vec ! [ first_group, vec![ 3 ] ]
3563+ ) ;
3564+
3565+ let session_index = 1 ;
3566+
3567+ let pool = sp_core:: testing:: TaskExecutor :: new ( ) ;
3568+ let ( ctx, mut handle) = polkadot_node_subsystem_test_helpers:: make_subsystem_context ( pool) ;
3569+
3570+ let bg = async move {
3571+ let s = StatementDistribution { metrics : Default :: default ( ) , keystore : make_ferdie_keystore ( ) } ;
3572+ s. run ( ctx) . await . unwrap ( ) ;
3573+ } ;
3574+
3575+ let ( _, rx_reqs) = mpsc:: channel ( 1 ) ;
3576+
3577+ let test_fut = async move {
3578+ handle. send ( FromOverseer :: Communication {
3579+ msg : StatementDistributionMessage :: StatementFetchingReceiver ( rx_reqs)
3580+ } ) . await ;
3581+
3582+ // register our active heads.
3583+ handle. send ( FromOverseer :: Signal ( OverseerSignal :: ActiveLeaves ( ActiveLeavesUpdate {
3584+ activated : vec ! [ ActivatedLeaf {
3585+ hash: hash_a,
3586+ number: 1 ,
3587+ span: Arc :: new( jaeger:: Span :: Disabled ) ,
3588+ } ] . into ( ) ,
3589+ deactivated : vec ! [ ] . into ( ) ,
3590+ } ) ) ) . await ;
3591+
3592+ assert_matches ! (
3593+ handle. recv( ) . await ,
3594+ AllMessages :: RuntimeApi (
3595+ RuntimeApiMessage :: Request ( r, RuntimeApiRequest :: SessionIndexForChild ( tx) )
3596+ )
3597+ if r == hash_a
3598+ => {
3599+ let _ = tx. send( Ok ( session_index) ) ;
3600+ }
3601+ ) ;
3602+
3603+ assert_matches ! (
3604+ handle. recv( ) . await ,
3605+ AllMessages :: RuntimeApi (
3606+ RuntimeApiMessage :: Request ( r, RuntimeApiRequest :: SessionInfo ( sess_index, tx) )
3607+ )
3608+ if r == hash_a && sess_index == session_index
3609+ => {
3610+ let _ = tx. send( Ok ( Some ( session_info) ) ) ;
3611+ }
3612+ ) ;
3613+
3614+ // notify of peers and view
3615+ handle. send ( FromOverseer :: Communication {
3616+ msg : StatementDistributionMessage :: NetworkBridgeUpdateV1 (
3617+ NetworkBridgeEvent :: PeerConnected (
3618+ peer_a. clone ( ) ,
3619+ ObservedRole :: Full ,
3620+ Some ( Sr25519Keyring :: Alice . public ( ) . into ( ) )
3621+ )
3622+ )
3623+ } ) . await ;
3624+
3625+ handle. send ( FromOverseer :: Communication {
3626+ msg : StatementDistributionMessage :: NetworkBridgeUpdateV1 (
3627+ NetworkBridgeEvent :: PeerViewChange ( peer_a. clone ( ) , view ! [ hash_a] )
3628+ )
3629+ } ) . await ;
3630+
3631+ // receive a seconded statement from peer A.
3632+ let statement = {
3633+ let signing_context = SigningContext {
3634+ parent_hash : hash_a,
3635+ session_index,
3636+ } ;
3637+
3638+ let keystore: SyncCryptoStorePtr = Arc :: new ( LocalKeystore :: in_memory ( ) ) ;
3639+ let alice_public = CryptoStore :: sr25519_generate_new (
3640+ & * keystore, ValidatorId :: ID , Some ( & Sr25519Keyring :: Alice . to_seed ( ) )
3641+ ) . await . unwrap ( ) ;
3642+
3643+ SignedFullStatement :: sign (
3644+ & keystore,
3645+ Statement :: Seconded ( candidate. clone ( ) ) ,
3646+ & signing_context,
3647+ ValidatorIndex ( 0 ) ,
3648+ & alice_public. into ( ) ,
3649+ ) . await . ok ( ) . flatten ( ) . expect ( "should be signed" )
3650+ } ;
3651+
3652+ let metadata =
3653+ protocol_v1:: StatementDistributionMessage :: Statement ( hash_a, statement. clone ( ) . into ( ) ) . get_metadata ( ) ;
3654+
3655+ for _ in 0 ..MAX_LARGE_STATEMENTS_PER_SENDER + 1 {
3656+ handle. send ( FromOverseer :: Communication {
3657+ msg : StatementDistributionMessage :: NetworkBridgeUpdateV1 (
3658+ NetworkBridgeEvent :: PeerMessage (
3659+ peer_a. clone ( ) ,
3660+ protocol_v1:: StatementDistributionMessage :: LargeStatement ( metadata. clone ( ) ) ,
3661+ )
3662+ )
3663+ } ) . await ;
3664+ }
3665+
3666+ // We should try to fetch the data:
3667+ assert_matches ! (
3668+ handle. recv( ) . await ,
3669+ AllMessages :: NetworkBridge (
3670+ NetworkBridgeMessage :: SendRequests (
3671+ mut reqs, IfDisconnected :: ImmediateError
3672+ )
3673+ ) => {
3674+ let reqs = reqs. pop( ) . unwrap( ) ;
3675+ let outgoing = match reqs {
3676+ Requests :: StatementFetching ( outgoing) => outgoing,
3677+ _ => panic!( "Unexpected request" ) ,
3678+ } ;
3679+ let req = outgoing. payload;
3680+ assert_eq!( req. relay_parent, metadata. relay_parent) ;
3681+ assert_eq!( req. candidate_hash, metadata. candidate_hash) ;
3682+ assert_eq!( outgoing. peer, Recipient :: Peer ( peer_a) ) ;
3683+ // Just drop request - should trigger error.
3684+ }
3685+ ) ;
3686+
3687+ // Then we should punish peer:
3688+ assert_matches ! (
3689+ handle. recv( ) . await ,
3690+ AllMessages :: NetworkBridge (
3691+ NetworkBridgeMessage :: ReportPeer ( p, r)
3692+ ) if p == peer_a && r == COST_APPARENT_FLOOD => { }
3693+ ) ;
3694+
3695+ handle. send ( FromOverseer :: Signal ( OverseerSignal :: Conclude ) ) . await ;
3696+ } ;
3697+
3698+ futures:: pin_mut!( test_fut) ;
3699+ futures:: pin_mut!( bg) ;
3700+
3701+ executor:: block_on ( future:: join ( test_fut, bg) ) ;
3702+ }
3703+
34743704 fn make_session_info ( validators : Vec < Pair > , groups : Vec < Vec < u32 > > ) -> SessionInfo {
34753705
34763706 let validator_groups: Vec < Vec < ValidatorIndex > > = groups
0 commit comments