Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add/fix session map tests
  • Loading branch information
maciejnems committed Jan 27, 2023
commit f35d172183c87b43b3c6484c579631c41afa15e4
247 changes: 176 additions & 71 deletions finality-aleph/src/session_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ mod tests {
struct MockProvider {
pub session_map: HashMap<NumberFor<TBlock>, SessionAuthorityData>,
pub next_session_map: HashMap<NumberFor<TBlock>, SessionAuthorityData>,
pub asked_for: Arc<Mutex<Vec<NumberFor<TBlock>>>>,
}

struct MockNotificator {
Expand All @@ -393,9 +392,15 @@ mod tests {
Self {
session_map: HashMap::new(),
next_session_map: HashMap::new(),
asked_for: Arc::new(Mutex::new(Vec::new())),
}
}

fn add_session(&mut self, session_id: u64) {
self.session_map
.insert(session_id, authority_data_for_session(session_id));
self.next_session_map
.insert(session_id, authority_data_for_session(session_id + 1));
}
}

impl MockNotificator {
Expand All @@ -409,14 +414,10 @@ mod tests {

impl AuthorityProvider<NumberFor<TBlock>> for MockProvider {
fn authority_data(&self, b: NumberFor<TBlock>) -> Option<SessionAuthorityData> {
let mut asked = self.asked_for.lock().unwrap();
asked.push(b);
self.session_map.get(&b).cloned()
}

fn next_authority_data(&self, b: NumberFor<TBlock>) -> Option<SessionAuthorityData> {
let mut asked = self.asked_for.lock().unwrap();
asked.push(b);
self.next_session_map.get(&b).cloned()
}
}
Expand Down Expand Up @@ -450,48 +451,64 @@ mod tests {
.collect()
}

fn authority_data_for_session(session_id: u64) -> SessionAuthorityData {
authority_data(session_id * 4, (session_id + 1) * 4)
}

fn to_notification(block: TBlock) -> FinalityNotification<TBlock> {
FinalityNotification {
hash: block.header.hash(),
header: block.header,
tree_route: Arc::new([]),
stale_heads: Arc::new([]),
}
}

#[tokio::test(flavor = "multi_thread")]
async fn genesis_catch_up() {
let (_sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mock_notificator = MockNotificator::new(receiver);

mock_provider.add_session(0);

let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
let session_map = updater.readonly_session_map();

let _handle = tokio::spawn(updater.run());

// wait a bit
Delay::new(Duration::from_millis(50)).await;

assert_eq!(
session_map.get(SessionId(0)).await,
Some(authority_data(0, 4))
);
assert_eq!(
session_map.get(SessionId(1)).await,
Some(authority_data(4, 8))
);
}

#[tokio::test(flavor = "multi_thread")]
async fn updates_session_map_on_notifications() {
let mut client = Arc::new(TestClientBuilder::new().build());
let (sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mock_notificator = MockNotificator::new(receiver);

mock_provider.session_map.insert(0, authority_data(0, 4));
mock_provider
.next_session_map
.insert(0, authority_data(4, 8));
mock_provider
.next_session_map
.insert(1, authority_data(8, 12));
mock_provider
.next_session_map
.insert(2, authority_data(12, 16));

let updater = SessionMapUpdater::new(mock_provider, mock_notificator);
mock_provider.add_session(0);
mock_provider.add_session(1);
mock_provider.add_session(2);

let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
let session_map = updater.readonly_session_map();

let blocks = n_new_blocks(&mut client, 2);
let block_1 = blocks.get(0).cloned().unwrap();
let block_2 = blocks.get(1).cloned().unwrap();
sender
.unbounded_send(FinalityNotification {
hash: block_1.header.hash(),
header: block_1.header,
tree_route: Arc::new([]),
stale_heads: Arc::new([]),
})
.unwrap();
sender
.unbounded_send(FinalityNotification {
hash: block_2.header.hash(),
header: block_2.header,
tree_route: Arc::new([]),
stale_heads: Arc::new([]),
})
.unwrap();
for block in n_new_blocks(&mut client, 2) {
sender.unbounded_send(to_notification(block)).unwrap();
}

let _handle = tokio::spawn(updater.run(SessionPeriod(1)));
let _handle = tokio::spawn(updater.run());

// wait a bit
Delay::new(Duration::from_millis(50)).await;
Expand All @@ -515,97 +532,185 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread")]
async fn updates_session_map_on_catching_up() {
async fn catch_up() {
let (_sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mut mock_notificator = MockNotificator::new(receiver);

mock_provider.session_map.insert(0, authority_data(0, 4));
mock_provider
.next_session_map
.insert(0, authority_data(4, 8));
mock_provider
.next_session_map
.insert(1, authority_data(8, 12));
mock_provider
.next_session_map
.insert(2, authority_data(12, 16));
mock_provider.add_session(0);
mock_provider.add_session(1);
mock_provider.add_session(2);

mock_notificator.last_finalized = 2;

let updater = SessionMapUpdater::new(mock_provider, mock_notificator);
let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
let session_map = updater.readonly_session_map();

let _handle = tokio::spawn(updater.run(SessionPeriod(1)));
let _handle = tokio::spawn(updater.run());

// wait a bit
Delay::new(Duration::from_millis(50)).await;

assert_eq!(
session_map.get(SessionId(0)).await,
Some(authority_data(0, 4))
Some(authority_data_for_session(0))
);
assert_eq!(
session_map.get(SessionId(1)).await,
Some(authority_data(4, 8))
Some(authority_data_for_session(1))
);
assert_eq!(
session_map.get(SessionId(2)).await,
Some(authority_data(8, 12))
Some(authority_data_for_session(2))
);
assert_eq!(
session_map.get(SessionId(3)).await,
Some(authority_data(12, 16))
Some(authority_data_for_session(3))
);
}

#[tokio::test(flavor = "multi_thread")]
async fn prunes_old_sessions() {
async fn catch_up_old_sessions() {
let (_sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mut mock_notificator = MockNotificator::new(receiver);

mock_provider.session_map.insert(0, authority_data(0, 4));
for i in 0..=2 * PRUNING_THRESHOLD {
mock_provider.next_session_map.insert(
i as u64,
authority_data(4 * (i + 1) as u64, 4 * (i + 2) as u64),
);
mock_provider.add_session(i as u64);
}

mock_notificator.last_finalized = 20;

let asked = mock_provider.asked_for.clone();
let updater = SessionMapUpdater::new(mock_provider, mock_notificator);
let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
let session_map = updater.readonly_session_map();

let _handle = tokio::spawn(updater.run(SessionPeriod(1)));
let _handle = tokio::spawn(updater.run());

// wait a bit
Delay::new(Duration::from_millis(50)).await;

{
let asked = asked.lock().unwrap();
assert_eq!((10..=20).into_iter().collect::<Vec<_>>(), *asked);
}
for i in 0..=20 - PRUNING_THRESHOLD {
for i in 0..=PRUNING_THRESHOLD {
assert_eq!(
session_map.get(SessionId(i)).await,
None,
"Session {:?} should be pruned",
i
);
}
for i in 21 - PRUNING_THRESHOLD..=20 {
for i in PRUNING_THRESHOLD + 1..=2 * PRUNING_THRESHOLD {
assert_eq!(
session_map.get(SessionId(i)).await,
Some(authority_data(4 * i as u64, 4 * (i + 1) as u64)),
Some(authority_data_for_session(i as u64)),
"Session {:?} should not be pruned",
i
);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn deals_with_database_pruned_authorities() {
let (_sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mut mock_notificator = MockNotificator::new(receiver);

mock_provider.add_session(5);
mock_notificator.last_finalized = 5;

let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
let session_map = updater.readonly_session_map();

let _handle = tokio::spawn(updater.run());

// wait a bit
Delay::new(Duration::from_millis(50)).await;

for i in 0..5 {
assert_eq!(
session_map.get(SessionId(i)).await,
None,
"Session {:?} should not be available",
i
);
}

assert_eq!(
session_map.get(SessionId(5)).await,
Some(authority_data_for_session(5))
);
assert_eq!(
session_map.get(SessionId(6)).await,
Some(authority_data_for_session(6))
);
}

#[tokio::test(flavor = "multi_thread")]
async fn prunes_old_sessions() {
let mut client = Arc::new(TestClientBuilder::new().build());
let (sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mock_notificator = MockNotificator::new(receiver);

for i in 0..=2 * PRUNING_THRESHOLD {
mock_provider.add_session(i as u64);
}

let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
let session_map = updater.readonly_session_map();

let _handle = tokio::spawn(updater.run());

let mut blocks = n_new_blocks(&mut client, 2 * PRUNING_THRESHOLD as u64);

for block in blocks.drain(..PRUNING_THRESHOLD as usize) {
sender.unbounded_send(to_notification(block)).unwrap();
}

// wait a bit
Delay::new(Duration::from_millis(50)).await;

for i in 0..=PRUNING_THRESHOLD + 1 {
assert_eq!(
session_map.get(SessionId(i)).await,
Some(authority_data_for_session(i as u64)),
"Session {:?} should be available",
i
);
}

for i in PRUNING_THRESHOLD + 2..=21 {
assert_eq!(
session_map.get(SessionId(i)).await,
None,
"Session {:?} should not be avalable yet",
i
);
}

for block in blocks {
sender.unbounded_send(to_notification(block)).unwrap();
}

Delay::new(Duration::from_millis(50)).await;

for i in 0..PRUNING_THRESHOLD {
assert_eq!(
session_map.get(SessionId(i)).await,
None,
"Session {:?} should be pruned",
i
);
}

for i in PRUNING_THRESHOLD + 1..=21 {
assert_eq!(
session_map.get(SessionId(i)).await,
Some(authority_data_for_session(i as u64)),
"Session {:?} should be avalable",
i
);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn subscription_with_already_defined_session_works() {
let mut shared = SharedSessionMap::new();
Expand Down