Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Integration test paused queues
Signed-off-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
ggwpez committed Feb 20, 2023
commit 4d18efed31bbcbc65eaca98f42541b43aeb52c9c
134 changes: 123 additions & 11 deletions frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
#![cfg(test)]

use crate::{
mock::{
new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed,
},
mock::{new_test_ext, IntoWeight, MockedWeightInfo, NumMessagesProcessed, SuspendedQueues},
mock_helpers::MessageOrigin,
*,
};

Expand All @@ -39,6 +38,7 @@ use sp_runtime::{
testing::Header,
traits::{BlakeTwo256, IdentityLookup},
};
use std::collections::{BTreeMap, BTreeSet};

type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Test>;
type Block = frame_system::mocking::MockBlock<Test>;
Expand Down Expand Up @@ -90,7 +90,7 @@ parameter_types! {
impl Config for Test {
type RuntimeEvent = RuntimeEvent;
type WeightInfo = MockedWeightInfo;
type MessageProcessor = CountingMessageProcessor;
type MessageProcessor = mock::CountingMessageProcessor;
type Size = u32;
type QueueChangeHandler = ();
type HeapSize = HeapSize;
Expand All @@ -100,7 +100,8 @@ impl Config for Test {

/// Simulates heavy usage by enqueueing and processing large amounts of messages.
///
/// Best to run with `-r`, `RUST_LOG=info` and `RUSTFLAGS='-Cdebug-assertions=y'`.
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
Expand Down Expand Up @@ -130,29 +131,131 @@ fn stress_test_enqueue_and_service() {
let mut msgs_remaining = 0;
for _ in 0..blocks {
// Start by enqueuing a large number of messages.
let (enqueued, _) =
let enqueued =
enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng);
msgs_remaining += enqueued;

// Pick a fraction of all messages currently in queue and process them.
let processed = rng.gen_range(1..=msgs_remaining);
log::info!("Processing {} of all messages {}", processed, msgs_remaining);
process_messages(processed); // This also advances the block.
process_some_messages(processed); // This also advances the block.
msgs_remaining -= processed;
}
log::info!("Processing all remaining {} messages", msgs_remaining);
process_messages(msgs_remaining);
process_all_messages(msgs_remaining);
post_conditions();
});
}

/// Simulates heavy usage of the suspension logic via `Yield`.
///
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
/// ```pre
/// Enqueued 11776 messages across 2526 queues. Payload 173.94 KiB
/// Suspended 63 and resumed 7 queues of 2526 in total
/// Processing 593 messages. Resumed msgs: 11599, All msgs: 11776
/// Enqueued 30104 messages across 5533 queues. Payload 416.62 KiB
/// Suspended 24 and resumed 15 queues of 5533 in total
/// Processing 12841 messages. Resumed msgs: 40857, All msgs: 41287
/// Processing all 28016 remaining resumed messages
/// Resumed all 64 suspended queues
/// Processing all remaining 430 messages
/// ```
#[test]
#[ignore] // Only run in the CI.
fn stress_test_queue_suspension() {
let blocks = 20;
let max_queues = 10_000;
let max_messages_per_queue = 10_000;
let (max_suspend_per_block, max_resume_per_block) = (100, 50);
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(41);

new_test_ext::<Test>().execute_with(|| {
let mut suspended = BTreeSet::<u32>::new();
let mut msgs_remaining = 0;

for _ in 0..blocks {
// Start by enqueuing a large number of messages.
let enqueued =
enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng);
msgs_remaining += enqueued;
let per_queue = msgs_per_queue();

// Suspend a random subset of queues.
let to_suspend = rng.gen_range(0..max_suspend_per_block).min(per_queue.len());
for _ in 0..to_suspend {
let q = rng.gen_range(0..per_queue.len());
suspended.insert(*per_queue.iter().nth(q).map(|(q, _)| q).unwrap());
}
// Resume a random subst of suspended queues.
let to_resume = rng.gen_range(0..max_resume_per_block).min(suspended.len());
for _ in 0..to_resume {
let q = rng.gen_range(0..suspended.len());
suspended.remove(&suspended.iter().nth(q).unwrap().clone());
}
log::info!(
"Suspended {} and resumed {} queues of {} in total",
to_suspend,
to_resume,
per_queue.len()
);
SuspendedQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect());

// Pick a fraction of all messages currently in queue and process them.
let resumed_messages =
per_queue.iter().filter(|(q, _)| !suspended.contains(q)).map(|(_, n)| n).sum();
let processed = rng.gen_range(1..=resumed_messages);
log::info!(
"Processing {} messages. Resumed msgs: {}, All msgs: {}",
processed,
resumed_messages,
msgs_remaining
);
process_some_messages(processed); // This also advances the block.
msgs_remaining -= processed;
}
let per_queue = msgs_per_queue();
let resumed_messages =
per_queue.iter().filter(|(q, _)| !suspended.contains(q)).map(|(_, n)| n).sum();
log::info!("Processing all {} remaining resumed messages", resumed_messages);
process_all_messages(resumed_messages);
msgs_remaining -= resumed_messages;

let resumed = SuspendedQueues::take();
log::info!("Resumed all {} suspended queues", resumed.len());
log::info!("Processing all remaining {} messages", msgs_remaining);
process_all_messages(msgs_remaining);
post_conditions();
});
}

/// How many messages are in each queue.
fn msgs_per_queue() -> BTreeMap<u32, u32> {
let mut per_queue = BTreeMap::new();
for (o, q) in BookStateFor::<Test>::iter() {
let MessageOrigin::Everywhere(o) = o else {
unreachable!();
};
per_queue.insert(o, q.message_count as u32);
}
per_queue
}

/// Enqueue a random number of random messages into a random number of queues.
///
/// Returns the total number of enqueued messages, their combined length and the number of messages
/// per queue.
fn enqueue_messages(
max_queues: u32,
max_per_queue: u32,
max_msg_len: u32,
rng: &mut StdRng,
) -> (u32, usize) {
) -> u32 {
let num_queues = rng.gen_range(1..max_queues);
let mut num_messages = 0;
let mut total_msg_len = 0;
Expand All @@ -179,11 +282,11 @@ fn enqueue_messages(
num_queues,
total_msg_len as f64 / 1024.0
);
(num_messages, total_msg_len as usize)
num_messages
}

/// Process the number of messages.
fn process_messages(num_msgs: u32) {
fn process_some_messages(num_msgs: u32) {
let weight = (num_msgs as u64).into_weight();
ServiceWeight::set(Some(weight));
let consumed = next_block();
Expand All @@ -192,6 +295,15 @@ fn process_messages(num_msgs: u32) {
assert_eq!(NumMessagesProcessed::take(), num_msgs as usize);
}

/// Process all remaining messages and assert their number.
fn process_all_messages(expected: u32) {
ServiceWeight::set(Some(Weight::MAX));
let consumed = next_block();

assert_eq!(consumed, Weight::from_all(expected as u64));
assert_eq!(NumMessagesProcessed::take(), expected as usize);
}

/// Returns the weight consumed by `MessageQueue::on_initialize()`.
fn next_block() -> Weight {
MessageQueue::on_finalize(System::block_number());
Expand Down