@@ -48,7 +48,7 @@ use frame_system::{ensure_none, ensure_root};
4848use polkadot_parachain:: primitives:: RelayChainBlockNumber ;
4949use scale_info:: TypeInfo ;
5050use sp_runtime:: {
51- traits:: { Block as BlockT , BlockNumberProvider , Hash } ,
51+ traits:: { Block as BlockT , BlockNumberProvider , Hash , Zero } ,
5252 transaction_validity:: {
5353 InvalidTransaction , TransactionLongevity , TransactionSource , TransactionValidity ,
5454 ValidTransaction ,
@@ -59,11 +59,16 @@ use xcm::latest::XcmHash;
5959
6060mod migration;
6161mod relay_state_snapshot;
62+ mod unincluded_segment;
6263#[ macro_use]
6364pub mod validate_block;
6465#[ cfg( test) ]
6566mod tests;
6667
68+ use unincluded_segment:: {
69+ Ancestor , HrmpChannelUpdate , SegmentTracker , TotalBandwidthLimits , UsedBandwidth ,
70+ } ;
71+
6772/// Register the `validate_block` function that is used by parachains to validate blocks on a
6873/// validator.
6974///
@@ -232,7 +237,7 @@ pub mod pallet {
232237 } ,
233238 } ;
234239
235- <PendingUpwardMessages < T > >:: mutate ( |up| {
240+ let ( ump_msg_count , ump_total_bytes ) = <PendingUpwardMessages < T > >:: mutate ( |up| {
236241 let ( count, size) = relevant_messaging_state. relay_dispatch_queue_size ;
237242
238243 let available_capacity = cmp:: min (
@@ -243,24 +248,32 @@ pub mod pallet {
243248
244249 // Count the number of messages we can possibly fit in the given constraints, i.e.
245250 // available_capacity and available_size.
246- let num = up
251+ let ( num, total_size ) = up
247252 . iter ( )
248- . scan ( ( available_capacity as usize , available_size as usize ) , |state, msg| {
249- let ( cap_left, size_left) = * state;
250- match ( cap_left. checked_sub ( 1 ) , size_left. checked_sub ( msg. len ( ) ) ) {
251- ( Some ( new_cap) , Some ( new_size) ) => {
253+ . scan ( ( 0u32 , 0u32 ) , |state, msg| {
254+ let ( cap_used, size_used) = * state;
255+ let new_cap = cap_used. saturating_add ( 1 ) ;
256+ let new_size = size_used. saturating_add ( msg. len ( ) as u32 ) ;
257+ match available_capacity
258+ . checked_sub ( new_cap)
259+ . and ( available_size. checked_sub ( new_size) )
260+ {
261+ Some ( _) => {
252262 * state = ( new_cap, new_size) ;
253- Some ( ( ) )
263+ Some ( * state )
254264 } ,
255265 _ => None ,
256266 }
257267 } )
258- . count ( ) ;
268+ . last ( )
269+ . unwrap_or_default ( ) ;
259270
260271 // TODO: #274 Return back messages that do not longer fit into the queue.
261272
262- UpwardMessages :: < T > :: put ( & up[ ..num] ) ;
263- * up = up. split_off ( num) ;
273+ UpwardMessages :: < T > :: put ( & up[ ..num as usize ] ) ;
274+ * up = up. split_off ( num as usize ) ;
275+
276+ ( num, total_size)
264277 } ) ;
265278
266279 // Sending HRMP messages is a little bit more involved. There are the following
@@ -282,6 +295,43 @@ pub mod pallet {
282295 . map ( |( recipient, data) | OutboundHrmpMessage { recipient, data } )
283296 . collect :: < Vec < _ > > ( ) ;
284297
298+ if MaxUnincludedLen :: < T > :: get ( ) . map_or ( false , |max_len| !max_len. is_zero ( ) ) {
299+ // NOTE: these limits don't account for the amount of processed messages from
300+ // downward and horizontal queues.
301+ //
302+ // This is correct because:
303+ // - inherent never contains messages that were previously processed.
304+ // - current implementation always attempts to exhaust each message queue.
305+ //
306+ // <https://github.com/paritytech/cumulus/issues/2472>
307+ let limits = TotalBandwidthLimits :: new ( & relevant_messaging_state) ;
308+
309+ let hrmp_outgoing = outbound_messages
310+ . iter ( )
311+ . map ( |msg| {
312+ (
313+ msg. recipient ,
314+ HrmpChannelUpdate { msg_count : 1 , total_bytes : msg. data . len ( ) as u32 } ,
315+ )
316+ } )
317+ . collect ( ) ;
318+ let used_bandwidth =
319+ UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing } ;
320+ // The bandwidth constructed was ensured to satisfy relay chain constraints.
321+ let ancestor = Ancestor :: new_unchecked ( used_bandwidth) ;
322+
323+ let watermark = HrmpWatermark :: < T > :: get ( ) ;
324+ AggregatedUnincludedSegment :: < T > :: mutate ( |agg| {
325+ let agg = agg. get_or_insert_with ( SegmentTracker :: default) ;
326+ // TODO: In order of this panic to be correct, outbound message source should
327+ // respect bandwidth limits as well.
328+ // <https://github.com/paritytech/cumulus/issues/2471>
329+ agg. append ( & ancestor, watermark, & limits)
330+ . expect ( "unincluded segment limits exceeded" ) ;
331+ } ) ;
332+ // Check in `on_initialize` guarantees there's space for this block.
333+ UnincludedSegment :: < T > :: append ( ancestor) ;
334+ }
285335 HrmpOutboundMessages :: < T > :: put ( outbound_messages) ;
286336 }
287337
@@ -296,6 +346,23 @@ pub mod pallet {
296346 weight += T :: DbWeight :: get ( ) . writes ( 1 ) ;
297347 }
298348
349+ // New para head was unknown during block finalization, update it.
350+ if MaxUnincludedLen :: < T > :: get ( ) . map_or ( false , |max_len| !max_len. is_zero ( ) ) {
351+ <UnincludedSegment < T > >:: mutate ( |chain| {
352+ if let Some ( ancestor) = chain. last_mut ( ) {
353+ let parent = frame_system:: Pallet :: < T > :: parent_hash ( ) ;
354+ // Ancestor is the latest finalized block, thus current parent is
355+ // its output head.
356+ ancestor. replace_para_head_hash ( parent) ;
357+ }
358+ } ) ;
359+ weight += T :: DbWeight :: get ( ) . reads_writes ( 1 , 1 ) ;
360+
361+ // Weight used during finalization.
362+ weight += T :: DbWeight :: get ( ) . reads_writes ( 2 , 2 ) ;
363+ }
364+ weight += T :: DbWeight :: get ( ) . reads ( 1 ) ;
365+
299366 // Remove the validation from the old block.
300367 ValidationData :: < T > :: kill ( ) ;
301368 ProcessedDownwardMessages :: < T > :: kill ( ) ;
@@ -336,6 +403,9 @@ pub mod pallet {
336403 4 + hrmp_max_message_num_per_candidate as u64 ,
337404 ) ;
338405
406+ // Always try to read `MaxUnincludedLen` in `on_finalize`.
407+ weight += T :: DbWeight :: get ( ) . reads ( 1 ) ;
408+
339409 weight
340410 }
341411 }
@@ -364,6 +434,12 @@ pub mod pallet {
364434 "ValidationData must be updated only once in a block" ,
365435 ) ;
366436
437+ // NOTE: the inherent data is expected to be unique, even if this block is built
438+ // in the context of the same relay parent as the previous one. In particular,
439+ // the inherent shouldn't contain messages that were already processed by any of the
440+ // ancestors.
441+ //
442+ // This invariant should be upheld by the `ProvideInherent` implementation.
367443 let ParachainInherentData {
368444 validation_data : vfp,
369445 relay_chain_state,
@@ -442,6 +518,7 @@ pub mod pallet {
442518 horizontal_messages,
443519 vfp. relay_parent_number ,
444520 ) ;
521+ total_weight += Self :: maybe_drop_included_ancestors ( & relay_state_proof) ;
445522
446523 Ok ( PostDispatchInfo { actual_weight : Some ( total_weight) , pays_fee : Pays :: No } )
447524 }
@@ -544,6 +621,29 @@ pub mod pallet {
544621 Unauthorized ,
545622 }
546623
624+ /// Maximum number of latest included block descendants the runtime is allowed to accept. In other words,
625+ /// these are ancestor of the block being currently executed, not yet sent to the relay chain runtime.
626+ ///
627+ /// This value is optional, but once set to `Some` by the governance, should never go back to `None`.
628+ /// Requires latest included para head to be present in the relay chain storage proof.
629+ #[ pallet:: storage]
630+ pub ( super ) type MaxUnincludedLen < T : Config > = StorageValue < _ , T :: BlockNumber , OptionQuery > ;
631+
632+ /// Latest included block descendants the runtime accepted. In other words, these are
633+ /// ancestors of the block being currently executed, not yet sent to the relay chain runtime.
634+ ///
635+ /// The segment length is limited by [`MaxUnincludedLen`].
636+ #[ pallet:: storage]
637+ pub ( super ) type UnincludedSegment < T : Config > =
638+ StorageValue < _ , Vec < Ancestor < T :: Hash > > , ValueQuery > ;
639+
640+ /// Storage field that keeps track of bandwidth used by the unincluded segment along with the latest
641+ /// the latest HRMP watermark. Used for limiting the acceptance of new blocks with respect to relay
642+ /// chain constraints.
643+ #[ pallet:: storage]
644+ pub ( super ) type AggregatedUnincludedSegment < T : Config > =
645+ StorageValue < _ , SegmentTracker < T :: Hash > , OptionQuery > ;
646+
547647 /// In case of a scheduled upgrade, this storage field contains the validation code to be applied.
548648 ///
549649 /// As soon as the relay chain gives us the go-ahead signal, we will overwrite the [`:code`][well_known_keys::CODE]
@@ -960,6 +1060,69 @@ impl<T: Config> Pallet<T> {
9601060 weight_used
9611061 }
9621062
1063+ /// Drop blocks from the unincluded segment with respect to the latest parachain head.
1064+ ///
1065+ /// No-op if [`MaxUnincludedLen`] is not set.
1066+ fn maybe_drop_included_ancestors ( relay_state_proof : & RelayChainStateProof ) -> Weight {
1067+ let mut weight_used = Weight :: zero ( ) ;
1068+ // If `MaxUnincludedLen` is present in the storage, parachain head
1069+ // is always expected to be included into the relay storage proof.
1070+ let para_head_with_len = <MaxUnincludedLen < T > >:: get ( ) . map ( |max_len| {
1071+ (
1072+ relay_state_proof
1073+ . read_included_para_head ( )
1074+ . expect ( "Invalid para head in relay chain state proof" ) ,
1075+ max_len,
1076+ )
1077+ } ) ;
1078+ weight_used += T :: DbWeight :: get ( ) . reads ( 1 ) ;
1079+ let Some ( ( para_head, max_len) ) = para_head_with_len else { return weight_used } ;
1080+
1081+ let para_head_hash = T :: Hashing :: hash ( & para_head. 0 ) ;
1082+ if !max_len. is_zero ( ) {
1083+ let ( dropped, left_count) : ( Vec < Ancestor < T :: Hash > > , u32 ) =
1084+ <UnincludedSegment < T > >:: mutate ( |chain| {
1085+ // Drop everything up to the block with an included para head, if present.
1086+ let idx = chain
1087+ . iter ( )
1088+ . position ( |block| {
1089+ let head_hash = block. para_head_hash ( ) . expect (
1090+ "para head hash is updated during block initialization; qed" ,
1091+ ) ;
1092+ head_hash == & para_head_hash
1093+ } )
1094+ . map_or ( 0 , |idx| idx + 1 ) ; // inclusive.
1095+
1096+ let left_count = ( idx..chain. len ( ) ) . count ( ) as u32 ;
1097+ let dropped = chain. drain ( ..idx) . collect ( ) ;
1098+ ( dropped, left_count)
1099+ } ) ;
1100+ weight_used += T :: DbWeight :: get ( ) . reads_writes ( 1 , 1 ) ;
1101+
1102+ // sanity-check there's place for the block at finalization phase.
1103+ //
1104+ // If this fails, the max segment len is reached and parachain should wait
1105+ // for ancestor's inclusion.
1106+ assert ! (
1107+ max_len > left_count. into( ) ,
1108+ "no space left for the block in the unincluded segment"
1109+ ) ;
1110+
1111+ if !dropped. is_empty ( ) {
1112+ <AggregatedUnincludedSegment < T > >:: mutate ( |agg| {
1113+ let agg = agg. as_mut ( ) . expect (
1114+ "dropped part of the segment wasn't empty, hence value exists; qed" ,
1115+ ) ;
1116+ for block in dropped {
1117+ agg. subtract ( & block) ;
1118+ }
1119+ } ) ;
1120+ weight_used += T :: DbWeight :: get ( ) . reads_writes ( 1 , 1 ) ;
1121+ }
1122+ }
1123+ weight_used
1124+ }
1125+
9631126 /// Put a new validation function into a particular location where polkadot
9641127 /// monitors for updates. Calling this function notifies polkadot that a new
9651128 /// upgrade has been scheduled.
0 commit comments