Skip to content
Prev Previous commit
Next Next commit
Join periodic and normal broadcast last Instant
  • Loading branch information
maciejnems committed Dec 28, 2022
commit c989c8e754206c682f197bc521e2a3e62c679c13
53 changes: 47 additions & 6 deletions finality-aleph/src/sync/ticker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use tokio::time::{sleep, Duration, Instant};
/// This struct is used for determining when we should broadcast justification so that it does not happen too often.
pub struct BroadcastTicker {
last_broadcast: Instant,
last_periodic_broadcast: Instant,
current_periodic_timeout: Duration,
max_timeout: Duration,
min_timeout: Duration,
Expand All @@ -13,7 +12,6 @@ impl BroadcastTicker {
pub fn new(max_timeout: Duration, min_timeout: Duration) -> Self {
Self {
last_broadcast: Instant::now(),
last_periodic_broadcast: Instant::now(),
current_periodic_timeout: max_timeout,
max_timeout,
min_timeout,
Expand All @@ -39,12 +37,12 @@ impl BroadcastTicker {
}

/// Sleeps until next periodic broadcast should happen.
/// In case time elapsed, sets last periodic broadcast to now and periodic timeout to `self.max_timeout`.
/// In case enough time elapsed, sets last broadcast to now and periodic timeout to `self.max_timeout`.
pub async fn wait_for_periodic_broadcast(&mut self) {
let since_last = Instant::now().saturating_duration_since(self.last_periodic_broadcast);
let since_last = Instant::now().saturating_duration_since(self.last_broadcast);
sleep(self.current_periodic_timeout.saturating_sub(since_last)).await;
self.current_periodic_timeout = self.max_timeout;
self.last_periodic_broadcast = Instant::now();
self.last_broadcast = Instant::now();
}
}

Expand All @@ -59,6 +57,7 @@ mod tests {
let max_timeout = Duration::from_millis(700);
let min_timeout = Duration::from_millis(100);
let mut ticker = BroadcastTicker::new(max_timeout, min_timeout);

assert!(!ticker.try_broadcast());
sleep(min_timeout).await;
assert!(ticker.try_broadcast());
Expand All @@ -70,6 +69,26 @@ mod tests {
let max_timeout = Duration::from_millis(700);
let min_timeout = Duration::from_millis(100);
let mut ticker = BroadcastTicker::new(max_timeout, min_timeout);

assert_ne!(
timeout(2 * min_timeout, ticker.wait_for_periodic_broadcast()).await,
Ok(())
);
assert_eq!(
timeout(max_timeout, ticker.wait_for_periodic_broadcast()).await,
Ok(())
);
}

#[tokio::test]
async fn wait_for_periodic_broadcast_after_try_broadcast_true() {
let max_timeout = Duration::from_millis(700);
let min_timeout = Duration::from_millis(100);
let mut ticker = BroadcastTicker::new(max_timeout, min_timeout);

sleep(min_timeout).await;
assert!(ticker.try_broadcast());

assert_ne!(
timeout(2 * min_timeout, ticker.wait_for_periodic_broadcast()).await,
Ok(())
Expand All @@ -81,11 +100,13 @@ mod tests {
}

#[tokio::test]
async fn wait_for_periodic_broadcast_after_try_broadcast() {
async fn wait_for_periodic_broadcast_after_try_broadcast_false() {
let max_timeout = Duration::from_millis(700);
let min_timeout = Duration::from_millis(100);
let mut ticker = BroadcastTicker::new(max_timeout, min_timeout);

assert!(!ticker.try_broadcast());

assert_eq!(
timeout(2 * min_timeout, ticker.wait_for_periodic_broadcast()).await,
Ok(())
Expand All @@ -99,4 +120,24 @@ mod tests {
Ok(())
);
}

#[tokio::test]
async fn try_broadcast_after_wait_for_periodic_broadcast() {
let max_timeout = Duration::from_millis(700);
let min_timeout = Duration::from_millis(100);
let mut ticker = BroadcastTicker::new(max_timeout, min_timeout);

assert_eq!(
timeout(
max_timeout + min_timeout,
ticker.wait_for_periodic_broadcast()
)
.await,
Ok(())
);

assert!(!ticker.try_broadcast());
sleep(min_timeout).await;
assert!(ticker.try_broadcast());
}
}