From 1d98c08667bedba66a5b4bc9f1292808b2c5429b Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Wed, 28 Oct 2020 17:27:34 +0100 Subject: [PATCH 01/13] UMP: Update the impl guide --- .../implementers-guide/src/runtime/router.md | 18 +++++++++++++----- .../implementers-guide/src/types/runtime.md | 6 +++--- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/roadmap/implementers-guide/src/runtime/router.md b/roadmap/implementers-guide/src/runtime/router.md index 5907d1650b85..293960e58846 100644 --- a/roadmap/implementers-guide/src/runtime/router.md +++ b/roadmap/implementers-guide/src/runtime/router.md @@ -21,14 +21,24 @@ RelayDispatchQueues: map ParaId => Vec<(ParachainDispatchOrigin, RawDispatchable /// First item in the tuple is the count of messages and second /// is the total length (in bytes) of the message payloads. /// -/// Note that this is an auxilary mapping: it's possible to tell the byte size and the number of +/// Note that this is an auxilary mapping: it's possible to tell the byte size and the number of /// messages only looking at `RelayDispatchQueues`. This mapping is separate to avoid the cost of /// loading the whole message queue if only the total size and count are required. +/// +/// Invariant: +/// - The set of keys should exactly match the set of keys of `RelayDispatchQueues`. RelayDispatchQueueSize: map ParaId => (u32, u32); /// The ordered list of `ParaId`s that have a `RelayDispatchQueue` entry. +/// +/// Invariant: +/// - The set of items from this vector should be exactly the set of the keys in +/// `RelayDispatchQueues` and `RelayDispatchQueueSize`. NeedsDispatch: Vec; -/// This is the para that will get dispatched first during the next upward dispatchable queue +/// This is the para that gets dispatched first during the next upward dispatchable queue /// execution round. +/// +/// Invariant: +/// - If `Some(para)`, then `para` must be present in `NeedsDispatch`. NextDispatchRoundStartWith: Option; ``` @@ -260,9 +270,7 @@ any of dispatchables return an error. 1. Decode `D` into a dispatchable. Otherwise, if succeeded: 1. If `weight_of(D) > config.dispatchable_upward_message_critical_weight` then skip the dispatchable. Otherwise: 1. Execute `D` and add the actual amount of weight consumed to `T`. - 1. If `weight_of(D) + T > config.preferred_dispatchable_upward_messages_step_weight`, set `NextDispatchRoundStartWith` to `P` and finish processing. - > NOTE that in practice we would need to approach the weight calculation more thoroughly, i.e. incorporate all operations - > that could take place on the course of handling these dispatchables. + 1. If `T >= config.preferred_dispatchable_upward_messages_step_weight`, set `NextDispatchRoundStartWith` to `P` and finish processing. 1. If `RelayDispatchQueues` for `P` became empty, remove `P` from `NeedsDispatch`. 1. If `NeedsDispatch` became empty then finish processing and set `NextDispatchRoundStartWith` to `None`. diff --git a/roadmap/implementers-guide/src/types/runtime.md b/roadmap/implementers-guide/src/types/runtime.md index 9bcecb7aa5b2..24befc8f5ce4 100644 --- a/roadmap/implementers-guide/src/types/runtime.md +++ b/roadmap/implementers-guide/src/types/runtime.md @@ -35,7 +35,7 @@ struct HostConfiguration { /// The amount of blocks ahead to schedule parathreads. pub scheduling_lookahead: u32, /// Total number of individual messages allowed in the parachain -> relay-chain message queue. - pub max_upward_queue_count: u32, + pub max_upward_queue_capacity: u32, /// Total size of messages allowed in the parachain -> relay-chain message queue before which /// no further messages may be added to it. If it exceeds this then the queue may contain only /// a single message. @@ -44,7 +44,7 @@ struct HostConfiguration { /// stage. /// /// NOTE that this is a soft limit and could be exceeded. - pub preferred_dispatchable_upward_messages_step_weight: u32, + pub preferred_dispatchable_upward_messages_step_weight: Weight, /// Any dispatchable upward message that requests more than the critical amount is rejected. /// /// The parameter value is picked up so that no dispatchable can make the block weight exceed @@ -52,7 +52,7 @@ struct HostConfiguration { /// and `dispatchable_upward_message_critical_weight` doesn't exceed the amount of weight left /// under a typical worst case (e.g. no upgrades, etc) weight consumed by the required phases of /// block execution (i.e. initialization, finalization and inherents). - pub dispatchable_upward_message_critical_weight: u32, + pub dispatchable_upward_message_critical_weight: Weight, /// The maximum number of messages that a candidate can contain. pub max_upward_message_num_per_candidate: u32, /// The maximum size of a message that can be put in a downward message queue. From 90e42ba205774f09352c7d11a0b441acd690e374 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 14 Sep 2020 12:22:32 +0200 Subject: [PATCH 02/13] UMP: Incorporate XCM related changes into the guide --- roadmap/implementers-guide/src/messaging.md | 26 ++-- .../src/runtime/inclusioninherent.md | 2 +- .../implementers-guide/src/runtime/router.md | 123 +++++++++--------- .../implementers-guide/src/types/messages.md | 64 +-------- .../implementers-guide/src/types/runtime.md | 8 -- 5 files changed, 88 insertions(+), 135 deletions(-) diff --git a/roadmap/implementers-guide/src/messaging.md b/roadmap/implementers-guide/src/messaging.md index 1e782e155be1..edc810e03415 100644 --- a/roadmap/implementers-guide/src/messaging.md +++ b/roadmap/implementers-guide/src/messaging.md @@ -26,20 +26,28 @@ The downward message queue doesn't have a cap on its size and it is up to the re that prevent spamming in place. Upward Message Passing (UMP) is a mechanism responsible for delivering messages in the opposite direction: -from a parachain up to the relay chain. Upward messages can serve different purposes and can be of different - kinds. +from a parachain up to the relay chain. Upward messages are essentially byte blobs. However, they are interpreted +by the relay-chain according to the XCM standard. -One kind of message is `Dispatchable`. They could be thought of similarly to extrinsics sent to a relay chain: they also -invoke exposed runtime entrypoints, they consume weight and require fees. The difference is that they originate from -a parachain. Each parachain has a queue of dispatchables to be executed. There can be only so many dispatchables at a time. +The XCM standard is a common vocabulary of messages. The XCM standard doesn't require a particular interpretation of +a message. However, the parachains host (e.g. Polkadot) guarantees certain semantics for those. + +Moreover, while most XCM messages are handled by the on-chain XCM interpreter, some of the messages are special +cased. Specifically, those messages can be checked during the acceptance criteria and thus invalid +messages would lead to rejecting the candidate itself. + +One kind of such a message is `Xcm::Transact`. This upward message can be seen as a way for a parachain +to execute arbitrary entrypoints on the relay-chain. `Xcm::Transact` messages resemble regular extrinsics with the exception that they +originate from a parachain. + +The payload of `Xcm::Transact` messages is referred as to `Dispatchable`. When a candidate with such a message is enacted +the dispatchables are put into a queue corresponding to the parachain. There can be only so many dispatchables in that queue at once. The weight that processing of the dispatchables can consume is limited by a preconfigured value. Therefore, it is possible that some dispatchables will be left for later blocks. To make the dispatching more fair, the queues are processed turn-by-turn in a round robin fashion. -Upward messages are also used by a parachain to request opening and closing HRMP channels (HRMP will be described below). - -Other kinds of upward messages can be introduced in the future as well. Potential candidates are -new validation code signalling, or other requests to the relay chain. +The second category of special cased XCM messages are for horizontal messaging channel management, +namely messages meant to request opening and closing HRMP channels (HRMP will be described below). ## Horizontal Message Passing diff --git a/roadmap/implementers-guide/src/runtime/inclusioninherent.md b/roadmap/implementers-guide/src/runtime/inclusioninherent.md index 990fd4a32b9a..9290025e2d05 100644 --- a/roadmap/implementers-guide/src/runtime/inclusioninherent.md +++ b/roadmap/implementers-guide/src/runtime/inclusioninherent.md @@ -22,5 +22,5 @@ Included: Option<()>, 1. Invoke `Scheduler::schedule(freed)` 1. Invoke the `Inclusion::process_candidates` routine with the parameters `(backed_candidates, Scheduler::scheduled(), Scheduler::group_validators)`. 1. Call `Scheduler::occupied` using the return value of the `Inclusion::process_candidates` call above, first sorting the list of assigned core indices. - 1. Call the `Router::process_pending_upward_dispatchables` routine to execute all messages in upward dispatch queues. + 1. Call the `Router::process_pending_upward_messages` routine to execute all messages in upward dispatch queues. 1. If all of the above succeeds, set `Included` to `Some(())`. diff --git a/roadmap/implementers-guide/src/runtime/router.md b/roadmap/implementers-guide/src/runtime/router.md index 293960e58846..af9f861c8469 100644 --- a/roadmap/implementers-guide/src/runtime/router.md +++ b/roadmap/implementers-guide/src/runtime/router.md @@ -15,9 +15,15 @@ OutgoingParas: Vec; ### Upward Message Passing (UMP) ```rust -/// Dispatchable objects ready to be dispatched onto the relay chain. The messages are processed in FIFO order. -RelayDispatchQueues: map ParaId => Vec<(ParachainDispatchOrigin, RawDispatchable)>; +/// The messages waiting to be handled by the relay-chain originating from a certain parachain. +/// +/// Note that some upward messages might have been already processed by the inclusion logic. E.g. +/// channel management messages. +/// +/// The messages are processed in FIFO order. +RelayDispatchQueues: map ParaId => Vec; /// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`. +/// /// First item in the tuple is the count of messages and second /// is the total length (in bytes) of the message payloads. /// @@ -166,36 +172,8 @@ No initialization routine runs for this module. Candidate Acceptance Function: * `check_upward_messages(P: ParaId, Vec`): - 1. Checks that there are at most `config.max_upward_message_num_per_candidate` messages. - 1. Checks each upward message `M` individually depending on its kind: - 1. If the message kind is `Dispatchable`: - 1. Verify that `RelayDispatchQueueSize` for `P` has enough capacity for the message (NOTE that should include all processed - upward messages of the `Dispatchable` kind up to this point!) - 1. If the message kind is `HrmpInitOpenChannel(recipient, max_places, max_message_size)`: - 1. Check that the `P` is not `recipient`. - 1. Check that `max_places` is less or equal to `config.hrmp_channel_max_places`. - 1. Check that `max_message_size` is less or equal to `config.hrmp_channel_max_message_size`. - 1. Check that `recipient` is a valid para. - 1. Check that there is no existing channel for `(P, recipient)` in `HrmpChannels`. - 1. Check that there is no existing open channel request (`P`, `recipient`) in `HrmpOpenChannelRequests`. - 1. Check that the sum of the number of already opened HRMP channels by the `P` (the size - of the set found `HrmpEgressChannelsIndex` for `P`) and the number of open requests by the - `P` (the value from `HrmpOpenChannelRequestCount` for `P`) doesn't exceed the limit of - channels (`config.hrmp_max_parachain_outbound_channels` or `config.hrmp_max_parathread_outbound_channels`) minus 1. - 1. Check that `P`'s balance is more or equal to `config.hrmp_sender_deposit` - 1. If the message kind is `HrmpAcceptOpenChannel(sender)`: - 1. Check that there is an existing request between (`sender`, `P`) in `HrmpOpenChannelRequests` - 1. Check that it is not confirmed. - 1. Check that `P`'s balance is more or equal to `config.hrmp_recipient_deposit`. - 1. Check that the sum of the number of inbound HRMP channels opened to `P` (the size of the set - found in `HrmpIngressChannelsIndex` for `P`) and the number of accepted open requests by the `P` - (the value from `HrmpAcceptedChannelRequestCount` for `P`) doesn't exceed the limit of channels - (`config.hrmp_max_parachain_inbound_channels` or `config.hrmp_max_parathread_inbound_channels`) - minus 1. - 1. If the message kind is `HrmpCloseChannel(ch)`: - 1. Check that `P` is either `ch.sender` or `ch.recipient` - 1. Check that `HrmpChannels` for `ch` exists. - 1. Check that `ch` is not in the `HrmpCloseChannelRequests` set. + 1. Checks that there are at most `config.max_upward_message_num_per_candidate` messages. + 1. Verify that `RelayDispatchQueueSize` for `P` has enough capacity for the messages * `check_processed_downward_messages(P: ParaId, processed_downward_messages)`: 1. Checks that `DownwardMessageQueues` for `P` is at least `processed_downward_messages` long. 1. Checks that `processed_downward_messages` is at least 1 if `DownwardMessageQueues` for `P` is not empty. @@ -232,56 +210,85 @@ Candidate Enactment: * `prune_dmq(P: ParaId, processed_downward_messages)`: 1. Remove the first `processed_downward_messages` from the `DownwardMessageQueues` of `P`. * `enact_upward_messages(P: ParaId, Vec)`: - 1. Process all upward messages in order depending on their kinds: - 1. If the message kind is `Dispatchable`: + 1. Process each upward message `M` in order: 1. Append the message to `RelayDispatchQueues` for `P` 1. Increment the size and the count in `RelayDispatchQueueSize` for `P`. 1. Ensure that `P` is present in `NeedsDispatch`. - 1. If the message kind is `HrmpInitOpenChannel(recipient, max_places, max_message_size)`: - 1. Increase `HrmpOpenChannelRequestCount` by 1 for `P`. - 1. Append `(P, recipient)` to `HrmpOpenChannelRequestsList`. - 1. Add a new entry to `HrmpOpenChannelRequests` for `(sender, recipient)` - 1. Set `sender_deposit` to `config.hrmp_sender_deposit` - 1. Set `limit_used_places` to `max_places` - 1. Set `limit_message_size` to `max_message_size` - 1. Set `limit_used_bytes` to `config.hrmp_channel_max_size` - 1. Reserve the deposit for the `P` according to `config.hrmp_sender_deposit` - 1. If the message kind is `HrmpAcceptOpenChannel(sender)`: - 1. Reserve the deposit for the `P` according to `config.hrmp_recipient_deposit` - 1. For the request in `HrmpOpenChannelRequests` identified by `(sender, P)`, set `confirmed` flag to `true`. - 1. Increase `HrmpAcceptedChannelRequestCount` by 1 for `P`. - 1. If the message kind is `HrmpCloseChannel(ch)`: - 1. If not already there, insert a new entry `Some(())` to `HrmpCloseChannelRequests` for `ch` - and append `ch` to `HrmpCloseChannelRequestsList`. The following routine is intended to be called in the same time when `Paras::schedule_para_cleanup` is called. `schedule_para_cleanup(ParaId)`: 1. Add the para into the `OutgoingParas` vector maintaining the sorted order. -The following routine is meant to execute pending entries in upward dispatchable queues. This function doesn't fail, even if -any of dispatchables return an error. +The following routine is meant to execute pending entries in upward message queues. This function doesn't fail, even if +dispatcing any of individual upward messages returns an error. -`process_pending_upward_dispatchables()`: +`process_pending_upward_messages()`: 1. Initialize a cumulative weight counter `T` to 0 1. Iterate over items in `NeedsDispatch` cyclically, starting with `NextDispatchRoundStartWith`. If the item specified is `None` start from the beginning. For each `P` encountered: - 1. Dequeue `D` the first dispatchable `D` from `RelayDispatchQueues` for `P` + 1. Dequeue the first upward message `D` from `RelayDispatchQueues` for `P` 1. Decrement the size of the message from `RelayDispatchQueueSize` for `P` - 1. Decode `D` into a dispatchable. Otherwise, if succeeded: - 1. If `weight_of(D) > config.dispatchable_upward_message_critical_weight` then skip the dispatchable. Otherwise: - 1. Execute `D` and add the actual amount of weight consumed to `T`. + 1. Delegate processing of the message to the runtime. The weight consumed is added to `T`. 1. If `T >= config.preferred_dispatchable_upward_messages_step_weight`, set `NextDispatchRoundStartWith` to `P` and finish processing. 1. If `RelayDispatchQueues` for `P` became empty, remove `P` from `NeedsDispatch`. 1. If `NeedsDispatch` became empty then finish processing and set `NextDispatchRoundStartWith` to `None`. + > NOTE that in practice we would need to approach the weight calculation more thoroughly, i.e. incorporate all operations + > that could take place on the course of handling these upward messages. Utility routines. `queue_downward_message(P: ParaId, M: DownwardMessage)`: - 1. Check if the serialized size of `M` exceeds the `config.critical_downward_message_size`. If so, return an error. + 1. Check if the size of `M` exceeds the `config.max_downward_message_size`. If so, return an error. 1. Wrap `M` into `InboundDownwardMessage` using the current block number for `sent_at`. 1. Obtain a new MQC link for the resulting `InboundDownwardMessage` and replace `DownwardMessageQueueHeads` for `P` with the resulting hash. 1. Add the resulting `InboundDownwardMessage` into `DownwardMessageQueues` for `P`. +## Entry-points + +The following entry-points are meant to be used for HRMP channel management. + +Those entry-points are meant to be called from a parachain. `origin` is defined as the `ParaId` of +the parachain executed the message. + +* `hrmp_init_open_channel(recipient, max_places, max_message_size)`: + 1. Check that the `origin` is not `recipient`. + 1. Check that `max_places` is less or equal to `config.hrmp_channel_max_places` and greater than zero. + 1. Check that `max_message_size` is less or equal to `config.hrmp_channel_max_message_size` and greater than zero. + 1. Check that `recipient` is a valid para. + 1. Check that there is no existing channel for `(origin, recipient)` in `HrmpChannels`. + 1. Check that there is no existing open channel request (`origin`, `recipient`) in `HrmpOpenChannelRequests`. + 1. Check that the sum of the number of already opened HRMP channels by the `origin` (the size + of the set found `HrmpEgressChannelsIndex` for `origin`) and the number of open requests by the + `origin` (the value from `HrmpOpenChannelRequestCount` for `origin`) doesn't exceed the limit of + channels (`config.hrmp_max_parachain_outbound_channels` or `config.hrmp_max_parathread_outbound_channels`) minus 1. + 1. Check that `origin`'s balance is more or equal to `config.hrmp_sender_deposit` + 1. Reserve the deposit for the `origin` according to `config.hrmp_sender_deposit` + 1. Increase `HrmpOpenChannelRequestCount` by 1 for `origin`. + 1. Append `(origin, recipient)` to `HrmpOpenChannelRequestsList`. + 1. Add a new entry to `HrmpOpenChannelRequests` for `(origin, recipient)` + 1. Set `sender_deposit` to `config.hrmp_sender_deposit` + 1. Set `limit_used_places` to `max_places` + 1. Set `limit_message_size` to `max_message_size` + 1. Set `limit_used_bytes` to `config.hrmp_channel_max_size` +* `hrmp_accept_open_channel(sender)`: + 1. Check that there is an existing request between (`sender`, `origin`) in `HrmpOpenChannelRequests` + 1. Check that it is not confirmed. + 1. Check that the sum of the number of inbound HRMP channels opened to `origin` (the size of the set + found in `HrmpIngressChannelsIndex` for `origin`) and the number of accepted open requests by the `origin` + (the value from `HrmpAcceptedChannelRequestCount` for `origin`) doesn't exceed the limit of channels + (`config.hrmp_max_parachain_inbound_channels` or `config.hrmp_max_parathread_inbound_channels`) + minus 1. + 1. Check that `origin`'s balance is more or equal to `config.hrmp_recipient_deposit`. + 1. Reserve the deposit for the `origin` according to `config.hrmp_recipient_deposit` + 1. For the request in `HrmpOpenChannelRequests` identified by `(sender, P)`, set `confirmed` flag to `true`. + 1. Increase `HrmpAcceptedChannelRequestCount` by 1 for `origin`. +* `hrmp_close_channel(ch)`: + 1. Check that `origin` is either `ch.sender` or `ch.recipient` + 1. Check that `HrmpChannels` for `ch` exists. + 1. Check that `ch` is not in the `HrmpCloseChannelRequests` set. + 1. If not already there, insert a new entry `Some(())` to `HrmpCloseChannelRequests` for `ch` + and append `ch` to `HrmpCloseChannelRequestsList`. + ## Session Change 1. Drain `OutgoingParas`. For each `P` happened to be in the list: diff --git a/roadmap/implementers-guide/src/types/messages.md b/roadmap/implementers-guide/src/types/messages.md index 9551bb627964..2462b969165e 100644 --- a/roadmap/implementers-guide/src/types/messages.md +++ b/roadmap/implementers-guide/src/types/messages.md @@ -12,6 +12,9 @@ Types required for message passing between the relay-chain and a parachain. Actual contents of the messages is specified by the XCM standard. ```rust,ignore +/// A message sent from a parachain to the relay-chain. +type UpwardMessage = Vec; + /// A message sent from the relay-chain down to a parachain. /// /// The size of the message is limited by the `config.max_downward_message_size` @@ -28,6 +31,8 @@ struct InboundDownwardMessage { } ``` +## Horizontal Message Passing + ## HrmpChannelId A type that uniquely identifies an HRMP channel. An HRMP channel is established between two paras. @@ -46,65 +51,6 @@ struct HrmpChannelId { } ``` -## Upward Message - -A type of messages dispatched from a parachain to the relay chain. - -```rust,ignore -enum ParachainDispatchOrigin { - /// As a simple `Origin::Signed`, using `ParaId::account_id` as its value. This is good when - /// interacting with standard modules such as `balances`. - Signed, - /// As the special `Origin::Parachain(ParaId)`. This is good when interacting with parachain- - /// aware modules which need to succinctly verify that the origin is a parachain. - Parachain, - /// As the simple, superuser `Origin::Root`. This can only be done on specially permissioned - /// parachains. - Root, -} - -/// An opaque byte buffer that encodes an entrypoint and the arguments that should be -/// provided to it upon the dispatch. -/// -/// NOTE In order to be executable the byte buffer should be decoded which potentially can fail if -/// the encoding was changed. -type RawDispatchable = Vec; - -enum UpwardMessage { - /// This upward message is meant to schedule execution of a provided dispatchable. - Dispatchable { - /// The origin with which the dispatchable should be executed. - origin: ParachainDispatchOrigin, - /// The dispatchable to be executed in its raw form. - dispatchable: RawDispatchable, - }, - /// A message for initiation of opening a new HRMP channel between the origin para and the - /// given `recipient`. - /// - /// Let `origin` be the parachain that sent this upward message. In that case the channel - /// to be opened is (`origin` -> `recipient`). - HrmpInitOpenChannel { - /// The receiving party in the channel. - recipient: ParaId, - /// How many messages can be stored in the channel at most. - max_places: u32, - /// The maximum size of a message in this channel. - max_message_size: u32, - }, - /// A message that is meant to confirm the HRMP open channel request initiated earlier by the - /// `HrmpInitOpenChannel` by the given `sender`. - /// - /// Let `origin` be the parachain that sent this upward message. In that case the channel - /// (`origin` -> `sender`) will be opened during the session change. - HrmpAcceptOpenChannel(ParaId), - /// A message for closing the specified existing channel `ch`. - /// - /// The channel to be closed is `(ch.sender -> ch.recipient)`. The parachain that sent this - /// upward message must be either `ch.sender` or `ch.recipient`. - HrmpCloseChannel(HrmpChannelId), -} -``` - ## Horizontal Message This is a message sent from a parachain to another parachain that travels through the relay chain. diff --git a/roadmap/implementers-guide/src/types/runtime.md b/roadmap/implementers-guide/src/types/runtime.md index 24befc8f5ce4..c745f96bbd21 100644 --- a/roadmap/implementers-guide/src/types/runtime.md +++ b/roadmap/implementers-guide/src/types/runtime.md @@ -45,14 +45,6 @@ struct HostConfiguration { /// /// NOTE that this is a soft limit and could be exceeded. pub preferred_dispatchable_upward_messages_step_weight: Weight, - /// Any dispatchable upward message that requests more than the critical amount is rejected. - /// - /// The parameter value is picked up so that no dispatchable can make the block weight exceed - /// the total budget. I.e. that the sum of `preferred_dispatchable_upward_messages_step_weight` - /// and `dispatchable_upward_message_critical_weight` doesn't exceed the amount of weight left - /// under a typical worst case (e.g. no upgrades, etc) weight consumed by the required phases of - /// block execution (i.e. initialization, finalization and inherents). - pub dispatchable_upward_message_critical_weight: Weight, /// The maximum number of messages that a candidate can contain. pub max_upward_message_num_per_candidate: u32, /// The maximum size of a message that can be put in a downward message queue. From 02ce8c6d917d43b1af792436ff06ace87a31618e Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Wed, 9 Sep 2020 15:25:34 +0200 Subject: [PATCH 03/13] UMP: Data structures and configuration --- node/core/candidate-validation/src/lib.rs | 4 +- parachain/src/primitives.rs | 38 +------------ primitives/src/v0.rs | 2 +- primitives/src/v1.rs | 3 +- runtime/parachains/src/configuration.rs | 69 +++++++++++++++++++++++ 5 files changed, 74 insertions(+), 42 deletions(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index eea1d757dd5f..2e0c87136601 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -501,7 +501,7 @@ fn validate_candidate_exhaustive( mod tests { use super::*; use polkadot_node_subsystem_test_helpers as test_helpers; - use polkadot_primitives::v1::{HeadData, BlockData}; + use polkadot_primitives::v1::{HeadData, BlockData, UpwardMessage}; use sp_core::testing::TaskExecutor; use futures::executor; use assert_matches::assert_matches; @@ -847,7 +847,7 @@ mod tests { assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1])); - assert_eq!(outputs.upward_messages, Vec::new()); + assert_eq!(outputs.upward_messages, Vec::::new()); assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into())); assert_eq!(used_validation_data, validation_data); }); diff --git a/parachain/src/primitives.rs b/parachain/src/primitives.rs index 83edc4e0f2a6..e66fb7c0931e 100644 --- a/parachain/src/primitives.rs +++ b/parachain/src/primitives.rs @@ -198,44 +198,8 @@ impl AccountIdConversion for Id { } } -/// Which origin a parachain's message to the relay chain should be dispatched from. -#[derive(Clone, PartialEq, Eq, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Hash))] -#[repr(u8)] -pub enum ParachainDispatchOrigin { - /// As a simple `Origin::Signed`, using `ParaId::account_id` as its value. This is good when - /// interacting with standard modules such as `balances`. - Signed, - /// As the special `Origin::Parachain(ParaId)`. This is good when interacting with parachain- - /// aware modules which need to succinctly verify that the origin is a parachain. - Parachain, - /// As the simple, superuser `Origin::Root`. This can only be done on specially permissioned - /// parachains. - Root, -} - -impl sp_std::convert::TryFrom for ParachainDispatchOrigin { - type Error = (); - fn try_from(x: u8) -> core::result::Result { - const SIGNED: u8 = ParachainDispatchOrigin::Signed as u8; - const PARACHAIN: u8 = ParachainDispatchOrigin::Parachain as u8; - Ok(match x { - SIGNED => ParachainDispatchOrigin::Signed, - PARACHAIN => ParachainDispatchOrigin::Parachain, - _ => return Err(()), - }) - } -} - /// A message from a parachain to its Relay Chain. -#[derive(Clone, PartialEq, Eq, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Hash))] -pub struct UpwardMessage { - /// The origin for the message to be sent from. - pub origin: ParachainDispatchOrigin, - /// The message data. - pub data: Vec, -} +pub type UpwardMessage = Vec; /// Validation parameters for evaluating the parachain validity function. // TODO: balance downloads (https://github.com/paritytech/polkadot/issues/220) diff --git a/primitives/src/v0.rs b/primitives/src/v0.rs index c058d22dbc56..a01b3c743fa9 100644 --- a/primitives/src/v0.rs +++ b/primitives/src/v0.rs @@ -41,7 +41,7 @@ pub use polkadot_core_primitives::*; pub use parity_scale_codec::Compact; pub use polkadot_parachain::primitives::{ - Id, ParachainDispatchOrigin, LOWEST_USER_ID, UpwardMessage, HeadData, BlockData, + Id, LOWEST_USER_ID, UpwardMessage, HeadData, BlockData, ValidationCode, }; diff --git a/primitives/src/v1.rs b/primitives/src/v1.rs index 167ac16219aa..5347849f7f81 100644 --- a/primitives/src/v1.rs +++ b/primitives/src/v1.rs @@ -36,8 +36,7 @@ pub use polkadot_core_primitives::v1::{ // Export some polkadot-parachain primitives pub use polkadot_parachain::primitives::{ - Id, ParachainDispatchOrigin, LOWEST_USER_ID, UpwardMessage, HeadData, BlockData, - ValidationCode, + Id, LOWEST_USER_ID, UpwardMessage, HeadData, BlockData, ValidationCode, }; // Export some basic parachain primitives from v0. diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index 14a57ded184e..e964c1bc3c97 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -58,6 +58,12 @@ pub struct HostConfiguration { pub thread_availability_period: BlockNumber, /// The amount of blocks ahead to schedule parachains and parathreads. pub scheduling_lookahead: u32, + /// Total number of individual messages allowed in the parachain -> relay-chain message queue. + pub max_upward_queue_capacity: u32, + /// Total size of messages allowed in the parachain -> relay-chain message queue before which + /// no further messages may be added to it. If it exceeds this then the queue may contain only + /// a single message. + pub max_upward_queue_size: u32, /// The maximum size of a message that can be put in a downward message queue. /// /// Since we require receiving at least one DMP message the obvious upper bound of the size is @@ -65,6 +71,13 @@ pub struct HostConfiguration { /// decide to do with its PoV so this value in practice will be picked as a fraction of the PoV /// size. pub max_downward_message_size: u32, + /// The amount of weight we wish to devote to the processing the dispatchable upward messages + /// stage. + /// + /// NOTE that this is a soft limit and could be exceeded. + pub preferred_dispatchable_upward_messages_step_weight: Weight, + /// The maximum number of messages that a candidate can contain. + pub max_upward_message_num_per_candidate: u32, } pub trait Trait: frame_system::Trait { } @@ -198,6 +211,26 @@ decl_module! { Ok(()) } + /// Sets the maximum items that can present in a upward dispatch queue at once. + #[weight = (1_000, DispatchClass::Operational)] + pub fn set_max_upward_queue_capacity(origin, new: u32) -> DispatchResult { + ensure_root(origin)?; + Self::update_config_member(|config| { + sp_std::mem::replace(&mut config.max_upward_queue_capacity, new) != new + }); + Ok(()) + } + + /// Sets the maximum total size of items that can present in a upward dispatch queue at once. + #[weight = (1_000, DispatchClass::Operational)] + pub fn set_max_upward_queue_size(origin, new: u32) -> DispatchResult { + ensure_root(origin)?; + Self::update_config_member(|config| { + sp_std::mem::replace(&mut config.max_upward_queue_size, new) != new + }); + Ok(()) + } + /// Set the critical downward message size. #[weight = (1_000, DispatchClass::Operational)] pub fn set_max_downward_message_size(origin, new: u32) -> DispatchResult { @@ -207,6 +240,26 @@ decl_module! { }); Ok(()) } + + /// Sets the soft limit for the phase of dispatching dispatchable upward messages. + #[weight = (1_000, DispatchClass::Operational)] + pub fn set_preferred_dispatchable_upward_messages_step_weight(origin, new: Weight) -> DispatchResult { + ensure_root(origin)?; + Self::update_config_member(|config| { + sp_std::mem::replace(&mut config.preferred_dispatchable_upward_messages_step_weight, new) != new + }); + Ok(()) + } + + /// Sets the maximum number of messages that a candidate can contain. + #[weight = (1_000, DispatchClass::Operational)] + pub fn set_max_upward_message_num_per_candidate(origin, new: u32) -> DispatchResult { + ensure_root(origin)?; + Self::update_config_member(|config| { + sp_std::mem::replace(&mut config.max_upward_message_num_per_candidate, new) != new + }); + Ok(()) + } } } @@ -285,7 +338,11 @@ mod tests { chain_availability_period: 10, thread_availability_period: 8, scheduling_lookahead: 3, + max_upward_queue_capacity: 1337, + max_upward_queue_size: 228, max_downward_message_size: 2048, + preferred_dispatchable_upward_messages_step_weight: 20000, + max_upward_message_num_per_candidate: 5, }; assert!(::PendingConfig::get().is_none()); @@ -323,9 +380,21 @@ mod tests { Configuration::set_scheduling_lookahead( Origin::root(), new_config.scheduling_lookahead, ).unwrap(); + Configuration::set_max_upward_queue_capacity( + Origin::root(), new_config.max_upward_queue_capacity, + ).unwrap(); + Configuration::set_max_upward_queue_size( + Origin::root(), new_config.max_upward_queue_size, + ).unwrap(); Configuration::set_max_downward_message_size( Origin::root(), new_config.max_downward_message_size, ).unwrap(); + Configuration::set_preferred_dispatchable_upward_messages_step_weight( + Origin::root(), new_config.preferred_dispatchable_upward_messages_step_weight, + ).unwrap(); + Configuration::set_max_upward_message_num_per_candidate( + Origin::root(), new_config.max_upward_message_num_per_candidate, + ).unwrap(); assert_eq!(::PendingConfig::get(), Some(new_config)); }) From e0f9af1671a1bcb2b84bc74184ed9e4610e7cfe6 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Thu, 10 Sep 2020 15:34:36 +0200 Subject: [PATCH 04/13] UMP: Initial plumbing --- runtime/parachains/src/inclusion.rs | 17 ++++++ runtime/parachains/src/inclusion_inherent.rs | 4 ++ runtime/parachains/src/router.rs | 1 + runtime/parachains/src/router/ump.rs | 59 ++++++++++++++++++++ 4 files changed, 81 insertions(+) create mode 100644 runtime/parachains/src/router/ump.rs diff --git a/runtime/parachains/src/inclusion.rs b/runtime/parachains/src/inclusion.rs index e5e831bcc38b..d5a6e92674b8 100644 --- a/runtime/parachains/src/inclusion.rs +++ b/runtime/parachains/src/inclusion.rs @@ -155,6 +155,8 @@ decl_error! { InternalError, /// The downward message queue is not processed correctly. IncorrectDownwardMessageHandling, + /// At least one upward message sent does not pass the acceptance criteria. + InvalidUpwardMessages, } } @@ -412,6 +414,7 @@ impl Module { &candidate.candidate.commitments.head_data, &candidate.candidate.commitments.new_validation_code, candidate.candidate.commitments.processed_downward_messages, + &candidate.candidate.commitments.upward_messages, )?; for (i, assignment) in scheduled[skip..].iter().enumerate() { @@ -544,6 +547,7 @@ impl Module { &validation_outputs.head_data, &validation_outputs.new_validation_code, validation_outputs.processed_downward_messages, + &validation_outputs.upward_messages, ) } @@ -570,6 +574,10 @@ impl Module { receipt.descriptor.para_id, commitments.processed_downward_messages, ); + weight += >::enact_upward_messages( + receipt.descriptor.para_id, + commitments.upward_messages, + ); Self::deposit_event( Event::::CandidateIncluded(plain, commitments.head_data.clone()) @@ -693,6 +701,7 @@ impl CandidateCheckContext { head_data: &HeadData, new_validation_code: &Option, processed_downward_messages: u32, + upward_messages: &[primitives::v1::UpwardMessage], ) -> Result<(), DispatchError> { ensure!( head_data.0.len() <= self.config.max_head_data_size as _, @@ -722,6 +731,14 @@ impl CandidateCheckContext { ), Error::::IncorrectDownwardMessageHandling, ); + ensure!( + >::check_upward_messages( + &self.config, + para_id, + upward_messages, + ), + Error::::InvalidUpwardMessages, + ); Ok(()) } diff --git a/runtime/parachains/src/inclusion_inherent.rs b/runtime/parachains/src/inclusion_inherent.rs index f9a7465d9128..d827b359c821 100644 --- a/runtime/parachains/src/inclusion_inherent.rs +++ b/runtime/parachains/src/inclusion_inherent.rs @@ -35,6 +35,7 @@ use frame_system::ensure_none; use crate::{ inclusion, scheduler::{self, FreedReason}, + router, }; use inherents::{InherentIdentifier, InherentData, MakeFatalError, ProvideInherent}; @@ -115,6 +116,9 @@ decl_module! { // Note which of the scheduled cores were actually occupied by a backed candidate. >::occupied(&occupied); + // Give some time slice to dispatch pending upward messages. + >::process_pending_upward_messages(); + // And track that we've finished processing the inherent for this block. Included::set(Some(())); diff --git a/runtime/parachains/src/router.rs b/runtime/parachains/src/router.rs index ad12a33bc8c7..c4cd29155962 100644 --- a/runtime/parachains/src/router.rs +++ b/runtime/parachains/src/router.rs @@ -29,6 +29,7 @@ use frame_support::{decl_error, decl_module, decl_storage, weights::Weight}; use primitives::v1::{Id as ParaId, InboundDownwardMessage, Hash}; mod dmp; +mod ump; pub use dmp::QueueDownwardMessageError; diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs new file mode 100644 index 000000000000..246a2c989058 --- /dev/null +++ b/runtime/parachains/src/router/ump.rs @@ -0,0 +1,59 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::{Trait, Module}; +use crate::configuration::HostConfiguration; +use sp_std::prelude::*; +use frame_support::weights::Weight; +use primitives::v1::{Id as ParaId, UpwardMessage}; + +impl Module { + /// Check that all the upward messages sent by a candidate pass the acceptance criteria. Returns + /// false, if any of the messages doesn't pass. + pub(crate) fn check_upward_messages( + config: &HostConfiguration, + para: ParaId, + upward_messages: &[UpwardMessage], + ) -> bool { + drop(para); + + if upward_messages.len() as u32 > config.max_upward_message_num_per_candidate { + return false; + } + + for _ in upward_messages { + return false; + } + + true + } + + /// Enacts all the upward messages sent by a candidate. + pub(crate) fn enact_upward_messages(para: ParaId, upward_messages: Vec) -> Weight { + drop(para); + + for _ in upward_messages { + todo!() + } + + 0 + } + + /// Devote some time into dispatching pending upward messages. + pub(crate) fn process_pending_upward_messages() { + // no-op for now, will be filled in the following commits + } +} From e9dde74b4f60d27cea4d75074075f58a86c6ca1d Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Thu, 17 Sep 2020 19:15:08 +0200 Subject: [PATCH 05/13] UMP: Data layout --- runtime/parachains/src/router.rs | 42 +++++++++++++++++++++++++++- runtime/parachains/src/router/ump.rs | 17 +++++++++-- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/runtime/parachains/src/router.rs b/runtime/parachains/src/router.rs index c4cd29155962..6b168268a3b8 100644 --- a/runtime/parachains/src/router.rs +++ b/runtime/parachains/src/router.rs @@ -26,7 +26,8 @@ use crate::{ }; use sp_std::prelude::*; use frame_support::{decl_error, decl_module, decl_storage, weights::Weight}; -use primitives::v1::{Id as ParaId, InboundDownwardMessage, Hash}; +use sp_std::collections::vec_deque::VecDeque; +use primitives::v1::{Id as ParaId, InboundDownwardMessage, Hash, UpwardMessage}; mod dmp; mod ump; @@ -57,6 +58,44 @@ decl_storage! { /// - `B`: is the relay-chain block number in which a message was appended. /// - `H(M)`: is the hash of the message being appended. DownwardMessageQueueHeads: map hasher(twox_64_concat) ParaId => Hash; + + /* + * Upward Message Passing (UMP) + * + * Storage layout required for UMP, specifically dispatchable upward messages. + */ + + /// The messages waiting to be handled by the relay-chain originating from a certain parachain. + /// + /// Note that some upward messages might have been already processed by the inclusion logic. E.g. + /// channel management messages. + /// + /// The messages are processed in FIFO order. + RelayDispatchQueues: map hasher(twox_64_concat) ParaId => VecDeque; + /// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`. + /// + /// First item in the tuple is the count of messages and second + /// is the total length (in bytes) of the message payloads. + /// + /// Note that this is an auxilary mapping: it's possible to tell the byte size and the number of + /// messages only looking at `RelayDispatchQueues`. This mapping is separate to avoid the cost of + /// loading the whole message queue if only the total size and count are required. + /// + /// Invariant: + /// - The set of keys should exactly match the set of keys of `RelayDispatchQueues`. + RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32); + /// The ordered list of `ParaId`s that have a `RelayDispatchQueue` entry. + /// + /// Invariant: + /// - The set of items from this vector should be exactly the set of the keys in + /// `RelayDispatchQueues` and `RelayDispatchQueueSize`. + NeedsDispatch: Vec; + /// This is the para that gets will get dispatched first during the next upward dispatchable queue + /// execution round. + /// + /// Invariant: + /// - If `Some(para)`, then `para` must be present in `NeedsDispatch`. + NextDispatchRoundStartWith: Option; } } @@ -87,6 +126,7 @@ impl Module { let outgoing = OutgoingParas::take(); for outgoing_para in outgoing { Self::clean_dmp_after_outgoing(outgoing_para); + Self::clean_ump_after_outgoing(outgoing_para); } } diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 246a2c989058..33ff9bf36e24 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -14,13 +14,26 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::{Trait, Module}; +use super::{Trait, Module, Store}; use crate::configuration::HostConfiguration; use sp_std::prelude::*; -use frame_support::weights::Weight; +use frame_support::{StorageMap, StorageValue, weights::Weight}; use primitives::v1::{Id as ParaId, UpwardMessage}; impl Module { + pub(super) fn clean_ump_after_outgoing(outgoing_para: ParaId) { + ::RelayDispatchQueueSize::remove(&outgoing_para); + ::RelayDispatchQueues::remove(&outgoing_para); + ::NeedsDispatch::mutate(|v| { + if let Ok(i) = v.binary_search(&outgoing_para) { + v.remove(i); + } + }); + ::NextDispatchRoundStartWith::mutate(|v| { + *v = v.filter(|p| *p == outgoing_para) + }); + } + /// Check that all the upward messages sent by a candidate pass the acceptance criteria. Returns /// false, if any of the messages doesn't pass. pub(crate) fn check_upward_messages( From c583a41077d25078a9e4d077e5332a14c57c8cbc Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Thu, 17 Sep 2020 19:15:48 +0200 Subject: [PATCH 06/13] UMP: Acceptance criteria & enactment --- runtime/common/src/paras_registrar.rs | 4 +- runtime/parachains/src/mock.rs | 4 +- runtime/parachains/src/router.rs | 9 +- runtime/parachains/src/router/ump.rs | 611 +++++++++++++++++++++++++- runtime/rococo-v1/src/lib.rs | 4 +- runtime/test-runtime/src/lib.rs | 4 +- 6 files changed, 620 insertions(+), 16 deletions(-) diff --git a/runtime/common/src/paras_registrar.rs b/runtime/common/src/paras_registrar.rs index 7c9ed36a9751..ab58878abddd 100644 --- a/runtime/common/src/paras_registrar.rs +++ b/runtime/common/src/paras_registrar.rs @@ -425,7 +425,9 @@ mod tests { type WeightInfo = (); } - impl router::Trait for Test { } + impl router::Trait for Test { + type UmpSink = (); + } impl pallet_session::historical::Trait for Test { type FullIdentification = pallet_staking::Exposure; diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 490c8083ad8f..4fdd6e0e6155 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -108,7 +108,9 @@ impl crate::paras::Trait for Test { type Origin = Origin; } -impl crate::router::Trait for Test { } +impl crate::router::Trait for Test { + type UmpSink = crate::router::MockUmpSink; +} impl crate::scheduler::Trait for Test { } diff --git a/runtime/parachains/src/router.rs b/runtime/parachains/src/router.rs index 6b168268a3b8..b773bd9c1270 100644 --- a/runtime/parachains/src/router.rs +++ b/runtime/parachains/src/router.rs @@ -33,8 +33,15 @@ mod dmp; mod ump; pub use dmp::QueueDownwardMessageError; +pub use ump::UmpSink; -pub trait Trait: frame_system::Trait + configuration::Trait {} +#[cfg(test)] +pub use ump::mock_sink::MockUmpSink; + +pub trait Trait: frame_system::Trait + configuration::Trait { + /// A place where all received upward messages are funneled. + type UmpSink: UmpSink; +} decl_storage! { trait Store for Module as Router { diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 33ff9bf36e24..203575bcfeab 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -15,15 +15,51 @@ // along with Polkadot. If not, see . use super::{Trait, Module, Store}; -use crate::configuration::HostConfiguration; +use crate::configuration::{self, HostConfiguration}; use sp_std::prelude::*; -use frame_support::{StorageMap, StorageValue, weights::Weight}; +use sp_std::collections::{btree_map::BTreeMap, vec_deque::VecDeque}; +use frame_support::{StorageMap, StorageValue, weights::Weight, traits::Get}; use primitives::v1::{Id as ParaId, UpwardMessage}; +/// All upward messages coming from parachains will be funneled into an implementation of this trait. +/// +/// The message is opaque from the perspective of UMP. The message size can range from 0 and there is no upper bound. +/// +/// It's up to the implementation of this trait to decide what to do with a message as long as it +/// returns the amount of weight consumed in the process of handling. Ignoring a message is a valid +/// strategy. +/// +/// There are no guarantees on how much time it takes for the message sent by a candidate to end up +/// in the sink after the candidate was enacted. That typically depends on the UMP traffic, the sizes +/// of upward messages and the configuration of UMP. +/// +/// It is possible that by the time the message is sank the origin parachain was offboarded. It is +/// up to the implementer to check that if it cares. +pub trait UmpSink { + /// Process an incoming upward message and return the amount of weight it consumed. + /// + /// See the trait docs for more details. + fn process_upward_message(origin: ParaId, msg: Vec) -> Weight; +} + +/// An implementation of a sink that just swallows the message without consuming any weight. +impl UmpSink for () { + fn process_upward_message(_: ParaId, _: Vec) -> Weight { + 0 + } +} + +/// Routines related to the upward message passing. impl Module { pub(super) fn clean_ump_after_outgoing(outgoing_para: ParaId) { ::RelayDispatchQueueSize::remove(&outgoing_para); ::RelayDispatchQueues::remove(&outgoing_para); + + // Remove the outgoing para from the `NeedsDispatch` list and from + // `NextDispatchRoundStartWith`. + // + // That's needed for maintaining invariant that `NextDispatchRoundStartWith` points to an + // existing item in `NeedsDispatch`. ::NeedsDispatch::mutate(|v| { if let Ok(i) = v.binary_search(&outgoing_para) { v.remove(i); @@ -41,13 +77,23 @@ impl Module { para: ParaId, upward_messages: &[UpwardMessage], ) -> bool { - drop(para); - if upward_messages.len() as u32 > config.max_upward_message_num_per_candidate { return false; } - for _ in upward_messages { + let (mut para_queue_count, mut para_queue_size) = + ::RelayDispatchQueueSize::get(¶); + + for msg in upward_messages { + para_queue_count += 1; + para_queue_size += msg.len() as u32; + } + + // make sure that the queue is not overfilled. + // we do it here only once since returning false invalidates the whole relay-chain block. + if para_queue_count > config.max_upward_queue_capacity + || para_queue_size > config.max_upward_queue_size + { return false; } @@ -55,18 +101,561 @@ impl Module { } /// Enacts all the upward messages sent by a candidate. - pub(crate) fn enact_upward_messages(para: ParaId, upward_messages: Vec) -> Weight { - drop(para); + pub(crate) fn enact_upward_messages( + para: ParaId, + upward_messages: Vec, + ) -> Weight { + let mut weight = 0; + + if !upward_messages.is_empty() { + let (extra_cnt, extra_size) = upward_messages + .iter() + .fold((0, 0), |(cnt, size), d| (cnt + 1, size + d.len() as u32)); + + ::RelayDispatchQueues::mutate(¶, |v| { + v.extend(upward_messages.into_iter()) + }); - for _ in upward_messages { - todo!() + ::RelayDispatchQueueSize::mutate( + ¶, + |(ref mut cnt, ref mut size)| { + *cnt += extra_cnt; + *size += extra_size; + }, + ); + + ::NeedsDispatch::mutate(|v| { + if let Err(i) = v.binary_search(¶) { + v.insert(i, para); + } + }); + + weight += T::DbWeight::get().reads_writes(3, 3); } - 0 + weight } /// Devote some time into dispatching pending upward messages. pub(crate) fn process_pending_upward_messages() { - // no-op for now, will be filled in the following commits + let mut used_weight_so_far = 0; + + let config = >::config(); + let mut cursor = NeedsDispatchCursor::new::(); + let mut queue_cache = QueueCache::new(); + + while let Some(dispatchee) = cursor.peek() { + if used_weight_so_far >= config.preferred_dispatchable_upward_messages_step_weight { + // Then check whether we've reached or overshoot the + // preferred weight for the dispatching stage. + // + // if so - bail. + break; + } + + // dequeue the next message from the queue of the dispatchee + let (upward_message, became_empty) = queue_cache.dequeue::(dispatchee); + if let Some(upward_message) = upward_message { + used_weight_so_far += + T::UmpSink::process_upward_message(dispatchee, upward_message); + } + + if became_empty { + // the queue is empty now - this para doesn't need attention anymore. + cursor.remove(); + } else { + cursor.advance(); + } + } + + cursor.flush::(); + queue_cache.flush::(); + } +} + +/// To avoid constant fetching, deserializing and serialization the queues are cached. +/// +/// After an item dequeued from a queue for the first time, the queue is stored in this struct rather +/// than being serialized and persisted. +/// +/// This implementation works best when: +/// +/// 1. when the queues are shallow +/// 2. the dispatcher makes more than one cycle +/// +/// if the queues are deep and there are many we would load and keep the queues for a long time, +/// thus increasing the peak memory consumption of the wasm runtime. Under such conditions persisting +/// queues might play better since it's unlikely that they are going to be requested once more. +/// +/// On the other hand, the situation when deep queues exist and it takes more than one dipsatcher +/// cycle to traverse the queues is already sub-optimal and better be avoided. +/// +/// This struct is not supposed to be dropped but rather to be consumed by [`flush`]. +struct QueueCache(BTreeMap); + +struct QueueCacheEntry { + queue: VecDeque, + count: u32, + total_size: u32, +} + +impl QueueCache { + fn new() -> Self { + Self(BTreeMap::new()) + } + + /// Dequeues one item from the upward message queue of the given para. + /// + /// Returns `(upward_message, became_empty)`, where + /// + /// - `upward_message` a dequeued message or `None` if the queue _was_ empty. + /// - `became_empty` is true if the queue _became_ empty. + fn dequeue(&mut self, para: ParaId) -> (Option, bool) { + let cache_entry = self.0.entry(para).or_insert_with(|| { + let queue = as Store>::RelayDispatchQueues::get(¶); + let (count, total_size) = as Store>::RelayDispatchQueueSize::get(¶); + QueueCacheEntry { + queue, + count, + total_size, + } + }); + let upward_message = cache_entry.queue.pop_front(); + if let Some(ref msg) = upward_message { + cache_entry.count -= 1; + cache_entry.total_size -= msg.len() as u32; + } + + let became_empty = cache_entry.queue.is_empty(); + (upward_message, became_empty) + } + + /// Flushes the updated queues into the storage. + fn flush(self) { + // NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics + // within runtime. It is dangerous to use because of double-panics and flushing on a panic + // is not necessary as well. + for ( + para, + QueueCacheEntry { + queue, + count, + total_size, + }, + ) in self.0 + { + if queue.is_empty() { + // remove the entries altogether. + as Store>::RelayDispatchQueues::remove(¶); + as Store>::RelayDispatchQueueSize::remove(¶); + } else { + as Store>::RelayDispatchQueues::insert(¶, queue); + as Store>::RelayDispatchQueueSize::insert(¶, (count, total_size)); + } + } + } +} + +/// A cursor that iterates over all entries in `NeedsDispatch`. +/// +/// This cursor will start with the para indicated by `NextDispatchRoundStartWith` storage entry. +/// This cursor is cyclic meaning that after reaching the end it will jump to the beginning. Unlike +/// an iterator, this cursor allows removing items during the iteration. +/// +/// Each iteration cycle *must be* concluded with a call to either `advance` or `remove`. +/// +/// This struct is not supposed to be dropped but rather to be consumed by [`flush`]. +#[derive(Debug)] +struct NeedsDispatchCursor { + needs_dispatch: Vec, + cur_idx: usize, +} + +impl NeedsDispatchCursor { + fn new() -> Self { + let needs_dispatch: Vec = as Store>::NeedsDispatch::get(); + let start_with = as Store>::NextDispatchRoundStartWith::get(); + + let start_with_idx = match start_with { + Some(para) => match needs_dispatch.binary_search(¶) { + Ok(found_idx) => found_idx, + Err(_supposed_idx) => { + // well that's weird because we maintain an invariant that + // `NextDispatchRoundStartWith` must point into one of the items in + // `NeedsDispatch`. + // + // let's select 0 as the starting index as a safe bet. + debug_assert!(false); + 0 + } + }, + None => 0, + }; + + Self { + needs_dispatch, + cur_idx: start_with_idx, + } + } + + /// Returns the item the cursor points to. + fn peek(&self) -> Option { + self.needs_dispatch.get(self.cur_idx).cloned() + } + + /// Moves the cursor to the next item. + fn advance(&mut self) { + if self.needs_dispatch.is_empty() { + return; + } + self.cur_idx = (self.cur_idx + 1) % self.needs_dispatch.len(); + } + + /// Removes the item under the cursor. + fn remove(&mut self) { + if self.needs_dispatch.is_empty() { + return; + } + let _ = self.needs_dispatch.remove(self.cur_idx); + } + + /// Flushes the dispatcher state into the persistent storage. + fn flush(self) { + let next_one = self.peek(); + as Store>::NextDispatchRoundStartWith::set(next_one); + as Store>::NeedsDispatch::put(self.needs_dispatch); + } +} + +#[cfg(test)] +pub(crate) mod mock_sink { + //! An implementation of a mock UMP sink that allows attaching a probe for mocking the weights + //! and checking the sent messages. + //! + //! A default behavior of the UMP sink is to ignore an incoming message and return 0 weight. + //! + //! A probe can be attached to the mock UMP sink. When attached, the mock sink would consult the + //! probe to check whether the received message was expected and what weight it should return. + //! + //! There are two rules on how to use a probe: + //! + //! 1. There can be only one active probe at a time. Creation of another probe while there is + //! already an active one leads to a panic. The probe is scoped to a thread where it was created. + //! + //! 2. All messages expected by the probe must be received by the time of dropping it. Unreceived + //! messages will lead to a panic while dropping a probe. + + use super::{UmpSink, UpwardMessage, ParaId}; + use std::cell::RefCell; + use std::collections::vec_deque::VecDeque; + use frame_support::weights::Weight; + + #[derive(Debug)] + struct UmpExpectation { + expected_origin: ParaId, + expected_msg: UpwardMessage, + mock_weight: Weight, + } + + std::thread_local! { + // `Some` here indicates that there is an active probe. + static HOOK: RefCell>> = RefCell::new(None); + } + + pub struct MockUmpSink; + impl UmpSink for MockUmpSink { + fn process_upward_message(actual_origin: ParaId, actual_msg: Vec) -> Weight { + HOOK.with(|opt_hook| match &mut *opt_hook.borrow_mut() { + Some(hook) => { + let UmpExpectation { + expected_origin, + expected_msg, + mock_weight, + } = match hook.pop_front() { + Some(expectation) => expectation, + None => { + panic!( + "The probe is active but didn't expect the message:\n\n\t{:?}.", + actual_msg, + ); + } + }; + assert_eq!(expected_origin, actual_origin); + assert_eq!(expected_msg, actual_msg); + mock_weight + } + None => 0, + }) + } + } + + pub struct Probe { + _private: (), + } + + impl Probe { + pub fn new() -> Self { + HOOK.with(|opt_hook| { + let prev = opt_hook.borrow_mut().replace(VecDeque::default()); + + // that can trigger if there were two probes were created during one session which + // is may be a bit strict, but may save time figuring out what's wrong. + // if you land here and you do need the two probes in one session consider + // dropping the the existing probe explicitly. + assert!(prev.is_none()); + }); + Self { _private: () } + } + + /// Add an expected message. + /// + /// The enqueued messages are processed in FIFO order. + pub fn assert_msg( + &mut self, + expected_origin: ParaId, + expected_msg: UpwardMessage, + mock_weight: Weight, + ) { + HOOK.with(|opt_hook| { + opt_hook + .borrow_mut() + .as_mut() + .unwrap() + .push_back(UmpExpectation { + expected_origin, + expected_msg, + mock_weight, + }) + }); + } + } + + impl Drop for Probe { + fn drop(&mut self) { + let _ = HOOK.try_with(|opt_hook| { + let prev = opt_hook.borrow_mut().take().expect( + "this probe was created and hasn't been yet destroyed; + the probe cannot be replaced; + there is only one probe at a time allowed; + thus it cannot be `None`; + qed", + ); + + if !prev.is_empty() { + // some messages are left unchecked. We should notify the developer about this. + // however, we do so only if the thread doesn't panic already. Otherwise, the + // developer would get a SIGILL or SIGABRT without a meaningful error message. + if !std::thread::panicking() { + panic!( + "the probe is dropped and not all expected messages arrived: {:?}", + prev + ); + } + } + }); + // an `Err` here signals here that the thread local was already destroyed. + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use super::mock_sink::Probe; + use crate::router::tests::default_genesis_config; + use crate::mock::{Configuration, Router, new_test_ext}; + use frame_support::IterableStorageMap; + use std::collections::HashSet; + + struct GenesisConfigBuilder { + max_upward_message_num_per_candidate: u32, + max_upward_queue_capacity: u32, + max_upward_queue_size: u32, + preferred_dispatchable_upward_messages_step_weight: Weight, + } + + impl Default for GenesisConfigBuilder { + fn default() -> Self { + Self { + max_upward_message_num_per_candidate: 2, + max_upward_queue_capacity: 4, + max_upward_queue_size: 64, + preferred_dispatchable_upward_messages_step_weight: 1000, + } + } + } + + impl GenesisConfigBuilder { + fn build(self) -> crate::mock::GenesisConfig { + let mut genesis = default_genesis_config(); + let config = &mut genesis.configuration.config; + + config.max_upward_message_num_per_candidate = self.max_upward_message_num_per_candidate; + config.max_upward_queue_capacity = self.max_upward_queue_capacity; + config.max_upward_queue_size = self.max_upward_queue_size; + config.preferred_dispatchable_upward_messages_step_weight = + self.preferred_dispatchable_upward_messages_step_weight; + genesis + } + } + + fn queue_upward_msg(para: ParaId, msg: UpwardMessage) { + let msgs = vec![msg]; + assert!(Router::check_upward_messages( + &Configuration::config(), + para, + &msgs, + )); + let _ = Router::enact_upward_messages(para, msgs); + } + + fn assert_storage_consistency_exhaustive() { + // check that empty queues don't clutter the storage. + for (_para, queue) in ::RelayDispatchQueues::iter() { + assert!(!queue.is_empty()); + } + + // actually count the counts and sizes in queues and compare them to the bookkeeped version. + for (para, queue) in ::RelayDispatchQueues::iter() { + let (expected_count, expected_size) = + ::RelayDispatchQueueSize::get(para); + let (actual_count, actual_size) = + queue.into_iter().fold((0, 0), |(acc_count, acc_size), x| { + (acc_count + 1, acc_size + x.len() as u32) + }); + + assert_eq!(expected_count, actual_count); + assert_eq!(expected_size, actual_size); + } + + // since we wipe the empty queues the sets of paras in queue contents, queue sizes and + // need dispatch set should all be equal. + let queue_contents_set = ::RelayDispatchQueues::iter() + .map(|(k, _)| k) + .collect::>(); + let queue_sizes_set = ::RelayDispatchQueueSize::iter() + .map(|(k, _)| k) + .collect::>(); + let needs_dispatch_set = ::NeedsDispatch::get() + .into_iter() + .collect::>(); + assert_eq!(queue_contents_set, queue_sizes_set); + assert_eq!(queue_contents_set, needs_dispatch_set); + + // `NextDispatchRoundStartWith` should point into a para that is tracked. + if let Some(para) = ::NextDispatchRoundStartWith::get() { + assert!(queue_contents_set.contains(¶)); + } + + // `NeedsDispatch` is always sorted. + assert!(::NeedsDispatch::get() + .windows(2) + .all(|xs| xs[0] <= xs[1])); + } + + #[test] + fn dispatch_empty() { + new_test_ext(default_genesis_config()).execute_with(|| { + assert_storage_consistency_exhaustive(); + + // make sure that the case with empty queues is handled properly + Router::process_pending_upward_messages(); + + assert_storage_consistency_exhaustive(); + }); + } + + #[test] + fn dispatch_single_message() { + let a = ParaId::from(228); + let msg = vec![1, 2, 3]; + + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + let mut probe = Probe::new(); + + probe.assert_msg(a, msg.clone(), 0); + queue_upward_msg(a, msg); + + Router::process_pending_upward_messages(); + + assert_storage_consistency_exhaustive(); + }); + } + + #[test] + fn dispatch_resume_after_exceeding_dispatch_stage_weight() { + let a = ParaId::from(128); + let c = ParaId::from(228); + let q = ParaId::from(911); + + let a_msg_1 = vec![1, 2, 3]; + let a_msg_2 = vec![3, 2, 1]; + let c_msg_1 = vec![4, 5, 6]; + let c_msg_2 = vec![9, 8, 7]; + let q_msg = b"we are Q".to_vec(); + + new_test_ext( + GenesisConfigBuilder { + preferred_dispatchable_upward_messages_step_weight: 500, + ..Default::default() + } + .build(), + ) + .execute_with(|| { + queue_upward_msg(q, q_msg.clone()); + queue_upward_msg(c, c_msg_1.clone()); + queue_upward_msg(a, a_msg_1.clone()); + queue_upward_msg(a, a_msg_2.clone()); + + assert_storage_consistency_exhaustive(); + + // we expect only two first messages to fit in the first iteration. + { + let mut probe = Probe::new(); + + probe.assert_msg(a, a_msg_1.clone(), 300); + probe.assert_msg(c, c_msg_1.clone(), 300); + Router::process_pending_upward_messages(); + assert_storage_consistency_exhaustive(); + + drop(probe); + } + + queue_upward_msg(c, c_msg_2.clone()); + assert_storage_consistency_exhaustive(); + + // second iteration should process the second message. + { + let mut probe = Probe::new(); + + probe.assert_msg(q, q_msg.clone(), 500); + Router::process_pending_upward_messages(); + assert_storage_consistency_exhaustive(); + + drop(probe); + } + + // 3rd iteration. + { + let mut probe = Probe::new(); + + probe.assert_msg(a, a_msg_2.clone(), 100); + probe.assert_msg(c, c_msg_2.clone(), 100); + Router::process_pending_upward_messages(); + assert_storage_consistency_exhaustive(); + + drop(probe); + } + + // finally, make sure that the queue is empty. + { + let probe = Probe::new(); + + Router::process_pending_upward_messages(); + assert_storage_consistency_exhaustive(); + + drop(probe); + } + }); } } diff --git a/runtime/rococo-v1/src/lib.rs b/runtime/rococo-v1/src/lib.rs index f6195e48f5b9..9123a7c4b79d 100644 --- a/runtime/rococo-v1/src/lib.rs +++ b/runtime/rococo-v1/src/lib.rs @@ -765,7 +765,9 @@ impl parachains_paras::Trait for Runtime { type Origin = Origin; } -impl parachains_router::Trait for Runtime {} +impl parachains_router::Trait for Runtime { + type UmpSink = (); // TODO: #1873 To be handled by the XCM receiver. +} impl parachains_inclusion_inherent::Trait for Runtime {} diff --git a/runtime/test-runtime/src/lib.rs b/runtime/test-runtime/src/lib.rs index 6e4913a0dc88..0153d87b4782 100644 --- a/runtime/test-runtime/src/lib.rs +++ b/runtime/test-runtime/src/lib.rs @@ -452,7 +452,9 @@ impl paras::Trait for Runtime { type Origin = Origin; } -impl router::Trait for Runtime {} +impl router::Trait for Runtime { + type UmpSink = (); +} impl scheduler::Trait for Runtime {} From ea5d29cb3476fb6a948cc2b4b4107d34d17936eb Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Tue, 27 Oct 2020 18:52:40 +0100 Subject: [PATCH 07/13] UMP: Fix dispatcher bug and add the test for it --- runtime/parachains/src/router/ump.rs | 48 ++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 203575bcfeab..3b0b6672b08e 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -317,6 +317,12 @@ impl NeedsDispatchCursor { return; } let _ = self.needs_dispatch.remove(self.cur_idx); + + // we might've removed the last element and that doesn't necessarily mean that `needs_dispatch` + // became empty. Reposition the cursor in this case to the beginning. + if self.needs_dispatch.get(self.cur_idx).is_none() { + self.cur_idx = 0; + } } /// Flushes the dispatcher state into the persistent storage. @@ -658,4 +664,46 @@ mod tests { } }); } + + #[test] + fn dispatch_correctly_handle_remove_of_latest() { + let a = ParaId::from(1991); + let b = ParaId::from(1999); + + let a_msg_1 = vec![1, 2, 3]; + let a_msg_2 = vec![3, 2, 1]; + let b_msg_1 = vec![4, 5, 6]; + + new_test_ext( + GenesisConfigBuilder { + preferred_dispatchable_upward_messages_step_weight: 900, + ..Default::default() + } + .build(), + ) + .execute_with(|| { + // We want to test here an edge case, where we remove the queue with the highest + // para id (i.e. last in the needs_dispatch order). + // + // If the last entry was removed we should proceed execution, assuming we still have + // weight available. + + queue_upward_msg(a, a_msg_1.clone()); + queue_upward_msg(a, a_msg_2.clone()); + queue_upward_msg(b, b_msg_1.clone()); + + { + let mut probe = Probe::new(); + + probe.assert_msg(a, a_msg_1.clone(), 300); + probe.assert_msg(b, b_msg_1.clone(), 300); + probe.assert_msg(a, a_msg_2.clone(), 300); + + Router::process_pending_upward_messages(); + + drop(probe); + } + }); + } + } From 294b97e415c6085ce6ed420c74502c7bae950c1a Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Wed, 28 Oct 2020 18:28:57 +0100 Subject: [PATCH 08/13] UMP: Constrain the maximum size of an UMP message This commit addresses the UMP part of https://github.com/paritytech/polkadot/issues/1869 --- .../implementers-guide/src/runtime/router.md | 1 + .../implementers-guide/src/types/runtime.md | 6 ++++++ runtime/parachains/src/configuration.rs | 20 +++++++++++++++++++ runtime/parachains/src/router/ump.rs | 9 +++++++-- 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/roadmap/implementers-guide/src/runtime/router.md b/roadmap/implementers-guide/src/runtime/router.md index af9f861c8469..734081604371 100644 --- a/roadmap/implementers-guide/src/runtime/router.md +++ b/roadmap/implementers-guide/src/runtime/router.md @@ -173,6 +173,7 @@ Candidate Acceptance Function: * `check_upward_messages(P: ParaId, Vec`): 1. Checks that there are at most `config.max_upward_message_num_per_candidate` messages. + 1. Checks that no message exceeds `config.max_upward_message_size`. 1. Verify that `RelayDispatchQueueSize` for `P` has enough capacity for the messages * `check_processed_downward_messages(P: ParaId, processed_downward_messages)`: 1. Checks that `DownwardMessageQueues` for `P` is at least `processed_downward_messages` long. diff --git a/roadmap/implementers-guide/src/types/runtime.md b/roadmap/implementers-guide/src/types/runtime.md index c745f96bbd21..fc1868c32c60 100644 --- a/roadmap/implementers-guide/src/types/runtime.md +++ b/roadmap/implementers-guide/src/types/runtime.md @@ -45,7 +45,13 @@ struct HostConfiguration { /// /// NOTE that this is a soft limit and could be exceeded. pub preferred_dispatchable_upward_messages_step_weight: Weight, + /// The maximum size of an upward message that can be sent by a candidate. + /// + /// This parameter affects the upper bound of size of `CandidateCommitments`. + pub max_upward_message_size: u32, /// The maximum number of messages that a candidate can contain. + /// + /// This parameter affects the upper bound of size of `CandidateCommitments`. pub max_upward_message_num_per_candidate: u32, /// The maximum size of a message that can be put in a downward message queue. /// diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index e964c1bc3c97..3f7f32f39153 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -76,7 +76,13 @@ pub struct HostConfiguration { /// /// NOTE that this is a soft limit and could be exceeded. pub preferred_dispatchable_upward_messages_step_weight: Weight, + /// The maximum size of an upward message that can be sent by a candidate. + /// + /// This parameter affects the size upper bound of the `CandidateCommitments`. + pub max_upward_message_size: u32, /// The maximum number of messages that a candidate can contain. + /// + /// This parameter affects the size upper bound of the `CandidateCommitments`. pub max_upward_message_num_per_candidate: u32, } @@ -251,6 +257,16 @@ decl_module! { Ok(()) } + /// Sets the maximum size of an upward message that can be sent by a candidate. + #[weight = (1_000, DispatchClass::Operational)] + pub fn set_max_upward_message_size(origin, new: u32) -> DispatchResult { + ensure_root(origin)?; + Self::update_config_member(|config| { + sp_std::mem::replace(&mut config.max_upward_message_size, new) != new + }); + Ok(()) + } + /// Sets the maximum number of messages that a candidate can contain. #[weight = (1_000, DispatchClass::Operational)] pub fn set_max_upward_message_num_per_candidate(origin, new: u32) -> DispatchResult { @@ -342,6 +358,7 @@ mod tests { max_upward_queue_size: 228, max_downward_message_size: 2048, preferred_dispatchable_upward_messages_step_weight: 20000, + max_upward_message_size: 448, max_upward_message_num_per_candidate: 5, }; @@ -392,6 +409,9 @@ mod tests { Configuration::set_preferred_dispatchable_upward_messages_step_weight( Origin::root(), new_config.preferred_dispatchable_upward_messages_step_weight, ).unwrap(); + Configuration::set_max_upward_message_size( + Origin::root(), new_config.max_upward_message_size, + ).unwrap(); Configuration::set_max_upward_message_num_per_candidate( Origin::root(), new_config.max_upward_message_num_per_candidate, ).unwrap(); diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 3b0b6672b08e..484004dbfa19 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -23,7 +23,8 @@ use primitives::v1::{Id as ParaId, UpwardMessage}; /// All upward messages coming from parachains will be funneled into an implementation of this trait. /// -/// The message is opaque from the perspective of UMP. The message size can range from 0 and there is no upper bound. +/// The message is opaque from the perspective of UMP. The message size can range from 0 to +/// `config.max_upward_message_size`. /// /// It's up to the implementation of this trait to decide what to do with a message as long as it /// returns the amount of weight consumed in the process of handling. Ignoring a message is a valid @@ -85,8 +86,12 @@ impl Module { ::RelayDispatchQueueSize::get(¶); for msg in upward_messages { + let msg_size = msg.len() as u32; + if msg_size > config.max_upward_message_size { + return false; + } para_queue_count += 1; - para_queue_size += msg.len() as u32; + para_queue_size += msg_size; } // make sure that the queue is not overfilled. From d1dbf727c6f6a5ae034cd042df3bc22ec639b683 Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Thu, 29 Oct 2020 15:54:18 +0100 Subject: [PATCH 09/13] Fix failing test due to misconfiguration --- runtime/parachains/src/router/ump.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 484004dbfa19..1949d8f5bc2a 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -479,6 +479,7 @@ mod tests { use std::collections::HashSet; struct GenesisConfigBuilder { + max_upward_message_size: u32, max_upward_message_num_per_candidate: u32, max_upward_queue_capacity: u32, max_upward_queue_size: u32, @@ -488,6 +489,7 @@ mod tests { impl Default for GenesisConfigBuilder { fn default() -> Self { Self { + max_upward_message_size: 16, max_upward_message_num_per_candidate: 2, max_upward_queue_capacity: 4, max_upward_queue_size: 64, @@ -501,6 +503,7 @@ mod tests { let mut genesis = default_genesis_config(); let config = &mut genesis.configuration.config; + config.max_upward_message_size = self.max_upward_message_size; config.max_upward_message_num_per_candidate = self.max_upward_message_num_per_candidate; config.max_upward_queue_capacity = self.max_upward_queue_capacity; config.max_upward_queue_size = self.max_upward_queue_size; From 72654e82ac016bbf2d58764c690cc40f633a8195 Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Thu, 29 Oct 2020 15:57:17 +0100 Subject: [PATCH 10/13] Make the type of RelayDispatchQueueSize be more apparent in the guide --- roadmap/implementers-guide/src/runtime/router.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roadmap/implementers-guide/src/runtime/router.md b/roadmap/implementers-guide/src/runtime/router.md index 734081604371..4310aa711fc9 100644 --- a/roadmap/implementers-guide/src/runtime/router.md +++ b/roadmap/implementers-guide/src/runtime/router.md @@ -33,7 +33,7 @@ RelayDispatchQueues: map ParaId => Vec; /// /// Invariant: /// - The set of keys should exactly match the set of keys of `RelayDispatchQueues`. -RelayDispatchQueueSize: map ParaId => (u32, u32); +RelayDispatchQueueSize: map ParaId => (u32, u32); // (num_messages, total_bytes) /// The ordered list of `ParaId`s that have a `RelayDispatchQueue` entry. /// /// Invariant: From 25c94f4897ddc73b5fc21553439fa3264d267bbd Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Thu, 29 Oct 2020 16:39:49 +0100 Subject: [PATCH 11/13] Revert renaming `max_upward_queue_capacity` to `max_upward_queue_count` --- roadmap/implementers-guide/src/types/runtime.md | 2 +- runtime/parachains/src/configuration.rs | 12 ++++++------ runtime/parachains/src/router/ump.rs | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/roadmap/implementers-guide/src/types/runtime.md b/roadmap/implementers-guide/src/types/runtime.md index fc1868c32c60..c2f166026cb6 100644 --- a/roadmap/implementers-guide/src/types/runtime.md +++ b/roadmap/implementers-guide/src/types/runtime.md @@ -35,7 +35,7 @@ struct HostConfiguration { /// The amount of blocks ahead to schedule parathreads. pub scheduling_lookahead: u32, /// Total number of individual messages allowed in the parachain -> relay-chain message queue. - pub max_upward_queue_capacity: u32, + pub max_upward_queue_count: u32, /// Total size of messages allowed in the parachain -> relay-chain message queue before which /// no further messages may be added to it. If it exceeds this then the queue may contain only /// a single message. diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index 3f7f32f39153..dc29882da0a7 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -59,7 +59,7 @@ pub struct HostConfiguration { /// The amount of blocks ahead to schedule parachains and parathreads. pub scheduling_lookahead: u32, /// Total number of individual messages allowed in the parachain -> relay-chain message queue. - pub max_upward_queue_capacity: u32, + pub max_upward_queue_count: u32, /// Total size of messages allowed in the parachain -> relay-chain message queue before which /// no further messages may be added to it. If it exceeds this then the queue may contain only /// a single message. @@ -219,10 +219,10 @@ decl_module! { /// Sets the maximum items that can present in a upward dispatch queue at once. #[weight = (1_000, DispatchClass::Operational)] - pub fn set_max_upward_queue_capacity(origin, new: u32) -> DispatchResult { + pub fn set_max_upward_queue_count(origin, new: u32) -> DispatchResult { ensure_root(origin)?; Self::update_config_member(|config| { - sp_std::mem::replace(&mut config.max_upward_queue_capacity, new) != new + sp_std::mem::replace(&mut config.max_upward_queue_count, new) != new }); Ok(()) } @@ -354,7 +354,7 @@ mod tests { chain_availability_period: 10, thread_availability_period: 8, scheduling_lookahead: 3, - max_upward_queue_capacity: 1337, + max_upward_queue_count: 1337, max_upward_queue_size: 228, max_downward_message_size: 2048, preferred_dispatchable_upward_messages_step_weight: 20000, @@ -397,8 +397,8 @@ mod tests { Configuration::set_scheduling_lookahead( Origin::root(), new_config.scheduling_lookahead, ).unwrap(); - Configuration::set_max_upward_queue_capacity( - Origin::root(), new_config.max_upward_queue_capacity, + Configuration::set_max_upward_queue_count( + Origin::root(), new_config.max_upward_queue_count, ).unwrap(); Configuration::set_max_upward_queue_size( Origin::root(), new_config.max_upward_queue_size, diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 1949d8f5bc2a..12562e9db2b3 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -96,7 +96,7 @@ impl Module { // make sure that the queue is not overfilled. // we do it here only once since returning false invalidates the whole relay-chain block. - if para_queue_count > config.max_upward_queue_capacity + if para_queue_count > config.max_upward_queue_count || para_queue_size > config.max_upward_queue_size { return false; @@ -481,7 +481,7 @@ mod tests { struct GenesisConfigBuilder { max_upward_message_size: u32, max_upward_message_num_per_candidate: u32, - max_upward_queue_capacity: u32, + max_upward_queue_count: u32, max_upward_queue_size: u32, preferred_dispatchable_upward_messages_step_weight: Weight, } @@ -491,7 +491,7 @@ mod tests { Self { max_upward_message_size: 16, max_upward_message_num_per_candidate: 2, - max_upward_queue_capacity: 4, + max_upward_queue_count: 4, max_upward_queue_size: 64, preferred_dispatchable_upward_messages_step_weight: 1000, } @@ -505,7 +505,7 @@ mod tests { config.max_upward_message_size = self.max_upward_message_size; config.max_upward_message_num_per_candidate = self.max_upward_message_num_per_candidate; - config.max_upward_queue_capacity = self.max_upward_queue_capacity; + config.max_upward_queue_count = self.max_upward_queue_count; config.max_upward_queue_size = self.max_upward_queue_size; config.preferred_dispatchable_upward_messages_step_weight = self.preferred_dispatchable_upward_messages_step_weight; From c67b929c31d51a49a65189d5ccec129728ff40a7 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Thu, 29 Oct 2020 17:45:47 +0100 Subject: [PATCH 12/13] convert spaces to tabs Co-authored-by: Bernhard Schuster --- runtime/parachains/src/configuration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index dc29882da0a7..8bb957cce189 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -355,7 +355,7 @@ mod tests { thread_availability_period: 8, scheduling_lookahead: 3, max_upward_queue_count: 1337, - max_upward_queue_size: 228, + max_upward_queue_size: 228, max_downward_message_size: 2048, preferred_dispatchable_upward_messages_step_weight: 20000, max_upward_message_size: 448, From 5e92fe0500c7d97afcac604759b16374bdcbe02c Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Thu, 29 Oct 2020 18:24:10 +0100 Subject: [PATCH 13/13] Update runtime/parachains/src/router/ump.rs Co-authored-by: Bernhard Schuster --- runtime/parachains/src/router/ump.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/runtime/parachains/src/router/ump.rs b/runtime/parachains/src/router/ump.rs index 12562e9db2b3..19a81a59fabb 100644 --- a/runtime/parachains/src/router/ump.rs +++ b/runtime/parachains/src/router/ump.rs @@ -96,13 +96,8 @@ impl Module { // make sure that the queue is not overfilled. // we do it here only once since returning false invalidates the whole relay-chain block. - if para_queue_count > config.max_upward_queue_count - || para_queue_size > config.max_upward_queue_size - { - return false; - } - - true + para_queue_count <= config.max_upward_queue_count + && para_queue_size <= config.max_upward_queue_size } /// Enacts all the upward messages sent by a candidate.